跳到主要內容
版本:5.0

一般訊息

一般訊息是指 Apache RocketMQ 中沒有特殊功能的訊息。它們不同於具有 fifo 訊息、延遲訊息和交易訊息等功能的訊息。本主題說明一般訊息的範例、運作機制、用法和注意事項。

範例

一般訊息通常用於微服務解耦、資料整合和事件驅動的範例。這些範例大多數對處理訊息的時間或順序沒有要求,或要求很低,除了可靠的傳輸通道之外。

範例 1:微服務的非同步解耦 線上訊息處理

上圖顯示線上電子商務交易範例。在此範例中,上游訂單系統將訂單下達和付款封裝成一個獨立的一般訊息,並將訊息傳送至 Apache RocketMQ 代理程式。然後,下游系統會依需求從代理程式訂閱訊息,並根據本地的消費邏輯處理任務。訊息彼此獨立,不需要關聯。

情境 2:數據整合傳輸 數據傳輸

上圖以離線日誌收集為例,通過 Instrumentation 元件收集前端應用程式的操作日誌,並將日誌轉發到 Apache RocketMQ 中。每條消息都是一條日誌資料,Apache RocketMQ 不需要對其做任何處理,只需要將日誌資料傳輸到下游的儲存和分析系統即可,後續的處理工作由後端應用程式負責。

工作機制

普通消息定義

普通消息是 Apache RocketMQ 中具有基本功能的消息,支援生產者和消費者之間的非同步解耦和通訊。 生命週期

普通消息生命週期

  • 初始化:消息由生產者建立並初始化,準備發送到 Broker。

  • 就緒:消息發送到 Broker,消費者可見,可以被消費。

  • 飛行中:消息被消費者獲取,並根據消費者的本地業務邏輯進行處理。

    在此過程中,Broker 會等待消費者完成消費並提交消費結果,如果在一定時間內沒有收到消費者的響應,Apache RocketMQ 會對消息進行重試,詳見 消費重試

  • 已確認:消費者完成消費,並將消費結果提交給 Broker,Broker 標記當前消息是否消費成功。

    預設情況下,Apache RocketMQ 會保留所有消息,當提交消費結果時,消息資料會被邏輯標記為已消費,而不是立即刪除,因此在消息因保留時間到期或儲存空間不足而被刪除之前,消費者可以回溯消息進行再次消費。

  • 已刪除:當消息的保留時間到期或儲存空間不足時,Apache RocketMQ 會以滾動的方式從物理檔案中刪除最早儲存的消息,詳見 消息儲存和清理

使用限制

一般訊息僅支援 MessageType 為 Normal 的主題。

範例

建立主題

建議使用 mqadmin 工具建立 Apache RocketMQ 5.0 中的主題。但請注意,訊息類型需要新增為屬性參數。以下是範例

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=NORMAL

傳送訊息

您可以設定索引鍵和篩選標籤來篩選或搜尋一般訊息。以下範例程式碼展示如何在 Java 中傳送和接收一般訊息

// Send a normal message. 
MessageBuilder messageBuilder = new MessageBuilder();
Message message = messageBuilder.setTopic("topic")
// Specify the message index key so that you can accurately search for the message by using a keyword.
.setKeys("messageKey")
// Specify the message tag so that the consumer can filter the message based on the specified tag.
.setTag("messageTag")
// Message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the message. You need to pay attention to the sending result and capture exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
// Consumption example 1: When you consume a normal message as a push consumer, you need only to process the message in the message listener.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: When you consume a normal message as a simple consumer, you must obtain and consume the message, and submit the consumption result.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, you must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, you must re-initiate the request to obtain the message.
e.printStackTrace();
}

使用注意事項

設定全域唯一索引鍵以利於進行問題排除

您可以在 Apache RocketMQ 中設定自訂索引鍵,也就是訊息鍵。當您查詢和追蹤訊息時,索引鍵可以協助您有效率且準確地找到這些訊息。

因此,建議您在傳送訊息時,使用服務的唯一資訊,例如訂單 ID 和使用者 ID,作為索引。這有助於您在未來快速找到訊息。