<noframes id="bhrfl"><address id="bhrfl"></address>

    <address id="bhrfl"></address>

    <noframes id="bhrfl"><address id="bhrfl"><th id="bhrfl"></th></address>

    <form id="bhrfl"><th id="bhrfl"><progress id="bhrfl"></progress></th></form>

    <em id="bhrfl"><span id="bhrfl"></span></em>

    全部
    常見問題
    產品動態
    精選推薦

    Redis隊列原理解析:讓你的應用程序運行更加穩定!

    管理 管理 編輯 刪除

    一、消息隊列簡介

    消息隊列(Message Queue),字面意思就是存放消息的隊列。最簡單的消息隊列模型包括 3 個角色:

    • 消息隊列:存儲和管理消息,也被稱為消息代理(Message Broker)
    • 生產者:發送消息到消息隊列
    • 消費者:從消息隊列獲取消息并處理消息

    f3db520231213175325232.png

    消息隊列和阻塞隊列的區別:

    ① 消息隊列是在 JVM 以外的獨立服務,所以不受 JVM 內存的限制

    ② 消息隊列不僅僅做數據存儲,還需要確保數據安全,存入到消息隊列中的所有消息都需要做持久化,這樣不管是服務宕機還是重啟,數據都不會丟失。而且消息隊列還會在消息投遞給消費者后,要求消費者做消息確認,如果消費者沒有確認,那么這條消息就會一直存在于消息隊列中,下一次會繼續投遞給消費者,讓消費者繼續處理,直到消息被成功處理。

    二、Redis 提供的消息隊列

    Redis 提供了三種不同的方式來實現消息隊列:

    • list 結構:基于 List 結構模擬消息隊列
    • PubSub:基本的點對點消息隊列
    • Stream:比較完善的消息隊列模型

    2.1 基于 List 結構模擬消息隊列

    消息隊列(Message Queue),字面意思就是存放消息的隊列。而 Redis 的 List 數據結構是一個雙向鏈表,很容易模擬出隊列效果。

    隊列(先進先出)是入口和出口不在一邊,因此我們可以利用:LPUSH 結合 RPOP、或者 RPUSH 結合 LPOP 來實現。

    不過要注意的是,當隊列中沒有消息時 RPOP 或 LPOP 操作會返回 null,并不像 JVM 的阻塞隊列那樣會阻塞并等待下消息。因此這里應該使用 BRPOP 或者 BLPOP 來實現阻塞效果。

    基于 List 的消息隊列有哪些優缺點?

    優點:

    ① 利用 Redis 存儲,不受限于 JVM 內存上限

    ② 基于 Redis 的持久化機制,數據安全性有保證

    ③ 可以滿足消息有序性

    缺點:

    ① 無法避免消息丟失。假設某個消費者從消息隊列(List 結構)中獲取到一條消息,但還未來得及處理,該消費者出現故障,那么這條消息就會丟失,這是因為 POP 命令是 remove and get,會將消息直接從消息隊列中直接移除,這樣其他消費者就獲取不到。

    ② 只支持單消費者。消息隊列(List 結構)中的消息,一旦被某個消費者取走,就會從隊列中移除,其他消費者就獲取不到了,無法實現一條消息被很多消費者消費的需求。

    2.2 基于 PubSub 的消息隊列

    PubSub(發布訂閱)是 Redis2.0 版本引入的消息傳遞模型。顧名思義,消費者可以訂閱一個或多個 channel,生產者向對應 channel 發送消息后,所有訂閱者都能收到相關消息。

    相關命令如下:

    • SUBSCRIBE channel [channel]:訂閱一個或多個頻道
    • PUBLISH channel msg:向一個頻道發送消息
    • PSUBSCRIBE pattern [pattern]:訂閱與 pattern 格式匹配的所有頻道

    關于 PubSub 的具體命令使用方法可以參看官網:?https://redis.io/commands/?group=pubsub

    基于 PubSub 的消息隊列有哪些優缺點:

    優點:

    采用發布訂閱模型,支持多生產、多消費。一條消息可以發給多個消費者,也可以發給一個消費者,而且也支持不同生產者往相同頻道發。

    缺點:

    ① 不支持數據持久化。本身不像 List 結構那樣支持數據持久化,List 結構本身就是用來存儲數據的,而 PubSub 則是用來做消息發送的。因此,當發送一條消息時,但卻沒有任何消費者訂閱,那么該條消息就直接消失了。

    ② 無法避免消息丟失

    ③ 消息堆積有上限,超出時數據丟失。當發送一條消息時,如果有消費者監聽,消費者會將發送過來的消息緩存至消息緩存區,由消費者進行處理。而消費者的緩存空間是有上限的,如果超出了就會丟失。

    2.3 基于 Stream 的消息隊列

    Stream 是 Redis5.0 引入的一種新的數據類型,可以實現一個功能非常完善的消息隊列。

    發送消息的命令:

    1b937202312131756428041.png

    最簡用法如下:

    c6f6320231213175759232.png

    讀取消息的方式之一:XREAD

    159f920231213175856456.png

    使用 XREAD 讀取消息

    50c47202312131801176933.png

    XREAD 阻塞方式,讀取最新的消息:

    6211c202312131802599576.png

    在業務開發中,我們可以循環的調用 XREAD 阻塞方式來查詢最新消息,從而實現持續監聽隊列的效果,偽代碼如下:

    while(true) {
    	// 嘗試讀取隊列中的消息,最多阻塞 2 秒
    	Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");
    	if(msg == null) {
    		continue;
    	}
    	// 處理消息
    	handleMessage(msg);
    }

    注意:當我們指定起始 ID 為 $ 時,代表讀取最新的消息,如果我們處理一條消息的過程中,又有超過 1 條以上的消息到達隊列,則下次獲取時也只能獲取到最新的一條,會出現漏讀消息的問題。

    STREAM 類型消息隊列的 XREAD 命令特點:

    • 消息可回溯。消息讀完后不消失,永久保存在隊列中。
    • 一個消息可以被多個消費者讀取
    • 可以阻塞讀取
    • 有消息漏讀的風險

    2.4 基于 Stream 的消息隊列-消費者組

    消費者組(Consumer Group):將多個消費者劃分到一個組中,監聽同一個隊列。具備下列特點:

    ① 消息分流:隊列中的消息會分流給組內的不同消費者,而不是重復消費,從而加快消息處理的速度。

    處于一個組內的多個消費者實際上是競爭關系,凡是進入到這個組的消息,組內的消費者就會競爭該消息的處理權。這種方式可以大大提高消息的處理速度,避免消息堆積。如果想要一條消息被多個消費者處理,可以添加多個消費者組。

    ② 消息標識:消費者組會維護一個標識,記錄最后一個被處理的消息,哪怕消費者宕機重啟,還會從標識之后讀取消息。確保每一個消息都會被消費。

    ③ 消息確認:消費者獲取消息后,消息處于 pending 狀態,并存入一個 pending-list。當處理完成后需要通過 XACK 來確認消息,標記消息為已處理,才會從 pending-list 移除。

    創建消費者組:

    8ec60202312131804259918.png

    • key:隊列名稱
    • groupName:消費者組名稱
    • ID:起始 ID 標識,$ 代表隊列中最后一個消息,0 代表隊列中第一個消息。
    • MKSTREAM:隊列不存在時自動創建隊列

    其他常見命令:

    bc5e4202312131805209027.png

    從消費者組讀取消息:

    378fd202312131805486853.png

    • group:消費者組名稱
    • consumer:消費者名稱,如果消費者不存在,會自動創建一個消費者
    • count:本次查詢的最大數量
    • BLOCK milliseconds:當沒有消息時最長等待時間
    • NOACK:無需手動 ACK,獲取到消息后自動確認
    • STREAMS key:指定隊列名稱
    • ID:獲取消息的起始 ID:

    “>”:從下一個未消費的消息開始

    其他:根據指定 id 從 pending-list 中獲取已消費但未確認的消息,例如 0,是從 pending-list 中的第一個消息開始。

    使用 Java 代碼處理消費者監聽消息的基本思路:

    whilt(true){
    	// 嘗試監聽隊列,使用阻塞模式,最長等待 2000 毫秒
    	// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
    	// 含義:消費者組 g1 中的消費者 c1 使用阻塞式嘗試從消息隊列 s1 中讀取下一個未被消費的消息,阻塞時長為 2000 毫秒
    	Obeject msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
    	if(msg == null){
    		continue;
    	}
    	try{
    		// 處理消息,完成后一定要 ACK
    		handleMessage(msg);
    	} catch(Exception e){
    		while(true){
    			// XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0
    			// 含義:消費者組 g1 中的消費者 c1 從消息隊列 s1 的pending-list 中讀取第一個消息
    			Obeject msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
    			if(msg == null){ // null 說明沒有異常消息,所有消息都已確認,結束循環
    				break;
    			}
    			try{
    				// 說明有異常消息,再次處理
    				handleMessage(msg);
    			}catch(Exception e){
    				// 再次出現異常,記錄日志,繼續循環
    				continue;
    			}
    		}
    	}
    
    }

    Stream 類型消息隊列的 XREADGROUP 命令特點:

    • 消息可回溯
    • 可以多消費者爭搶消息,加快消費速度
    • 可以阻塞讀取
    • 沒有消息漏讀的風險
    • 有消息確認機制,保證消息至少被消費一次

    三、Redis 消息隊列比對

    41c11202312131807378772.png

    四、基于 Stream 消息隊列實現異步秒殺

    需求:

    ① 創建一個 Stream 類型的消息隊列,名為 stream.orders

    ② 修改之前的秒殺下單 Lua 腳本,在認定有搶購資格后,直接向 stream.orders 中添加消息,內容包含 voucherId、userId、orderId

    ③ 項目啟動時,開啟一個線程任務,嘗試獲取 stream.orders 中的消息,完成下單

    4.1 通過命令行的方式創建消息隊列以及消費者組

    創建隊列名為 stream.orders 且組名為 g1 的消費者組,消息 ID 從 0 開始

    75b0e202312131808304484.png

    4.2 Lua 腳本

    -- 優惠券id
    local voucherId = ARGV[1]
    -- 用戶id
    local userId = ARGV[2]
    -- 訂單id
    local orderId = ARGV[3]
    
    -- 庫存key
    local stockKey = "seckill:stock:"..voucherId
    -- 訂單key
    local orderKey = "seckill:order:"..voucherId
    
    -- 判斷庫存是否充足
    if(tonumber(redis.call('get', stockKey)) <= 0) then
        return 1
    end
    
    -- 判斷用戶是否已經下過單
    if(redis.call('sismember', orderKey, userId) == 1) then
        return 2
    end
    
    -- 扣減庫存
    redis.call('incrby', stockKey, -1)
    
    -- 將 userId 存入當前優惠券的 set 集合
    redis.call('sadd', orderKey, userId)
    
    -- 將訂單信息存入到消息隊列中 xadd stream.orders * k1 v1 k2 v2
    redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
    return 0

    4.3 代碼改進

    @Service
    public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    
        @Autowired
        private ISeckillVoucherService seckillVoucherService;
    
        @Autowired
        private RedisIdWorker redisIdWorker;
    
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        @Autowired
        private RedissonClient redissonClient;
    
        private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    
        static {
            SECKILL_SCRIPT = new DefaultRedisScript();
            SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
            SECKILL_SCRIPT.setResultType(Long.class);
        }
    
    
        /***
         * 創建線程池
         */
        private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
    
        /***
         * 容器啟動時,便開始創建獨立線程,從隊列中讀取數據,創建訂單
         */
        @PostConstruct
        private void init(){
            SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
        }
    
        private class VoucherOrderHandler implements Runnable {
    
            @Override
            public void run() {
                while(true){
                    try {
                        // 獲取消息隊列中的訂單信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
                        List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                                Consumer.from("g1", "c1"),
                                StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2000)),
                                StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                        );
                        // 判斷訂單信息是否為空
                        if(list == null || list.isEmpty()){
                            // 如果為 null,說明沒有消息,繼續下一次循環
                            continue;
                        }
                        // 解析消息
                        MapRecord<String, Object, Object> record = list.get(0);
                        Map<Object, Object> value = record.getValue();
                        VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                        // 創建訂單
                        createVoucherOrder(voucherOrder);
                        // 確認消息 xack s1 g1 id
                        stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
                    } catch (Exception e) {
                        log.error("處理訂單異常!", e);
                        handlePendingList();
                    }
                }
    
            }
    
            private void handlePendingList() {
                while(true){
                    try {
                        // 獲取 pending-list 中的訂單信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
                        List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                                Consumer.from("g1", "c1"),
                                StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                                StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                        );
                        // 判斷訂單信息是否為空
                        if(list == null || list.isEmpty()){
                            break;
                        }
                        // 解析消息
                        MapRecord<String, Object, Object> record = list.get(0);
                        Map<Object, Object> value = record.getValue();
                        VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                        // 創建訂單
                        createVoucherOrder(voucherOrder);
                        // 確認消息 xack s1 g1 id
                        stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
                    } catch (Exception e) {
                        log.error("處理訂單異常!", e);
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException interruptedException) {
                            interruptedException.printStackTrace();
                        }
                    }
                }
    
            }
        }
    
        private void createVoucherOrder(VoucherOrder voucherOrder) {
            // 判斷當前優惠券用戶是否已經下過單
            // 用戶 id
            Long userId = voucherOrder.getUserId();
            Long voucherId = voucherOrder.getVoucherId();
    
            RLock lock = redissonClient.getLock("lock:order:" + userId);
            // 獲取互斥鎖
            // 使用空參意味著不會進行重復嘗試獲取鎖
            boolean isLock = lock.tryLock();
            if (!isLock) {
                // 獲取鎖失敗,直接返回失敗或者重試
                log.error("不允許重復下單!");
                return;
            }
    
    
            try {
                // 查詢訂單
                int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
                if (count > 0) {
                    log.error("不允許重復下單!");
                    return;
                }
    
                // 扣減庫存
                boolean success = seckillVoucherService.update().
                        setSql("stock = stock - 1").
                        eq("voucher_id", voucherId).
                        gt("stock", 0).
                        update();
    
                // 扣減失敗
                if (!success) {
                    log.error("庫存不足!");
                    return;
                }
    
                // 創建訂單
                save(voucherOrder);
            } finally {
                // 釋放鎖
                lock.unlock();
            }
        }
    
        @Override
        public Result seckillVoucher(Long voucherId) {
            UserDTO user = UserHolder.getUser();
            // 生成訂單 id
            Long orderId = redisIdWorker.nextId("order");
            // 執行 lua 腳本
            Long result = stringRedisTemplate.execute(
                    SECKILL_SCRIPT,
                    Collections.emptyList(),
                    voucherId.toString(), user.getId().toString(), orderId.toString());
            int r = result.intValue();
    
            // 判斷結果是否為 0
            if(r != 0){
                // 不為 0 ,代表沒有購買資格
                Result.fail(r == 1 ? "庫存不足!" : "不能重復下單!");
            }
    
            // 返回訂單 id
            return Result.ok(orderId);
        }
    }


    請登錄后查看

    CRMEB-慕白寒窗雪 最后編輯于2023-12-13 18:09:50

    快捷回復
    回復
    回復
    回復({{post_count}}) {{!is_user ? '我的回復' :'全部回復'}}
    排序 默認正序 回復倒序 點贊倒序

    {{item.user_info.nickname ? item.user_info.nickname : item.user_name}} LV.{{ item.user_info.bbs_level }}

    作者 管理員 企業

    {{item.floor}}# 同步到gitee 已同步到gitee {{item.is_suggest == 1? '取消推薦': '推薦'}}
    {{item.is_suggest == 1? '取消推薦': '推薦'}}
    沙發 板凳 地板 {{item.floor}}#
    {{item.user_info.title || '暫無簡介'}}
    附件

    {{itemf.name}}

    {{item.created_at}}  {{item.ip_address}}
    打賞
    已打賞¥{{item.reward_price}}
    {{item.like_count}}
    {{item.showReply ? '取消回復' : '回復'}}
    刪除
    回復
    回復

    {{itemc.user_info.nickname}}

    {{itemc.user_name}}

    回復 {{itemc.comment_user_info.nickname}}

    附件

    {{itemf.name}}

    {{itemc.created_at}}
    打賞
    已打賞¥{{itemc.reward_price}}
    {{itemc.like_count}}
    {{itemc.showReply ? '取消回復' : '回復'}}
    刪除
    回復
    回復
    查看更多
    打賞
    已打賞¥{{reward_price}}
    2933
    {{like_count}}
    {{collect_count}}
    添加回復 ({{post_count}})

    相關推薦

    快速安全登錄

    使用微信掃碼登錄
    {{item.label}} 加精
    {{item.label}} {{item.label}} 板塊推薦 常見問題 產品動態 精選推薦 首頁頭條 首頁動態 首頁推薦
    取 消 確 定
    回復
    回復
    問題:
    問題自動獲取的帖子內容,不準確時需要手動修改. [獲取答案]
    答案:
    提交
    bug 需求 取 消 確 定
    打賞金額
    當前余額:¥{{rewardUserInfo.reward_price}}
    {{item.price}}元
    請輸入 0.1-{{reward_max_price}} 范圍內的數值
    打賞成功
    ¥{{price}}
    完成 確認打賞

    微信登錄/注冊

    切換手機號登錄

    {{ bind_phone ? '綁定手機' : '手機登錄'}}

    {{codeText}}
    切換微信登錄/注冊
    暫不綁定
    亚洲欧美字幕
    CRMEB客服

    CRMEB咨詢熱線 咨詢熱線

    400-8888-794

    微信掃碼咨詢

    CRMEB開源商城下載 源碼下載 CRMEB幫助文檔 幫助文檔
    返回頂部 返回頂部
    CRMEB客服