<noframes id="bhrfl"><address id="bhrfl"></address>

    <address id="bhrfl"></address>

    <noframes id="bhrfl"><address id="bhrfl"><th id="bhrfl"></th></address>

    <form id="bhrfl"><th id="bhrfl"><progress id="bhrfl"></progress></th></form>

    <em id="bhrfl"><span id="bhrfl"></span></em>

    全部
    常見問題
    產品動態
    精選推薦

    Redis實現延遲隊列方法介紹

    管理 管理 編輯 刪除

    延遲隊列,顧名思義它是一種帶有延遲功能的消息隊列。那么,是在什么場景下我才需要這樣的隊列呢?

    1. 背景

    我們先看看以下業務場景:

    • 當訂單一直處于未支付狀態時,如何及時的關閉訂單
    • 如何定期檢查處于退款狀態的訂單是否已經退款成功
    • 在訂單長時間沒有收到下游系統的狀態通知的時候,如何實現階梯式的同步訂單狀態的策略
    • 在系統通知上游系統支付成功終態時,上游系統返回通知失敗,如何進行異步通知實行分頻率發送:15s 3m 10m 30m 30m 1h 2h 6h 15h

    1.1 解決方案

    • 最簡單的方式,定時掃表。例如對于訂單支付失效要求比較高的,每2S掃表一次檢查過期的訂單進行主動關單操作。優點是簡單,缺點是每分鐘全局掃表,浪費資源,如果遇到表數據訂單量即將過期的訂單量很大,會造成關單延遲。
    • 使用RabbitMq或者其他MQ改造實現延遲隊列,優點是,開源,現成的穩定的實現方案,缺點是:MQ是一個消息中間件,如果團隊技術棧本來就有MQ,那還好,如果不是,那為了延遲隊列而去部署一套MQ成本有點大
    • 使用Redis的zset、list的特性,我們可以利用redis來實現一個延遲隊列RedisDelayQueue

    2. 設計目標

    • 實時性:允許存在一定時間的秒級誤差
    • 高可用性:支持單機、支持集群
    • 支持消息刪除:業務會隨時刪除指定消息
    • 消息可靠性:保證至少被消費一次
    • 消息持久化:基于Redis自身的持久化特性,如果Redis數據丟失,意味著延遲消息的丟失,不過可以做主備和集群保證。這個可以考慮后續優化將消息持久化到MangoDB中

    3. 設計方案

    設計主要包含以下幾點:

    • 將整個Redis當做消息池,以KV形式存儲消息
    • 使用ZSET做優先隊列,按照Score維持優先級
    • 使用LIST結構,以先進先出的方式消費
    • ZSET和LIST存儲消息地址(對應消息池的每個KEY)
    • 自定義路由對象,存儲ZSET和LIST名稱,以點對點的方式將消息從ZSET路由到正確的LIST
    • 使用定時器維護路由
    • 根據TTL規則實現消息延遲

    3.1 設計圖

    還是基于有贊的延遲隊列設計,進行優化改造及代碼實現。有贊設計

    4fd36202305041604464350.png

    3.2 數據結構

    • ZING:DELAY_QUEUE:JOB_POOL 是一個Hash_Table結構,里面存儲了所有延遲隊列的信息。KV結構:K=prefix+projectName field = topic+jobId V=CONENT;V由客戶端傳入的數據,消費的時候回傳
    • ZING:DELAY_QUEUE:BUCKET 延遲隊列的有序集合ZSET,存放K=ID和需要的執行時間戳,根據時間戳排序
    • ZING:DELAY_QUEUE:QUEUE LIST結構,每個Topic一個LIST,list存放的都是當前需要被消費的JOB

    69fbc202305041605422122.png

    圖片僅供參考,基本可以描述整個流程的執行過程

    3.3 任務的生命周期

    1. 新增一個JOB,會在ZING:DELAY_QUEUE:JOB_POOL中插入一條數據,記錄了業務方消費方。ZING:DELAY_QUEUE:BUCKET也會插入一條記錄,記錄執行的時間戳
    2. 搬運線程會去ZING:DELAY_QUEUE:BUCKET中查找哪些執行時間戳的RunTimeMillis比現在的時間小,將這些記錄全部刪除;同時會解析出每個任務的Topic是什么,然后將這些任務PUSH到TOPIC對應的列表ZING:DELAY_QUEUE:QUEUE
    3. 每個TOPIC的LIST都會有一個監聽線程去批量獲取LIST中的待消費數據,獲取到的數據全部扔給這個TOPIC的消費線程池
    4. 消費線程池執行會去ZING:DELAY_QUEUE:JOB_POOL查找數據結構,返回給回調結構,執行回調方法。

    3.4 設計要點

    3.4.1 基本概念

    • JOB:需要異步處理的任務,是延遲隊列里的基本單元
    • Topic:一組相同類型Job的集合(隊列)。供消費者來訂閱

    3.4.2 消息結構

    每個JOB必須包含以下幾個屬性

    • jobId:Job的唯一標識。用來檢索和刪除指定的Job信息
    • topic:Job類型??梢岳斫獬删唧w的業務名稱
    • delay:Job需要延遲的時間。單位:秒。(服務端會將其轉換為絕對時間)
    • body:Job的內容,供消費者做具體的業務處理,以json格式存儲
    • retry:失敗重試次數
    • url:通知URL

    3.5 設計細節

    3.5.1 如何快速消費ZING:DELAY_QUEUE:QUEUE

    最簡單的實現方式就是使用定時器進行秒級掃描,為了保證消息執行的時效性,可以設置每1S請求Redis一次,判斷隊列中是否有待消費的JOB。但是這樣會存在一個問題,如果queue中一直沒有可消費的JOB,那頻繁的掃描就失去了意義,也浪費了資源,幸好LIST中有一個BLPOP阻塞原語,如果list中有數據就會立馬返回,如果沒有數據就會一直阻塞在那里,直到有數據返回,可以設置阻塞的超時時間,超時會返回NULL;具體的實現方式及策略會在代碼中進行具體的實現介紹

    3.5.2 避免定時導致的消息重復搬運及消費

    • 使用Redis的分布式鎖來控制消息的搬運,從而避免消息被重復搬運導致的問題
    • 使用分布式鎖來保證定時器的執行頻率

    4. 核心代碼實現

    4.1 技術說明

    技術棧:SpringBoot,Redisson,Redis,分布式鎖,定時器

    注意:本項目沒有實現設計方案中的多Queue消費,只開啟了一個QUEUE,這個待以后優化

    4.2 核心實體

    4.2.1 Job新增對象

    /**
     * 消息結構
     *
     * @author 睜眼看世界
     * @date 2020年1月15日
     */
    @Data
    public class Job implements Serializable {
     
     private static final long serialVersionUID = 1L;
     
     /**
     * Job的唯一標識。用來檢索和刪除指定的Job信息
     */
     @NotBlank
     private String jobId;
     
     
     /**
     * Job類型。可以理解成具體的業務名稱
     */
     @NotBlank
     private String topic;
     
     /**
     * Job需要延遲的時間。單位:秒。(服務端會將其轉換為絕對時間)
     */
     private Long delay;
     
     /**
     * Job的內容,供消費者做具體的業務處理,以json格式存儲
     */
     @NotBlank
     private String body;
     
     /**
     * 失敗重試次數
     */
     private int retry = 0;
     
     /**
     * 通知URL
     */
     @NotBlank
     private String url;
    }
    4.2.2 Job刪除對象
    /**
     * 消息結構
     *
     * @author 睜眼看世界
     * @date 2020年1月15日
     */
    @Data
    public class JobDie implements Serializable {
     
     private static final long serialVersionUID = 1L;
     
     /**
     * Job的唯一標識。用來檢索和刪除指定的Job信息
     */
     @NotBlank
     private String jobId;
     
     
     /**
     * Job類型??梢岳斫獬删唧w的業務名稱
     */
     @NotBlank
     private String topic;
    }

    4.3 搬運線程

    /**
     * 搬運線程
     *
     * @author 睜眼看世界
     * @date 2020年1月17日
     */
    @Slf4j
    @Component
    public class CarryJobScheduled {
     
     @Autowired
     private RedissonClient redissonClient;
     
     /**
     * 啟動定時開啟搬運JOB信息
     */
     @Scheduled(cron = "*/1 * * * * *")
     public void carryJobToQueue() {
     System.out.println("carryJobToQueue --->");
     RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
     try {
     boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
     if (!lockFlag) {
     throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
     }
     RScoredSortedSet<Object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE);
     long now = System.currentTimeMillis();
     Collection<Object> jobCollection = bucketSet.valueRange(0, false, now, true);
     List<String> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
     RList<String> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE);
     readyQueue.addAll(jobList);
     bucketSet.removeAllAsync(jobList);
     } catch (InterruptedException e) {
     log.error("carryJobToQueue error", e);
     } finally {
     if (lock != null) {
     lock.unlock();
     }
     }
     }
    }

    4.4 消費線程

    @Slf4j
    @Component
    public class ReadyQueueContext {
     
     @Autowired
     private RedissonClient redissonClient;
     
     @Autowired
     private ConsumerService consumerService;
     
     /**
     * TOPIC消費線程
     */
     @PostConstruct
     public void startTopicConsumer() {
     TaskManager.doTask(this::runTopicThreads, "開啟TOPIC消費線程");
     }
     
     /**
     * 開啟TOPIC消費線程
     * 將所有可能出現的異常全部catch住,確保While(true)能夠不中斷
     */
     @SuppressWarnings("InfiniteLoopStatement")
     private void runTopicThreads() {
     while (true) {
     RLock lock = null;
     try {
     lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK);
     } catch (Exception e) {
     log.error("runTopicThreads getLock error", e);
     }
     try {
     if (lock == null) {
     continue;
     }
     // 分布式鎖時間比Blpop阻塞時間多1S,避免出現釋放鎖的時候,鎖已經超時釋放,unlock報錯
     boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
     if (!lockFlag) {
     continue;
     }
     
     // 1. 獲取ReadyQueue中待消費的數據
     RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE);
     String topicId = queue.poll(60, TimeUnit.SECONDS);
     if (StringUtils.isEmpty(topicId)) {
     continue;
     }
     
     // 2. 獲取job元信息內容
     RMap<String, Job> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY);
     Job job = jobPoolMap.get(topicId);
     
     // 3. 消費
     FutureTask<Boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消費JobId-->" + job.getJobId());
     if (taskResult.get()) {
     // 3.1 消費成功,刪除JobPool和DelayBucket的job信息
     jobPoolMap.remove(topicId);
     } else {
     int retrySum = job.getRetry() + 1;
     // 3.2 消費失敗,則根據策略重新加入Bucket
     
     // 如果重試次數大于5,則將jobPool中的數據刪除,持久化到DB
     if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
     jobPoolMap.remove(topicId);
     continue;
     }
     job.setRetry(retrySum);
     long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
     log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
     RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
     delayBucket.add(nextTime, topicId);
     // 3.3 更新元信息失敗次數
     jobPoolMap.put(topicId, job);
     }
     } catch (Exception e) {
     log.error("runTopicThreads error", e);
     } finally {
     if (lock != null) {
     try {
     lock.unlock();
     } catch (Exception e) {
     log.error("runTopicThreads unlock error", e);
     }
     }
     }
     }
     }
    }

    4.5 添加及刪除JOB

    /**
     * 提供給外部服務的操作接口
     *
     * @author why
     * @date 2020年1月15日
     */
    @Slf4j
    @Service
    public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
     
     @Autowired
     private RedissonClient redissonClient;
     
     
     /**
     * 添加job元信息
     *
     * @param job 元信息
     */
     @Override
     public void addJob(Job job) {
     
     RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId());
     try {
     boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
     if (!lockFlag) {
     throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
     }
     String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());
     
     // 1. 將job添加到 JobPool中
     RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
     if (jobPool.get(topicId) != null) {
     throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);
     }
     
     jobPool.put(topicId, job);
     
     // 2. 將job添加到 DelayBucket中
     RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
     delayBucket.add(job.getDelay(), topicId);
     } catch (InterruptedException e) {
     log.error("addJob error", e);
     } finally {
     if (lock != null) {
     lock.unlock();
     }
     }
     }
     
     
     /**
     * 刪除job信息
     *
     * @param job 元信息
     */
     @Override
     public void deleteJob(JobDie jobDie) {
     
     RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId());
     try {
     boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
     if (!lockFlag) {
     throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
     }
     String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());
     
     RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
     jobPool.remove(topicId);
     
     RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
     delayBucket.remove(topicId);
     } catch (InterruptedException e) {
     log.error("addJob error", e);
     } finally {
     if (lock != null) {
     lock.unlock();
     }
     }
     }
    }

    5. 待優化的內容

    1. 目前只有一個Queue隊列存放消息,當需要消費的消息大量堆積后,會影響消息通知的時效。改進的辦法是,開啟多個Queue,進行消息路由,再開啟多個消費線程進行消費,提供吞吐量
    2. 消息沒有進行持久化,存在風險,后續會將消息持久化到MangoDB中

    6. 源碼

    更多詳細源碼請在下面地址中獲取

    百度網盤: https://pan.baidu.com/s/14G-bpVthImHD4eosZUNSFA?pwd=yu27

    請登錄后查看

    CRMEB-慕白寒窗雪 最后編輯于2023-05-04 16:12:24

    快捷回復
    回復
    回復
    回復({{post_count}}) {{!is_user ? '我的回復' :'全部回復'}}
    排序 默認正序 回復倒序 點贊倒序

    {{item.user_info.nickname ? item.user_info.nickname : item.user_name}} LV.{{ item.user_info.bbs_level }}

    作者 管理員 企業

    {{item.floor}}# 同步到gitee 已同步到gitee {{item.is_suggest == 1? '取消推薦': '推薦'}}
    {{item.is_suggest == 1? '取消推薦': '推薦'}}
    沙發 板凳 地板 {{item.floor}}#
    {{item.user_info.title || '暫無簡介'}}
    附件

    {{itemf.name}}

    {{item.created_at}}  {{item.ip_address}}
    打賞
    已打賞¥{{item.reward_price}}
    {{item.like_count}}
    {{item.showReply ? '取消回復' : '回復'}}
    刪除
    回復
    回復

    {{itemc.user_info.nickname}}

    {{itemc.user_name}}

    回復 {{itemc.comment_user_info.nickname}}

    附件

    {{itemf.name}}

    {{itemc.created_at}}
    打賞
    已打賞¥{{itemc.reward_price}}
    {{itemc.like_count}}
    {{itemc.showReply ? '取消回復' : '回復'}}
    刪除
    回復
    回復
    查看更多
    打賞
    已打賞¥{{reward_price}}
    2611
    {{like_count}}
    {{collect_count}}
    添加回復 ({{post_count}})

    相關推薦

    快速安全登錄

    使用微信掃碼登錄
    {{item.label}} 加精
    {{item.label}} {{item.label}} 板塊推薦 常見問題 產品動態 精選推薦 首頁頭條 首頁動態 首頁推薦
    取 消 確 定
    回復
    回復
    問題:
    問題自動獲取的帖子內容,不準確時需要手動修改. [獲取答案]
    答案:
    提交
    bug 需求 取 消 確 定
    打賞金額
    當前余額:¥{{rewardUserInfo.reward_price}}
    {{item.price}}元
    請輸入 0.1-{{reward_max_price}} 范圍內的數值
    打賞成功
    ¥{{price}}
    完成 確認打賞

    微信登錄/注冊

    切換手機號登錄

    {{ bind_phone ? '綁定手機' : '手機登錄'}}

    {{codeText}}
    切換微信登錄/注冊
    暫不綁定
    亚洲欧美字幕
    CRMEB客服

    CRMEB咨詢熱線 咨詢熱線

    400-8888-794

    微信掃碼咨詢

    CRMEB開源商城下載 源碼下載 CRMEB幫助文檔 幫助文檔
    返回頂部 返回頂部
    CRMEB客服