跳至主要內容
版本:5.0

消費者類型

Apache RocketMQ 支援下列消費者類型:PushConsumer、SimpleConsumer 和 PullConsumer。本主題說明三種消費者類型的用法、運作和重試機制,以及場景。

背景資訊

Apache RocketMQ 提供 PushConsumer、SimpleConsumer 和 PullConsumer 消費者類型。三種消費者類型具有不同的整合和控制方式,可讓您滿足不同商業場景中的訊息傳遞需求。下列因素有助於您為您的商業場景選擇合適的消費者類型

  • 同時消費:消費者如何使用多執行緒技術來實作同時訊息消費,以提高訊息處理效率?

  • 同步或非同步訊息處理:對於不同的整合場景,消費者可能需要將收到的訊息非同步地分發到商業邏輯系統進行處理。我如何實作非同步訊息處理?

  • 可靠訊息處理:消費者在處理訊息時如何傳回回應結果?當發生訊息錯誤時,我如何實作訊息重試以確保可靠的訊息處理?

有關前述問題的解答,請參閱 PushConsumerSimpleConsumer

功能概述

消息消费流程

上圖顯示 Apache RocketMQ 中消費者使用訊息的階段包括:接收訊息、處理訊息和提交使用狀態。

三種類型的消費者提供不同的實作方法和 API 操作,適用於各種訊息使用情境。下表說明三種類型消費者的差異。

資訊

PullConsumer 僅建議整合到串流處理架構中。PushConsumer 和 simpleConsumer 可以滿足大多數情境。

您可以根據業務情境在 PushConsumer 和 SimpleConsumer 之間切換。切換到不同的消費者類型時,Apache RocketMQ 中現有資源和現有業務處理任務的使用不會受到影響。

危險

嚴禁在同一個 consumerGroup 中混合使用 pullConsumer 和其他消費者類型。

項目PushConsumerSimpleConsumerPullConsumer
API 操作呼叫呼叫回呼操作以使用訊息監聽器傳回使用結果。消費者只能在訊息監聽器的範圍內處理使用邏輯。業務應用程式實作訊息處理,並呼叫對應的操作以傳回使用結果。業務應用程式實作訊息擷取和處理,並呼叫對應的操作以傳回使用結果。
使用管理Apache RocketMQ SDK 用於管理訊息使用中並行的執行緒數。訊息使用中並行的執行緒數基於個別業務應用程式的使用邏輯。訊息使用中並行的執行緒數基於個別業務應用程式的使用邏輯。
負載平衡機制5.0 版本中為訊息型負載平衡,早期版本為佇列型負載平衡。訊息型負載平衡。佇列型負載平衡。
API 靈活性API 操作被封裝,靈活性較差。原子操作,靈活性較高。原子操作,靈活性較高。
場景此消費者類型適合不需自訂處理流程的開發場景。此消費者類型適合需自訂處理流程的開發場景。建議僅在串流處理框架場景中整合

PushConsumer

PushConsumer 是一種封裝程度高的消費者類型,訊息消費與消費結果提交僅透過訊息監聽器處理,訊息拉取、消費狀態提交、消費重試等操作都由 Apache RocketMQ client SDK 完成。

使用方式

PushConsumer 使用方式固定,消費者初始化時向 PushConsumer 註冊訊息監聽器,在訊息監聽器中實現訊息處理邏輯,訊息拉取、監聽器呼叫觸發、訊息重試等操作都由 Apache RocketMQ SDK 完成。

範例程式碼

// Message consumption example: Use a PushConsumer consumer to consume messages. 
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
// Configure consumer group.
.setConsumerGroup("YourConsumerGroup")
// Specify the access point.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
// Specify the pre-bound subscriptions.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Set the message listener.
.setMessageListener(new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
// Consume the messages and return the consumption result.
return ConsumeResult.SUCCESS;
}
})
.build();

PushConsumer 消費者的訊息監聽器回傳結果有以下幾種

  • 消費成功:例如使用 Apache RocketMQ SDK for Java,消費訊息成功時回傳 ConsumeResult.SUCCESS,伺服器會根據消費結果更新消費進度。

  • 消費失敗:例如使用 Apache RocketMQ SDK for Java,消費訊息失敗時回傳 ConsumeResult.FAILURE,Apache RocketMQ 是否重試消費訊息取決於消費重試策略。

  • 意外失敗:例如拋出意外異常,導致訊息消費失敗,Apache RocketMQ 是否重試消費訊息取決於消費重試策略。

如果訊息處理邏輯中意外錯誤持續導致 PushConsumer 消費者無法消費訊息,SDK 會判定為消費逾時,強制提交消費失敗結果,訊息會根據消費重試策略處理,消費逾時詳情請參閱 Push Consumer 重試策略

資訊

當消費逾時發生時,SDK 會提交消費失敗結果,但當前的消費執行緒可能無法及時響應結果,繼續處理訊息::

工作原理

PushConsumer 的訊息即時處理,基於 SDK 內建的典型 Reactor 執行緒模型,SDK 內建長輪詢執行緒,會拉取訊息並暫存於佇列中,再由佇列將訊息發送給個別訊息消費執行緒,訊息監聽器根據訊息消費邏輯進行處理,以下為 PushConsumer 消費者的訊息消費流程圖 PushConsumer原理

重試以確保可靠性

對於 PushConsumer,客戶端 SDK 和消費邏輯單元之間的通訊僅透過使用訊息監聽器來實作。客戶端 SDK 會根據訊息監聽器傳回的結果來檢查訊息是否已消費,並根據消費重試邏輯執行重試以確保訊息可靠性。所有訊息都必須以同步方式消費。在監聽器操作呼叫結束時傳回消費結果。不允許非同步分配。如需瞭解關於訊息重試的更多資訊,請參閱 推播消費者的重試政策

為了確保訊息傳遞的可靠性,Apache RocketMQ 禁止 PushConsumer 消費者在訊息消費中執行下列行為。

  • 在訊息消費完成之前傳回消費結果。例如,對於稍後無法消費的訊息,會提前傳回消費成功結果。在這種情況下,Apache RocketMQ 無法檢查實際的消費結果,也不會重試消費訊息。

  • 從訊息監聽器將訊息分配到其他自訂執行緒,並提前傳回消費結果。如果訊息無法消費,但已提前傳回消費成功結果,Apache RocketMQ 無法檢查實際的消費結果,也不會重試消費訊息。

確保訊息順序

對於 Apache RocketMQ 中的 先進先出 (FIFO) 訊息,如果已為消費者群組設定已排序訊息消費,PushConsumer 消費者會依消費順序消費訊息。當 PushConsumer 消費者消費訊息時,會確保消費順序,而不需要個別業務應用程式在業務邏輯中定義消費順序。

在 Apache RocketMQ 中,同步提交是已排序訊息處理的前提。如果在業務邏輯中定義了非同步分配,Apache RocketMQ 無法確保訊息順序。::

場景

PushConsumer 將訊息處理限制為同步處理,並限制處理每則訊息的逾時時間。PushConsumer 適用於下列場景

  • 可預測的訊息處理時間:如果未限制訊息處理時間,則會持續觸發訊息重試,以確保訊息可靠性,而這些訊息需要較長的處理時間。這會導致大量重複訊息。

  • 沒有非同步化且沒有自訂流程:PushConsumer 將消費邏輯的執行緒模式限制為 Reactor 執行緒模式。客戶端 SDK 會根據最大通量來處理訊息。此模式容易開發,但不允許非同步或自訂流程。

SimpleConsumer

SimpleConsumer 是一種消費者類型,支援訊息處理的原子操作。此類型的消費者會呼叫作業來取得訊息、提交使用狀態,並根據業務邏輯執行訊息重試。

使用方式

SimpleConsumer 涉及多個 API 作業。會根據需要呼叫對應的作業,以取得訊息並將其分配給業務執行緒進行處理。然後,呼叫提交作業來提交訊息處理結果。範例程式碼

// Consumption example: When a SimpleConsumer consumer consumes normal messages, the consumer obtain messages and commit message consumption results. 
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
// Configure consumer group.
.setConsumerGroup("YourConsumerGroup")
// Specify the access point.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
// Specify the pre-bound subscriptions.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Specify the max await time when receive messages from the server.
.setAwaitDuration(Duration.ofSeconds(1))
.build();
try {
// A SimpleConsumer consumer must obtain and process messages.
List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, the consumer must re-initiate the request to obtain the message.
logger.error("Failed to receive message", e);
}

下表說明 SimpleConsumer 提供的 API 作業。

作業說明可修改參數
ReceiveMessage消費者可以呼叫此作業從伺服器取得訊息。注意由於伺服器使用分散式儲存,因此即使請求的訊息實際上存在於伺服器上,伺服器仍可能傳回空結果。您可以再次呼叫 ReceiveMessage 作業或增加 ReceiveMessage 作業中的並行值。批次提取大小:一次取得的訊息數量。SimpleConsumer 消費者可以取得多則訊息進行批次使用。訊息隱藏持續時間:訊息的最大處理時間。此參數控制使用失敗時的訊息重試間隔。如需更多資訊,請參閱SimpleConsumer 的重試政策。呼叫 ReceiveMessage 作業時需要此參數。
AckMessage訊息被消費者使用後,消費者會呼叫此作業將使用成功結果傳回伺服器。
ChangeInvisibleDuration在使用重試的情況下,消費者可以呼叫此作業變更訊息處理時間,以控制訊息重試間隔。訊息隱藏持續時間:訊息的最大處理時間。您可以呼叫此作業變更 ReceiveMessage 作業中指定的訊息隱藏持續時間。在多數情況下,此作業用於您想要增加訊息處理時間的情況。

重試以確保可靠性

當 SimpleConsumer 消費者使用訊息時,用戶端 SDK 與 Apache RocketMQ 伺服器之間的通訊是透過使用 ReceiveMessageAckMessage 作業來實作的。當用戶端 SDK 成功處理訊息時,會呼叫 AckMessage 作業。當訊息處理失敗時,不會傳回確認訊息,在指定的訊息隱藏持續時間過後觸發訊息重試機制。如需更多資訊,請參閱簡易消費者的重試政策

確保訊息順序

在 Apache RocketMQ 中,SimpleConsumer 消費者會依儲存順序取得Fifo 訊息。如果一組已排序訊息中的某則訊息尚未完全處理,則無法取得該組已排序訊息中的下一則訊息。

場景

SimpleConsumer 提供原子 API 作業來取得訊息並提交使用結果。與 PushConsumer 相較,SimpleConsumer 提供更好的彈性。SimpleConsumer 適用於下列情況

  • 無法控制訊息處理時間:如果訊息處理時間無法估計,建議您使用 SimpleConsumer 來防止訊息處理時間過長。您可以在訊息使用期間指定估計訊息處理時間。如果現有的處理時間不適合您的業務情境,您可以呼叫對應的 API 操作來變更訊息處理時間。

  • 非同步處理和批次使用:SimpleConsumer 沒有包含 SDK 中的複雜執行緒封裝。商業應用程式可以使用自訂設定。這樣,SimpleConsumer 使用者可以實作非同步分發、批次使用和其他自訂情境。

  • 自訂訊息使用率:使用 SimpleConsumer 時,商業應用程式會呼叫 ReceiveMessage 操作來取得訊息。您可以調整取得訊息的頻率來控制訊息使用率。

PullConsumer

待續。

使用說明

為 PushConsumer 指定適當的使用時間限制

建議您限制 PushConsumer 使用者的訊息使用時間,以防止訊息處理時間過長。訊息處理時間過長可能會因為訊息處理逾時而導致訊息重複,並持續讓下一則訊息等待使用。如果訊息經常處理過長,建議您使用 SimpleConsumer,並根據您的業務需求指定適當的訊息隱藏時間。