順序訊息
順序訊息是 Apache RocketMQ 中具有進階功能的一種訊息類型。本主題說明順序訊息的場景、工作機制、限制、使用範例和使用注意事項。
場景
異質系統使用狀態同步來在順序事件處理、交易撮合和即時增量資料同步等場景中維持強一致性。當事件變更發生時,前述場景需要從上游應用程式順序傳遞訊息到下游應用程式。Apache RocketMQ 提供順序訊息來協助您實作順序訊息傳輸。
場景 1:交易撮合
例如,在證券和股票交易場景中,如果多個競標者為競標訂單提供相同的競標價格,則最早提供競標價格的競標者得標。因此,下游訂單處理系統必須設計為按照提供價格的順序來處理訂單。
場景 2:即時增量資料同步
一般訊息 先進先出訊息
例如,您希望執行與資料庫修改相關資料的增量同步。您可以使用 Apache RocketMQ 中提供的已排序訊息,將訊息從上游來源資料庫傳輸至下游查詢系統。訊息可以是新增、刪除和修改作業的二進位記錄檔。下游系統會按訊息傳送順序擷取訊息,以相同的順序更新資料庫狀態。已排序訊息可協助您確保上游系統中的作業與下游系統中的狀態資料之間的一致性。如果您在此情況下使用一般訊息,可能會發生狀態不一致的情況。
運作機制
已排序訊息的定義
已排序訊息是 Apache RocketMQ 中一種進階的訊息類型。已排序訊息會按訊息傳送順序傳遞給使用者。此訊息類型讓您可以在商業情境中實作已排序處理。
已排序訊息的定義特徵是訊息傳送、儲存和傳遞的順序。
Apache RocketMQ 使用訊息群組來決定已排序訊息的順序。您必須為已排序訊息設定訊息群組。訊息群組中的訊息會按先進先出 (FIFO) 順序處理。訊息排序不適用於不同的訊息群組或不在訊息群組中的訊息。
基於訊息群組的訊息排序讓您可以根據您的商業邏輯指定細緻的訊息排序。這可協助您在您的商業系統中實作部分訊息排序,並提升商業系統的並行度和處理量。
訊息排序
Apache RocketMQ 中適用兩種訊息順序:生產順序和使用順序。
生產順序:Apache RocketMQ 使用在生產者和伺服器之間建立的通訊協定,以確保訊息會按序從生產者傳送至伺服器,並按訊息傳送順序儲存和保留訊息。
為確保訊息的生產順序,請確認符合下列條件
單一生產者:訊息的生產順序適用於個別生產者。Apache RocketMQ 無法判斷不同系統中不同生產者的訊息順序,即使您為訊息設定相同的訊息群組。
序列傳輸:Apache RocketMQ 中的生產者支援使用多個執行緒進行安全存取。如果生產者使用多個執行緒同時傳送訊息,Apache RocketMQ 無法確定來自不同執行緒的訊息順序。
如果符合前述條件的生產者將訊息傳送至 Apache RocketMQ,則屬於同一個訊息群組的訊息會依據訊息傳送順序儲存在同一個佇列中。下圖說明 Apache RocketMQ 的順序儲存邏輯。
在前述圖形中,來自 MessageGroup 1 和 MessageGroup 4 的訊息儲存在同一個佇列(MessageQueue 1)中。Apache RocketMQ 可確保來自 MessageGroup 1 的訊息 G1-M1、G1-M2 和 G1-M3 依據訊息傳送順序儲存在佇列中。來自 MessageGroup 4 的訊息 G4-M1 和 G4-M2 也會依據訊息傳送順序儲存。然而,來自 MessageGroup 1 和 MessageGroup 4 的訊息並未依據特定順序儲存。
消費順序 :
Apache RocketMQ 使用消費者與伺服器之間建立的通訊協定,以確保訊息依據儲存順序進行消費。
若要確保訊息的消費順序,請確認符合下列條件
傳送順序:Apache RocketMQ 使用客戶端 SDK 和伺服器端的通訊協定,確保訊息依據伺服器上的訊息儲存順序傳送。當消費者應用程式消費訊息時,應用程式必須遵循接收處理回覆路徑,以防止非同步處理所造成的訊息順序錯誤。
註解- 當 PushConsumer 消費者消費訊息時,Apache RocketMQ 可確保訊息依據儲存順序逐一傳送至消費者。
- 當 SimpleConsumer 消費者消費訊息時,消費者可能會一次提取多則訊息,而業務應用程式必須有解決方案來實作訊息消費順序。如需瞭解消費者類型的詳細資訊,請參閱 消費者類型。
限制重試次數:Apache RocketMQ 會限制順序訊息的傳送重試次數。如果訊息達到最大傳送重試次數,Apache RocketMQ 會停止重試傳送訊息以供消費。這可防止佇列中的其他訊息持續等待傳送。
在消費順序至關重要的場景中,建議您指定適當的重試次數,以防止訊息處理順序錯誤。
生產順序和消費順序的組合
如果您希望訊息根據 FIFO 原則進行處理,則需要生產順序和消費順序。在大多數業務場景中,一個生產者可能對應多個消費者,並且並非所有消費者都需要按順序消費訊息。您可以組合生產順序和消費順序的設定以滿足您在不同業務場景中的需求。例如,您可以發送順序訊息並使用非順序並發消費來提高吞吐量。下表描述了生產順序和消費順序設定的不同組合。
生產順序 | 消費順序 | 效果 |
---|---|---|
配置訊息群組以實作訊息的順序傳遞。 | 訊息的順序消費 | 在訊息群組層級確保訊息的順序。同一訊息群組中的訊息會以相同的順序發送和消費。 |
配置訊息群組以實作訊息的順序傳遞。 | 並發消費 | 訊息會並發且按時間順序消費。 |
不配置訊息群組以實作訊息的非順序傳遞。 | 訊息的順序消費 | 在佇列層級確保訊息的順序。訊息消費基於佇列的屬性。Apache RocketMQ 確保消費順序與佇列中的儲存順序相同,但未必與訊息傳送順序相同。 |
不配置訊息群組以實作訊息的非順序傳遞。 | 並發消費 | 訊息會並發且按時間順序消費。 |
順序訊息的生命週期
已初始化:訊息由生產者建立和初始化,並準備傳送至代理程式。
已準備好:訊息已傳送至代理程式,消費者可見並可供消費。
傳輸中:消費者已取得訊息並根據消費者的當地業務邏輯進行處理。
在此過程中,代理程式會等待消費者完成消費並提交消費結果。如果在特定時間內未收到消費者的回應,Apache RocketMQ 會重新嘗試傳送訊息。如需更多資訊,請參閱 消費重試。
已確認:消費者完成消費並將消費結果提交至代理程式。代理程式會標記目前訊息是否已成功消費。
預設情況下,Apache RocketMQ 會保留所有訊息。當提交消費結果時,訊息資料會被邏輯標記為已消費,而不是立即刪除。因此,消費者可以在訊息因保留期間到期或儲存空間不足而被刪除之前回溯訊息以重新消費。
已刪除:當訊息的保留期間到期或儲存空間不足時,Apache RocketMQ 會以滾動方式從實體檔案中刪除最早儲存的訊息。如需更多資訊,請參閱 訊息儲存和清理。
訊息消費失敗或逾時會觸發伺服器的重試邏輯。如果觸發訊息的消費重試,訊息便會達到其生命週期的結束。原始訊息會被視為具有新訊息 ID 的新訊息。
如果觸發已排序訊息的消費重試,只有在已排序訊息被消費後,緊接在已排序訊息之後的訊息才能被處理。
使用限制
已排序訊息僅支援 MessageType 為 FIFO 的主題。
範例
建立主題
建議使用 mqadmin 工具在 Apache RocketMQ 5.0 中建立主題。但值得注意的是,訊息類型需要新增為屬性參數。以下是範例
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=FIFO
傳送訊息
與一般訊息相比,已排序訊息必須為其設定訊息群組。建議您根據您的業務需求,以細緻的層級設定訊息群組,以允許工作負載解耦和並行擴充。
建立 FIFO 主題
./bin/mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 127.0.0.1:9876 -a +message.type=FIFO
- -c 群集名稱
- -t 主題名稱
- -n 名稱伺服器的位址
- -o 建立已排序主題的旗標
下列範例程式碼提供如何使用 Java 傳送和接收已排序訊息的範例
// Send ordered messages.
MessageBuilder messageBuilder = null;
Message message = messageBuilder.setTopic("topic")
// Specify the message index key. The system uses the key to locate the message.
.setKeys("messageKey")
// Specify the message tag. The consumer can use the tag to filter the message.
.setTag("messageTag")
// Configure a message group for the ordered messages. We recommend that you do not include a large number of messages in the group.
.setMessageGroup("fifoGroup001")
// Configure the message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the messages. Focus on the result of message sending and exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
// Make sure that ordered delivery is applied to the consumer group. Otherwise, the messages are delivered concurrently and in no particular order.
// Consumption example 1: If the consumer type is PushConsumer, the consumer needs to only process the message in the message listener.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: If the consumer type is SimpleConsumer, the consumer must actively obtain the message for consumption and submit the consumption result.
// If the consumption of a message in the message group has not finished, the next message in the message group cannot be retrieved if you call the Receive function.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, the consumer must re-initiate the request to obtain the message.
e.printStackTrace();
}
使用注意事項
使用序列消費以防止訊息處理順序錯誤。
建議您使用序列訊息消費,而非批次消費。同時消費多則訊息可能會導致訊息處理順序錯誤。
例如,訊息 1、2、3 和 4 以 1-2-3-4 的順序傳送,而批次消費的順序為 1-[2、3](以批次處理,但失敗)-[2、3](重試)-4。如果訊息 3 處理失敗,系統可能會重複處理訊息 2。因此,會發生訊息消費順序錯誤。
避免在訊息群組中包含大量訊息。
Apache RocketMQ 確保同一訊息群組中的訊息儲存在同一個佇列中。包含大量訊息的訊息群組會導致對應的佇列過載。這會影響訊息傳遞效能,並阻礙擴充性。設定訊息群組時,您可以使用訂單 ID 和使用者 ID 作為訊息排序條件。這可確保同一使用者的訊息順序。
建議您在您的業務應用程式中,依訊息群組拆分訊息。例如,您可以使用訂單 ID 和使用者 ID 作為訊息群組關鍵字,以實作同一使用者的訊息的已排序處理。您不需要確保不同使用者的訊息順序。