跳至主要內容
版本:5.0

延遲訊息

延遲訊息是 Apache RocketMQ 中具有進階功能的訊息。本主題說明延遲訊息和延遲訊息的場景、運作機制、限制、使用範例和使用注意事項。

注意

排程訊息和延遲訊息本質上是相同的。兩者都根據訊息設定的計時時間,在固定時間將訊息傳遞給消費者。因此,以下各節使用延遲訊息。

場景

在分散式計時排程和工作逾時處理等場景中,需要精確且可靠的基於時間的事件觸發。Apache RocketMQ 提供延遲訊息,協助您簡化計時排程工作開發,並實作效能高、可擴充且可靠的計時觸發。

情境 1:分布式定時排程 定时消息

分布式定時排程情境涉及需要各種時間粒度層級的任務,例如每天下午 5 點執行檔案清理的任務或每 2 分鐘觸發推播訊息的任務。傳統基於資料集的定時排程解決方案在分布式情境中既複雜又低效。相較之下,Apache RocketMQ 中的延遲訊息允許您封裝多種類型的時間觸發器。

情境 2:任務逾時處理 超时任务处理

涉及任務逾時處理的典型情境是電子商務付款,其中未付款訂單在未付款一段特定時間後會被取消,而不是立即取消。在這種情況下,您可以使用 Apache RocketMQ 中的延遲訊息來檢查和觸發逾時任務。

基於延遲訊息的任務逾時處理提供以下好處

  • 各種時間粒度層級和簡化的開發:Apache RocketMQ 中的排程訊息沒有固定時間增量的限制。您可以在任何時間粒度層級觸發任務,而且不需要重複資料刪除。

  • 高性能和可擴充性:Apache RocketMQ 中的延遲訊息提供高並發性和可擴充性。這優於傳統的資料庫掃描方法,後者實作複雜,而且由於頻繁的 API 掃描呼叫,可能會造成效能瓶頸。

工作機制

延遲訊息的定義

延遲訊息是 Apache RocketMQ 中具有進階功能的訊息。延遲訊息允許消費者在指定時間間隔後或在指定時間才使用傳送到伺服器的訊息。您可以在分布式情境中使用延遲訊息來實作延遲排程和觸發。

時間設定規則

  • Apache RocketMQ 中延遲訊息的排程或延遲時間表示為時間戳記,而不是時間間隔。

  • 排程時間以毫秒級 Unix 時間戳記格式呈現。您必須將訊息傳遞的排程時間轉換為毫秒級 Unix 時間戳記。您可以使用Unix 時間戳記轉換器將時間轉換為毫秒級 Unix 時間戳記。

  • 排程時間必須在允許的時間範圍內。如果排程時間超出範圍,則排程時間不會生效,訊息會立即從伺服器端傳遞。

  • 預設情況下,延遲訊息的最大時間範圍為 24 小時。您無法變更預設值。如需更多資訊,請參閱參數限制

  • 排程時間必須晚於目前時間。如果排程時間設定為早於目前時間,則排程時間不會生效,訊息會立即從伺服器端傳遞。

以下部分提供兩個時間設定範例

  • 延遲訊息:如果目前時間為 2022-06-09 17:30:00,您希望在 2022-06-09 19:20:00 傳遞訊息,則排程時間的毫秒級 Unix 時間戳記為 1654773600000。

  • 延遲訊息:如果目前時間為 2022-06-09 17:30:00,您希望在 1 小時後傳遞訊息,則訊息傳遞時間為 2022-06-09 18:30:00,毫秒級 Unix 時間戳記為 1654770600000。

排程訊息的生命週期

定时消息生命周期

  • 已初始化:訊息由生產者建立並初始化,並準備傳送至伺服器。

  • 計時:訊息傳送至伺服器端,訊息儲存在基於時間的儲存系統中,直到指定傳遞時間。不會立即為訊息建立索引。

  • 準備就緒:在指定時間,訊息會寫入一般儲存引擎,訊息對使用者可見,並等待使用者使用。

  • 傳送中:使用者取得訊息,並根據使用者的當地商業邏輯處理訊息。

    在此過程中,broker 會等待 consumer 完成消費並提交消費結果。如果在一定時間內未收到 consumer 的回應,Apache RocketMQ 會重試該訊息。更多資訊,請參閱 消費重試

  • 已確認:consumer 完成消費並將消費結果提交給 broker。broker 會標記當前訊息是否已成功消費。

    預設情況下,Apache RocketMQ 會保留所有訊息。當提交消費結果時,訊息資料會被邏輯標記為已消費,而不是立即刪除。因此,在訊息因保留時間到期或儲存空間不足而被刪除之前,consumer 可以回溯訊息以重新消費。

  • 已刪除:當訊息的保留時間到期或儲存空間不足時,Apache RocketMQ 會以滾動的方式從實體檔案中刪除最早儲存的訊息。更多資訊,請參閱 訊息儲存和清理

使用限制

訊息類型一致性

延遲訊息只能傳送給 MessageType 為 Delay 的主題。

時間粒度

Apache RocketMQ 中延遲訊息的時間粒度低至毫秒。預設粒度值為 1000 毫秒。

Apache RocketMQ 中延遲訊息的狀態可以持續儲存。如果訊息系統發生故障並重新啟動,訊息仍會根據指定的傳遞時間傳遞。但是,如果儲存系統發生例外或重新啟動,則傳遞延遲訊息可能會發生延遲。

範例

建立主題

建議使用 mqadmin 工具在 Apache RocketMQ 5.0 中建立主題。但是,值得注意的是,訊息類型需要新增為屬性參數。以下是一個範例

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=Delay

傳送訊息

與一般訊息不同,延遲訊息必須為其指定傳遞時間戳記。

建立 DELAY 主題

/bin/mqadmin updateTopic -c DefaultCluster -t DelayTopic -n 127.0.0.1:9876 -a +message.type=DELAY
  • -c 集群名稱
  • -t 主題名稱
  • -n 名稱伺服器的地址
  • -a 額外屬性,我們新增一個 message.type 屬性,其值為 DELAY,以支援傳遞 DELAY 訊息。

以下程式碼提供傳遞和使用延遲訊息的 Java 範例

        // Send delay messages.
MessageBuilder messageBuilder = null;
// Specify a millisecond-level Unix timestamp. In this example, the specified timestamp indicates that the message will be delivered in 10 minutes from the current time.
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = messageBuilder.setTopic("topic")
// Specify the message index key. The system uses the key to locate the message.
.setKeys("messageKey")
// Specify the message tag. The consumer can use the tag to filter messages.
.setTag("messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
// Configure the message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the messages. Focus on the result of message sending and exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
// Consumption example 1: If a scheduled message is consumed by a push consumer, the consumer needs to process the message only in the message listener.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView.getDeliveryTimestamp());
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: If a scheduled message is consumed by a simple consumer, the consumer must obtain the message for consumption and submit the consumption result.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, you must re-initiate the request to obtain the message.
e.printStackTrace();
}
}

使用說明

建議您不要為大量訊息安排相同的傳遞時間。

延遲訊息會儲存在基於時間的儲存系統中,然後在指定的傳遞時間傳遞給使用者。如果您為大量延遲訊息指定相同的傳遞時間,系統必須在傳遞時間同時處理這些訊息。這會讓系統負載過重,導致訊息傳遞延遲。