一致訂閱關係
簡介
訂閱關係是 RocketMQ 網域模型中非常重要的部分,用於表達消費者使用訊息的控制元資料。如需完整概念,請參閱 訂閱關係模型。
當同一個消費者群組中的所有消費者實例對主題和標籤的訂閱完全相同時,訂閱關係就是一致的。如果訂閱關係(消費者群組名稱-主題-標籤)不一致,可能會導致使用訊息混淆,甚至遺失訊息。
1 正確訂閱關係範例
1.1 訂閱的主題相同,且過濾器表達式一致
如下圖所示,同一個 ConsumerGroup 中的 3 個 Consumer 實例 C1、C2、C3 都訂閱了 TopicA,且對 TopicA 的 Tag 訂閱都為 Tag1,符合訂閱關係一致性的原則。
正確範例程式碼 1
C1、C2、C3 的訂閱關係一致,表示 C1、C2、C3 訂閱訊息的程式碼必須完全相同,範例程式碼如下
PushConsumer consumer1 = provider.newPushConsumerBuilder().setConsumerGroup("GroupA").build();
consumer1.subscribe("TopicA", new FilterExpression("TagA", FilterExpressionType.TAG));
PushConsumer consumer2 = provider.newPushConsumerBuilder().setConsumerGroup("GroupA").build();
consumer2.subscribe("TopicA", new FilterExpression("TagA", FilterExpressionType.TAG));
PushConsumer consumer3 = provider.newPushConsumerBuilder().setConsumerGroup("GroupA").build();
consumer3.subscribe("TopicA", new FilterExpression("TagA", FilterExpressionType.TAG));
RocketMQ 強調訂閱關係的一致性,表示同一個 ConsumerGroup 內的每個 Consumer 都應保持一致,因為從伺服器的角度來看,同一個 Group 內的 Consumer 都應為同一個邏輯副本。
強調訂閱關係的一致性,並不代表一個 Consumer 不能訂閱多個 Topic,每個 Consumer 仍可按需求訂閱多個 Topic,但前提是同一個消費群組內的 Consumer 必須保持一致。
2 訂閱關係不一致的排查
問題描述
使用 RocketMQ 版本的消息佇列時,可能會發生訂閱關係不一致的問題,具體問題如下
- RocketMQ 版本的消息佇列控制台中的訂閱關係一致性顯示為否。
- Consumer 實例收不到訂閱訊息。
請參考以下步驟進行檢查
您可以在 Apache RocketMQ 控制台或 CLi 工具中檢查指定 Group 的訂閱關係是否一致,若查詢結果為不一致,請參考本文常見的訂閱關係不一致問題,對 Consumer 實例的消費程式碼進行排查。
- 檢查 Consumer 實例中與訂閱相關的設定程式碼,確保同一個 ConsumerGroup 中的 Consumer 實例都訂閱相同的 Topic 與 Tag。
- 使用控制台或 Cli 指令 ConsumerConnection 檢查實際的訂閱關係是否一致。
- 測試並確認預期的 Consumer 實例可以消費訊息。
3 訂閱關係不一致的常見問題
3.1 同一個 ConsumerGroup 中,Consumer 實例訂閱的 Topic 不同(適用於 3.x、4.x SDK)
在 SDK 的早期 3.x/4.x 版本中,如以下圖所示,同一個 ConsumerGroup 中的 Consumer 實例 C1、C2、C3 分別訂閱了 TopicA、TopicB、TopicC,它們訂閱的主題不一致,不符合訂閱關係一致性的原則。
SDK 的 5.x 版本現已支援同一個 ConsumerGroup 中的 Consumer 實例訂閱不同的主題。
3.2 同一個 ConsumerGroup 中的 Consumer 實例訂閱同一個主題,但訂閱標籤不同。
如以下圖所示,同一個 ConsumerGroup 中的 Consumer 實例 C1、C2、C3 都訂閱了 TopicA,但 C1 訂閱了 TopicA 的 Tag1,而 C2 和 C3 訂閱了 TopicA 的 Tag2,同一個主題的訂閱標籤不一致,不符合訂閱關係一致性的原則。
錯誤範例程式碼 2
Consumer 範例 2-1:
PushConsumer consumer1 = provider.newPushConsumerBuilder().setConsumerGroup("GroupA").build();
consumer1.subscribe("TopicA", new FilterExpression("Tag1", FilterExpressionType.TAG));
Consumer 範例 2-2:
PushConsumer consumer2 = provider.newPushConsumerBuilder().setConsumerGroup("GroupA").build();
consumer2.subscribe("TopicA", new FilterExpression("Tag2", FilterExpressionType.TAG));Consumer 範例 2-3:
PushConsumer consumer3 = provider.newPushConsumerBuilder().setConsumerGroup("GroupA").build();
consumer3.subscribe("TopicA", new FilterExpression("Tag2", FilterExpressionType.TAG));