Skip to content

秒杀问题


全局 ID 生成器

基本介绍

符号位:1 bit,永远为 0

时间戳:31 bit,以秒为单位,可以使用 69 年

序列号:32 bit,秒内的计数器,支持每秒产生 232 个不同 ID

计算开始时间

java
public static void main(String[] args) {
    LocalDateTime time = LocalDateTime.of(
        year: 2022,
        month: 1,
        dayOfMonth: 1,
        hour: 0,
        minute: 0,
        second: 0
    );
    long second = time.toEpochSecond(ZoneOffset.UTC);
    System.out.println("second = " + second);
}

代码实现

java
@Component
public class RedisIdWorker {
    /**
     * 开始时间戳
     */
    private static final long BEGIN_TIMESTAMP = 1640995200L;
    /**
     * 序列号的位数
     */
    private static final int COUNT_BITS = 32;

    private StringRedisTemplate stringRedisTemplate;

    public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    public long nextId(String keyPrefix) {
        // 1.生成时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;

        // 2.生成序列号
        // 2.1.获取当前日期,精确到天
        String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        // 2.2.自增长
        long count = stringRedisTemplate
                        .opsForValue()
                        .increment("icr:" + keyPrefix + ":" + date);

        // 3.拼接并返回
        return timestamp << COUNT_BITS | count;
    }
}

秒杀下单(多线程并发安全)

基本思路


java
@Override
public Result seckillVoucher(Long voucherId) {
    // 1.查询优惠券
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    // 2.判断秒杀是否开始
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
        // 尚未开始
        return Result.fail("秒杀尚未开始!");
    }
    // 3.判断秒杀是否已经结束
    if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
        // 尚未开始
        return Result.fail("秒杀已经结束!");
    }
    // 4.判断库存是否充足
    if (voucher.getStock() < 1) {
        // 库存不足
        return Result.fail("库存不足!");
    }
    //5,扣减库存
    boolean success = seckillVoucherService.update()
            .setSql("stock= stock -1")
            .eq("voucher_id", voucherId).update();
    if (!success) {
        //扣减库存
        return Result.fail("库存不足!");
    }
    //6.创建订单
    VoucherOrder voucherOrder = new VoucherOrder();
    // 6.1.订单id
    long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);
    // 6.2.用户id
    Long userId = UserHolder.getUser().getId();
    voucherOrder.setUserId(userId);
    // 6.3.代金券id
    voucherOrder.setVoucherId(voucherId);
    save(voucherOrder);

    return Result.ok(orderId);

}

超卖问题


乐观锁

(1)锁类型介绍

悲观锁:可以实现对于数据的串行化执行,比如 syn,和 lock 都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等


(2)乐观锁的两种实现

CAS 法:在扣减库存的时候查询当前库存是否存在,如果存在,则说明没有其他线程操作过

版本号法:增加 version 属性,具体如下图


乐观锁解决超卖:方案一

以下逻辑的核心含义是:只要我扣减库存时的库存和之前我查询到的库存是一样的,就意味着没有人在中间修改过库存,那么此时就是安全的,但是以上这种方式通过测试发现会有很多失败的情况,失败的原因在于:在使用乐观锁过程中假设 100 个线程同时都拿到了 100 的库存,然后大家一起去进行扣减,但是 100 个人中只有 1 个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败

java
boolean success = seckillVoucherService.update()
                .setSql("stock= stock -1") //set stock = stock -1
                .eq("voucher_id", voucherId)
                .eq("stock",voucher.getStock()).update(); //where id = ? and stock = ?

乐观锁解决超卖:方案二

之前的方式要修改前后都保持一致,但是这样我们分析过,成功的概率太低,所以我们的乐观锁需要变一下,改成 stock 大于 0 即可

java
boolean success = seckillVoucherService.update()
                .setSql("stock= stock -1")
                .eq("voucher_id", voucherId)
                .update()
                .gt("stock",0); //where id = ? and stock > 0

一人一单


存在问题:现在的问题还是和之前一样,并发过来,查询数据库,都不存在订单,所以我们还是需要加锁,但是乐观锁比较适合更新数据,而现在是插入数据,所以我们需要使用悲观锁操作

(1)添加依赖

xml
<!--aspecj-->
<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
</dependency>

(2)启动类配置

java
@EnableAspectJAutoProxy(exposeProxy = true)

(3)代码实现

先开启事务再获取锁,由于事务还未提交,线程查询还没有查询到其他线程事务应该提交的数据,进而可能导致超卖问题

解决方案:先获取锁,再开启事务理论上是可以解决超卖问题的,但是在多线程下还是会出现超卖的情况,然而在非事务方法中调用事务方法,会产生事务失效问题,需要通过代理对象来避免

java
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public Result seckillVoucher(Long voucherId) {
        // 1. 查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        // 2. 判断秒杀是否开始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            // 尚未开始
            return Result.fail("秒杀尚未开始!");
        }
        // 3. 判断秒杀是否已经结束
        if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            // 尚未开始
            return Result.fail("秒杀已经结束!");
        }
        // 4. 判断库存是否充足
        if (voucher.getStock() < 1) {
            // 库存不足
            return Result.fail("库存不足!");
        }

       Long userId = UserHolder.getUser().getId();
       /*
        * 1. 当前方法被spring的事务控制,如果你在方法内部加锁,可能会导致当前方法事务还没有提交,
        * 但是锁已经释放也会导致问题,所以我们选择将当前方法整体包裹起来,确保事务不会出现问题
        *
        * 2. 事务想要生效,还得利用代理来生效,所以这个地方,我们需要获得原始的事务对象来操作事务
        *
        * 3. intern() 这个方法是从常量池中拿到数据,如果我们直接使用 userId.toString()
        * 他拿到的对象实际上是不同的对象,new 出来的对象,我们使用锁必须保证锁必须是同一把,
        * 所以我们需要使用 intern() 方法
        */
       synchronized (userId.toString().intern()) {
           IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
           return proxy.createVoucherOrder(voucherId);
       }
    }

    @Transactional
    public Result createVoucherOrder(Long voucherId) {
        // 一人一单
        Long userId = UserHolder.getUser().getId();
        int count = query()
                .eq("user_id", userId)
                .eq("voucher_id", voucherId)
                .count();
        if (count > 0) {
            return Result.fail("用户已经购买过一次!");
        }

        // 5. 扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherId)
                // 乐观锁解决在多线程下的超卖问题
                .gt("stock", 0)
                .update();
        if (!success) {
            // 扣减失败
            return Result.fail("库存不足!");
        }
        // 6. 创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        // 6.1. 订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 6.2. 用户id
        voucherOrder.setUserId(userId);
        // 6.3. 代金券id
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);
        // 7. 返回订单id
        return Result.ok(orderId);
    }
}

分布式锁(集群并发安全)

集群环境下锁失效

由于现在我们部署了多个 tomcat,每个 tomcat 都有一个属于自己的 jvm,那么假设在服务器 A 的 tomcat 内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的,但是如果现在是服务器 B 的 tomcat 内部,又有两个线程,但是他们的锁对象写的虽然和服务器 A 一样,但是锁对象却不是同一个,所以线程 3 和线程 4 可以实现互斥,但是却无法和线程 1 和线程 2 实现互斥,这就是集群环境下, syn 锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题


实现原理

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁

核心思想:让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行

分布式锁需要满足的条件

(1)可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思

(2)互斥:互斥是分布式锁的最基本的条件,使得程序串行执行

(3)高可用:程序不易崩溃,时时刻刻都保证较高的可用性

(4)高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能

(5)安全性:安全也是程序中必不可少的一环

实现方式


Mysql:mysql 本身就带有锁机制,但是由于 mysql 性能本身一般,所以采用分布式锁的情况下,其实使用 mysql 作为分布式锁比较少见

Redis:redis 作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用 redis 或者 zookeeper 作为分布式锁,利用 setnx 这个方法,如果插入 key 成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁

Zookeeper:zookeeper 也是企业级开发中较好的一个实现分布式锁的方案,由于本套视频并不讲解 zookeeper 的原理和分布式锁的实现,所以不过多阐述

Redis 实现

(1)定义接口

java
public interface ILock {

    /**
     * 尝试获取锁
     * @param timeoutSec 锁持有的超时时间,过期后自动释放
     * @return true 代表获取锁成功;false 代表获取锁失败
     */
    boolean tryLock(Long timeoutSec);

    /**
     * 释放锁
     */
    void unLock();
}

(2)编写实现类

java
public class SimpleRedisLock implements ILock {

    private String name;

    private StringRedisTemplate stringRedisTemplate;

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    public static final String KEY_PREFIX = "lock:";

    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取线程标示
        String threadId = Thread.currentThread().getId()
        // 获取锁
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
        //通过del删除锁
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
}

(3)修改业务代码

java
@Override
public Result seckillVoucher(Long voucherId) {
    // 1. 查询优惠券
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    // 2. 判断秒杀是否开始
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
        // 尚未开始
        return Result.fail("秒杀尚未开始!");
    }
    // 3. 判断秒杀是否已经结束
    if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
        // 尚未开始
        return Result.fail("秒杀已经结束!");
    }
    // 4. 判断库存是否充足
    if (voucher.getStock() < 1) {
        // 库存不足
        return Result.fail("库存不足!");
    }

    Long userId = UserHolder.getUser().getId();

    // 创建锁对象
    SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
    // 获取锁
    boolean isLock = lock.tryLock(1200L);
    // 判断是否获取锁成功
    if (!isLock){
        return Result.fail("不允许重复下单");
    }
    try {
        // 获取代理对象(事务)
        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
        return proxy.createVoucherOrder(voucherId);
    } finally {
        // 释放锁
        lock.unLock();
    }
}

锁误删问题

(1)问题描述

持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放,这时其他线程,线程 2 来尝试获得锁,就拿到了这把锁,然后线程 2 在持有锁执行过程中,线程 1 反应过来,继续执行,而线程 1 执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程 2 的锁进行删除,这就是误删别人锁的情况说明

(2)解决方案

在每个线程释放锁的时候,去判断一下当前这把锁是否属于自己,如果属于自己,则不进行锁的删除,假设还是上边的情况,线程 1 卡顿,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程 1 反应过来,然后删除锁,但是线程 1,一看当前这把锁不是属于自己,于是不进行删除锁逻辑,当线程 2 走到删除锁逻辑时,如果没有卡过自动释放锁的时间点,则判断当前这把锁是属于自己的,于是删除这把锁

核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除

(3)代码改造

java
private String name;

private StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
    this.name = name;
    this.stringRedisTemplate = stringRedisTemplate;
}

public static final String KEY_PREFIX = "lock:";

// true 可以取消随机字符串间的横杠
public static final String ID_PRIFIX = UUID.randomUUID().toString(true) + "-";

@Override
public boolean tryLock(long timeoutSec) {
   // 获取线程标示
   String threadId = ID_PREFIX + Thread.currentThread().getId();
   // 获取锁
   Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
   return Boolean.TRUE.equals(success);
}

public void unlock() {
    // 获取线程标示
    String threadId = ID_PREFIX + Thread.currentThread().getId();
    // 获取锁中的标示
    String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
    // 判断标示是否一致
    if(threadId.equals(id)) {
        // 释放锁
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
}

原子性问题

更为极端的误删逻辑说明:线程 1 现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是此时他的锁到期了,那么此时线程 2 进来,但是线程 1 他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程 1 的拿锁,比锁,删锁,实际上并不是原子性的,我们要防止刚才的情况发生


lua 脚本解决原子性问题

(1)lua 脚本基本介绍

基本语法参考网站:https://www.runoob.com/lua/lua-tutorial.html,

Redis提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性

Lua 是一种编程语言,这里重点介绍 Redis 提供的调用函数,我们可以使用 lua 去操作 redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了,作为 Java 程序员这一块并不需要大家过于精通,只需要知道他有什么作用即可

这里重点介绍Redis提供的调用函数,语法如下

lua
redis.call('命令名称', 'key', '其它参数', ...)

例如,我们要执行set name jack,则脚本是这样

lua
# 执行 set name jack
redis.call('set', 'name', 'jack')

例如,我们要先执行set name Rose,再执行get name,则脚本如下

lua
# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

(2)lua 脚本实现

释放锁的业务流程是这样的

1、获取锁中的线程标示

2、判断是否与指定的标示(当前线程标示)一致

3、如果一致则释放锁(删除)

4、如果不一致则什么都不做

lua
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
  -- 一致,则删除锁
  return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

(3)Java 代码调用

java
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }

public void unlock() {
    // 调用lua脚本
    stringRedisTemplate.execute(
            UNLOCK_SCRIPT,
            Collections.singletonList(KEY_PREFIX + name),
            ID_PREFIX + Thread.currentThread().getId());
}

Redission

问题分析

基于 setnx 实现的分布式锁存在下面的问题

重入问题:重入问题是指获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如 HashTable 这样的代码中,他的方法都是使用 synchronized 修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的 synchronized 和 Lock 锁都是可重入的

不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁

超时释放:我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了 lua 表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患

主从一致性:如果 Redis 提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题



基本介绍

Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid),它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现

Redission提供了分布式锁的多种多样的功能,具体如下


快速入门

(1)引入依赖

xml
<dependency>
	<groupId>org.redisson</groupId>
	<artifactId>redisson</artifactId>
	<version>3.13.6</version>
</dependency>

(2)配置类

java
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        // 配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.150.101:6379")
            .setPassword("123321");
        // 创建RedissonClient对象
        return Redisson.create(config);
    }
}

(3)业务实现

java
@Resource
private RedissonClient redissonClient;

@Override
public Result seckillVoucher(Long voucherId) {
    // 1.查询优惠券
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    // 2.判断秒杀是否开始
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
        // 尚未开始
        return Result.fail("秒杀尚未开始!");
    }
    // 3.判断秒杀是否已经结束
    if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
        // 尚未开始
        return Result.fail("秒杀已经结束!");
    }
    // 4.判断库存是否充足
    if (voucher.getStock() < 1) {
        // 库存不足
        return Result.fail("库存不足!");
    }

    Long userId = UserHolder.getUser().getId();

    //创建锁对象 这个代码不用了,因为我们现在要使用分布式锁
    //SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);

    RLock lock = redissonClient.getLock("lock:order:" + userId);

    //获取锁对象
    //尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
    boolean isLock = lock.tryLock();

    //加锁失败
    if (!isLock) {
        return Result.fail("不允许重复下单");
    }
    try {
        //获取代理对象(事务)
        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
        return proxy.createVoucherOrder(voucherId);
    } finally {
        //释放锁
        lock.unlock();
    }
}

高并发优化思路

(1)合并写请求

合并写请求比较适合应用在写频率较高,写数据比较简单的场景

(2)异步写

异步写则更适合应用在业务比较复杂,业务链较长的场景

1. lua 脚本实现异步秒杀


我们现在来看看整体思路:当用户下单之后,判断库存是否充足只需要导 redis 中去根据 key 找对应的 value 是否大于 0 即可,如果不充足,则直接结束,如果充足,继续在 redis 中判断用户是否可以下单,如果 set 集合中没有这条数据,说明他可以下单,如果 set 集合中没有这条记录,则将 userId 和优惠卷存入到 redis 中,并且返回 0,整个过程需要保证是原子性的,我们可以使用 lua 来操作

当以上判断逻辑走完之后,我们可以判断当前 redis 中返回的结果是否是 0 ,如果是 0,则表示可以下单,则将之前说的信息存入到到 queue 中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功

优化思路

新增秒杀优惠券的同时,将优惠券信息保存到 Redis 中

基于 Lua 脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

如果抢购成功,将优惠券 id 和用户 id 封装后存入阻塞队列

开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

存在的问题

基于阻塞队列的异步秒杀存在内存限制问题、数据安全问题,后续基于 Redis 消息队列继续优化

代码实现

(1)lua 脚本

lua
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2.库存不足,返回1
    return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.3.存在,说明是重复下单,返回2
    return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

(2)业务改造

java
//异步处理线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

//在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
@PostConstruct
private void init() {
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
// 用于线程池处理的任务
// 当初始化完毕后,就会去从对列中去拿信息
private class VoucherOrderHandler implements Runnable {

    @Override
    public void run() {
        while (true){
            try {
                // 1.获取队列中的订单信息
                VoucherOrder voucherOrder = orderTasks.take();
                // 2.创建订单
                handleVoucherOrder(voucherOrder);
            } catch (Exception e) {
                log.error("处理订单异常", e);
            }
        }
    }

    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        //1.获取用户
        Long userId = voucherOrder.getUserId();
        // 2.创建锁对象
        RLock redisLock = redissonClient.getLock("lock:order:" + userId);
        // 3.尝试获取锁
        boolean isLock = redisLock.lock();
        // 4.判断是否获得锁成功
        if (!isLock) {
            // 获取锁失败,直接返回失败或者重试
            log.error("不允许重复下单!");
            return;
        }
        try {
            //注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效
            proxy.createVoucherOrder(voucherOrder);
        } finally {
            // 释放锁
            redisLock.unlock();
        }
    }

    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

    @Override
    public Result seckillVoucher(Long voucherId) {
        Long userId = UserHolder.getUser().getId();
        long orderId = redisIdWorker.nextId("order");
        // 1.执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString(), String.valueOf(orderId)
        );
        int r = result.intValue();
        // 2.判断结果是否为0
        if (r != 0) {
            // 2.1.不为0 ,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
        VoucherOrder voucherOrder = new VoucherOrder();
        // 2.3.订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 2.4.用户id
        voucherOrder.setUserId(userId);
        // 2.5.代金券id
        voucherOrder.setVoucherId(voucherId);
        // 2.6.放入阻塞队列
        orderTasks.add(voucherOrder);
        //3.获取代理对象
        proxy = (IVoucherOrderService)AopContext.currentProxy();
        //4.返回订单id
        return Result.ok(orderId);
    }

    @Transactional
    public  void createVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        // 5.1.查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        // 5.2.判断是否存在
        if (count > 0) {
            // 用户已经购买过了
            log.error("用户已经购买过了");
            return ;
        }

        // 6.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1") // set stock = stock - 1
                .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0
                .update();
        if (!success) {
            // 扣减失败
            log.error("库存不足");
            return ;
        }
        save(voucherOrder);
    }
}

2. Redis与 MQ 异步领卷

(1)业务流程


(2)优化思路

当用户请求来领券时,不是直接领券,而是通过MQ发送一个领券消息。有一个监听器监听消息,完成领券动作

并不是每一个用户都有领券资格,具体要校验了资格才知道。那我们在发送MQ消息后,就要返回给用户结果了,此时该告诉用户是领券成功还是失败呢?

显然,无论告诉他哪种结果都不一定正确。因此,我们应该将校验领券资格的逻辑前置,在校验完成后再发MQ消息,完成数据库写操作

但是,校验领券资格的部分依然会有多次数据库查询,还需要加锁。效率提升并不明显,怎么办?

为了进一步提高效率,我们可以把优惠券相关数据缓存到 Redis 中,这样就可以基于 Redis 完成资格校验,不用访问数据库,效率自然会进一步提高了

(3)优惠卷缓存设计

既然要在缓存中保存优惠券库存,并且校验库存是否充足。那就必须在每次校验通过后,立刻扣减Redis中缓存的库存,否则缓存中库存一直不变,起不到校验是否超发的目的

优惠券一旦发放,就可能有用户来领券,因此应该在发放优惠券的同时直接添加优惠券缓存。而暂停发放时则应该将优惠券的缓存删除,下次再次发放时重新添加

(4)缓存查询优化

在兑换资格校验的时候,或者领券资格校验的时候,会有多次与 Redis 的交互,每一次交互都需要发起一次网络请求,在并发较高的时候可能导致网络拥堵,甚至导致业务变慢

我们能不能在一次请求 Redis 中完成所有校验呢?

普通的 Redis 命令做不到,不过 Redis 提供了一种脚本语法,可以在脚本中编写复杂业务判断。我们只需要向 Redis 发起一次请求,就可以完成对脚本调用,即可实现复杂业务校验,这个脚本就是 LUA 脚本

⚠️ 获取锁逻辑封装

需求分析


使用分布式锁的场景中,只有获取锁后的业务处理不同,其余的步骤都相同,可以通过 AOP 切面编程实现方法的抽取,实现一个通用分布式锁的组件

获取锁失败策略


相关依赖

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
</dependency>

定义注解

java
/**
 * 分布式锁
 **/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Lock {
    /**
     * 加锁key的表达式,支持SPEL表达式
     */
    String name();

    /**
     * 阻塞超时时长,不指定 waitTime 则按照Redisson默认时长
     */
    long waitTime() default 1;

    /**
     * 锁自动释放时长,默认是-1,其实是30秒 + watchDog模式
     */
    long leaseTime() default -1;

    /**
     * 时间单位,默认为秒
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;

    /**
     * 如果设定了false,则方法结束不释放锁,而是等待leaseTime后自动释放
     */
    boolean autoUnlock() default true;

    /**
     * 锁的类型,包括:可重入锁、公平锁、读锁、写锁
     */
    LockType lockType() default LockType.DEFAULT;

    /**
     * 锁策略,包括5种,默认策略是 不断尝试获取锁,直到成功或超时,超时后抛出异常
     */
    LockStrategy lockStrategy() default LockStrategy.FAIL_AFTER_RETRY_TIMEOUT;
}

定义锁类型

java
public enum LockType {
    // 可重入锁
    DEFAULT() {
        @Override
        public RLock getLock(RedissonClient redissonClient, String name) {
            return redissonClient.getLock(name);
        }
    },
    // 公平锁
    FAIR_LOCK() {
        @Override
        public RLock getLock(RedissonClient redissonClient, String name) {
            return redissonClient.getFairLock(name);
        }
    },
    // 读锁
    READ_LOCK() {
        @Override
        public RLock getLock(RedissonClient redissonClient, String name) {
            return redissonClient.getReadWriteLock(name).readLock();
        }
    },
    // 写锁
    WRITE_LOCK() {
        @Override
        public RLock getLock(RedissonClient redissonClient, String name) {
            return redissonClient.getReadWriteLock(name).writeLock();
        }
    },
    ;

    public abstract RLock getLock(RedissonClient redissonClient, String name);
}

定义锁策略

java
public enum LockStrategy {
    /**
     * 不重试,直接结束,返回false
     */
    SKIP_FAST() {
        @Override
        public boolean tryLock(RLock lock, Lock properties) throws InterruptedException {
            return lock.tryLock(0, properties.leaseTime(), properties.timeUnit());
        }
    },
    /**
     * 不重试,直接结束,抛出异常
     */
    FAIL_FAST() {
        @Override
        public boolean tryLock(RLock lock, Lock properties) throws InterruptedException {
            boolean success = lock.tryLock(0, properties.leaseTime(), properties.timeUnit());
            if (!success) {
                throw new RuntimeException("请求太频繁");
            }
            return true;
        }
    },
    /**
     * 重试,直到超时后,直接结束
     */
    SKIP_AFTER_RETRY_TIMEOUT() {
        @Override
        public boolean tryLock(RLock lock, Lock properties) throws InterruptedException {
            return lock.tryLock(properties.waitTime(), properties.leaseTime(), properties.timeUnit());
        }
    },
    /**
     * 重试,直到超时后,抛出异常
     */
    FAIL_AFTER_RETRY_TIMEOUT() {
        @Override
        public boolean tryLock(RLock lock, Lock properties) throws InterruptedException {
            boolean success = lock.tryLock(properties.waitTime(), properties.leaseTime(), properties.timeUnit());
            if (!success) {
                throw new RuntimeException("请求超时");
            }
            return true;
        }
    },
    /**
     * 不停重试,直到成功为止
     */
    KEEP_RETRY() {
        @Override
        public boolean tryLock(RLock lock, Lock properties) throws InterruptedException {
            lock.lock(properties.leaseTime(), properties.timeUnit());
            return true;
        }
    },
    ;

    public abstract boolean tryLock(RLock lock, Lock properties) throws InterruptedException;
}

定义切面类

基于 SPEL 表达式,可在注解中实现动态传递锁名

java
@Aspect
@Component
public class LockAspect {

    private final RedissonClient redissonClient;

    public LockAspect(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

    //通过环绕加锁,方法执行前加锁,方法执行后根据注解使用解锁
    @Around("@annotation(properties)")
    public Object handleLock(ProceedingJoinPoint pjp, Lock properties) throws Throwable {
        if (!properties.autoUnlock() && properties.leaseTime() <= 0) {
            // 不手动释放锁时,必须指定leaseTime时间
            throw new BizIllegalException("leaseTime不能为空");
        }
        // 1.基于SPEL表达式解析锁的 name
        String name = getLockName(properties.name(), pjp);
        // 2.得到锁对象
        RLock rLock = properties.lockType().getLock(redissonClient, name);
        // 3.尝试获取锁
        boolean success = properties.lockStrategy().tryLock(rLock, properties);
        if (!success) {
            // 获取锁失败,结束
            return null;
        }
        try {
            // 4.执行被代理方法
            return pjp.proceed();
        } finally {
            // 5.释放锁
            if (properties.autoUnlock()) {
                rLock.unlock();
            }
        }
    }

    /**
     * SPEL的正则规则
     */
    private static final Pattern pattern = Pattern.compile("\\#\\{([^\\}]*)\\}");
    /**
     * 方法参数解析器
     */
    private static final ParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();

    /**
     * 解析锁名称
     * @param name 原始锁名称
     * @param pjp 切入点
     * @return 解析后的锁名称
     */
    private String getLockName(String name, ProceedingJoinPoint pjp) {
        // 1.判断是否存在spel表达式
        if (StringUtils.isBlank(name) || !name.contains("#")) {
            // 不存在,直接返回
            return name;
        }
        // 2.构建context
        EvaluationContext context = new MethodBasedEvaluationContext(
                TypedValue.NULL, resolveMethod(pjp), pjp.getArgs(), parameterNameDiscoverer);
        // 3.构建解析器
        ExpressionParser parser = new SpelExpressionParser();
        // 3.循环处理
        Matcher matcher = pattern.matcher(name);
        while (matcher.find()) {
            // 2.1.获取表达式
            String tmp = matcher.group();
            // 2.2.尝试解析
            Expression expression = parser.parseExpression("#" + matcher.group(1));
            Object value = expression.getValue(context);
            name = name.replace(tmp, ObjectUtils.nullSafeToString(value));
        }
        return name;
    }

    private Method resolveMethod(ProceedingJoinPoint pjp) {
        // 1.获取方法签名
        MethodSignature signature = (MethodSignature)pjp.getSignature();
        // 2.获取字节码
        Class<?> clazz = pjp.getTarget().getClass();
        // 3.方法名称
        String name = signature.getName();
        // 4.方法参数列表
        Class<?>[] parameterTypes = signature.getMethod().getParameterTypes();
        return tryGetDeclaredMethod(clazz, name, parameterTypes);
    }

    private Method tryGetDeclaredMethod(Class<?> clazz, String name, Class<?> ... parameterTypes){
        try {
            // 5.反射获取方法
            return clazz.getDeclaredMethod(name, parameterTypes);
        } catch (NoSuchMethodException e) {
            Class<?> superClass = clazz.getSuperclass();
            if (superClass != null) {
                // 尝试从父类寻找
                return tryGetDeclaredMethod(superClass, name, parameterTypes);
            }
        }
        return null;
    }
}

定义配置类

java
/**
 * Redisson 配置类
 *
 * 适用于单体 Spring Boot 项目:
 * 1. 从 Spring Boot 的 Redis 配置中读取连接信息
 * 2. 创建 RedissonClient
 * 3. 注册分布式锁切面
 */
@Slf4j
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class RedissonConfig {

    private static final String REDIS_PROTOCOL_PREFIX = "redis://";
    private static final String REDISS_PROTOCOL_PREFIX = "rediss://";

    /**
     * 注册分布式锁切面 Bean
     *
     * LockAspect 不需要加 @Component
     * 因为这里已经通过 @Bean 注册进 Spring 容器
     */
    @Bean
    public LockAspect lockAspect(RedissonClient redissonClient) {
        return new LockAspect(redissonClient);
    }

    /**
     * 创建 RedissonClient
     *
     * 这里的 RedisProperties 来自 Spring Boot 对 application.yml 的自动绑定
     */
    @Bean(destroyMethod = "shutdown")
    public RedissonClient redissonClient(RedisProperties properties) {
        log.debug("开始初始化 RedissonClient");

        RedisProperties.Cluster cluster = properties.getCluster();
        RedisProperties.Sentinel sentinel = properties.getSentinel();
        String password = properties.getPassword();

        int timeout = 3000;
        Duration duration = properties.getTimeout();
        if (duration != null) {
            timeout = (int) duration.toMillis();
        }

        Config config = new Config();

        // 优先使用集群模式
        if (cluster != null && !CollectionUtil.isEmpty(cluster.getNodes())) {
            config.useClusterServers()
                    .addNodeAddress(convert(cluster.getNodes()))
                    .setConnectTimeout(timeout)
                    .setPassword(password);

        // 其次使用哨兵模式
        } else if (sentinel != null && StrUtil.isNotBlank(sentinel.getMaster())) {
            config.useSentinelServers()
                    .setMasterName(sentinel.getMaster())
                    .addSentinelAddress(convert(sentinel.getNodes()))
                    .setConnectTimeout(timeout)
                    // 保持原始功能不变,固定使用 0 号库
                    .setDatabase(0)
                    .setPassword(password);

        // 默认使用单机模式
        } else {
            config.useSingleServer()
                    .setAddress(String.format("redis://%s:%d", properties.getHost(), properties.getPort()))
                    .setConnectTimeout(timeout)
                    // 保持原始功能不变,固定使用 0 号库
                    .setDatabase(0)
                    .setPassword(password);
        }

        return Redisson.create(config);
    }

    /**
     * 将节点地址统一转换为 Redisson 支持的格式
     *
     * 例如:
     * 127.0.0.1:6379 -> redis://127.0.0.1:6379
     */
    private String[] convert(List<String> sourceNodes) {
        List<String> nodes = new ArrayList<>(sourceNodes.size());
        for (String node : sourceNodes) {
            if (!node.startsWith(REDIS_PROTOCOL_PREFIX) && !node.startsWith(REDISS_PROTOCOL_PREFIX)) {
                nodes.add(REDIS_PROTOCOL_PREFIX + node);
            } else {
                nodes.add(node);
            }
        }
        return nodes.toArray(new String[0]);
    }
}

配置实例

yaml
# 单机模式
spring:
  data:
    redis:
      host: 127.0.0.1
      port: 6379
      password: 123456
      timeout: 3s

# 哨兵模式
spring:
  data:
    redis:
      password: 123456
      timeout: 3s
      sentinel:
        master: mymaster
        nodes:
          - 192.168.10.11:26379
          - 192.168.10.12:26379
          - 192.168.10.13:26379

# 集群模式
spring:
spring:
  data:
    redis:
      password: 123456
      timeout: 3s
      cluster:
        nodes:
          - 192.168.10.21:6379
          - 192.168.10.22:6379
          - 192.168.10.23:6379
          - 192.168.10.24:6379
          - 192.168.10.25:6379
          - 192.168.10.26:6379

使用注解

(1)直接传递参数

java
@Lock(name = "lock:coupon:#{userId}")

(2)通过对象获取属性传递参数

语法:T(类名) . 方法名()就是调用静态方法

java
@Lock(name = "lock:coupon:#{T(con.tianji.common.utils.UserContext).getUser()}")