Skip to content

优惠券分发任务 V1 版本流程文档

概述

优惠券分发任务系统负责将优惠券批量分发给指定用户。系统支持两种触发方式:

  • 立即发送:创建任务后立即执行分发
  • 定时发送:在指定时间由定时任务触发分发

整体架构

核心组件

1. 商户后台模块 (merchant-admin)

  • CouponTaskController: 接收创建任务请求
  • CouponTaskServiceImpl: 处理任务创建逻辑
  • CouponTaskExecuteProducer: 发送 RocketMQ 消息
  • CouponTaskJobHandler: XXL-Job 定时任务处理器

2. 分发服务模块 (distribution)

  • CouponTaskExecuteConsumer: 消费 RocketMQ 消息
  • ReadExcelDistributionListener: 逐行处理 Excel 用户数据

流程一:立即发送

时序图

详细步骤

阶段 1: 任务创建 (CouponTaskServiceImpl)

java
public void createCouponTask(CouponTaskCreateReqDTO requestParam) {
    // 1. 查询优惠券模板
    CouponTemplateQueryRespDTO couponTemplate = 
        couponTemplateService.findCouponTemplateById(requestParam.getCouponTemplateId());
    
    // 2. 创建任务实体
    CouponTaskDO couponTaskDO = BeanUtil.copyProperties(requestParam, CouponTaskDO.class);
    couponTaskDO.setStatus(CouponTaskStatusEnum.IN_PROGRESS.getStatus()); // 立即发送
    couponTaskDO.setSendNum(null); // 稍后异步统计
    couponTaskMapper.insert(couponTaskDO);
    
    // 3. 异步统计 Excel 行数
    excelAsyncExecutor.execute(() -> refreshCouponTaskSendNum(taskId, fileAddress));
    
    // 4. 立即发送消息到 RocketMQ
    CouponTaskExecuteEvent event = CouponTaskExecuteEvent.builder()
        .couponTaskId(taskId)
        .build();
    couponTaskExecuteProducer.send(event);
}

关键点

  • 任务状态直接设置为 IN_PROGRESS
  • 立即发送 RocketMQ 消息触发分发
  • Excel 行数统计是异步的,不阻塞主流程

阶段 2: 消息消费 (CouponTaskExecuteConsumer)

java
public void onMessage(CouponTaskExecuteEvent event) {
    // 1. 验证任务状态
    CouponTaskDO couponTaskDO = couponTaskMapper.selectById(event.getCouponTaskId());
    if (couponTaskDO.getStatus() != IN_PROGRESS) {
        return; // 状态异常,终止
    }
    
    // 2. 验证模板状态
    CouponTemplateDO couponTemplateDO = couponTemplateMapper.selectOne(...);
    if (couponTemplateDO.getStatus() != ACTIVE) {
        return; // 模板不可用,终止
    }
    
    // 3. 使用 EasyExcel 读取并分发
    ReadExcelDistributionListener listener = new ReadExcelDistributionListener(...);
    EasyExcel.read(couponTaskDO.getFileAddress(), CouponTaskExcelObject.class, listener)
        .sheet()
        .doRead();
}

阶段 3: 逐行分发 (ReadExcelDistributionListener)

java
public void invoke(CouponTaskExcelObject row, AnalysisContext context) {
    // 1. 扣减 Redis 缓存库存
    String key = String.format("one-coupon_engine:template:%s", templateId);
    Long stock = stringRedisTemplate.opsForHash().increment(key, "stock", -1);
    if (stock < 0) return; // 库存不足
    
    // 2. 扣减数据库库存
    int result = couponTemplateMapper.decrementCouponTemplateStock(shopNumber, templateId, 1);
    if (!SqlHelper.retBool(result)) return; // 库存不足
    
    // 3. 插入用户优惠券记录
    UserCouponDO userCouponDO = UserCouponDO.builder()
        .couponTemplateId(templateId)
        .userId(Long.parseLong(row.getUserId()))
        .receiveTime(now)
        .validStartTime(now)
        .validEndTime(validEndTime)
        .status(EFFECTIVE)
        .build();
    userCouponMapper.insert(userCouponDO);
    
    // 4. 更新用户优惠券缓存 (ZSet)
    String userKey = String.format("one-coupon_engine:user-template-list:%s", row.getUserId());
    String member = templateId + "_" + userCouponDO.getId();
    stringRedisTemplate.opsForZSet().add(userKey, member, now.getTime());
}

public void doAfterAllAnalysed(AnalysisContext context) {
    // 所有行处理完成,更新任务状态
    CouponTaskDO update = CouponTaskDO.builder()
        .id(couponTaskId)
        .status(CouponTaskStatusEnum.SUCCESS.getStatus())
        .completionTime(new Date())
        .build();
    couponTaskMapper.updateById(update);
}

流程二:定时发送

时序图

详细步骤

阶段 1: 任务创建 (CouponTaskServiceImpl)

java
public void createCouponTask(CouponTaskCreateReqDTO requestParam) {
    // 1. 查询优惠券模板
    CouponTemplateQueryRespDTO couponTemplate = 
        couponTemplateService.findCouponTemplateById(requestParam.getCouponTemplateId());
    
    // 2. 创建任务实体
    CouponTaskDO couponTaskDO = BeanUtil.copyProperties(requestParam, CouponTaskDO.class);
    couponTaskDO.setStatus(CouponTaskStatusEnum.PENDING.getStatus()); // 定时发送
    couponTaskDO.setSendTime(requestParam.getSendTime()); // 设置执行时间
    couponTaskDO.setSendNum(null);
    couponTaskMapper.insert(couponTaskDO);
    
    // 3. 异步统计 Excel 行数
    excelAsyncExecutor.execute(() -> refreshCouponTaskSendNum(taskId, fileAddress));
    
    // 4. 不发送消息,等待定时任务触发
    log.info("定时发送任务创建成功,任务ID:{},将由 xxl-job 扫描并执行", taskId);
}

关键点

  • 任务状态设置为 PENDING
  • 不立即发送消息,等待定时任务扫描

阶段 2: 定时任务扫描 (CouponTaskJobHandler)

java
@XxlJob(value = "couponTemplateTask")
public void execute() throws Exception {
    long initId = 0;
    Date now = new Date();
    
    while (true) {
        // 1. 查询待执行任务 (分页查询,每次最多 100 条)
        List<CouponTaskDO> tasks = fetchPendingTasks(initId, now);
        if (CollUtil.isEmpty(tasks)) break;
        
        // 2. 逐个处理任务
        for (CouponTaskDO task : tasks) {
            distributeCoupon(task);
        }
        
        // 3. 更新游标
        if (tasks.size() < MAX_LIMIT) break;
        initId = tasks.stream().mapToLong(CouponTaskDO::getId).max().orElse(initId);
    }
}

private List<CouponTaskDO> fetchPendingTasks(long initId, Date now) {
    return couponTaskMapper.selectList(
        Wrappers.lambdaQuery(CouponTaskDO.class)
            .eq(CouponTaskDO::getStatus, PENDING)
            .le(CouponTaskDO::getSendTime, now)  // 已到执行时间
            .gt(CouponTaskDO::getId, initId)     // 游标分页
            .last("LIMIT " + MAX_LIMIT)
    );
}

private void distributeCoupon(CouponTaskDO task) {
    // 1. 更新任务状态为 IN_PROGRESS
    couponTaskMapper.updateById(
        CouponTaskDO.builder()
            .id(task.getId())
            .status(IN_PROGRESS)
            .build()
    );
    
    // 2. 发送消息到 RocketMQ
    CouponTaskExecuteEvent event = CouponTaskExecuteEvent.builder()
        .couponTaskId(task.getId())
        .build();
    couponTaskExecuteProducer.send(event);
}

关键点

  • 使用游标分页避免深分页问题
  • 每次最多处理 100 条任务
  • 先更新状态再发送消息,避免重复执行

阶段 3: 消息消费

后续流程与立即发送完全相同,参考流程一的阶段 2 和阶段 3。


数据流转

1. 任务状态流转

2. Redis 数据结构

优惠券模板库存 (Hash)

Key: one-coupon_engine:template:{templateId}
Type: Hash

Field       Value
stock       1000    (剩余库存)

用户优惠券列表 (ZSet)

Key: one-coupon_engine:user-template-list:{userId}
Type: ZSet

Member (优惠券标识)    Score (领取时间戳)
"100_5001"            1705123456789
"200_5002"            1705123567890
"100_5003"            1705123678901

Member 格式{模板ID}_{用户优惠券记录ID}

  • 支持同一用户领取多张相同模板的优惠券
  • Score 使用时间戳,支持按时间排序查询

3. 数据库表

coupon_task (优惠券任务表)

sql
id                  BIGINT       -- 任务ID
batch_id            BIGINT       -- 批次ID
coupon_template_id  BIGINT       -- 优惠券模板ID
shop_number         VARCHAR      -- 商户编号
send_type           TINYINT      -- 发送类型 (1:立即 2:定时)
send_time           DATETIME     -- 发送时间
send_num            INT          -- 发送数量 (异步统计)
status              TINYINT      -- 状态 (0:待执行 1:执行中 2:成功 3:失败)
file_address        VARCHAR      -- Excel 文件地址
operator_id         BIGINT       -- 操作人ID
completion_time     DATETIME     -- 完成时间

user_coupon (用户优惠券表)

sql
id                  BIGINT       -- 记录ID
coupon_template_id  BIGINT       -- 优惠券模板ID
user_id             BIGINT       -- 用户ID
receive_time        DATETIME     -- 领取时间
receive_count       INT          -- 领取次数
valid_start_time    DATETIME     -- 有效期开始
valid_end_time      DATETIME     -- 有效期结束
source              TINYINT      -- 来源 (1:平台发放 2:用户领取)
status              TINYINT      -- 状态 (1:有效 2:已使用 3:已过期)

V1 版本的性能问题

问题分析

1. 逐行数据库操作(最严重)

java
// 每读一行 Excel 就执行:
- 1UPDATE (扣减模板库存)
- 1INSERT (插入用户优惠券)

影响

  • 10 万用户 = 20 万次数据库操作
  • 每次操作都有网络往返开销
  • 数据库连接池压力大
  • 执行时间长(可能数小时)

2. 逐行 Redis 操作

java
// 每行都操作 Redis:
- 1HINCRBY (扣减库存)
- 1ZADD (添加用户优惠券)

影响

  • 10 万用户 = 20 万次 Redis 操作
  • 虽然 Redis 快,但网络往返仍有开销

3. 库存扣减的双重操作

java
// 每个用户都要扣减一次模板库存
stringRedisTemplate.opsForHash().increment(key, "stock", -1);  // Redis -1
couponTemplateMapper.decrementCouponTemplateStock(..., 1);     // MySQL -1

问题

  • 应该批量扣减,而不是逐个扣减
  • 例如:100 个用户一次性扣减 100

4. 没有事务保护

java
// Redis 扣减成功,但数据库失败
Long stock = stringRedisTemplate.opsForHash().increment(..., -1);
// 如果这里失败,Redis 库存已扣减,无法回滚
int result = couponTemplateMapper.decrementCouponTemplateStock(...);

问题

  • Redis 和 MySQL 数据不一致
  • 没有补偿机制

5. 单线程阻塞式处理

java
// EasyExcel 逐行回调,单线程顺序处理
public void invoke(CouponTaskExcelObject row, AnalysisContext context) {
    // 处理用户 1
    // 处理用户 2
    // 处理用户 3
    // ...
}

问题

  • 没有利用多核 CPU
  • 无法并发处理

性能测试估算

假设:

  • Excel 文件:10 万行用户
  • 单次数据库操作:10ms
  • 单次 Redis 操作:1ms

当前性能

总时间 = 100,000 × (10ms + 10ms + 1ms + 1ms)
      = 100,000 × 22ms
      = 2,200,000ms
      = 36.7 分钟

优化后性能(批量 + 并发)

批量大小:1000
并发线程:10

总时间 = (100,000 / 1000 / 10) × (100ms + 10ms)
      = 10 × 110ms
      = 1,100ms
      = 1.1 秒

性能提升:约 2000 倍


优化建议

1. 批量操作

java
// 攒够 1000 条再批量插入
List<UserCouponDO> batch = new ArrayList<>(1000);
for (CouponTaskExcelObject row : rows) {
    batch.add(buildUserCoupon(row));
    if (batch.size() >= 1000) {
        userCouponMapper.insertBatch(batch);
        batch.clear();
    }
}

2. 库存预扣

java
// 一次性扣减总数
int totalCount = excelRowCount;
couponTemplateMapper.decrementCouponTemplateStock(shopNumber, templateId, totalCount);

3. 并发处理

java
// 使用线程池并发处理
ExecutorService executor = Executors.newFixedThreadPool(10);
List<List<CouponTaskExcelObject>> partitions = partition(allRows, 1000);
for (List<CouponTaskExcelObject> partition : partitions) {
    executor.submit(() -> processBatch(partition));
}

4. 消息拆分

java
// 将大任务拆分成多个小任务
for (int i = 0; i < totalPages; i++) {
    CouponTaskExecuteEvent event = CouponTaskExecuteEvent.builder()
        .couponTaskId(taskId)
        .pageNumber(i)
        .pageSize(1000)
        .build();
    producer.send(event);
}

5. 事务补偿

java
// 使用分布式事务或补偿机制
@Transactional
public void distributeCoupon() {
    try {
        // 扣减库存
        // 插入记录
    } catch (Exception e) {
        // 回滚 Redis
        stringRedisTemplate.opsForHash().increment(key, "stock", 1);
        throw e;
    }
}

总结

V1 版本特点

优点

  • 逻辑清晰,易于理解
  • 功能完整,能正常工作
  • 支持立即发送和定时发送

缺点

  • 性能差,处理大量用户时耗时长
  • 逐行数据库操作,资源消耗大
  • 没有事务保护,可能数据不一致
  • 单线程处理,无法利用多核

适用场景

  • 小规模分发(< 1000 用户)
  • 对实时性要求不高
  • 原型验证阶段

改进方向

  • 批量操作
  • 并发处理
  • 消息拆分
  • 事务补偿
  • 异步化

附录:关键代码位置

组件文件路径
任务创建服务merchant-admin/service/impl/CouponTaskServiceImpl.java
定时任务处理器merchant-admin/job/CouponTaskJobHandler.java
消息生产者merchant-admin/mq/producer/CouponTaskExecuteProducer.java
消息消费者distribution/mq/consumer/CouponTaskExecuteConsumer.java
Excel 处理监听器distribution/service/excel/ReadExcelDistributionListener.java