# secKill **Repository Path**: xrea/secKill ## Basic Information - **Project Name**: secKill - **Description**: 基于Redis分布式来实现秒杀业务 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2019-01-12 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 《基于redis分布式锁实现秒杀》——总结 本文章参考简书上: [TOC] [基于redis分布式锁实现“秒杀”](https://www.jianshu.com/p/7a0f24e3d90f) > 秒杀: 1. 从业务角度来说,是用户对同一资源进行争抢 2. 从业务角度来说,是多个线程对资源进行操作
总结:秒杀是对控制线程对资源的争抢,**既要保证高效并发,也要保证操作的正确性** ### 1.实现的方法 对于线程的控制,常用的可以有以下三种方法: 1. 对争夺资源的方法入口加锁synchronized,这个方法是最粗暴的,会降低系统的性能。 2. 在上面的基础上进行优化,我们可以对操作数据库的代码进行加锁,也就是对代码块进行加锁。这个加锁的的粒度也是比较大的,假设两个用户对不同的资源进行操作,比如说购买不同商品,从业务逻辑上来说是对不同的资源进行争抢,所以应该不是秒杀的业务实现,但是从技术层面来说,都对于商品这张表进行了操作,所以也就产生了竞争关系,所以也会降低系统的性能。 3. 既然是并发的问题,理论上说将所有的请求进行串行,使用队列进行管理,自然就不会有并发问题,这样的话对于队列的负载就会很大,一旦消息出错,容易造成消息阻塞和消息丢失情况。这也不是一个理想的方法。 ### 2.解决思维 针对上面出现的问题,我们可以深入思考下,秒杀所出现的竞争关系是对同一个商品进行争抢,对于不同的商品是不应该出现竞争关系,所以我们需要在同一个商品上进行加锁。**分布式锁**可以解决上面的问题。 ### 3.分布式锁 > 分布式锁:控制分布式系统之间同步访问共享资源的一种方式。 很官方的解释,理解起来的话就是不同系统或者同一系统不同主机共享资源,那么访问这些资源,需要互斥来彼此进行干扰,保持一致性。 ### 4.模拟场景 目前分布式锁使用比较广泛的是redis,redis是key-value存储系统,他的特性很适合用来处理高并发:
1. 数据存储在内存中,处理速度非常快 2. 键可以设置过期时间,使用redis键来操作锁,设置过期时间可以有效的防止死锁 3. 单线程,消除了传统数据库串行控制的开销 4. 支持事务,操作都是原子性 现在我们来模拟秒杀的场景:
数据库里有一张表,column分别是商品ID,和商品ID对应的库存量,秒杀成功就将此商品库存量-1。现在假设有1000个线程来秒杀两件商品,500个线程秒杀第一个商品,500个线程秒杀第二个商品 ### 5. 具体的实现 #### 5.1 redis的命令 ``` ## 如果key不存在就设置key以及对应的value, ## 如果存在就不做任何操作 SETNX key value ## 设置键的过期时间 EXPIRE key sceonds ## 删除键 DEL key ``` #### 5.2 需要思考的问题 1. java如何操作redis 2. 怎么实现加锁 3. 如果释放锁 4. 阻塞还是非阻塞 5. 针对异常的处理 >在Spring中已经针对Redis的操作封装了jar包
我们针对商品的操作,其实是针对数据库中对应商品的id进行操作,对商品加锁,可以将商品对应的id来作为key存储在redis中,在对该商品进行操作时,先查看下是否在redis中存在,如果存在的话说明已经有用户在对该商品进行操作了,此时需要等待上面的用户处理完成。用户处理完成之后,可以操作删除redis中对应的键,相当于释放了锁。
采用阻塞方式,当发现已经上锁了,在特定的时间里轮询锁
业务由于种种原因导致失败,没有及时的释放锁,也就是删除redis中对应的key,我们可以添加键的失效时间来自动让锁释放。这样的话就避免了死锁的问题 #### 5.3 代码实现 以上都是理论性的讨论,现在开始基于之前的思考,来使用代码实现(代码基于博客上的代码进行了修改): #### 5.3.1 自定义AOP需要切入的注解 ```java /** * 方法级注解 * * @author zhuqb */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface CacheLock { /** * redis 锁key的前缀 * * @return */ String lockedPrefix() default ""; /** * 轮询锁的时间 默认是2s * * @return */ long timeOut() default 2000; /** * key在redis里存在的时间,1000S * * @return */ int expireTime() default 1000;// } ``` ---- ```java /** * 参数级注解 * 自定义注解 * * @author zhuqb */ @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface LockedComplexObject { /** * 含有成员变量的复杂对象中需要加锁的成员变量,如一个商品对象的商品ID * * @return */ String field() default ""; } ``` ---- ```java /** * 参数级注解 * * @author zhuqb */ @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface LockedObject { /** * 传入的参数值 * * @return */ String value() default ""; } ``` 注解比较简单,下面贴上主要的AOP切入代码,在切入方法执行前进行加锁,方法执行之后释放锁 ```java /** * 针对 含有 @CacheLock 注解的方法 进行切入 * * @author zhuqb */ @Component @Aspect @ComponentScan @EnableAspectJAutoProxy public class AspectAop { public static Logger logger = LoggerFactory.getLogger(AspectAop.class); @Autowired CacheLockService cacheLockService; /** * 对所有方法注解了CacheLock 进行拦截 */ @Pointcut("@annotation(com.amos.ms.type.CacheLock)") public void intercepter() { } /** * 方法执行前进行加锁 * * @param joinPoint */ @Before("intercepter()") public void doBeforeAdvice(JoinPoint joinPoint) { logger.info("这是前置通知"); MethodParams params = this.getValuesFromMethod(joinPoint); // 加锁 boolean lock = cacheLockService.lock(params.getKey(), params.getTimeout(), params.getExpireTime()); if (!lock) { CacheLockUtils.count++; logger.info("获取锁失败"); // 这里不能抛出异常 否则会造成程序死锁 不知道为什么 // throw new CacheLockException("获取锁失败"); } } /** * 后置通知 只要方法执行完成了 就会执行该操作 * * @param joinPoint */ @After("intercepter()") public void doAfterAdvice(JoinPoint joinPoint) { logger.info("这是后置通知"); MethodParams params = this.getValuesFromMethod(joinPoint); cacheLockService.unlock(params.getKey()); } /** * 获取锁操作 需要的参数 * * @param joinPoint * @return */ private MethodParams getValuesFromMethod(JoinPoint joinPoint) { // 获取所有的参数值 Object[] paramVaules = joinPoint.getArgs(); MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); // 获取 @CacheLock Annotation annotation = method.getAnnotation(CacheLock.class); // 拦截的参数 String key = CacheLockUtils.getCacheLockKey(method,paramVaules); if (StringUtils.isBlank(key)) { throw new CacheLockException("没有需要加锁的参数"); } // 对拦截的参数进行封装 String lockKey = LockUtils.getLockKey(((CacheLock) annotation).lockedPrefix(), key); MethodParams params = new MethodParams(); params.setExpireTime(((CacheLock) annotation).expireTime()); params.setTimeout(((CacheLock) annotation).timeOut()); params.setPrefix(((CacheLock) annotation).lockedPrefix()); params.setKey(lockKey); return params; } @Data private static class MethodParams { /** * 拦截锁前缀 */ private String prefix; /** * 拦截对象 */ private String key; /** * 轮询时间 */ private long timeout; /** * 过期时间 */ private long expireTime; } } ``` 加锁采用的是向redis数据库添加键,通过判断键是否存在来判断是否已经加锁,如果已经加锁了,需要在轮询时间内看看是否释放锁,加锁的代码如下: ```java /** * 给键添加锁 * * @param key * @param timeout 轮询时间 * @param expireSeconds 过期时间 * @return */ @Override public boolean lock(String key, long timeout, long expireSeconds) { boolean flag = false; long nanoTime = System.nanoTime(); // 轮询时间 timeout *= MILLI_NANO_TIME; try{ while (System.nanoTime() - nanoTime < timeout) { if (redisService.setnx(key,System.currentTimeMillis()+"",expireSeconds)) { return true; } logger.info("出现锁等待"); // 短暂休眠,避免可能的活锁 Thread.sleep(3); } }catch (Exception e) { if (logger.isDebugEnabled()) { e.printStackTrace(); } flag = false; } return flag; } ``` 释放锁即是删除redis中对应的键 ```java /** * 释放锁 * * @param key * @return */ @Override public boolean unlock(String key) { return redisService.del(key); } ``` SpringBoot可以集成对于Redis的操作 ```java @Autowired StringRedisTemplate stringRedisTemplate; /** * 设置键,并且设置过期时间 * * @param key 键的名称 * @param value 键的值 * @param expireSeconds 过期时间 单位 ms * @return */ @Override public boolean setnx(String key, String value, long expireSeconds) { return stringRedisTemplate.opsForValue().setIfAbsent(key,value,expireSeconds,TimeUnit.MILLISECONDS); } /** * 删除键 * * @param key * @return */ @Override public boolean del(String key) { return stringRedisTemplate.delete(key); } ``` 以上就是主要的业务代码,下面可以通过编写测试方法来测试,测试代码如下: ```java public String secKill() { int threadCount = 1000; int splitPoint = 500; CountDownLatch endCount = new CountDownLatch(threadCount); CountDownLatch beginCount = new CountDownLatch(1); Thread[] threads = new Thread[threadCount]; //起500个线程,秒杀第一个商品 for (int i = 0; i < splitPoint; i++) { threads[i] = new Thread(new Runnable() { @Override public void run() { try { // 等待在一个信号量上,挂起 beginCount.await(); businessService.secKill("test", 10000001L); endCount.countDown(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); threads[i].start(); } //再起500个线程,秒杀第二件商品 for (int i = splitPoint; i < threadCount; i++) { threads[i] = new Thread(new Runnable() { @Override public void run() { try { // 等待在一个信号量上,挂起 beginCount.await(); businessService.secKill("test", 10000002L); endCount.countDown(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); threads[i].start(); } long startTime = System.currentTimeMillis(); //主线程释放开始信号量,并等待结束信号量,这样做保证1000个线程做到完全同时执行,保证测试的正确性 beginCount.countDown(); try { //主线程等待结束信号量 endCount.await(); //观察秒杀结果是否正确 System.out.println(BusinessServiceImpl.inventory.get(10000001L)); System.out.println(BusinessServiceImpl.inventory.get(10000002L)); System.out.println("error count" + CacheLockUtils.count); System.out.println("total cost " + (System.currentTimeMillis() - startTime)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return "aaaa"; } ``` 本文是参考理解《基于redis分布式锁实现秒杀》一文进行的总结以及调整,在此感谢简书作者:lsfire,感谢分享秒杀业务类的实现思路和方法。
代码存放在github上:[基于Redis分布式来实现秒杀业务](https://github.com/endyzhu/secKill)