<noframes id="bhrfl"><address id="bhrfl"></address>

    <address id="bhrfl"></address>

    <noframes id="bhrfl"><address id="bhrfl"><th id="bhrfl"></th></address>

    <form id="bhrfl"><th id="bhrfl"><progress id="bhrfl"></progress></th></form>

    <em id="bhrfl"><span id="bhrfl"></span></em>

    全部
    常見問題
    產品動態
    精選推薦

    消息隊列常見問題整理

    管理 管理 編輯 刪除

    前言

    消息隊列(Message Queue),從廣義上講是一種消息隊列服務中間件,提供一套完整的信息生產、傳遞、消費的軟件系統。

    消息隊列所涵蓋的功能遠不止于隊列(Queue),其本質是兩個進程傳遞信息的一種方法。兩個進程可以分布在同一臺機器上,亦可以分布在不同的機器上。

    眾所周知,進程通信可以通過 RPC(Remote Procedure Call,遠程過程調用)進行,那么我們為什么要用消息隊列這種軟件服務來傳遞消息呢?

    我們通過一個快遞員送快遞的栗子來描述下消息隊列的作用。

    消息隊列

    現實世界的例子

    小明是一名快遞員,通常給一個客戶送快遞分為三步:

    1. 第一步:把快遞運到客戶家門口;
    2. 第二步:敲門;
    3. 第三步:客戶開門取走快遞。

    好了上邊是送快遞最簡單的三步,讓我們想想,這簡單的三步會有什么問題?

    (1)耦合

    小明什么時候完成這一單,完全依賴于客戶的響應速度。

    如果客戶還沒起床,聽見敲門聲再穿衣服開門,可能消耗很多時間。如果客戶沒在家呢?那就要配送失敗了,如何判斷配送失敗呢?小明需要判斷等多久開門(超時時間),打電話判斷是否在家(健康檢查),最終郁悶的離開,下次再來一次(重試)。

    小明直接與客戶交互,對客戶的狀態強依賴,產生了耦合現象。

    (2)同步影響性能

    小明的配送速度受到客戶的響應速度影響極大,有一兩個需要長時間等待的快件,小明的配送效率(吞吐率)會受到很大影響。

    (3)高峰期負載很高

    每次到雙11、618 購物節的時候,小明都很煩躁。快遞太多,來的比送得快,這可如何是好。一旦有客戶因為聯系不上影響了配送效率,就會影響后面客戶的配送,嚴重了還會收到投訴。

    這個時候有個叫X巢的快遞柜出現了,小明可以把快遞放到柜子里,發條短信通知客戶過來取快遞。這樣就不強依賴客戶的響應,大大提高了配送效率。

    這里的快遞柜就相當于是編程世界的消息隊列,讓我們看看消息隊列到底起到了什么作用。

    消息隊列解決什么問題

    • 解耦:此時,小明只需要把快遞放到柜子里,不需要關心客戶是否在家,是否在睡覺??蛻粢膊恍枰恢钡却o小明開門,兩個人解耦了。
    • 異步:小明把快遞放到柜子里發個信息就可以去送下一件,不需同步等待結果。這樣每個快遞的處理速度(響應時間)都變得極短,每天送的快遞數量(吞吐量)也變多了。
    • 削峰:這次又到了雙十一,以前小明一天只能配送 100 個快遞,現在有了快遞柜,配送量(吞吐量和響應速度)增加了好幾倍,甚至數十倍,大大提升了小明的工作效率。這下小明再也不擔心接到投訴了。

    總結

    讓我們簡單總結一下消息隊列的作用,首先需要肯定的是使用消息組件有很多好處,其中最核心的三個是:解耦、異步、削峰

    • 解耦:生產端和消費端不需要相互依賴;
    • 異步:生產端不需要等待消費端響應,直接返回,提高了響應時間和吞吐量;
    • 削峰:打平高峰期的流量,消費端可以以自己的速度處理,同時也無需在高峰期增加太多資源,提高資源利用率。

    引入消息隊列后讓我們子系統間耦合性降低了,異步處理機制減少了系統的響應時間,同時能夠有效的應對請求峰值問題,提升系統的穩定性。但同時引入消息隊列也會帶來一些問題。

    下面我們以 RocketMQ 為例來分析引入 MQ 帶來的問題以及解決方案。

    MQ 常見問題分析

    消息丟失

    消息丟失可以說是 MQ 中普遍存在的問題,不管用哪種 MQ 都無法避免。

    那么有哪些場景會出現消息丟失問題呢?

    我們下面來看一下,整個消息從生產到消費的過程中,哪些地方可能會導致丟消息,以及應該如何避免消息丟失。

    一條消息從生產到被消費,將會經歷三個階段:

    c2e8a202305271633564157.png

    • 生產階段,生產者新建消息,然后通過網絡將消息投遞給 MQ 服務器
    • 存儲階段,消息將會存儲在服務器磁盤中,如果是集群,消息會在這個階段被復制到其他的副本上
    • 消費階段, 消費者將會從 MQ 服務器拉取消息

    以上任一階段都可能會丟失消息:

    • 生產階段:生產者發送消息時,由于網絡原因,發送到 MQ 失敗了。
    • 存儲階段:MQ 服務器持久化時,服務器宕機、重啟導致丟失消息。
    • 消費階段:消息消費者剛讀取消息,已經 ack 確認了,但業務還沒處理完,服務就被重啟了。

    消息丟失解決方案

    生產階段

    RocketMQ 提供了 3 種發送消息方式,分別是:

    • 同步發送:生產者向 MQ 發送消息,阻塞當前線程等待 MQ 服務器響應發送結果。
    • 異步發送:生產者首先構建一個向服務器發送消息的任務,把該任務提交給線程池,等執行完該任務時,回調用戶自定義的回調函數,執行處理結果。
    • Oneway 發送:生產者發起消息發送請求后并不會等待服務器的響應結果,也不會調用回調函數,即不關心消息的最終發送結果。

    Oneway 相對前兩種發送方式來說是一種不可靠的消息發送方式,因此要保證消息發送的可靠性,我們只考慮同步和異步的發送方式。

    (1)同步發送可靠性保證

    采用同步阻塞式的發送,然后同步檢查 MQ 服務器返回的狀態來判斷消息是否持久化成功。如果發送超時或者失敗,則會自動重試,如果重試再失敗,就會以返回值或者異常的方式告知用戶。

    我們在編寫發送消息代碼時,需要注意,正確處理返回值或者捕獲異常,就可以保證這個階段的消息不會丟失。

    同步發送,代碼如下:

    public void send() throws Exception {
        String message = "test producer";
        Message sendMessage = new Message("topic1", "tag1", message.getBytes());
        sendMessage.putUserProperty("name1","value1");
        SendResult sendResult = null;
    
        DefaultMQProducer producer = new DefaultMQProducer("testGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.setRetryTimesWhenSendFailed(3);
        try {
            sendResult = producer.send(sendMessage);
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (sendResult != null) {
            System.out.println(sendResult.getSendStatus());
        }
    }
    

    同步發送會返回 4 個狀態碼:

    SEND_OK:消息發送成功。

    • 需要注意的是,消息發送到服務器后,還有兩個操作:「消息刷盤」和「消息同步到 slave 節點」,默認這兩個操作都是異步的,只有把這兩個操作都改為同步,SEND_OK 這個狀態才能真正表示發送成功。
    • FLUSH_DISK_TIMEOUT:消息發送成功但是消息刷盤超時。
    • FLUSH_SLAVE_TIMEOUT:消息發送成功但是消息同步到 slave 節點時超時。
    • SLAVE_NOT_AVAILABLE:消息發送成功但是 broker 的 slave 節點不可用。

    根據返回的狀態碼,可以做消息重試,這里設置的重試次數是 3。

    消息重試時,消費端一定要做好冪等處理。

    既然是同步發送肯定就比較耗費一些時間,如果你的業務比較注重 RT 那就可以使用異步發送的方式。

    (2)異步發送可靠性保證

    異步發送時,則需要在回調方法里進行檢查。這個地方是需要特別注意的,很多丟消息的原因就是,我們使用了異步發送,卻沒有在回調中檢查發送結果。

    具體的業務實現可以根據發送的結果信息來判斷是否需要重試來保證消息的可靠性。

    異步發送,代碼如下:

    public void sendAsync() throws Exception {
        String message = "test producer";
        Message sendMessage = new Message("topic1", "tag1", message.getBytes());
        sendMessage.putUserProperty("name1","value1");
    
        DefaultMQProducer producer = new DefaultMQProducer("testGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.setRetryTimesWhenSendFailed(3);
        producer.send(sendMessage, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                
            }
    
            @Override
            public void onException(Throwable e) {
                // TODO 可以在這里加入重試邏輯
            }
        });
    }
    

    異步發送,可以重寫回調函數,回調函數捕獲到 Exception 時表示發送失敗,這時可以進行重試,這里設置的重試次數是 3。

    存儲階段

    默認的情況下,消息隊列為了快速響應,在接受到生產者的請求,將消息保存在內存成功之后,就會立刻返回 ACK 響應給生產者。

    RocketMQ 的刷盤方式分為「同步刷盤」和「異步刷盤」兩種。

    • 異步刷盤:消息寫入 CommitLog 時,并不會直接寫入磁盤,而是先寫入 PageCache 緩存后返回成功,然后用后臺線程異步把消息刷入磁盤。異步刷盤提高了消息吞吐量,但是可能會有消息丟失的情況,比如斷點導致機器停機,PageCache 中沒來得及刷盤的消息就會丟失。

    同步刷盤:消息寫入內存的 PageCache 后,立刻通知刷盤線程刷盤,然后等待刷盤完成,如果消息未在約定的時間內(默認 5 s)刷盤成功,就返回FLUSH_DISK_TIMEOUT,Producer 收到這個響應后,可以進行重試。同步刷盤策略保證了消息的可靠性,同時降低了吞吐量,增加了延遲。

    • 要開啟同步刷盤,需要增加下面配置:flushDiskType=SYNC_FLUSH

    RocketMQ 默認的是異步刷盤,就有可能導致消息還未刷到硬盤上就丟失了,可以通過設置為同步刷盤的方式來保證消息可靠性,這樣即使 MQ 掛了,恢復的時候也可以從磁盤中去恢復消息。

    如果是 Broker 是由多個節點組成的集群,需要將 Broker 集群配置成:至少將消息發送到 2 個以上的節點,再給客戶端回復發送確認響應。 這樣當某個 Broker 宕機時,其他的 Broker 可以替代宕機的 Broker,也不會發生消息丟失。

    Broker 采用集群配置時,需要注意的一點是:消息發送到 master 節點后,slave 節點會從 master 拉取消息保持跟 master 的一致。這個過程默認是異步的,即 master 收到消息后,不等 slave 節點復制消息就直接給 Producer 返回成功。

    這樣會有一個問題,如果 slave 節點還沒有完成消息復制,這時 master 宕機了,進行主備切換后就會有消息丟失。

    為了避免這個問題,可以采用 slave 節點同步復制消息,即等 slave 節點復制消息成功后再給 Producer 返回發送成功。只需要增加下面的配置:brokerRole=SYNC_MASTER。

    消費階段

    消費階段采用和生產階段類似的確認機制來保證消息的可靠傳遞,客戶端從 Broker 拉取消息后,執行用戶的消費業務邏輯:

    • 如果 Consumer 消費成功,返回 CONSUME_SUCCESS,提交 offset 并從 Broker 拉取下一批消息
    • 如果 Consumer 消費失敗,下次拉消息的時候還會返回同一條消息,即進行消費重試。
    消費重試

    RocketMQ 認為消息消費失敗需要重試的場景有三種:

    • 返回 RECONSUME_LATER
    • 返回 null
    • 拋出異常

    Broker 收到這個響應后,會把這條消息放入重試隊列,Topic 名字為%RETRY% + consumerGroup。

    注意:

    Broker 默認最多重試 16 次,如果重試 16 次都失敗,就把這條消息放入「死信隊列」,Consumer 可以訂閱死信隊列進行消費。

    重試只有在集群模式(MessageModel.CLUSTERING)下生效,在廣播模式(MessageModel.BROADCASTING)下是不生效的。

    Consumer 端一定要做好冪等處理。

    其實重試 3 次都失敗就可以說明代碼有問題,這時 Consumer 可以把消息存入本地,給 Broker 返回 CONSUME_SUCCESS 來結束重試。

    死信隊列:未能成功消費的消息,消息隊列并不會立刻將消息丟棄,而是將消息發送到死信隊列,其名稱是在原隊列名稱前加 %DLQ%,如果消息最終進入了死信隊列,則可以通過 RocketMQ 提供的相關接口從死信隊列獲取到相應的消息,保證了消息消費的可靠性。

    上面方案看似萬無一失,每個階段都能保證消息的不丟失,但在分布式系統中,故障不可避免,作為消息生產端,你并不能保證 MQ 是不是弄丟了你的消息,消費者是否消費了你的消息,所以,本著 Design for Failure 的設計原則,我們需要一種機制,來 Check 消息是否丟失了。

    檢測消息丟失的方法

    總體方案解決思路為:在消息生產端,給每個發出的消息都指定一個全局唯一 ID,或者附加一個連續遞增的版本號,然后在消費端做對應的版本校驗。

    可以利用攔截器機制。在生產端發送消息之前,通過攔截器將消息版本號注入消息中(版本號可以采用連續遞增的 ID 生成,也可以通過分布式全局唯一 ID生成)。然后在消費端收到消息后,再通過攔截器檢測版本號的連續性或消費狀態,這樣實現的好處是消息檢測的代碼不會侵入到業務代碼中,可以通過單獨的任務來定位丟失的消息,做進一步的排查。

    如果同時存在多個消息生產端和消息消費端,通過版本號遞增的方式就很難實現了,因為不能保證版本號的唯一性,此時只能通過全局唯一 ID 的方案來進行消息檢測,具體的實現原理和版本號遞增的方式一致。

    重復消息

    RocketMQ 為了保證消息的可靠性,選擇 「至少傳輸成功一次」 的消息模型。

    在消息領域有一個對消息投遞的 QoS 定義,分為:

    • 最多一次(At most once)
    • 至少一次(At least once)
    • 僅一次( Exactly once)

    既然是至少一次,那避免不了消息重復,尤其是在分布式網絡環境下。比如:網絡原因閃斷,ACK 返回失敗等等故障,確認信息沒有傳送到消息隊列,導致消息隊列不知道該消息已經被消費了,再次將該消息分發給其他的消費者。

    那么如何解決這個問題?

    這個問題其實可以換一種說法,就是如何解決消費端冪等性問題(冪等性,就是一條命令,任意多次執行所產生的影響均與一次執行的影響相同),只要消費端具備了冪等性,那么重復消費消息的問題也就解決了。

    那如何保證消息隊列消費的冪等性? 我們還是得結合業務來思考,這里給幾個思路:

    • 利用數據庫的唯一約束實現:比如收到數據時要寫庫,通過創建唯一索引的方式保證冪等。
    • 為更新的數據設置前置條件:給數據變更設置一個前置條件,如果滿足條件就更新數據,否則拒絕更新數據。 比如可以通過判斷狀態是否允許操作,不滿足的拒絕更新。

    如果上面提到的兩種實現冪等方法都不能適用于你的場景,我們還有一種通用性最強,適用范圍最廣的實現冪等性方法。

    更通用的解決方案

    終極方法:「基于消息冪等表的非事務方案」,實現的思路特別簡單:在執行業務代碼之前,先檢查一下是否處理過這個條消息。

    具體的實現方法是:

    首先,在數據庫中建一張消息日志表,這個表有兩個字段:「消息 ID」和「消息執行狀態(消費中、已消費)」。

    然后給消息 ID 來創建一個唯一約束,這樣對于相同的消息 ID,表里至多只能存在一條記錄。

    在發送消息時,給每條消息指定一個全局唯一的 ID,消費時,先根據這個 ID 檢查這條消息是否有被消費過,如果沒有消費過,才執行業務代碼,然后將消費狀態置為已消費。

    544ed202305271707048457.png

    可以看到,此方案是無事務的,而是針對消息表本身做了狀態的區分:消費中、消費完成。只有消費完成的消息才會被冪等處理掉。

    而對于已有消費中的消息,后面重復的消息會觸發延遲消費,這樣主要是為了控制并發場景下,第二條消息在第一條消息沒完成的過程中,去控制消息不丟(如果直接冪等,那么會丟失消息(同一個消息id的話),因為上一條消息如果沒有消費完成的時候,第二條消息你已經告訴broker成功了,那么第一條消息這時候失敗broker也不會重新投遞了)。

    我們分析下這種方案是否解決了冪等問題:

    1. 消息已經消費成功了,第二條消息將被直接冪等處理掉(消費成功)
    2. 并發場景下的消息,依舊能滿足不會出現消息重復
    3. 支持上游業務生產者重發的業務重復的消息冪等問題

    第一個問題明顯解決了。

    第二個問題也已經解決,主要是依靠插入消息表的這個動作做控制的,因為「消息 ID」的惟一的,后面的消息插入會由于主鍵沖突而失敗,走向延遲消費的分支,然后后面延遲消費的時候就會變成上面第一個場景的問題。

    關于第三個問題,只要我們設計去重的消息鍵讓其支持業務的主鍵(例如訂單號、請求流水號等),而不僅僅是 messageId 即可。所以也不是問題。

    此方案是否有消息丟失的風險?

    細心的讀者可能會發現這里實際上是有邏輯漏洞的,問題出在上面聊到的三個問題中的第 2 個問題「并發場景」。

    在并發場景下我們依賴于消息狀態做并發控制,使得第二條重復的消息會不斷延遲消費(重試)。

    但如果這時候第一條消息也由于一些異常原因(例如機器重啟了、外部異常導致消費失?。]有成功消費成功呢?

    也就是說這時候延遲消費實際上每次下來看到的都是「消費中」的狀態,最后消費就會被視為消費失敗而被投遞到死信隊列中。

    對于此,我們解決的方法是,插入的消息表必須要帶一個最長消費過期時間,例如 10 分鐘,意思是如果一個消息處于消費中超過 10 分鐘,就需要從消息表中刪除(需要程序自行實現)。

    所以最后這個消息的流程會是這樣的:

    fa417202305271708553882.png

    消息積壓

    如果出現積壓,那一定是性能問題,想要解決消息從生產到消費上的性能問題,就首先要知道哪些環節可能出現消息積壓,然后在考慮如何解決。

    因為消息發送之后才會出現積壓的問題,所以和消息生產端沒有關系,又因為絕大部分的消息隊列單節點都能達到每秒鐘幾萬的處理能力,相對于業務邏輯來說,性能不會出現在中間件的消息存儲上面。

    毫無疑問,出問題的肯定是消息消費階段。

    如果是線上突發問題,要臨時擴容,增加消費端的數量,與此同時,降級一些非核心的業務。通過擴容和降級承擔流量。

    其次,才是排查解決異常問題,如通過監控,日志等手段分析是否消費端的業務邏輯代碼出現了問題,優化消費端的業務處理邏輯。

    最后,如果是消費端的處理能力不足,可以通過水平擴容來提供消費端的并發處理能力。

    在擴容消費者的是時候有一點需要注意,如果當前 Topic 的 Message Queue 的數量大于消費者數量,就可以對消費者進行擴容,增加消費者,來提高消費能力,盡快把積壓的消息消費完。如果消費者的數量大于等于 Message Queue 的數量,增加消費者是沒有用的。

    順序消費

    我們知道,RocketMQ 在主題上是無序的。但是在有些場景下,使用 MQ 需要保證消息的順序性,比如在電商系統中:下單、付款、發貨、買家確認收貨,消費端需要嚴格按照業務狀態機的順序處理,否則,就會出現業務問題。

    我們發現,消息帶上了狀態,不再是一個個獨立的個體,有了上下文依賴關系!

    那么 MQ 是如何來保證消息順序的?

    我們通常發送消息的時候,消息發送默認是會采用輪詢的方式發送到不同的 queue。

    而消費端消費的時候,是會分配到多個 queue 的,多個 queue 是同時拉取提交消費。

    但是同一條 queue 里面,RocketMQ 的確是能保證 FIFO 的。那么要做到順序消息,應該怎么實現呢——把消息確保投遞到同一條 queue。

    對于 RocketMQ 來說,主要是通過 Producer 和 Consumer 來保證消息順序的。

    生產端

    生產端提供了一個接口 MessageQueueSelector

    public interface MessageQueueSelector {
       MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
    }
    

    接口內定義一個 select 方法,具體參數含義:

    • mqs:該 Topic 下所有的隊列分片
    • msg:待發送的消息
    • arg:發送消息時傳遞的參數

    示例代碼

    模擬訂單消息的發送,共有 3 個訂單,每個訂單都包含下單、付款、發貨買家確認收貨四個流程,對應 4 條消息。同一個訂單的消息要求嚴格按照順序消費,不同訂單的消息可以并發執行。

    首先實現 MessageQueueSelector 接口,定制 MessageQueue 選擇策略:

    public class OrderMessageQueueSelector implements MessageQueueSelector {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            //選擇以參數arg為索引的MessageQueue
            Integer id = (Integer) arg;
            int index = id % mqs.size();
            return mqs.get(index);
        }
    }
    

    下面實現發送消息邏輯:

    @Slf4j
    @Service
    public class OrderMessageProducer {
        @Value("${spring.rocketmq.namesrvAddr}")
        private String namesrvAddr;
    
        private static final DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
    
        private static final String[] ORDER_MESSAGES = {"下單", "付款", "發貨", "買家確認收貨"};
    
        @PostConstruct
        public void sendMessage() {
            try {
                //設置namesrv
                producer.setNamesrvAddr(namesrvAddr);
    
                //啟動Producer
                producer.start();
    
                System.err.println("Order Message Producer Start...");
    
                //創建3組消息,每組消息發往同一個Queue,保證消息的局部有序性
                String tags = "Tags";
    
                OrderMessageQueueSelector orderMessageQueueSelector = new OrderMessageQueueSelector();
    
                //注:要實現順序消費,必須同步發送消息
                for (int i = 0; i < 3; i++) {
                    String orderId = "" + (i + 1);
                    for (int j = 0, size = ORDER_MESSAGES.length; j < size; j++) {
                        String message = "Order-" + orderId + "-" + ORDER_MESSAGES[j];
                        String keys = message;
                        byte[] messageBody = message.getBytes(RemotingHelper.DEFAULT_CHARSET);
                        Message mqMsg = new Message("TEST_TOPIC_NAME", tags, keys, messageBody);
                        producer.send(mqMsg, orderMessageQueueSelector, i);
                    }
                }
            } catch (Exception e) {
                log.error("Message Producer: Send Message Error ", e);
            }
        }
    }
    

    使用 DefaultMQProducersend() 方法,指定 MessageQueueSelector 和參數,Broker 將會將邏輯上需要保證順序性的消息發往同一隊列。

    注意:上面的代碼把 orderId 相同的消息都會發送到同一個 MessageQueue,這樣同一個 orderId 的消息是有序的,這也叫做局部有序。對應的另一種是全局有序,這需要把所有的消息都發到同一個 MessageQueue。

    注:想要實現順序消費,發送方式必須為同步發送,異步發送無法保證消息的發送順序!

    這樣同一批我們需要做到順序消費訂單肯定會投遞到同一個隊列,同一個隊列肯定會投遞到同一個消費實例,同一個消費實例肯定是順序拉取并順序提交線程池的,只要保證消費端順序消費,則大功告成!

    消費端

    消費端想要實現順序消費,只要設置監聽器實現 MessageListenerOrderly 接口即可。

    示例代碼

    首先自定義 MessageListenerOrderly 接口實現類,實現順序消費:

    public class OrderMessageListener implements MessageListenerOrderly {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            if (CollectionUtils.isEmpty(msgs)){
                return ConsumeOrderlyStatus.SUCCESS;
            }
            //設置自動提交
            context.setAutoCommit(true);
            msgs.stream()
                    .forEach(msg -> {
                        try {
                            String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                            System.err.println("Handle Order Message: messageId: " + msg.getMsgId() + ",topic: " + msg.getTopic() + ",tags: "
                                    + msg.getTags() + ",keys: " + msg.getKeys() + ",messageBody: " + messageBody);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    });
            return ConsumeOrderlyStatus.SUCCESS;
        }
    }
    

    下面就是消費邏輯:

    @Service
    public class OrderMessageConsumer {
        @Value("${spring.rocketmq.namesrvAddr}")
        private String namesrvAddr;
    
        private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
    
        @PostConstruct
        public void start() {
            try {
                //設置namesrv地址
                consumer.setNamesrvAddr(namesrvAddr);
    
                //從消息隊列頭部開始消費
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
                //集群消費模式
                consumer.setMessageModel(MessageModel.CLUSTERING);
    
                //訂閱主題
                consumer.subscribe("TEST_TOPIC_NAME", "*");
    
                //注冊消息監聽器,這里因為要實現順序消費,所以必須注冊MessageListenerOrderly
                consumer.registerMessageListener(new OrderMessageListener());
    
                //啟動消費端
                consumer.start();
    
                System.err.println("Order Message Consumer Start...");
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
    
        }
    }
    

    要保證消息的順序性,就需要保證同一個 MessageQueue 只能被同一個 Consumer 消費。簡單來說就是通過對 MessageQueueLock 進行加鎖,這樣就保證只有一個線程在處理當前 MessageQueue。感興趣的同學可以深入研究下。

    總結

    在項目中引入 MQ 解決了我們系統之間的耦合度過高的問題、提高系統的靈活性和峰值處理能力。但同時也帶來了一些問題:消息丟失、重復消息和消息積壓。

    消息丟失可分三個階段進行分析:

    • 生產階段:采用同步發送,通過正確處理返回值或者捕獲異常,保證消息可靠性;采用異步發送則需要在回調方法里進行檢查。
    • 存儲階段:存儲端的可靠性依靠持久化策略、備份(主從復制)保證。
    • 消費階段:消費失敗可以依靠重試策略保證可靠性。

    對于重復消息,我們最后也給出一個終極方案:「基于消息冪等表的非事務方案」。不依賴事務而實現消息的去重,那么方案就能推廣到更復雜的場景例如:RPC、跨庫等。

    而消息積壓,絕大部分問題出現在消費端,我們可以通過水平擴容增加 Consumer 的實例數量來解決,需要注意的是,增加并發需要同步擴容分區數量,否則是起不到效果的。

    最后介紹了順序消費,RocketMQ 采用了局部順序一致性的機制,實現了單個隊列中的消息嚴格有序。


    請登錄后查看

    CRMEB-慕白寒窗雪 最后編輯于2023-05-27 17:14:09

    快捷回復
    回復
    回復
    回復({{post_count}}) {{!is_user ? '我的回復' :'全部回復'}}
    排序 默認正序 回復倒序 點贊倒序

    {{item.user_info.nickname ? item.user_info.nickname : item.user_name}} LV.{{ item.user_info.bbs_level }}

    作者 管理員 企業

    {{item.floor}}# 同步到gitee 已同步到gitee {{item.is_suggest == 1? '取消推薦': '推薦'}}
    {{item.is_suggest == 1? '取消推薦': '推薦'}}
    沙發 板凳 地板 {{item.floor}}#
    {{item.user_info.title || '暫無簡介'}}
    附件

    {{itemf.name}}

    {{item.created_at}}  {{item.ip_address}}
    打賞
    已打賞¥{{item.reward_price}}
    {{item.like_count}}
    {{item.showReply ? '取消回復' : '回復'}}
    刪除
    回復
    回復

    {{itemc.user_info.nickname}}

    {{itemc.user_name}}

    回復 {{itemc.comment_user_info.nickname}}

    附件

    {{itemf.name}}

    {{itemc.created_at}}
    打賞
    已打賞¥{{itemc.reward_price}}
    {{itemc.like_count}}
    {{itemc.showReply ? '取消回復' : '回復'}}
    刪除
    回復
    回復
    查看更多
    打賞
    已打賞¥{{reward_price}}
    2895
    {{like_count}}
    {{collect_count}}
    添加回復 ({{post_count}})

    相關推薦

    快速安全登錄

    使用微信掃碼登錄
    {{item.label}} 加精
    {{item.label}} {{item.label}} 板塊推薦 常見問題 產品動態 精選推薦 首頁頭條 首頁動態 首頁推薦
    取 消 確 定
    回復
    回復
    問題:
    問題自動獲取的帖子內容,不準確時需要手動修改. [獲取答案]
    答案:
    提交
    bug 需求 取 消 確 定
    打賞金額
    當前余額:¥{{rewardUserInfo.reward_price}}
    {{item.price}}元
    請輸入 0.1-{{reward_max_price}} 范圍內的數值
    打賞成功
    ¥{{price}}
    完成 確認打賞

    微信登錄/注冊

    切換手機號登錄

    {{ bind_phone ? '綁定手機' : '手機登錄'}}

    {{codeText}}
    切換微信登錄/注冊
    暫不綁定
    亚洲欧美字幕
    CRMEB客服

    CRMEB咨詢熱線 咨詢熱線

    400-8888-794

    微信掃碼咨詢

    CRMEB開源商城下載 源碼下載 CRMEB幫助文檔 幫助文檔
    返回頂部 返回頂部
    CRMEB客服