优惠券分发任务 V1 版本流程文档
概述
优惠券分发任务系统负责将优惠券批量分发给指定用户。系统支持两种触发方式:
- 立即发送:创建任务后立即执行分发
- 定时发送:在指定时间由定时任务触发分发
整体架构
核心组件
1. 商户后台模块 (merchant-admin)
- CouponTaskController: 接收创建任务请求
- CouponTaskServiceImpl: 处理任务创建逻辑
- CouponTaskExecuteProducer: 发送 RocketMQ 消息
- CouponTaskJobHandler: XXL-Job 定时任务处理器
2. 分发服务模块 (distribution)
- CouponTaskExecuteConsumer: 消费 RocketMQ 消息
- ReadExcelDistributionListener: 逐行处理 Excel 用户数据
流程一:立即发送
时序图
详细步骤
阶段 1: 任务创建 (CouponTaskServiceImpl)
java
public void createCouponTask(CouponTaskCreateReqDTO requestParam) {
// 1. 查询优惠券模板
CouponTemplateQueryRespDTO couponTemplate =
couponTemplateService.findCouponTemplateById(requestParam.getCouponTemplateId());
// 2. 创建任务实体
CouponTaskDO couponTaskDO = BeanUtil.copyProperties(requestParam, CouponTaskDO.class);
couponTaskDO.setStatus(CouponTaskStatusEnum.IN_PROGRESS.getStatus()); // 立即发送
couponTaskDO.setSendNum(null); // 稍后异步统计
couponTaskMapper.insert(couponTaskDO);
// 3. 异步统计 Excel 行数
excelAsyncExecutor.execute(() -> refreshCouponTaskSendNum(taskId, fileAddress));
// 4. 立即发送消息到 RocketMQ
CouponTaskExecuteEvent event = CouponTaskExecuteEvent.builder()
.couponTaskId(taskId)
.build();
couponTaskExecuteProducer.send(event);
}关键点:
- 任务状态直接设置为
IN_PROGRESS - 立即发送 RocketMQ 消息触发分发
- Excel 行数统计是异步的,不阻塞主流程
阶段 2: 消息消费 (CouponTaskExecuteConsumer)
java
public void onMessage(CouponTaskExecuteEvent event) {
// 1. 验证任务状态
CouponTaskDO couponTaskDO = couponTaskMapper.selectById(event.getCouponTaskId());
if (couponTaskDO.getStatus() != IN_PROGRESS) {
return; // 状态异常,终止
}
// 2. 验证模板状态
CouponTemplateDO couponTemplateDO = couponTemplateMapper.selectOne(...);
if (couponTemplateDO.getStatus() != ACTIVE) {
return; // 模板不可用,终止
}
// 3. 使用 EasyExcel 读取并分发
ReadExcelDistributionListener listener = new ReadExcelDistributionListener(...);
EasyExcel.read(couponTaskDO.getFileAddress(), CouponTaskExcelObject.class, listener)
.sheet()
.doRead();
}阶段 3: 逐行分发 (ReadExcelDistributionListener)
java
public void invoke(CouponTaskExcelObject row, AnalysisContext context) {
// 1. 扣减 Redis 缓存库存
String key = String.format("one-coupon_engine:template:%s", templateId);
Long stock = stringRedisTemplate.opsForHash().increment(key, "stock", -1);
if (stock < 0) return; // 库存不足
// 2. 扣减数据库库存
int result = couponTemplateMapper.decrementCouponTemplateStock(shopNumber, templateId, 1);
if (!SqlHelper.retBool(result)) return; // 库存不足
// 3. 插入用户优惠券记录
UserCouponDO userCouponDO = UserCouponDO.builder()
.couponTemplateId(templateId)
.userId(Long.parseLong(row.getUserId()))
.receiveTime(now)
.validStartTime(now)
.validEndTime(validEndTime)
.status(EFFECTIVE)
.build();
userCouponMapper.insert(userCouponDO);
// 4. 更新用户优惠券缓存 (ZSet)
String userKey = String.format("one-coupon_engine:user-template-list:%s", row.getUserId());
String member = templateId + "_" + userCouponDO.getId();
stringRedisTemplate.opsForZSet().add(userKey, member, now.getTime());
}
public void doAfterAllAnalysed(AnalysisContext context) {
// 所有行处理完成,更新任务状态
CouponTaskDO update = CouponTaskDO.builder()
.id(couponTaskId)
.status(CouponTaskStatusEnum.SUCCESS.getStatus())
.completionTime(new Date())
.build();
couponTaskMapper.updateById(update);
}流程二:定时发送
时序图
详细步骤
阶段 1: 任务创建 (CouponTaskServiceImpl)
java
public void createCouponTask(CouponTaskCreateReqDTO requestParam) {
// 1. 查询优惠券模板
CouponTemplateQueryRespDTO couponTemplate =
couponTemplateService.findCouponTemplateById(requestParam.getCouponTemplateId());
// 2. 创建任务实体
CouponTaskDO couponTaskDO = BeanUtil.copyProperties(requestParam, CouponTaskDO.class);
couponTaskDO.setStatus(CouponTaskStatusEnum.PENDING.getStatus()); // 定时发送
couponTaskDO.setSendTime(requestParam.getSendTime()); // 设置执行时间
couponTaskDO.setSendNum(null);
couponTaskMapper.insert(couponTaskDO);
// 3. 异步统计 Excel 行数
excelAsyncExecutor.execute(() -> refreshCouponTaskSendNum(taskId, fileAddress));
// 4. 不发送消息,等待定时任务触发
log.info("定时发送任务创建成功,任务ID:{},将由 xxl-job 扫描并执行", taskId);
}关键点:
- 任务状态设置为
PENDING - 不立即发送消息,等待定时任务扫描
阶段 2: 定时任务扫描 (CouponTaskJobHandler)
java
@XxlJob(value = "couponTemplateTask")
public void execute() throws Exception {
long initId = 0;
Date now = new Date();
while (true) {
// 1. 查询待执行任务 (分页查询,每次最多 100 条)
List<CouponTaskDO> tasks = fetchPendingTasks(initId, now);
if (CollUtil.isEmpty(tasks)) break;
// 2. 逐个处理任务
for (CouponTaskDO task : tasks) {
distributeCoupon(task);
}
// 3. 更新游标
if (tasks.size() < MAX_LIMIT) break;
initId = tasks.stream().mapToLong(CouponTaskDO::getId).max().orElse(initId);
}
}
private List<CouponTaskDO> fetchPendingTasks(long initId, Date now) {
return couponTaskMapper.selectList(
Wrappers.lambdaQuery(CouponTaskDO.class)
.eq(CouponTaskDO::getStatus, PENDING)
.le(CouponTaskDO::getSendTime, now) // 已到执行时间
.gt(CouponTaskDO::getId, initId) // 游标分页
.last("LIMIT " + MAX_LIMIT)
);
}
private void distributeCoupon(CouponTaskDO task) {
// 1. 更新任务状态为 IN_PROGRESS
couponTaskMapper.updateById(
CouponTaskDO.builder()
.id(task.getId())
.status(IN_PROGRESS)
.build()
);
// 2. 发送消息到 RocketMQ
CouponTaskExecuteEvent event = CouponTaskExecuteEvent.builder()
.couponTaskId(task.getId())
.build();
couponTaskExecuteProducer.send(event);
}关键点:
- 使用游标分页避免深分页问题
- 每次最多处理 100 条任务
- 先更新状态再发送消息,避免重复执行
阶段 3: 消息消费
后续流程与立即发送完全相同,参考流程一的阶段 2 和阶段 3。
数据流转
1. 任务状态流转
2. Redis 数据结构
优惠券模板库存 (Hash)
Key: one-coupon_engine:template:{templateId}
Type: Hash
Field Value
stock 1000 (剩余库存)用户优惠券列表 (ZSet)
Key: one-coupon_engine:user-template-list:{userId}
Type: ZSet
Member (优惠券标识) Score (领取时间戳)
"100_5001" 1705123456789
"200_5002" 1705123567890
"100_5003" 1705123678901Member 格式:{模板ID}_{用户优惠券记录ID}
- 支持同一用户领取多张相同模板的优惠券
- Score 使用时间戳,支持按时间排序查询
3. 数据库表
coupon_task (优惠券任务表)
sql
id BIGINT -- 任务ID
batch_id BIGINT -- 批次ID
coupon_template_id BIGINT -- 优惠券模板ID
shop_number VARCHAR -- 商户编号
send_type TINYINT -- 发送类型 (1:立即 2:定时)
send_time DATETIME -- 发送时间
send_num INT -- 发送数量 (异步统计)
status TINYINT -- 状态 (0:待执行 1:执行中 2:成功 3:失败)
file_address VARCHAR -- Excel 文件地址
operator_id BIGINT -- 操作人ID
completion_time DATETIME -- 完成时间user_coupon (用户优惠券表)
sql
id BIGINT -- 记录ID
coupon_template_id BIGINT -- 优惠券模板ID
user_id BIGINT -- 用户ID
receive_time DATETIME -- 领取时间
receive_count INT -- 领取次数
valid_start_time DATETIME -- 有效期开始
valid_end_time DATETIME -- 有效期结束
source TINYINT -- 来源 (1:平台发放 2:用户领取)
status TINYINT -- 状态 (1:有效 2:已使用 3:已过期)V1 版本的性能问题
问题分析
1. 逐行数据库操作(最严重)
java
// 每读一行 Excel 就执行:
- 1 次 UPDATE (扣减模板库存)
- 1 次 INSERT (插入用户优惠券)影响:
- 10 万用户 = 20 万次数据库操作
- 每次操作都有网络往返开销
- 数据库连接池压力大
- 执行时间长(可能数小时)
2. 逐行 Redis 操作
java
// 每行都操作 Redis:
- 1 次 HINCRBY (扣减库存)
- 1 次 ZADD (添加用户优惠券)影响:
- 10 万用户 = 20 万次 Redis 操作
- 虽然 Redis 快,但网络往返仍有开销
3. 库存扣减的双重操作
java
// 每个用户都要扣减一次模板库存
stringRedisTemplate.opsForHash().increment(key, "stock", -1); // Redis -1
couponTemplateMapper.decrementCouponTemplateStock(..., 1); // MySQL -1问题:
- 应该批量扣减,而不是逐个扣减
- 例如:100 个用户一次性扣减 100
4. 没有事务保护
java
// Redis 扣减成功,但数据库失败
Long stock = stringRedisTemplate.opsForHash().increment(..., -1);
// 如果这里失败,Redis 库存已扣减,无法回滚
int result = couponTemplateMapper.decrementCouponTemplateStock(...);问题:
- Redis 和 MySQL 数据不一致
- 没有补偿机制
5. 单线程阻塞式处理
java
// EasyExcel 逐行回调,单线程顺序处理
public void invoke(CouponTaskExcelObject row, AnalysisContext context) {
// 处理用户 1
// 处理用户 2
// 处理用户 3
// ...
}问题:
- 没有利用多核 CPU
- 无法并发处理
性能测试估算
假设:
- Excel 文件:10 万行用户
- 单次数据库操作:10ms
- 单次 Redis 操作:1ms
当前性能:
总时间 = 100,000 × (10ms + 10ms + 1ms + 1ms)
= 100,000 × 22ms
= 2,200,000ms
= 36.7 分钟优化后性能(批量 + 并发):
批量大小:1000
并发线程:10
总时间 = (100,000 / 1000 / 10) × (100ms + 10ms)
= 10 × 110ms
= 1,100ms
= 1.1 秒性能提升:约 2000 倍
优化建议
1. 批量操作
java
// 攒够 1000 条再批量插入
List<UserCouponDO> batch = new ArrayList<>(1000);
for (CouponTaskExcelObject row : rows) {
batch.add(buildUserCoupon(row));
if (batch.size() >= 1000) {
userCouponMapper.insertBatch(batch);
batch.clear();
}
}2. 库存预扣
java
// 一次性扣减总数
int totalCount = excelRowCount;
couponTemplateMapper.decrementCouponTemplateStock(shopNumber, templateId, totalCount);3. 并发处理
java
// 使用线程池并发处理
ExecutorService executor = Executors.newFixedThreadPool(10);
List<List<CouponTaskExcelObject>> partitions = partition(allRows, 1000);
for (List<CouponTaskExcelObject> partition : partitions) {
executor.submit(() -> processBatch(partition));
}4. 消息拆分
java
// 将大任务拆分成多个小任务
for (int i = 0; i < totalPages; i++) {
CouponTaskExecuteEvent event = CouponTaskExecuteEvent.builder()
.couponTaskId(taskId)
.pageNumber(i)
.pageSize(1000)
.build();
producer.send(event);
}5. 事务补偿
java
// 使用分布式事务或补偿机制
@Transactional
public void distributeCoupon() {
try {
// 扣减库存
// 插入记录
} catch (Exception e) {
// 回滚 Redis
stringRedisTemplate.opsForHash().increment(key, "stock", 1);
throw e;
}
}总结
V1 版本特点
✅ 优点:
- 逻辑清晰,易于理解
- 功能完整,能正常工作
- 支持立即发送和定时发送
❌ 缺点:
- 性能差,处理大量用户时耗时长
- 逐行数据库操作,资源消耗大
- 没有事务保护,可能数据不一致
- 单线程处理,无法利用多核
适用场景
- 小规模分发(< 1000 用户)
- 对实时性要求不高
- 原型验证阶段
改进方向
- 批量操作
- 并发处理
- 消息拆分
- 事务补偿
- 异步化
附录:关键代码位置
| 组件 | 文件路径 |
|---|---|
| 任务创建服务 | merchant-admin/service/impl/CouponTaskServiceImpl.java |
| 定时任务处理器 | merchant-admin/job/CouponTaskJobHandler.java |
| 消息生产者 | merchant-admin/mq/producer/CouponTaskExecuteProducer.java |
| 消息消费者 | distribution/mq/consumer/CouponTaskExecuteConsumer.java |
| Excel 处理监听器 | distribution/service/excel/ReadExcelDistributionListener.java |
