C# Redis分布式锁(RedLock)
Redis单节点的分布式锁只需要注意三点就可以了:
1.加锁并设置锁的过期时间必须是原子操作;
2.锁的value值必须要有唯一性;
3.释放锁的时候要验证其value值,不是自己加的锁不能释放.
但是单节点分布式锁最大的缺点就是,它只作用在一个Redis节点上,如果该节点挂了,那就挂了.
那可不可以通过哨兵机制来保证高可用呢?
答案是不行.
因为Redis在进行主从复制的时候是异步的.
假设 clientA 拿到锁后,在 master 还没同步到 slave 时,master 发生了故障,这时候 salve 升级为 master,导致锁丢失.
RedLock 的思想是:假设有5个Redis节点.这些节点完全相互独立,不存在主从或者集群机制,都是 master.并且这5个Redis实例运行在5台机器上,这样保证他们不会同时宕掉.
客户端应该按照以下操作来获取锁:
1.获取当前时间戳,假设是T1.
2.依次尝试从这5个Redis实例获取锁.当客户端向Redis请求获取锁时,客户端应该设置超时时间,并且这个超时时间应该小于锁的失效时间.比如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间.这样可以避免Redis已经挂掉的情况下,客户端还在等待响应结果.如果Redis没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁.
3.请求完所有的Redis节点后,只有满足如下两点,才算真正的获取到锁:
1)当前时间 – T1 的时间差小于锁的过期时间.比如T1=00:00:00,然后从5个Redis节点都拿到了锁,当前时间是 00:00:05,也就是说获取锁一共用了5秒钟.假设锁的过期时间是3秒,那么这次获取锁的操作就算失败了.
2)从(N/2+1)个Redis节点都获取到锁.这个很好理解,5个节点,你拿2个,我拿2个,到底算谁的?
总结一句话就是:从开始获取锁计时,只要在锁的过期时间内成功获取到一半以上的锁便算成功,否则算失败.
4.当客户端获取到了锁,锁的真正有效时间 = 锁的过期时间 – 获取锁所使用的时间(也就是第3步计算出来的时间).
5.如果客户端由于某些原因(比如获取锁的实例个数小于N/2+1,或者已经超过了有效时间),没有获取到锁,客户端便会在所有的Redis实例上进行解锁(即使某些Redis实例根本就没有加锁成功),因为可能已经获取了小于 N/2+1个锁,必须释放掉,否则会影响其他客户端获取锁.
关于是否启动AOF永久存储,需要有所取舍.
1.永久启动,由于Redis的过期机制是按照unix时间戳走的,所以当我们重启Redis后,依然会按照规定的时间过期.但是永久启动对性能有一定影响;
2.采用默认的1秒1次.如果在1秒内断电,会导致数据丢失,这时候如果立刻重启会导致锁的互斥性实效.
所以有效的解决方案是,采用AOF,1秒1次,不管什么原因宕机后,等待一定时间再重启.这个时间就是锁的过期时间.
Demo:
安装官方提供的 RedLock.net
Startup:
public class Startup { private RedLockFactory _redLockFactory; public void ConfigureServices(IServiceCollection services) { services.AddControllers(); var endPoints = new List<RedLockEndPoint> { new DnsEndPoint("127.0.0.1", 6379), new DnsEndPoint("127.0.0.1", 6380), new DnsEndPoint("127.0.0.1", 6381) }; _redLockFactory = RedLockFactory.Create(endPoints); services.AddSingleton(typeof(IDistributedLockFactory), _redLockFactory); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime applicationLifetime) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } //应用程序结束时释放,因为不是容器创建的对象 applicationLifetime.ApplicationStopping.Register(() => { _redLockFactory.Dispose(); }); app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); } }
测试api:
[ApiController] public class ValuesController : ControllerBase { private static int _stock = 10; private readonly IDistributedLockFactory _distributedLockFactory; public ValuesController(IDistributedLockFactory distributedLockFactory) { _distributedLockFactory = distributedLockFactory; } [Route("lockTest")] [HttpGet] public async Task<int> DistributedLockTest() { // resource 锁定的资源 var resource = "the-thing-we-are-locking-on"; // expiryTime 锁的过期时间 var expiry = TimeSpan.FromSeconds(5); // waitTime 等待时间 var wait = TimeSpan.FromSeconds(1); // retryTime 等待时间内,多久重试一次 var retry = TimeSpan.FromMilliseconds(250); using (var redLock = await _distributedLockFactory.CreateLockAsync(resource, expiry, wait, retry)) { if (redLock.IsAcquired) { // 模拟执行业务逻辑 await Task.Delay(new Random().Next(100, 500)); if (stock > 0) { stock--; return stock; } return stock; } Console.WriteLine($"{DateTime.Now} : 获取锁失败"); } return -99; } }
测试控制台:
static void Main(string[] args) { HttpClient client = new HttpClient(); var result = Parallel.For(0, 20, (i) => { var stopwatch = new Stopwatch(); stopwatch.Start(); var response = client.GetAsync($"http://localhost:5000/locktest").Result; stopwatch.Stop(); var data = response.Content.ReadAsStringAsync().Result; Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId}, Result:{data}, Time:{stopwatch.ElapsedMilliseconds}"); }); client.Dispose(); Console.ReadKey(); }
测试结果: