并发请求update如何保证数据的一致性? - CNode技术社区

并发请求update如何保证数据的一致性?
发布于 9 年前 作者 Fov6363 9551 次浏览 来自 问答

一.情景介绍: 1.有一个接口提供update数据的功能,但这个接口在update时,需要做一些额外的操作,即需要通过先find后update的方式来修改数据,这样在并发的情况下如何保证数据的一致性? 2.使用mongoose 二.相关伪代码如下:

const user = await user.find({"id":id}).exec();
//要对user的属性做一些修改
user.xxx = xxxxxx;
await user.save();

当这个接口被并发调用时,就会出现值覆盖的问题,比如第一个请求是把user.age + 1,第二个请求同第一个请求,但实际效果user.age可能只加了1,因为第二个请求取到的user.age时,第一个请求还没有执行完save操作,导致第二个请求读到了脏数据,所以后面的值会覆盖前面的值. 三.局限性 1.中间的这部分修改操作是必须的,且无法转换成mongo原生支持的操作符(指$inc等),通常见于利用原属性值计算新值,比如user.age = (new Date().getTime() - user.birthday).必须在内存中计算属性值. 2.这个接口是并发调用,不能很好的实现队列的机制. 3.mongodb不支持行级锁,且没有暴露出来,所以通过mongodb的锁机制去搞这个感觉也没什么思路 四.方法一 1.把最后那个user.save()改成 user.findOneAndUpdate({"id":id,"updated_at":user.updated_at},{update属性}) ,返回找不到对应的document时,可以重新走一遍这个流程,如果返回数据了,证明修改成功,这个依赖的是每次update操作,都会更新其updated_at字段,变相的用程序实现乐观锁,这样做会不会陷入死循环状态?这样会不会有其它问题? 2.希望各位大神有其它的思路和方法指教一下.

10 回复

mongodb其实不是很适合做这类操作,因为它只支持文档级别的原子操作,并且没有事务,由于node单线程,天生异步,因此node里面没有锁的概念,而且node也不需要锁。所以在这种情况下只能考虑从第三方来实现,比如可以借助redis实现一个分布式锁。

@nullcc 基于redis实现一个分布式锁,刚才刷其它网站时看到了,对redis的分布式锁没有经验,我要去看下如何实现...因为目前的这个项目是以cluster模式启动了(以后可能是发展为集群),所以借助redis来实现分布式锁我觉得是很不错的方向.感觉回答.

@Fov6363 具体来说,你有一个资源A,你想通过一个接口来更新这个资源A,并且在这个接口中除了更新A还有一些其他操作。比如我们需要先从DB中获取A,做一些相关操作,计算出A的一个值,然后真正更新A。这个过程中有可能产生竞争,导致数据一致性有问题。 如果用redis实现一个分布式锁,考虑最简单的情况,一般来说mongodb的文档都有一个_id,就拿这个_id做key,value为1,表示同一时间只能有一个操作方操作资源A。redis的incr和decr操作都是原子的,所以在你要操作这个资源A的时候,要像在多线程编程中先acquire lock,操作完毕要release lock。这是最基本的情况,如果要做到稳定高效,你还要考虑获取锁超时、执行失败时release lock的问题。大致是这个思路。

@nullcc 非常感谢,这个过程我明白,关键是您说的获取锁超时,执行失败释放锁的问题不太好处理,我在看 redlock这个包,看是否能满足我现在的需求.

@Fov6363 这个库应该是可以用的。文档的第一句已经表明了它的用处: This is a node.js implementation of the redlock algorithm for distributed redis locks. It provides strong guarantees in both single-redis and multi-redis environments, and provides fault tolerance through use of multiple independent redis instances or clusters.

具体到例子里面也看到了一些ttl的设置,error handling之类的,仔细研究一下用法就行。

const debug = require('debug');
const Promise = require('bluebird');
const http = require('http');
var client1 = require('redis').createClient(6379, '127.0.0.1');
var Redlock = require('redlock');
const Db = require('mongodb').Db;
const MongoClient = require('mongodb').MongoClient;
const Server = require('mongodb').Server;
var collection;
var redlock = new Redlock(
 // you should have one client for each redis node
 // in your cluster
 [client1],
 {
 // the expected clock drift; for more details
 // see http://redis.io/topics/distlock
 driftFactor: 0.01, // time in ms
 // the max number of times Redlock will attempt
 // to lock a resource before erroring
 retryCount: 10,
 // the time in ms between attempts
 retryDelay: 200, // time in ms
 // the max time in ms randomly added to retries
 // to improve performance under high contention
 // see https://www.awsarchitectureblog.com/2015/03/backoff.html
 retryJitter: 200 // time in ms
 }
);
const hostname = '127.0.0.1';
const port = 3000;
const server = http.createServer((req, res) => {
 res.statusCode = 200;
 res.setHeader('Content-Type', 'text/plain');
 // res.end('Hello World\n');
 update(res);
});
var resource = "1219924275036161";
var ttl = 10000;
MongoClient.connect('mongodb://localhost:27017/baas',async function(err, db) {
 // Create a collection
 collection = db.collection('users');
 // Insert the docs
 server.listen(port, hostname, () => {
 console.log(`Server running at http://${hostname}:${port}/`);
 });
});
function update(res) {
 console.log('-----');
 redlock.lock(resource,ttl).then(lock => {
 collection.find({'id':1219924275036161}).toArray(function (err,result) {
 let age = result[0].user_data.age;
 age += 1;
 // age = 0;
 collection.updateOne({'id':1219924275036161},{'$set':{'user_data.age':age}},function (err,result) {
 lock.unlock();
 res.end('nihao');
 })
 });
 })
}

赋上一个小demo,其中运行了两次,第一次不使用redlock包,第二次使用redloack包.使用wrk工具测试如下: 第一次

➜ wrk wrk -t4 -c200 -d30s --latency http://127.0.0.1:3000
Running 30s test @ http://127.0.0.1:3000
 4 threads and 200 connections
 Thread Stats Avg Stdev Max +/- Stdev
 Latency 82.95ms 35.47ms 408.34ms 83.17%
 Req/Sec 620.24 185.87 1.15k 67.79%
 Latency Distribution
 50% 74.00ms
 75% 95.12ms
 90% 121.55ms
 99% 236.99ms
 73916 requests in 30.10s, 9.16MB read
 Socket errors: connect 0, read 67, write 0, timeout 0
Requests/sec: 2455.65
Transfer/sec: 311.75KB

最后的age为976,起始为0. 第二次:

➜ wrk wrk -t4 -c200 -d30s --latency http://127.0.0.1:3000
Running 30s test @ http://127.0.0.1:3000
 4 threads and 200 connections
 Thread Stats Avg Stdev Max +/- Stdev
 Latency 155.27ms 341.91ms 2.00s 86.25%
 Req/Sec 165.63 144.66 0.91k 76.46%
 Latency Distribution
 50% 1.38ms
 75% 3.83ms
 90% 658.86ms
 99% 1.50s
 18377 requests in 30.09s, 2.28MB read
 Socket errors: connect 0, read 87, write 0, timeout 64
Requests/sec: 610.74
Transfer/sec: 77.53KB

QPS显示下降的很厉害,age此时为19380,起始值为976,其中在运行时,报一些未捕捉reject的错误,应该是这些错误导致了实际结果与预测结果有误.

第二个例子里的

Socket errors: connect 0, read 87, write 0, timeout 64

出现了64个连接超时的,这部分影响也很大。 还有就是一些参数的调优,因为这个锁是独占锁,可以计算出在一段时间内,一共能够成功进行几次操作。

./wrk -t4 -c200 -d30s --latency http://127.0.0.1:3000
Running 30s test @ http://127.0.0.1:3000
 4 threads and 200 connections
 Thread Stats Avg Stdev Max +/- Stdev
 Latency 52.63ms 209.61ms 2.00s 93.22%
 Req/Sec 437.32 414.88 1.65k 73.17%
 Latency Distribution
 50% 766.00us
 75% 0.88ms
 90% 1.51ms
 99% 1.17s
 30797 requests in 30.09s, 3.82MB read
 Socket errors: connect 0, read 90, write 0, timeout 32
Requests/sec: 1023.42
Transfer/sec: 129.93KB

@Fov6363 我机器上的结果。

@nullcc 收到,很强

回到顶部

AltStyle によって変換されたページ (->オリジナル) /