1. 设计思路
Redis ZSet 通过 分值(Score)存储任务的执行时间戳,并利用 有序查询 实现定时任务的调度。
主要操作包括:
ZADD:插入任务,
score
设为任务执行时间。ZRANGEBYSCORE:获取到期任务(score ≤ 当前时间)。
ZREM:任务执行后删除,防止重复处理。
2. Java 代码示例
生产者(添加任务)
import redis.clients.jedis.Jedis;
public class DelayQueueProducer {
private static final String DELAY_QUEUE = "delay_queue";
private static final Jedis redis = new Jedis("localhost", 6379);
public static void addTask(String taskId, long delaySeconds) {
long executeTime = System.currentTimeMillis() + (delaySeconds * 1000);
redis.zadd(DELAY_QUEUE, executeTime, taskId);
System.out.println("任务 " + taskId + " 已加入延迟队列,执行时间:" + executeTime);
}
public static void main(String[] args) {
addTask("task_1", 5); // 5秒后执行
addTask("task_2", 10); // 10秒后执行
}
}
消费者(轮询处理任务)
import redis.clients.jedis.Jedis;
import java.util.Set;
public class DelayQueueConsumer {
private static final String DELAY_QUEUE = "delay_queue";
private static final Jedis redis = new Jedis("localhost", 6379);
public static void processTasks() {
while (true) {
long now = System.currentTimeMillis();
Set<String> tasks = redis.zrangeByScore(DELAY_QUEUE, 0, now, 0, 1);
if (!tasks.isEmpty()) {
String task = tasks.iterator().next();
System.out.println("执行任务: " + task);
redis.zrem(DELAY_QUEUE, task); // 任务执行后删除
}
try {
Thread.sleep(1000); // 每秒检查一次
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
processTasks();
}
}
3. 方案优化
减少轮询
使用
ZPOPMIN
代替ZRANGEBYSCORE
,可以原子性获取并删除任务:redis.zpopMin(DELAY_QUEUE);
结合 Redis 订阅(Pub/Sub)机制,避免高频轮询。
保证任务唯一执行
多个消费者竞争式消费任务,确保同一任务不会被重复执行。
使用 Lua 脚本保证原子操作:
local task = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, 1) if #task > 0 then redis.call('ZREM', KEYS[1], task[1]) return task[1] end return nil
4. 适用场景
5. 总结
ZSet 通过 时间戳排序 + 有序查询 实现了高效的延迟队列,适用于定时任务、延时消息、订单超时处理等场景。
相比传统轮询方式,ZSet 能精准获取可执行任务,提高性能并减少资源浪费。