# 49. Redis分布式锁性能优化分析

# 标准答案

✅ Redis分布式锁性能优化策略:

  1. 锁设计优化

    • 使用Lua脚本保证原子性
    • 设置合适的过期时间
    • 实现可重入机制
    • 采用锁续期机制
  2. 性能优化

    • 使用RedLock算法
    • 实现锁分段
    • 优化锁粒度
    • 使用连接池
  3. 可靠性优化

    • 主从一致性保证
    • 故障自动恢复
    • 死锁预防
    • 监控告警

# 答案解析

# 1️⃣ 基础锁实现

public class RedisDistributedLock {
    private final StringRedisTemplate redisTemplate;
    private final String lockKey;
    private final String requestId;
    private final long expireTime;
    
    private static final String LOCK_SCRIPT = 
        "if redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) then " +
        "return 1 else return 0 end";
        
    private static final String UNLOCK_SCRIPT = 
        "if redis.call('get', KEYS[1]) == ARGV[1] then " +
        "return redis.call('del', KEYS[1]) else return 0 end";
    
    public boolean tryLock() {
        try {
            RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
            Long result = redisTemplate.execute(
                script,
                Collections.singletonList(lockKey),
                requestId,
                String.valueOf(expireTime)
            );
            return result != null && result == 1;
        } catch (Exception e) {
            log.error("Error acquiring lock", e);
            return false;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 2️⃣ 性能优化实现

public class OptimizedRedisLock {
    private final RedissonClient redisson;
    private final int segments;
    private final RLock[] locks;
    
    public OptimizedRedisLock(String keyPrefix, int segments) {
        this.segments = segments;
        this.locks = new RLock[segments];
        for (int i = 0; i < segments; i++) {
            locks[i] = redisson.getLock(keyPrefix + ":" + i);
        }
    }
    
    public boolean lockWithSegments(String key) {
        // 计算分段
        int index = getSegment(key);
        RLock lock = locks[index];
        
        try {
            // 带看门狗机制的加锁
            return lock.tryLock(500, 30000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
    
    private int getSegment(String key) {
        return Math.abs(key.hashCode() % segments);
    }
    
    // 批量加锁
    public boolean multiLock(String... keys) {
        RLock[] lockArray = new RLock[keys.length];
        for (int i = 0; i < keys.length; i++) {
            lockArray[i] = locks[getSegment(keys[i])];
        }
        
        RLock multiLock = redisson.getMultiLock(lockArray);
        try {
            return multiLock.tryLock(500, 30000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 常见误区

  • 误区1:过度依赖单个Redis节点
  • 误区2:忽视锁的过期时间设置

# 典型场景与解决方案

# ✅ 库存扣减场景

public class InventoryLockManager {
    private final OptimizedRedisLock lockManager;
    private final InventoryService inventoryService;
    
    public boolean deductStock(String productId, int quantity) {
        String lockKey = "inventory:" + productId;
        boolean locked = false;
        try {
            // 尝试加锁
            locked = lockManager.lockWithSegments(lockKey);
            if (!locked) {
                return false;
            }
            
            // 执行库存扣减
            return inventoryService.deduct(productId, quantity);
        } finally {
            if (locked) {
                lockManager.unlock(lockKey);
            }
        }
    }
    
    // 批量扣减库存
    public boolean batchDeductStock(Map<String, Integer> items) {
        String[] lockKeys = items.keySet().stream()
            .map(id -> "inventory:" + id)
            .toArray(String[]::new);
            
        boolean locked = false;
        try {
            locked = lockManager.multiLock(lockKeys);
            if (!locked) {
                return false;
            }
            
            // 执行批量扣减
            return inventoryService.batchDeduct(items);
        } finally {
            if (locked) {
                lockManager.unlockMulti(lockKeys);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

# 企业实战经验

# Situation(业务背景)

秒杀系统库存锁竞争严重,性能下降。

# Task(核心任务)

优化分布式锁性能,提高系统吞吐量。

# Action(解决方案)

  1. 实现锁分段
  2. 使用RedLock算法
  3. 优化锁粒度
  4. 引入连接池

# Result(结果)

  • 锁竞争降低80%
  • 系统吞吐量提升200%
  • 响应时间降低60%

# 深入追问

🔹 如何处理Redis主从切换问题?

  • 使用RedLock算法
  • 设置适当超时时间
  • 实现故障检测

🔹 如何避免死锁问题?

  • 设置锁超时时间
  • 实现锁续期机制
  • 使用锁监控告警

# 相关面试题

  1. RedLock算法的原理是什么?
  2. 如何实现可重入分布式锁?
  3. 分布式锁的性能瓶颈在哪里?