跳至主要內容
版本:5.0

消費重試

如果訊息無法被消費,Apache RocketMQ 會根據消費重試政策重新傳送訊息。這有助於移除一些錯誤。本主題說明消費重試功能的工作機制、版本相容性和使用說明。

場景

Apache RocketMQ 的消費重試功能確保消費完整性,而完整性可能會受到業務處理邏輯失敗的影響。此功能是針對業務邏輯失敗的保護措施。它不能用於控制業務流程。

此功能適用於下列場景

  • 由於訊息內容導致業務失敗。例如,交易狀態未傳回,預期業務會在特定時間內復原。

  • 消費失敗的原因不會影響業務持續性。失敗發生的機率很小,後續訊息很可能會傳送並如預期般被消費。在這些情況下,您可以使用重試機制重新傳送訊息,以避免阻擋流程。

請勿在下列場景中使用此功能

  • 消費失敗用於將訊息流程轉移至處理邏輯中的條件。處理邏輯假設許多訊息無法被消費。

  • 消費失敗用於限制訊息處理速率。速率限制應當用於暫時將過多訊息堆疊在佇列中,以供稍後處理,而不是讓訊息進入重試連結。

目的

非同步解耦中訊息中介軟體的常見問題是如何確保下游服務無法處理訊息時整個呼叫連結的完整性。作為金融級別可靠訊息中介軟體服務,Apache RocketMQ 使用精心設計的訊息確認和重試機制來確保每則訊息都根據業務預期進行處理。

了解 Apache RocketMQ 的訊息確認和重試機制有助於解決下列問題

  • 如何確保每則訊息都已處理:您可以根據其消費者邏輯和業務狀態一致性來確保每則訊息都已處理。

  • 如何確保在發生例外情況時正在處理的訊息狀態正確:當發生例外情況(例如斷電)時,您可以確保正確的訊息狀態。

政策概觀

當啟用消費重試功能時,Apache RocketMQ 代理會在訊息無法被消費時重新傳送訊息。如果訊息在經過指定次數的重試後仍然無法被消費,代理會將訊息傳送至死信佇列。

觸發條件

  • 訊息無法被消費。在這種情況下,消費者會傳回失敗狀態或系統會擲回例外情況。

  • 發生逾時錯誤或訊息在推播消費者佇列中停留過長時間。

行為

  • 重試處理狀態機:控制重試處理中訊息的狀態和變更邏輯。

  • 重試間隔:從發生消費失敗或逾時到重試訊息所經過的時間。

  • 最大重試次數:訊息可供消費的重試次數上限。

政策差異

訊息重試政策會根據消費者類型使用不同的重試機制和組態方法。下表說明各政策之間的差異。

消費者類型重試程序狀態機器重試間隔最大重試次數
PushConsumer 準備中 進行中 等待重試 已提交 * DLQ在建立消費者群組時,在元資料中指定。 無序訊息:遞增 有序訊息:固定在建立消費者群組時,在元資料中指定。
SimpleConsumer 準備中 進行中 已提交 DLQ在 API 中的 InvisibleDuration 參數中指定。在建立消費者群組時,在元資料中指定。

如需進一步瞭解重試政策,請參閱 Push 消費者的重試政策Simple 消費者的重試政策

PushConsumer 的重試政策

重試程序狀態機器

當 Push 消費者使用訊息時,訊息可以處於下列狀態之一:Push 消費者的狀態機器

  • 準備中 訊息正在等待在 Apache RocketMQ 代理程式上使用。
  • 進行中 訊息已取得,且正由消費者使用。不過,尚未傳回使用結果。
  • 等待重試 此狀態僅限於 Push 消費者。當代理程式等待消費者傳回使用狀態時,訊息使用失敗或發生逾時錯誤。在這些情況下,會觸發使用重試邏輯。如果尚未達到最大重試次數,訊息會在重試間隔經過後返回準備中狀態。處於準備中狀態的訊息可以再次使用。您可以增加重試之間的間隔,以防止頻繁重試。
  • 已提交 訊息已使用。在消費者傳回成功回應後,可以終止狀態機器。
  • DLQ 使用邏輯的預防措施。如果訊息在達到最大重試次數後仍無法使用,訊息將不再重試,並傳送至死信佇列。您可以使用死信佇列中的訊息,以復原您的業務。

當訊息重試時,其狀態會從 Ready 變更為 Inflight,然後再變更為 WaitingRetry。兩次消費之間的間隔是實際花費在消費的時間與重試間隔的總和。最大消費間隔由代理伺服器的系統參數指定,且無法超過此間隔。 重試間隔

最大重試次數

推播消費者的最大重試次數會在建立消費者群組時,指定在元資料中。如需詳細資訊,請參閱 消費者群組

例如,如果最大重試次數為 3,則訊息可以傳送 4 次:一次原始嘗試和 3 次重試。

重試間隔

  • 未排序的訊息(非排序訊息):遞增。下表說明詳細資訊。

    重試次數間隔重試次數間隔
    110 秒97 分鐘
    230 秒108 分鐘
    31 分鐘119 分鐘
    42 分鐘1210 分鐘
    53 分鐘1320 分鐘
    64 分鐘1430 分鐘
    75 分鐘151 小時
    86 分鐘162 小時
資訊

如果重試次數超過 16,則每個後續重試的間隔為 2 小時。

  • 已排序訊息:固定。如需詳細資訊,請參閱參數限制

範例

對於推播消費者,訊息重試僅由消費失敗的狀態碼觸發。SDK 也會擷取意外例外狀況。

        SimpleConsumer simpleConsumer = null;
// Consumption example: Consume normal messages as a push consumer and trigger a message retry by using a consumption failure.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Retry the message until the maximum number of retries is reached.
return ConsumeResult.FAILURE;
}
};

SimpleConsumer 的重試政策

重試程序狀態機器

當簡易消費者消費訊息時,訊息可以處於下列狀態之一:推播消費者狀態機

  • 準備中 訊息正在等待在 Apache RocketMQ 代理程式上使用。

  • 進行中 訊息已取得,且正由消費者使用。不過,尚未傳回使用結果。

  • 已提交 訊息已使用。在消費者傳回成功回應後,可以終止狀態機器。

  • DLQ 使用邏輯的預防措施。如果訊息在達到最大重試次數後仍無法使用,訊息將不再重試,並傳送至死信佇列。您可以使用死信佇列中的訊息,以復原您的業務。

重試間隔是固定的且預先配置的。消費者在呼叫 API 時,會在 InvisibleDuration 參數中設定此間隔。此參數指定訊息的最大處理時間。當訊息重試時,會重複使用此參數的值。您不需要設定後續重試的間隔。 簡易消費者重試

由於 InvisibleDuration 值是預先配置的,因此可能無法符合您的業務需求。您可以在用於呼叫 API 的程式碼中變更它。

例如,如果您將 InvisibleDuration 值設定為 20 毫秒,而訊息無法在時間內處理,則可以將值變更為較大的值,以避免觸發重試機制。

在變更 InvisibleDuration 值之前,必須符合下列條件

  • 當前訊息未發生超時錯誤。

  • 未返回當前訊息的消費狀態。

如下圖所示,修改後立即生效,即從調用 API 的時間點重新計算 InvisibleDuration 值。修改 InvisibleDuration 值

最大重試次數

對於普通消費者,最大重試次數在創建消費者組時在元數據中指定。更多資訊,請參閱 消費者組

訊息重試間隔

訊息重試間隔 = InvisibleDuration 值 − 訊息實際處理時長

因此,消費重試間隔由 InvisibleDuration 值控制。例如,如果 InvisibleDuration 值為 30 毫秒,並且在處理開始後 10 毫秒返回消費失敗,則下次重試的時間為 20 毫秒,這意味著重試間隔為 20 毫秒。如果在 30 毫秒內未返回消費結果,則會發生超時錯誤並觸發重試。此時,重試間隔為 0 毫秒。

範例

普通消費者只需等待訊息重試即可。

 // Consumption example: Consume normal messages as a simple consumer. If you want a message to be retried, do not process the message. Wait for it to time out, and the broker retries it automatically. 
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// If you want a message to be retried after it fails to be consumed, ignore the failure and wait for the message to be visible. Then try to obtain it again from the broker.
});
} catch (ClientException e) {
// If the message fails to be pulled due to throttling or other reasons, you must re-initiate the request to obtain the message.
e.printStackTrace();
}

使用說明

不要使用消費重試來處理消費限流

場景 中所述,訊息重試適用於業務處理和訊息消費失敗是小機率事件的場景。訊息重試不適用於失敗持續發生的場景,例如消費限流。

  • 錯誤範例:當前消費速率高於上限時,返回消費失敗以觸發重試。

  • 正確範例:如果當前消費速率高於限制,則稍後獲取並消費訊息。

設定適當的重試次數以避免無限重試

儘管 Apache RocketMQ 支援自訂消費重試次數,但建議您設定較少的重試次數和較長的重試間隔,以減輕系統負擔。避免大量重試或無限重試。