Redis 分布式锁
概述
Redis 分布式锁是一种在分布式系统中实现互斥访问共享资源的机制。它利用 Redis 的原子性操作来确保在多个服务实例之间只有一个实例能够获得锁,从而保证数据的一致性和安全性。
什么时候会用到分布式锁
1. 库存扣减场景
场景描述:电商系统中,多个用户同时购买同一商品时,需要确保库存不会超卖。
问题:如果不使用分布式锁,可能出现以下情况:
- 用户 A 和用户 B 同时购买商品,库存为 1
- 两个请求同时读取库存为 1
- 两个请求都认为可以购买,导致库存变为-1
解决方案:使用分布式锁确保同一时间只有一个请求能修改库存。
2. 订单创建场景
场景描述:防止用户重复提交订单,确保订单号的唯一性。
问题:用户网络延迟时可能重复点击提交按钮,导致创建多个相同订单。
解决方案:使用用户 ID 或订单号作为锁的 key,确保同一用户只能同时创建一个订单。
3. 定时任务调度
场景描述:在分布式环境中,确保定时任务只在一个节点上执行。
问题:多个服务实例同时运行定时任务,导致任务重复执行。
解决方案:使用分布式锁确保任务只在一个节点执行。
4. 缓存更新场景
场景描述:多个服务同时更新缓存时,防止缓存击穿。
问题:缓存失效时,多个请求同时查询数据库并更新缓存。
解决方案:使用分布式锁确保只有一个请求能更新缓存。
5. 分布式计数器
场景描述:实现分布式环境下的原子计数器。
问题:多个服务同时操作计数器时,可能出现数据不一致。
解决方案:使用分布式锁确保计数操作的原子性。
核心特性
1. 互斥性
- 在任意时刻,只有一个客户端能持有锁
- 通过 Redis 的原子性操作保证
2. 防死锁
- 锁具有过期时间,避免持有锁的客户端崩溃导致死锁
- 支持锁的自动释放机制
3. 可重入性
- 同一个客户端可以多次获取同一个锁
- 通过计数器机制实现
4. 高性能
- 基于内存操作,响应速度快
- 支持高并发场景
实现方式
1. SET 命令实现
# 基本实现
SET key value NX EX seconds
# 示例
SET lock:order:123 "client-id" NX EX 30
参数说明:
NX:只有当 key 不存在时才设置EX:设置过期时间(秒)PX:设置过期时间(毫秒)
2. Lua 脚本实现
-- 获取锁
if redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) then
return 1
else
return 0
end
-- 释放锁
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
Spring Boot 实现案例
1. 项目依赖配置
<!-- pom.xml -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
</dependencies>
2. Redis 配置
# application.yml
spring:
redis:
host: localhost
port: 6379
database: 0
timeout: 10000ms
lettuce:
pool:
max-active: 8
max-wait: -1ms
max-idle: 8
min-idle: 0
3. 分布式锁注解
// DistributedLock.java
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {
String key(); // 锁的key
long expireTime() default 30; // 过期时间(秒)
long waitTime() default 5; // 等待时间(秒)
String keyPrefix() default "lock:"; // key前缀
}
4. 分布式锁切面
// DistributedLockAspect.java
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Aspect
@Component
public class DistributedLockAspect {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Around("@annotation(distributedLock)")
public Object around(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable {
String lockKey = distributedLock.keyPrefix() + distributedLock.key();
String lockValue = UUID.randomUUID().toString();
boolean locked = false;
try {
// 尝试获取锁
locked = tryLock(lockKey, lockValue, distributedLock.expireTime(), distributedLock.waitTime());
if (locked) {
return joinPoint.proceed();
} else {
throw new RuntimeException("获取分布式锁失败");
}
} finally {
if (locked) {
releaseLock(lockKey, lockValue);
}
}
}
private boolean tryLock(String lockKey, String lockValue, long expireTime, long waitTime) {
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < waitTime * 1000) {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(result)) {
return true;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}
private void releaseLock(String lockKey, String lockValue) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) " +
"else return 0 end";
redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(lockKey), lockValue);
}
}
5. 库存扣减示例
// ProductService.java
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class ProductService {
@Autowired
private ProductRepository productRepository;
@DistributedLock(key = "product:#{#productId}", expireTime = 30, waitTime = 10)
@Transactional
public boolean reduceStock(Long productId, Integer quantity) {
Product product = productRepository.findById(productId)
.orElseThrow(() -> new RuntimeException("商品不存在"));
if (product.getStock() < quantity) {
throw new RuntimeException("库存不足");
}
product.setStock(product.getStock() - quantity);
productRepository.save(product);
return true;
}
}
6. 订单创建示例
// OrderService.java
import org.springframework.stereotype.Service;
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@DistributedLock(key = "order:user:#{#userId}", expireTime = 60, waitTime = 5)
public Order createOrder(Long userId, OrderRequest request) {
// 检查用户是否已有未支付订单
List<Order> unpaidOrders = orderRepository.findByUserIdAndStatus(userId, "UNPAID");
if (!unpaidOrders.isEmpty()) {
throw new RuntimeException("用户已有未支付订单");
}
// 创建新订单
Order order = new Order();
order.setUserId(userId);
order.setOrderNo(generateOrderNo());
order.setStatus("UNPAID");
order.setAmount(request.getAmount());
return orderRepository.save(order);
}
private String generateOrderNo() {
return "ORDER" + System.currentTimeMillis() + RandomStringUtils.randomNumeric(6);
}
}
7. 定时任务示例
// ScheduledTaskService.java
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class ScheduledTaskService {
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
@DistributedLock(key = "task:daily-report", expireTime = 3600, waitTime = 10)
public void generateDailyReport() {
// 生成日报逻辑
System.out.println("开始生成日报...");
// 执行报表生成逻辑
System.out.println("日报生成完成");
}
@Scheduled(fixedRate = 60000) // 每分钟执行一次
@DistributedLock(key = "task:cleanup", expireTime = 300, waitTime = 5)
public void cleanupExpiredData() {
// 清理过期数据逻辑
System.out.println("开始清理过期数据...");
// 执行清理逻辑
System.out.println("过期数据清理完成");
}
}
8. 缓存更新示例
// CacheService.java
import org.springframework.stereotype.Service;
@Service
public class CacheService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ProductRepository productRepository;
@DistributedLock(key = "cache:product:#{#productId}", expireTime = 30, waitTime = 5)
public Product getProductWithCache(Long productId) {
String cacheKey = "product:" + productId;
// 先从缓存获取
Product product = (Product) redisTemplate.opsForValue().get(cacheKey);
if (product != null) {
return product;
}
// 缓存未命中,从数据库查询
product = productRepository.findById(productId)
.orElseThrow(() -> new RuntimeException("商品不存在"));
// 更新缓存
redisTemplate.opsForValue().set(cacheKey, product, 30, TimeUnit.MINUTES);
return product;
}
}
总结
Redis 分布式锁是构建高可用分布式系统的重要组件。通过合理的设计和实现,可以有效地解决分布式环境下的并发控制问题。在实际应用中,需要根据具体的业务场景选择合适的实现策略,并建立完善的监控和运维体系。
