訊息過濾
消費者訂閱主題後,Apache RocketMQ 會將主題中的所有訊息傳遞給消費者。但是,如果您希望消費者僅接收與您的業務相關的訊息,您可以在 Apache RocketMQ 代理程式上設定篩選器。本主題說明訊息過濾功能及其運作方式。本主題也說明訊息的分類方式,並提供如何使用不同過濾方法的範例。
情境
Apache RocketMQ 遵循發布訂閱模式。Apache RocketMQ 是一種面向訊息的中介軟體,廣泛用於促進分散式上游和下游應用程式之間的通訊。在真實世界情境中,應用程式可以使用不同的方法來消費訊息。這些應用程式都可以訂閱同一個 Apache RocketMQ 主題,而且可以設定篩選器,讓這些應用程式僅接收與它們相關的訊息。
透過使用 Apache RocketMQ 的訊息過濾功能,您可以有效管理傳送至不同消費者的訊息。這可以防止您的系統因大量非任務關鍵訊息而過載。
Apache RocketMQ 的訊息過濾功能在主題層級生效,讓您可以管理分散在多個服務中的單一業務訊息。如果您想要管理不同業務的訊息,您可以訂閱不同的主題。
功能概覽
定義
訊息過濾功能會根據消費者設定的條件過濾訊息,並將符合條件的訊息傳送給消費者。
首先,在 Apache RocketMQ 的生產者和消費者上定義訊息屬性和標籤。然後在消費者上設定過濾條件,Apache RocketMQ 代理程式會根據條件過濾訊息,並將過濾後的訊息傳送給消費者。
運作機制
訊息過濾包含下列步驟
生產者:生產者會在初始化訊息之前,將屬性和標籤附加到訊息。這些屬性和標籤用於比對消費者設定的過濾條件。
消費者:消費者會呼叫訂閱註冊操作,在訊息初始化和使用期間,將訂閱的主題和訊息,或過濾條件,告知代理程式。
代理程式:在收到消費者要求訊息時,Apache RocketMQ 代理程式會根據消費者提交的過濾條件表達式,動態過濾訊息,並將符合過濾條件的訊息傳送給消費者。
分類
Apache RocketMQ 支援基於標籤的過濾和基於屬性的 SQL 過濾。下表比較這兩種方法。
項目 | 基於標籤的過濾 | 基於屬性的 SQL 過濾 |
---|---|---|
過濾目標 | 訊息標籤。 | 訊息屬性,包括自訂屬性和系統屬性。訊息標籤是系統屬性 (TAGS)。 |
過濾能力 | 精確比對。 | 基於 SQL 語法的比對。 |
情境 | 基於標籤的簡單過濾。 | 涉及標籤和屬性之間關係的複雜過濾。 |
如需瞭解如何使用過濾方法的更多資訊,請參閱 基於標籤的過濾 和 基於屬性的 SQL 過濾。
訂閱一致性
過濾表達式是訂閱的一部分。根據 Apache RocketMQ 的發佈/訂閱模式,一個消費者群組內的消費者訂閱必須與另一個消費者一致,包括其過濾表達式,以避免無法使用某些訊息的情況。如需更多資訊,請參閱 訂閱。
基於標籤的過濾
基於標籤的過濾是 Apache RocketMQ 提供的基本訊息過濾功能。此功能會根據生產者上設定的標籤過濾訊息。消費者使用標籤指定要使用的訊息。
情境
下圖展示了一個電子商務交易場景中的範例。在從下訂單到收到產品的過程中,會產生一系列訊息,例如
訂單訊息
付款訊息
物流訊息
這些訊息會傳送到名為 Trade_Topic 的主題,其訂閱者有多個系統,包括
付款系統:僅訂閱付款訊息。
物流系統:僅訂閱物流訊息。
交易成功率分析系統:訂閱訂單和付款訊息。
即時運算系統:訂閱所有訊息。
標籤設定
生產者在傳送訊息之前,只會將一個標籤附加到每個訊息。
標籤是一個字串。建議的最大字串長度為 128 個字元。
篩選規則
基於標籤的篩選根據字串執行精確篩選。您可以設定下列篩選規則
單一標籤比對:您可以將篩選條件設定為單一標籤,以僅接收帶有該標籤的訊息。
多重標籤比對:您可以在篩選條件中設定多個標籤,以接收帶有其中任何一個標籤的訊息。請使用兩個直線 (||) 分隔標籤。例如,Tag1||Tag2||Tag3 表示附加了 Tag1、Tag2 或 Tag3 的訊息都會傳送到使用者。
全部比對:您可以使用星號 (*) 比對所有標籤,表示主題中的所有訊息都會傳送到使用者。
範例
設定標籤並傳送訊息
Message message = messageBuilder.setTopic("topic")
// Specify the message index key so that the system can use a keyword to accurately locate the message.
.setKeys("messageKey")
// Specify the message tag so that consumers can use the tag to filter the message.
// This example indicates that the tag of the message is set to "TagA".
.setTag("TagA")
// Message body.
.setBody("messageBody".getBytes())
.build();
指定標籤並訂閱訊息
String topic = "Your Topic";
// Subscribe to messages that carry tag "TagA".
FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG);
pushConsumer.subscribe(topic, filterExpression);
指定多個標籤並訂閱訊息
String topic = "Your Topic";
// Subscribe to messages that carry tag TagA, TagB, or TagC.
FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG);
pushConsumer.subscribe(topic, filterExpression);
訂閱主題中的所有訊息
String topic = "Your Topic";
// Subscribe to all messages.
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
pushConsumer.subscribe(topic, filterExpression);
基於屬性的 SQL 篩選
基於屬性的 SQL 篩選是 Apache RocketMQ 提供的一種進階訊息篩選方法。它會根據生產者為訊息設定的屬性和屬性值(也稱為金鑰和值)來篩選訊息。生產者可以為訊息設定多個屬性。然後,使用者可以在 SQL 條件中指定屬性以接收特定訊息。
由於標籤是系統屬性,所以基於標籤的過濾是一種基於屬性的 SQL 過濾。在 SQL 語法中,標籤屬性由 TAGS 表示。
情境
下圖顯示電子商務交易場景中的範例。在這個過程中,從下訂單到收到產品,會產生一系列訊息。這些訊息會分類為訂單訊息和物流訊息。物流訊息會設定區域屬性,而區域屬性的值為杭州和上海。
訂單訊息
物流訊息
區域屬性值為杭州的物流訊息
區域屬性值為上海的物流訊息
這些訊息會傳送至名為 Trade_Topic 的主題,其訂閱者為以下系統
物流系統 1:僅訂閱區域屬性值為杭州的物流訊息。
物流系統 2:訂閱所有物流訊息。
訂單追蹤系統:僅訂閱訂單訊息。
即時運算系統:訂閱所有訊息。
訊息屬性設定
在傳送訊息之前,生產者可以為訊息設定自訂屬性。每個屬性都是自訂的鍵值對。
可以為訊息設定多個屬性。
篩選規則
撰寫過濾條件式時,必須遵循 SQL92 語法。特別是
語法 | 說明 | 範例 |
---|---|---|
IS NULL | 指定屬性不存在。 | a IS NULL :屬性 a 不存在。 |
IS NOT NULL | 指定屬性存在。 | a IS NOT NULL :屬性 a 存在。 |
> >= < <= | 比較數字值。此語法無法用於比較字串。如果用於比較字串,則在啟動使用者端時會回報錯誤。注意可以轉換為數字值的字串也會被視為數字值。 | a IS NOT NULL AND a > 100 :屬性 a 存在,且屬性 a 的值大於 100。 a IS NOT NULL AND a > 'abc' :錯誤範例。abc 是字串。因此,無法將 a 與 abc 比較。 |
BETWEEN xxx AND xxx | 比較數字值。此語法無法用於比較字串。如果用於比較字串,則在啟動使用者端時會回報錯誤。此語法等於 >= xxx AND \<= xxx。這表示屬性的值介於兩個數字值之間,或等於其中一個數字值。 | a IS NOT NULL AND (a BETWEEN 10 AND 100) :屬性 a 存在,且屬性 a 的值大於或等於 10,且小於或等於 100。 |
NOT BETWEEN xxx AND xxx | 比較數值。語法無法用於比較字串。如果用於比較字串,則在啟動使用者時會回報錯誤。語法等同於 < xxx OR > xxx。表示屬性的值小於左側數值或大於右側數值。 | a IS NOT NULL AND (a NOT BETWEEN 10 AND 100) :屬性 a 存在且屬性 a 的值小於 10 或大於 100。 |
IN (xxx, xxx) | 表示屬性的值包含在集合中。集合中的元素只能是字串。 | a IS NOT NULL AND (a IN ('abc', 'def')) :屬性 a 存在且屬性 a 的值為 abc 或 def。 |
= <> | 等於運算子與不等於運算子。可用於比較數值與字串。 | a IS NOT NULL AND (a = 'abc' OR a<>'def') :屬性 a 存在且屬性 a 的值為 abc 或屬性 a 的值不為 def。 |
AND OR | 邏輯 AND 運算子與邏輯 OR 運算子。可用於組合簡單的邏輯函數,且每個邏輯函數都必須放在括號中。 | a IS NOT NULL AND (a > 100) OR (b IS NULL) :屬性 a 存在且屬性 a 的值大於 100 或屬性 b 不存在。 |
SQL 基於屬性的過濾是透過設定自訂訊息屬性並定義 SQL 過濾器表達式來實作的。過濾器表達式可能不會產生有效的結果。Apache RocketMQ 仲介根據下列邏輯處理訊息
例外處理:如果在評估過濾器表達式時回報例外,則仲介預設會過濾掉接收到的訊息,且不會將訊息傳遞給使用者。例如,在比較數值與非數值時會發生例外。
空值處理:如果過濾器表達式的計算結果為 NULL 或值不是布林值,則仲介預設會過濾掉接收到的訊息,且不會將訊息傳遞給使用者。布林值表示一個真值,可能是真或假。假設您沒有為生產者傳送的訊息設定自訂屬性,但此自訂屬性用於 SQL 表達式中的過濾條件。在此情況下,過濾器表達式的評估結果為 NULL。
不一致數值處理:如果自訂訊息屬性的值是浮點數,但過濾器表達式中使用的屬性值是整數,則代理程式會預設過濾掉接收到的訊息,不會將訊息傳遞給消費者。
範例
為訊息設定標籤和屬性,並傳送訊息
Message message = messageBuilder.setTopic("topic")
// Specify the message index key so that the system can use a keyword to accurately locate the message.
.setKeys("messageKey")
// Specify the message tag so that consumers can use the tag to filter the message.
// This example indicates that the message tag is set to "messageTag".
.setTag("messageTag")
// You can also set custom attributes for the messages, such as environment, region, and logical branch.
// In this example, the custom attribute is region and the attribute value is Hangzhou.
.addProperty("Region", "Hangzhou")
// Message body.
.setBody("messageBody".getBytes())
.build();
根據自訂屬性訂閱和過濾訊息
String topic = "topic";
// Subscribe only to messages whose value of the region attribute is Hangzhou.
FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND Region='Hangzhou'", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);
根據多個自訂屬性訂閱和過濾訊息
String topic = "topic";
// Subscribe to messages whose value of the region attribute is Hangzhou and value of the price attribute is greater than 30.
FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND price IS NOT NULL AND Region = 'Hangzhou' AND price > 30", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);
訂閱主題中的所有訊息
String topic = "topic";
// Subscribe to all the messages.
FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);
使用說明
正確設定訊息的主題和標籤。
可以使用主題、標籤和屬性來區分訊息。在區分訊息時,請注意下列事項
訊息類型:不同類型的訊息,例如順序訊息和一般訊息,必須使用不同的主題來區分。請勿使用標籤來區分訊息類型。
業務領域:不同的業務領域和部門必須使用不同的主題。例如,物流訊息和付款訊息的主題必須不同。物流訊息可以使用標籤進一步區分為一般訊息和緊急訊息。
訊息數量和重要性:數量或連結重要性不同的訊息必須區分到不同的主題中。