读书人

在MongoDB里兑现循环序列功能

发布时间: 2013-06-26 14:29:32 作者: rapoo

在MongoDB里实现循环序列功能
db.counters.save({_id:"SerialNo1", val:0, maxval:99})

?

然后我们想system.js里添加一个Javascript函数

db.system.js.save({_id:"getNextUniqueSeq",value:function (keyName) {    var seqObj = db.counters.findOne({_id:keyName});    if (seqObj == null) {print("can not find record with key: " + keyName);        return -1;    }    // the max value of sequence    var maxVal = seqObj.maxval;    // the current value of sequence    var curVal = seqObj.val;while(true){// if curVal reach max, reset itif(curVal >= maxVal){db.counters.update({_id : keyName, val : curVal}, { $set : { val : 0 }}, false, false);var err = db.getLastErrorObj();if( err && err.code ) {print( "unexpected error reset data: " + tojson( err ) );                return -2;        } else if (err.n == 0){// fail to reset value, may be reseted by othersprint("fail to reset value: ");} // get current value again.seqObj = db.counters.findOne({_id:keyName});maxVal = seqObj.maxval;curVal = seqObj.val;continue;} // if curVal not reach the max, inc it;// increase db.counters.update({_id : keyName, val : curVal}, { $inc : { val : 1 }}, false, false);var err = db.getLastErrorObj();if( err && err.code ) {print( "unexpected error inc val: " + tojson( err ) );               return -3;        } else if (err.n == 0){// fail to reset value, may be increased by othersprint("fail to inc value: ");// get current value again.seqObj = db.counters.findOne({_id:keyName});maxVal = seqObj.maxval;curVal = seqObj.val;continue;} else {var retVal = curVal + 1;print("success to get seq : " + retVal);// increase successfulreturn retVal;}}}});

上面这段会把指定的序列号的val值+1,如果val达到上限则从0开始。所以叫循环序列。

?

其实上面的实现在原理上和Java里的AtomicInteger系列的功能实现是类似的,利用循环重试和原子性的CAS来实现。这种实现方式在多线程的环境里由于锁(Monitor)的范围很小,所以并发性上比排他锁要好一些。

?

下面我们用Java来测试一下这个函数的正确性。 即在多线程的情况下会不会得到重复的序列号。

?

第一个测试,val=0, maxval=2000, Java端20个线程每个线程循环调用100次。 共2000次。 所以正确的情况下,从0到1999应该每个数字只出现一次。

?

    @Test    public void testGetNextUniqueSeq1() throws Exception {        final int THREAD_COUNT = 20;        final int LOOP_COUNT = 100;        Mongo mongoClient = new Mongo("172.17.2.100", 27017);        DB db = mongoClient.getDB("im");        db.authenticate("imadmin", "imadmin".toCharArray());        BasicDBObject q = new BasicDBObject();        q.put("_id", "UNIQUE_KEY");        BasicDBObject upd = new BasicDBObject();        BasicDBObject set = new BasicDBObject();        set.put("val", 0);        set.put("maxval", THREAD_COUNT * LOOP_COUNT);        upd.put("$set", set);        db.getCollection("counters").update(q, upd);        Thread[] threads = new Thread[THREAD_COUNT];        final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];        for (int i = 0; i < THREAD_COUNT; i++) {            final int temp_i = i;            threads[i] = new Thread("" + i) {                @Override                public void run() {                    try {                        Mongo mongoClient = new Mongo("172.17.2.100", 27017);                        DB db = mongoClient.getDB("im");                        db.authenticate("imadmin", "imadmin".toCharArray());                        for (int j = 0; j < LOOP_COUNT; j++) {                            Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");                            System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());                            results[temp_i][j] = ((Double) result).intValue();                        }                    } catch (UnknownHostException e) {                        e.printStackTrace();                    }                }            };        }        for (Thread thread : threads) {            thread.start();        }        for (Thread thread : threads) {            thread.join();        }        for (int num = 1; num <= LOOP_COUNT * THREAD_COUNT; num++) {            // every number appear 1 times only!            int times = 0;            for (int j = 0; j < THREAD_COUNT; j++) {                for (int k = 0; k < LOOP_COUNT; k++) {                    if (results[j][k] == num)                        times++;                }            }            assertEquals(1, times);        }    }

?

然后我们再测试一下循环的情况。 val=0, maxval=99。 同样是Java端20个线程每个线程循环调用100次。 共2000次。这次从0到99的数字每个应该取得20次。

?

    @Test    public void testGetNextUniqueSeq2() throws Exception {        final int THREAD_COUNT = 20;        final int LOOP_COUNT = 100;        Mongo mongoClient = new Mongo("172.17.2.100", 27017);        DB db = mongoClient.getDB("im");        db.authenticate("imadmin", "imadmin".toCharArray());        BasicDBObject q = new BasicDBObject();        q.put("_id", "UNIQUE_KEY");        BasicDBObject upd = new BasicDBObject();        BasicDBObject set = new BasicDBObject();        set.put("val", 0);        set.put("maxval", LOOP_COUNT);        upd.put("$set", set);        db.getCollection("counters").update(q, upd);        Thread[] threads = new Thread[THREAD_COUNT];        final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];        for (int i = 0; i < THREAD_COUNT; i++) {            final int temp_i = i;            threads[i] = new Thread("" + i) {                @Override                public void run() {                    try {                        Mongo mongoClient = new Mongo("172.17.2.100", 27017);                        DB db = mongoClient.getDB("im");                        db.authenticate("imadmin", "imadmin".toCharArray());                        for (int j = 0; j < LOOP_COUNT; j++) {                            Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");                            System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());                            results[temp_i][j] = ((Double) result).intValue();                        }                    } catch (UnknownHostException e) {                        e.printStackTrace();                    }                }            };        }        for (Thread thread : threads) {            thread.start();        }        for (Thread thread : threads) {            thread.join();        }        for (int num = 1; num <= LOOP_COUNT; num++) {            // every number appear 20 times only!            int times = 0;            for (int j = 0; j < THREAD_COUNT; j++) {                for (int k = 0; k < LOOP_COUNT; k++) {                    if (results[j][k] == num)                        times++;                }            }            assertEquals(20, times);        }    }

?

这个测试跑了几次都是正确的。

?

由于没有可以进行对比其他的实现方式(例如排他锁)所以没有做性能测试。

?

写在最后。 虽然MongoDB支持类似于存储过程的Stored Javascript,但是其实不建议使用这个来解决复杂问题。主要原因是没法调试,维护起来太不方便。而且在2.4之前MongoDB对服务端Javascript支持并不是很好, 一个mongod进程同时只能执行一段Javascript。如果能在应用层解决掉还是在应用层里实现逻辑比较好。

读书人网 >其他数据库

热点推荐