分布式锁
# 前言
本文介绍了分布式锁遇到的问题及对应的解决方案。
# 数据库更新问题
在数据库中创建一个商品表,包含id、name、count字段,这里使用的peewee来操作数据库。
from peewee import *
db = SqliteDatabase('people.db')
class Goods(Model):
id = IntegerField()
count = IntegerField()
name = CharField()
class Meta:
database = db # This model uses the "people.db" database.
2
3
4
5
6
7
8
9
10
11
12
初始化数据表中数据。给id=1的商品初始化商品数量为100
id | name | count |
---|---|---|
1 | clothes | 100 |
使用两个线程消费商品卖出的场景,每次消费数量为10,当商品数量充足时,商品数量减少10。因为程序可能会在任何地方暂停运行,我们使用time.Sleep来构造程序暂停的场景。
import time
def main():
# 卖出的数量
num = 10
goods = Goods.get(Goods.id == 1)
time.sleep(random.randint(1, 3))
if goods.count < num:
print("商品数量不足")
else:
goods.count -= num
goods.save()
if __name__ == '__main__':
import threading
t1 = threading.Thread(target=main)
t2 = threading.Thread(target=main)
t1.start()
t2.start()
t1.join()
t2.join()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
运行后会发现,商品数量变成了90,这明显不符合我们的预期。两个线程都消费了10个,预期结果应该是80才对。
id | name | count |
---|---|---|
1 | clothes | 90 |
执行过程
在t1线程查询到的goods的商品数量为100,保存在变量中,停止,然后t2线程开始查询,查询到的数量也是100,然后往下执行时,t1减10,调用save时是告诉数据库保存的数量为90,结束。t2线程也是100-10,save时,也是保存90。
解决方案
应该让数据库根据自己当前的值更新,而不是使用变量中的值进行更新。
我们恢复商品的数量到100,然后修改代码如下,使用update来让数据库根据当前的值进行更新。
def main():
# 卖出的数量
num = 10
goods = Goods.get(Goods.id == 1)
time.sleep(random.randint(1, 3))
if goods.count < num:
print("商品数量不足")
else:
query = Goods.update(count=Goods.count - num).where(Goods.id == 1)
ok = query.execute()
if ok:
print("更新成功")
else:
print("更新失败")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
运行结果符合我们的预期,商品数量变成了80。
id | name | count |
---|---|---|
1 | clothes | 80 |
# 超卖问题
虽然解决了更新数量不一致的问题,依然没有解决商品超卖问题。商品数量依然是100,但是我们两个线程都想买99件。
def main():
# 卖出的数量
num = 99
goods = Goods.get(Goods.id == 1)
time.sleep(random.randint(1, 3))
if goods.count < num:
print("商品数量不足")
else:
query = Goods.update(count=Goods.count - num).where(Goods.id == 1)
ok = query.execute()
if ok:
print("更新成功")
else:
print("更新失败")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
运行结果是,两个线程都成功买入,数据库表中的数量变成了-98。我们肯定期望一个线程买入成功,而另一个线程执行失败。
id | name | count |
---|---|---|
1 | clothes | -98 |
加锁解决
我们在买入之前,加一个锁,这样同时只能有一个用户在执行买入。
import threading
R = threading.Lock()
def main():
# 卖出的数量
num = 99
R.acquire()
goods = Goods.get(Goods.id == 1)
time.sleep(random.randint(1, 3))
if goods.count < num:
print("商品数量不足")
else:
query = Goods.update(count=Goods.count - num).where(Goods.id == 1)
ok = query.execute()
if ok:
print("更新成功")
else:
print("更新失败")
R.release()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
运行之后,可以看到库存为1件,只有一个线程更新成功,另一个更新失败。
当前是同一个服务的两个线程中,可以拿到通一把锁,但是如果在微服务中,每一次请求因为负载均衡可能请求在不同的服务中,这两个服务甚至不在同一台服务器上,那么这个锁就失效了。
这个时候需要使用分布式锁来解决该问题。
# 基于mysql的乐观锁机制实现
什么是乐观锁?
乐观锁,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制,乐观锁适用于多读的应用类型,这样可以提高吞吐量
实现
从业务中实现乐观锁机制。
在之前的Goods表中添加version字段
class Goods(Model):
id = IntegerField()
count = IntegerField()
name = CharField()
version = IntegerField()
class Meta:
database = db # This model uses the "people.db" database.
2
3
4
5
6
7
8
数据表如下:
id | name | count | version |
---|---|---|---|
1 | clothes | 100 | 1 |
在更新条件中添加版本的判断,确认在更新库存数量时,是否有其他服务更改了该条记录,如果没有则进行更新。并且在更新库存时,给版本号+1,代表着该记录已被修改。
如果没有更新成功,则一直重试,直至成功为止。
def main():
# 卖出的数量
num = 99
while True:
goods = Goods.get(Goods.id == 1)
time.sleep(random.randint(1, 3))
if goods.count < num:
print("商品数量不足")
break
else:
query = Goods.update(count=Goods.count - num, version=Goods.version + 1).where(Goods.id == 1,
Goods.version == goods.version)
ok = query.execute()
if ok:
print("更新成功")
break
else:
print("更新失败")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
运行结果:
更新成功
更新失败
商品数量不足
2
3
库存剩下1件,版本号更新为2,符合我们的预期。
id | name | count | version |
---|---|---|---|
1 | clothes | 1 | 2 |
优点:
- 简单
- 不需要额外的组件
缺点:
并发高时,不断的对数据库进行查询,一样会增加数据库的压力。性能差。
# 基于mysql的悲观锁机制实现
悲观锁,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
缺点:并发性不高,不建议使用
# redis分布式锁
# 分布式锁需要解决的问题
互斥性,任何时刻只能有一个客户拥有锁,不能同时多个客户获取
安全性,只有被持有该锁的用户删除,而不能被其他用户删除
死锁,获取锁的客户单因为某些原因而宕机,而未能释放锁,其他客户端无法获取锁,需要有机制来避免该类问题的发生
- 代码异常,导致无法运行到release
- 你的当前服务器网络问题-脑裂
# 抛出问题
我创建一个redis锁的类,使用acquire加锁,release解锁。
class Lock:
def __init__(self, name):
self.redis_client = redis.Redis(host="10.61.74.37")
self.name = name
def acquire(self):
if not self.redis_client.get(self.name):
self.redis_client.set(self.name, 1)
return True
else:
while True:
import time
time.sleep(1)
if self.redis_client.get(self.name):
self.redis_client.set(self.name, 1)
return True
def release(self):
self.redis_client.delete(self.name)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
在入口处加锁,在出口处释放锁,这样同时只有一个服务能够执行更新操作。
def main():
# 卖出的数量
num = 99
# 商品ID
goods_id = 1
lock = Lock("lock:goods_{}".format(goods_id))
lock.acquire()
goods = Goods.get(Goods.id == goods_id)
time.sleep(random.randint(1, 3))
if goods.count < num:
print("商品数量不足")
else:
query = Goods.update(count=Goods.count - num).where(Goods.id == 1)
ok = query.execute()
if ok:
print("更新成功")
else:
print("更新失败")
lock.release()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
运行之后,发现库存的数量是-98,没有达到预期的效果。
id | name | count |
---|---|---|
1 | clothes | -98 |
我通过打日志的方式,在redis_client.get之后和release中打日志。
class Lock:
def __init__(self, name):
self.redis_client = redis.Redis(host="10.61.74.37")
self.name = name
def acquire(self):
if not self.redis_client.get(self.name):
print("acquire\n")
self.redis_client.set(self.name, 1)
return True
else:
while True:
import time
time.sleep(1)
if self.redis_client.get(self.name):
self.redis_client.set(self.name, 1)
return True
def release(self):
print("release")
self.redis_client.delete(self.name)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
运行结果如下:
acquireacquire
更新成功
release
更新成功
release
2
3
4
5
6
在没有释放锁的时候,两个线程竟然都拿到锁了?
因为,线程t1在执行redis_client.get(self.name)
之后还没有redis_client.set(self.name, 1)
时,线程t2也进来到这一步了,也就是两个线程同时在self.redis_client.get(self.name)
和self.redis_client.set(self.name, 1)
之间。
我们需要保证get和set是原子性的,才能解决该问题。
# redis中原子操作setnx
redis中自带了一个原子性操作setnx,可以进行查询并更新。
class Lock:
def __init__(self, name):
self.redis_client = redis.Redis(host="10.61.74.37")
self.name = name
def acquire(self):
# # 如果不存在,设置值为1,返回1. 否则返回0. 原子操作。
if self.redis_client.setnx(self.name, 1):
return True
else:
while True:
import time
time.sleep(1)
if self.redis_client.setnx(self.name, 1):
return True
def release(self):
self.redis_client.delete(self.name)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
运行后,库存数量为1,符合我们的预期。
id | name | count |
---|---|---|
1 | clothes | 1 |
# 死锁问题
获取锁的客户单因为某些原因而宕机,而未能释放锁,其他客户端无法获取锁,需要有机制来避免该类问题的发生
- 代码异常,导致无法运行到release
- 断点
解决方案:
通过设置过期时间来解决,每次在拿锁时,给redis中对应的key设置一个过期时间,即使出现上面的问题,key也能自动被删除,解决死锁问题。
class Lock:
def __init__(self, name):
self.redis_client = redis.Redis(host="10.61.74.37")
self.name = name
def acquire(self):
if self.redis_client.set(self.name, 1, nx=True, ex=15):
return True
else:
while True:
import time
time.sleep(1)
if self.redis_client.set(self.name, 1, nx=True, ex=15):
return True
2
3
4
5
6
7
8
9
10
11
12
13
14
但是会有新问题:
- 当前线程如果在一段时间后没有执行完,当前的程序没有执行完,然后key过期
- 不安全,另一个线程进来以后会将当前的key给删掉,另一个线程删掉了本该属于我设置的值。
解决方案:
如果当前线程没有执行完,那我的这个线程还应该在适当的时候去续租,将过期时间重新设置。一般是在快要过期的2/3的时候去续租。定时程序可以使用另一个线程去完成。
class Lock:
def __init__(self, name):
self.redis_client = redis.Redis(host="10.61.74.37")
self.name = name
def acquire(self):
if self.redis_client.set(self.name, 1, nx=True, ex=15):
# 启动一个线程然后去定时的刷新这个过期,这个操作最好也是使用lua脚本来完成。
return True
else:
while True:
import time
time.sleep(1)
if self.redis_client.set(self.name, 1, nx=True, ex=15):
return True
2
3
4
5
6
7
8
9
10
11
12
13
14
15
如何防止我设置的值被其他的线程给删除掉?
解决方法
可以拿锁的时候生成一个ID,并将其设置redis中键对应的值,在删除的时候,判断从redis中拿出的值是否为该程序设置的ID,如果不是,则删除失败。
class Lock:
def __init__(self, name, id=None):
self.redis_client = redis.Redis(host="10.61.74.37")
self.name = name
self.id = id if id else str(uuid.uuid4())
def acquire(self):
if self.redis_client.set(self.name, self.id, nx=True, ex=15):
# 启动一个线程然后去定时的刷新这个过期,这个操作最好也是使用lua脚本来完成。
return True
else:
while True:
import time
time.sleep(1)
if self.redis_client.set(self.name, self.id, nx=True, ex=15):
return True
def release(self):
val = str(self.redis_client.get(self.name), encoding="utf8")
if val == self.id:
self.redis_client.delete(self.name)
else:
print("不能删除自己的锁")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
但是还会有新的问题,上面的release方法,get和delete redis中key分成了两个步骤,还是有可能在两者之间中断,所以需要使用redis的lua脚本来实现两者的原子操作
# py-redis-lock和redis-py
该库是开源的分布式锁py实现库,解决了上面的问题。后面有空可以分析下该库的源码。
# redis的分布式锁优缺点
优点
- 性能高
- 简单
- redis本身使用很频繁,不需要额外维护
缺点
依赖了第三方组件
单机的redis挂掉的可能性相对较高,需要引入哨兵机制
redis的cluster的引入会导致刚才的redis的锁会有问题 - redlock