跳至主要內容
版本:5.0

訊息過濾

消費者訂閱主題後,Apache RocketMQ 會將主題中的所有訊息傳遞給消費者。但是,如果您希望消費者僅接收與您的業務相關的訊息,您可以在 Apache RocketMQ 代理程式上設定篩選器。本主題說明訊息過濾功能及其運作方式。本主題也說明訊息的分類方式,並提供如何使用不同過濾方法的範例。

情境

Apache RocketMQ 遵循發布訂閱模式。Apache RocketMQ 是一種面向訊息的中介軟體,廣泛用於促進分散式上游和下游應用程式之間的通訊。在真實世界情境中,應用程式可以使用不同的方法來消費訊息。這些應用程式都可以訂閱同一個 Apache RocketMQ 主題,而且可以設定篩選器,讓這些應用程式僅接收與它們相關的訊息。

透過使用 Apache RocketMQ 的訊息過濾功能,您可以有效管理傳送至不同消費者的訊息。這可以防止您的系統因大量非任務關鍵訊息而過載。

Apache RocketMQ 的訊息過濾功能在主題層級生效,讓您可以管理分散在多個服務中的單一業務訊息。如果您想要管理不同業務的訊息,您可以訂閱不同的主題。

功能概覽

定義

訊息過濾功能會根據消費者設定的條件過濾訊息,並將符合條件的訊息傳送給消費者。

首先,在 Apache RocketMQ 的生產者和消費者上定義訊息屬性和標籤。然後在消費者上設定過濾條件,Apache RocketMQ 代理程式會根據條件過濾訊息,並將過濾後的訊息傳送給消費者。

運作機制 消息过滤

訊息過濾包含下列步驟

  • 生產者:生產者會在初始化訊息之前,將屬性和標籤附加到訊息。這些屬性和標籤用於比對消費者設定的過濾條件。

  • 消費者:消費者會呼叫訂閱註冊操作,在訊息初始化和使用期間,將訂閱的主題和訊息,或過濾條件,告知代理程式。

  • 代理程式:在收到消費者要求訊息時,Apache RocketMQ 代理程式會根據消費者提交的過濾條件表達式,動態過濾訊息,並將符合過濾條件的訊息傳送給消費者。

分類

Apache RocketMQ 支援基於標籤的過濾和基於屬性的 SQL 過濾。下表比較這兩種方法。

項目基於標籤的過濾基於屬性的 SQL 過濾
過濾目標訊息標籤。訊息屬性,包括自訂屬性和系統屬性。訊息標籤是系統屬性 (TAGS)。
過濾能力精確比對。基於 SQL 語法的比對。
情境基於標籤的簡單過濾。涉及標籤和屬性之間關係的複雜過濾。

如需瞭解如何使用過濾方法的更多資訊,請參閱 基於標籤的過濾基於屬性的 SQL 過濾

訂閱一致性

過濾表達式是訂閱的一部分。根據 Apache RocketMQ 的發佈/訂閱模式,一個消費者群組內的消費者訂閱必須與另一個消費者一致,包括其過濾表達式,以避免無法使用某些訊息的情況。如需更多資訊,請參閱 訂閱

基於標籤的過濾

基於標籤的過濾是 Apache RocketMQ 提供的基本訊息過濾功能。此功能會根據生產者上設定的標籤過濾訊息。消費者使用標籤指定要使用的訊息。

情境

下圖展示了一個電子商務交易場景中的範例。在從下訂單到收到產品的過程中,會產生一系列訊息,例如

  • 訂單訊息

  • 付款訊息

  • 物流訊息

這些訊息會傳送到名為 Trade_Topic 的主題,其訂閱者有多個系統,包括

  • 付款系統:僅訂閱付款訊息。

  • 物流系統:僅訂閱物流訊息。

  • 交易成功率分析系統:訂閱訂單和付款訊息。

  • 即時運算系統:訂閱所有訊息。

Tag过滤

標籤設定

  • 生產者在傳送訊息之前,只會將一個標籤附加到每個訊息。

  • 標籤是一個字串。建議的最大字串長度為 128 個字元。

篩選規則

基於標籤的篩選根據字串執行精確篩選。您可以設定下列篩選規則

  • 單一標籤比對:您可以將篩選條件設定為單一標籤,以僅接收帶有該標籤的訊息。

  • 多重標籤比對:您可以在篩選條件中設定多個標籤,以接收帶有其中任何一個標籤的訊息。請使用兩個直線 (||) 分隔標籤。例如,Tag1||Tag2||Tag3 表示附加了 Tag1、Tag2 或 Tag3 的訊息都會傳送到使用者。

  • 全部比對:您可以使用星號 (*) 比對所有標籤,表示主題中的所有訊息都會傳送到使用者。

範例

  • 設定標籤並傳送訊息

    Message message = messageBuilder.setTopic("topic")
    // Specify the message index key so that the system can use a keyword to accurately locate the message.
    .setKeys("messageKey")
    // Specify the message tag so that consumers can use the tag to filter the message.
    // This example indicates that the tag of the message is set to "TagA".
    .setTag("TagA")
    // Message body.
    .setBody("messageBody".getBytes())
    .build();
  • 指定標籤並訂閱訊息

    String topic = "Your Topic";
    // Subscribe to messages that carry tag "TagA".
    FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG);
    pushConsumer.subscribe(topic, filterExpression);
  • 指定多個標籤並訂閱訊息

    String topic = "Your Topic";
    // Subscribe to messages that carry tag TagA, TagB, or TagC.
    FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG);
    pushConsumer.subscribe(topic, filterExpression);
  • 訂閱主題中的所有訊息

    String topic = "Your Topic";
    // Subscribe to all messages.
    FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
    pushConsumer.subscribe(topic, filterExpression);

基於屬性的 SQL 篩選

基於屬性的 SQL 篩選是 Apache RocketMQ 提供的一種進階訊息篩選方法。它會根據生產者為訊息設定的屬性和屬性值(也稱為金鑰和值)來篩選訊息。生產者可以為訊息設定多個屬性。然後,使用者可以在 SQL 條件中指定屬性以接收特定訊息。

資訊

由於標籤是系統屬性,所以基於標籤的過濾是一種基於屬性的 SQL 過濾。在 SQL 語法中,標籤屬性由 TAGS 表示。

情境

下圖顯示電子商務交易場景中的範例。在這個過程中,從下訂單到收到產品,會產生一系列訊息。這些訊息會分類為訂單訊息和物流訊息。物流訊息會設定區域屬性,而區域屬性的值為杭州和上海。

  • 訂單訊息

  • 物流訊息

    • 區域屬性值為杭州的物流訊息

    • 區域屬性值為上海的物流訊息

這些訊息會傳送至名為 Trade_Topic 的主題,其訂閱者為以下系統

  • 物流系統 1:僅訂閱區域屬性值為杭州的物流訊息。

  • 物流系統 2:訂閱所有物流訊息。

  • 訂單追蹤系統:僅訂閱訂單訊息。

  • 即時運算系統:訂閱所有訊息。

sql过滤

訊息屬性設定

在傳送訊息之前,生產者可以為訊息設定自訂屬性。每個屬性都是自訂的鍵值對。

可以為訊息設定多個屬性。

篩選規則

撰寫過濾條件式時,必須遵循 SQL92 語法。特別是

語法說明範例
IS NULL指定屬性不存在。a IS NULL:屬性 a 不存在。
IS NOT NULL指定屬性存在。a IS NOT NULL:屬性 a 存在。
> >= < <=比較數字值。此語法無法用於比較字串。如果用於比較字串,則在啟動使用者端時會回報錯誤。注意可以轉換為數字值的字串也會被視為數字值。 a IS NOT NULL AND a > 100:屬性 a 存在,且屬性 a 的值大於 100。 a IS NOT NULL AND a > 'abc':錯誤範例。abc 是字串。因此,無法將 a 與 abc 比較。
BETWEEN xxx AND xxx比較數字值。此語法無法用於比較字串。如果用於比較字串,則在啟動使用者端時會回報錯誤。此語法等於 >= xxx AND \<= xxx。這表示屬性的值介於兩個數字值之間,或等於其中一個數字值。a IS NOT NULL AND (a BETWEEN 10 AND 100):屬性 a 存在,且屬性 a 的值大於或等於 10,且小於或等於 100。
NOT BETWEEN xxx AND xxx比較數值。語法無法用於比較字串。如果用於比較字串,則在啟動使用者時會回報錯誤。語法等同於 < xxx OR > xxx。表示屬性的值小於左側數值或大於右側數值。a IS NOT NULL AND (a NOT BETWEEN 10 AND 100):屬性 a 存在且屬性 a 的值小於 10 或大於 100。
IN (xxx, xxx)表示屬性的值包含在集合中。集合中的元素只能是字串。a IS NOT NULL AND (a IN ('abc', 'def')):屬性 a 存在且屬性 a 的值為 abc 或 def。
= <>等於運算子與不等於運算子。可用於比較數值與字串。a IS NOT NULL AND (a = 'abc' OR a<>'def'):屬性 a 存在且屬性 a 的值為 abc 或屬性 a 的值不為 def。
AND OR邏輯 AND 運算子與邏輯 OR 運算子。可用於組合簡單的邏輯函數,且每個邏輯函數都必須放在括號中。a IS NOT NULL AND (a > 100) OR (b IS NULL):屬性 a 存在且屬性 a 的值大於 100 或屬性 b 不存在。

SQL 基於屬性的過濾是透過設定自訂訊息屬性並定義 SQL 過濾器表達式來實作的。過濾器表達式可能不會產生有效的結果。Apache RocketMQ 仲介根據下列邏輯處理訊息

  • 例外處理:如果在評估過濾器表達式時回報例外,則仲介預設會過濾掉接收到的訊息,且不會將訊息傳遞給使用者。例如,在比較數值與非數值時會發生例外。

  • 空值處理:如果過濾器表達式的計算結果為 NULL 或值不是布林值,則仲介預設會過濾掉接收到的訊息,且不會將訊息傳遞給使用者。布林值表示一個真值,可能是真或假。假設您沒有為生產者傳送的訊息設定自訂屬性,但此自訂屬性用於 SQL 表達式中的過濾條件。在此情況下,過濾器表達式的評估結果為 NULL。

  • 不一致數值處理:如果自訂訊息屬性的值是浮點數,但過濾器表達式中使用的屬性值是整數,則代理程式會預設過濾掉接收到的訊息,不會將訊息傳遞給消費者。

範例

  • 為訊息設定標籤和屬性,並傳送訊息

    Message message = messageBuilder.setTopic("topic")
    // Specify the message index key so that the system can use a keyword to accurately locate the message.
    .setKeys("messageKey")
    // Specify the message tag so that consumers can use the tag to filter the message.
    // This example indicates that the message tag is set to "messageTag".
    .setTag("messageTag")
    // You can also set custom attributes for the messages, such as environment, region, and logical branch.
    // In this example, the custom attribute is region and the attribute value is Hangzhou.
    .addProperty("Region", "Hangzhou")
    // Message body.
    .setBody("messageBody".getBytes())
    .build();
  • 根據自訂屬性訂閱和過濾訊息

    String topic = "topic";
    // Subscribe only to messages whose value of the region attribute is Hangzhou.
    FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND Region='Hangzhou'", FilterExpressionType.SQL92);
    simpleConsumer.subscribe(topic, filterExpression);
  • 根據多個自訂屬性訂閱和過濾訊息

    String topic = "topic";
    // Subscribe to messages whose value of the region attribute is Hangzhou and value of the price attribute is greater than 30.
    FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND price IS NOT NULL AND Region = 'Hangzhou' AND price > 30", FilterExpressionType.SQL92);
    simpleConsumer.subscribe(topic, filterExpression);
  • 訂閱主題中的所有訊息

    String topic = "topic";
    // Subscribe to all the messages.
    FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
    simpleConsumer.subscribe(topic, filterExpression);

使用說明

正確設定訊息的主題和標籤。

可以使用主題、標籤和屬性來區分訊息。在區分訊息時,請注意下列事項

  • 訊息類型:不同類型的訊息,例如順序訊息和一般訊息,必須使用不同的主題來區分。請勿使用標籤來區分訊息類型。

  • 業務領域:不同的業務領域和部門必須使用不同的主題。例如,物流訊息和付款訊息的主題必須不同。物流訊息可以使用標籤進一步區分為一般訊息和緊急訊息。

  • 訊息數量和重要性:數量或連結重要性不同的訊息必須區分到不同的主題中。