跳至主要內容
版本:5.0

交易訊息

交易訊息是 Apache RocketMQ 中的高階訊息類型。此主題說明交易訊息的應用場景、工作機制、限制、用法和使用注意事項。

場景

分散式交易

當核心業務邏輯在分散式系統中執行時,會同時呼叫多個下游業務來處理邏輯。因此,確保核心業務與下游業務之間執行結果的一致性,是分散式交易需要解決的最大挑戰。事務消息訴求

在電子商務場景中,當使用者下訂單時,會觸發下游系統進行相應的變更。例如,物流系統必須啟動出貨,信用系統必須更新使用者的信用點數,而購物車系統必須清除使用者的購物車。處理分支包括

  • 訂單系統將訂單狀態從未付款改為已付款。

  • 物流系統新增待出貨紀錄,並建立訂單物流紀錄。

  • 信用系統更新使用者的信用點數。

  • 購物車系統清空購物車,並更新使用者的購物車紀錄。

傳統 XA 基礎方案:效能不佳

傳統確保各分支結果一致性的作法,是使用基於 eXtended Architecture(XA)協定的分散式交易系統,將四個呼叫分支封裝成一個包含四個獨立交易分支的大交易,雖然此方案可以確保結果一致性,但為了達成此目的,需要鎖定大量的資源,且隨著分支數量的增加而增加,導致系統並行度低,隨著下游分支的增加,系統效能會越來越差。

普通消息基礎方案:結果一致性差

一個較簡化的作法,是將 XA 方案中的訂單系統變更視為一個 local transaction,而下游系統的變更視為下游任務,將交易分支視為普通訊息,加上訂單資料表的交易,此方案透過非同步處理訊息,縮短處理生命週期,提升系統並行度。 普通消息方案

但此方案容易造成核心交易與交易分支之間結果不一致,例如:

  • 訊息已發送,但訂單未執行,導致整個交易需要 rollback。

  • 訂單已執行,但訊息未發送,此時需要重新發送訊息讓其被消費。

  • 無法可靠地偵測到 timeout 錯誤,導致難以判斷訂單是否需要 rollback 或訂單變更是否需要 commit。

Apache RocketMQ 分散式事務消息方案:徹底一致

上述方案之所以無法保證一致性,是因為普通訊息不具備獨立資料庫交易的 commit、rollback 與統一協調能力。

Apache RocketMQ 的事務訊息功能在一般訊息解決方案的基礎上支援兩階段提交。此功能結合兩階段提交和本機交易,以達成提交結果的整體一致性。事務訊息

Apache RocketMQ 的事務訊息解決方案功能強大、可擴充,且容易開發。如需瞭解事務訊息的工作機制和流程,請參閱工作機制。

工作機制

定義

事務訊息是 Apache RocketMQ 提供的一種進階訊息類型,用於確保訊息產生和本機交易之間的最終一致性。處理工作流程

下圖顯示事務訊息的互動流程。事務訊息

  1. 生產者將訊息傳送至 Apache RocketMQ 代理程式。

  2. Apache RocketMQ 代理程式儲存訊息並標記為尚未準備好遞送。處於此狀態的訊息稱為半訊息。之後,代理程式會將確認訊息 (ACK) 傳回給生產者。

  3. 生產者執行本機交易。

  4. 生產者傳送第二個 ACK 至代理程式,以提交本機交易的執行結果。執行結果可能是提交或回滾。

    • 如果代理程式收到的訊息狀態為「提交」,則代理程式會將半訊息標記為可遞送,並將訊息遞送至消費者。

    • 如果代理程式收到的訊息狀態為「回滾」,則代理程式會回滾交易,且不會將半訊息遞送至消費者。

  5. 如果網路中斷或生產者應用程式重新啟動,且代理程式未收到第二個 ACK 或半訊息狀態為「未知」,則代理程式會等待一段時間,並向生產者叢集中的生產者發送要求,以查詢半訊息的狀態。注意如需瞭解這段時間的長度和最大查詢次數,請參閱參數限制

  1. 生產者收到要求後,會檢查與半訊息對應的本機交易執行結果。

  2. 生產者會根據本機交易的執行結果,向 Apache RocketMQ 代理程式傳送另一個 ACK。然後,代理程式會依照步驟 4 處理半訊息。

事務訊息的生命週期 事務訊息

  • 已初始化:訊息由生產者建立並初始化,準備傳送至代理程式。

  • 交易待處理:半訊息已傳送至代理程式。但並未立即寫入磁碟以進行永久儲存。而是儲存在交易儲存系統中。直到系統驗證本機交易的第二階段成功,訊息才會提交。在此期間,訊息對下游消費者而言是不可見的。

  • 回滾:在第二階段,如果交易執行結果為回滾,代理程式將回滾半訊息並終止工作流程。

  • 準備就緒:訊息已傳送至代理程式,對消費者可見且可供使用。

  • 傳輸中:訊息已由消費者取得,並根據消費者的本機商業邏輯進行處理。

    在此過程中,代理程式會等待消費者完成使用並提交使用結果。如果在特定時間內未收到消費者的回應,Apache RocketMQ 會重試訊息。如需詳細資訊,請參閱使用重試

  • 已確認:消費者完成使用並將使用結果提交至代理程式。代理程式會標記目前訊息是否已成功使用。

    預設情況下,Apache RocketMQ 會保留所有訊息。當提交使用結果時,訊息資料會以邏輯方式標記為已使用,而不是立即刪除。因此,消費者可以在訊息因保留期間到期或儲存空間不足而刪除前,回溯訊息以重新使用。

  • 已刪除:當訊息的保留期間到期或儲存空間不足時,Apache RocketMQ 會從實體檔案中以滾動方式刪除最早儲存的訊息。如需詳細資訊,請參閱訊息儲存和清理

使用限制

訊息類型一致性

交易訊息只能用於 MessageType 為交易的主題。

以交易為中心的消費

Apache RocketMQ 的交易訊息功能可保證相同的交易可以在本機核心交易和下游分支之間處理。但無法保證訊息使用結果與上游執行結果之間的一致性。因此,下游業務必須確保訊息已正確處理。我們建議消費者使用重試,以確保訊息在發生故障時已正確處理。

中間狀態可見性

Apache RocketMQ 的交易訊息功能僅確保最終一致性,表示在訊息傳送給消費者之前,上游交易和下游分支之間無法保證狀態一致性。因此,交易訊息僅適用於接受非同步執行的交易場景。

交易逾時機制

Apache RocketMQ 為事務訊息實作逾時機制,當 Broker 收到半訊息後,若在一定時間內無法確定要提交或回滾該事務,則 Broker 預設會回滾該訊息,詳細逾時時間可參考參數限制

範例

建立 Topic

在 Apache RocketMQ 5.0 中建立 Topic,建議使用 mqadmin 工具,但要注意的是,需要將訊息類型加入為屬性參數,範例如下

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=Transaction

發送訊息

發送事務訊息與一般訊息不同的地方在於

  • 在發送事務訊息前,必須先啟用事務檢查器,並與本地事務執行關聯。

  • 建立 Producer 時,必須設定事務檢查器,並綁定要發送訊息的 Topic 清單,如此一來,當發生異常時,才能讓 Client 內建的事務檢查器還原 Topic。

建立 TRANSACTION Topic

NORMAL Topic 不支援傳遞 TRANSACTION 訊息,若傳送 TRANSACTION 訊息到 NORMAL topic 會收到錯誤。

./bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster -a +message.type=TRANSACTION
  • -c 集群名稱
  • -t Topic 名稱
  • -n nameserver 地址
  • -a 額外屬性,我們加入一個 message.type 屬性,並設定為 TRANSACTION,以支援傳遞 TRANSACTION 訊息。

以下範例以 Java 為例,說明如何發送事務訊息

    // The demo is used to simulate the order table query service to check whether the order transaction is submitted. 
private static boolean checkOrderById(String orderId) {
return true;
}
// The demo is used to simulate the execution result of a local transaction.
private static boolean doLocalTransaction() {
return true;
}
public static void main(String[] args) throws ClientException {
ClientServiceProvider provider = new ClientServiceProvider();
MessageBuilder messageBuilder = new MessageBuilder();
// Build a transaction producer: The transactional message requires the producer to build a transaction checker to check the intermediate status of an exceptional half message.
Producer producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> {
/**
* The transaction checker checks whether the local transaction is correctly committed or rolled back based on the business ID, for example, an order ID.
* If this order is found in the order table, the order insertion action is committed correctly by the local transaction. If no order is found in the order table, the local transaction has been rolled back.
*/
final String orderId = messageView.getProperties().get("OrderId");
if (Strings.isNullOrEmpty(orderId)) {
// Message error. Rollback is returned.
return TransactionResolution.ROLLBACK;
}
return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
})
.build();
// Create a transaction branch.
final Transaction transaction;
try {
transaction = producer.beginTransaction();
} catch (ClientException e) {
e.printStackTrace();
// If the transaction branch fails to be created, the transaction is terminated.
return;
}
Message message = messageBuilder.setTopic("topic")
// Specify the message index key so that the system can use a keyword to accurately locate the message.
.setKeys("messageKey")
// Specify the message tag so that consumers can use the tag to filter the message.
.setTag("messageTag")
// For transactional messages, a unique ID associated with the local transaction is created to verify the query of the local transaction status.
.addProperty("OrderId", "xxx")
// Message body.
.setBody("messageBody".getBytes())
.build();
// Send a half message.
final SendReceipt sendReceipt;
try {
sendReceipt = producer.send(message, transaction);
} catch (ClientException e) {
// If the half message fails to be sent, the transaction can be terminated and the message is rolled back.
return;
}
/**
* Execute the local transaction and check the execution result.
* 1. If the result is Commit, deliver the message.
* 2. If the result is Rollback, roll back the message.
* 3. If an unknown exception occurs, no action is performed until a response is obtained from a half message status query.
*
*/
boolean localTransactionOk = doLocalTransaction();
if (localTransactionOk) {
try {
transaction.commit();
} catch (ClientException e) {
// You can determine whether to retry the message based on your business requirements. If you do not want to retry the message, you can use the half message status query to submit the transaction status.
e.printStackTrace();
}
} else {
try {
transaction.rollback();
} catch (ClientException e) {
// We recommend that you record the exception information. This enables you to submit the transaction status based on the half message status query in the event of a rollback exception. Otherwise, you have to retry the message.
e.printStackTrace();
}
}
}

使用注意事項

避免大量半訊息逾時。

Apache RocketMQ 允許在事務提交時發生異常時檢查事務,以確保事務的一致性,但 Producer 應盡量避免本地事務回傳未知的結果,大量的事務檢查會導致系統效能下降,拖慢事務處理速度。

正確處理進行中的事務。

在半訊息狀態查詢時,對於進行中的事務,不要回傳 Rollback 或 Commit,應維持該事務為 Unknown 狀態。

一般來說,事務進行中的原因是事務執行較慢,而查詢頻率過快,建議有兩種解法

  • 將第一次查詢的時間間隔設定較大,但可能會造成依賴查詢結果的訊息有較大的延遲。

  • 讓程式正確的辨識進行中的事務。