消費者
本節說明 Apache RocketMQ 中消費者的定義、模型關係、內部屬性和行為限制。此主題還提供消費者的版本相容性資訊和使用注意事項。
定義
消費者是接收並處理 Apache RocketMQ 中訊息的實體。
消費者通常整合在商業系統中。他們從 Apache RocketMQ 代理程式取得訊息,並將訊息轉換成商業邏輯可以感知和處理的資訊。
下列項目決定消費者的行為
消費者身分:消費者必須與消費者群組關聯,才能取得行為設定和使用狀態。
消費者類型:Apache RocketMQ 提供各種消費者類型,以適用於不同的開發情境,包括推播消費者、簡單消費者和拉取消費者。如需更多資訊,請參閱 消費者類型。
消費者的本機設定:這些設定會根據消費者類型指定消費者用戶端執行的內容。例如,您可以設定消費者上的執行緒數目和並行設定,以達成不同的傳輸效果。
模型關係
下圖顯示消費者在 Apache RocketMQ 的網域模型中的定位。
訊息由生產者初始化,並傳送到 Apache RocketMQ 伺服器。
訊息儲存在 Apache RocketMQ 伺服器中主題的指定佇列中,依據它們到達 Apache RocketMQ 伺服器的順序。
消費者根據指定的訂閱關係從 Apache RocketMQ 伺服器取得並使用訊息。
內部屬性
消費者群組名稱
定義:與目前消費者相關的消費者群組名稱。消費者會從消費者群組繼承其行為。如需詳細資訊,請參閱消費者群組。
值:消費者群組是 Apache RocketMQ{#product-name} 的邏輯資源。
您必須使用主控台或提前呼叫 API 作業來建立消費者群組。如需詳細瞭解此作業的限制,請參閱參數限制。
客戶端 ID
定義:消費者客戶端的識別碼。此屬性用於區分不同的消費者。此值在叢集中必須是唯一的。
值:客戶端 ID 會由 Apache RocketMQ SDK 自動產生。它主要用於 O&M 目的,例如檢視日誌和找出問題。客戶端 ID 無法修改。
通訊參數
端點 (必要):用於連線到伺服器的端點。此端點用於識別叢集。
存取點必須以格式設定。我們建議您使用網域名稱,避免使用 IP 位址,以防止節點變更無法執行熱點遷移。
憑證 (選用):客戶端用於驗證的憑證。
僅當伺服器啟用身分識別和驗證時才需要傳輸。
- 要求逾時 (選用):網路要求的逾時期間。如需詳細瞭解值範圍和預設值,請參閱參數限制。
預先繫結訂閱清單
定義:指定消費者的訂閱清單。Apache RocketMQ 代理伺服器可以在消費者初始化期間使用預先繫結訂閱清單來驗證訂閱主題的權限和有效性,而不是在應用程式啟動後。
值:我們建議您在消費者初始化期間指定訂閱或訂閱主題清單。如果未指定訂閱或訂閱主題已變更,Apache RocketMQ 會動態驗證主題。
訊息監聽器
定義:消費者用於在 Apache RocketMQ 代理伺服器將訊息推播到消費者後呼叫訊息使用邏輯的監聽器。
值:訊息監聽器的值會設定在消費者客戶端上。
限制:當您以推播消費者的身分使用訊息時,您必須在消費者用戶端上設定訊息監聽器。如需有關消費者類型的詳細資訊,請參閱 消費者類型。
行為限制
在 Apache RocketMQ 領域模型中,消費者管理是透過消費者分組來實作,而同一個群組中的消費者會共用訊息以進行使用。因此,為確保群組中訊息的正常載入和使用,Apache RocketMQ 要求同一個群組中的所有消費者保持以下使用行為一致
傳遞順序
使用重試政策
版本相容性
如行為限制中所述,同一個群組中所有消費者的傳遞順序和使用重試政策需要一致。
Apache RocketMQ 伺服器版本 5.x:前一個消費者的使用行為會從關聯的消費者群組取得。因此,同一個群組中所有消費者的使用行為必須一致,而用戶端不需要注意它。
Apache RocketMQ 伺服器版本 3.x/ 4.x 歷程記錄:前一個使用邏輯是由消費者用戶端介面定義的。因此,當您設定消費者用戶端時,您必須確保同一個群組中消費者的使用行為一致。
如果您使用 Apache RocketMQ 伺服器版本 5.x,而用戶端使用先前版本的 SDK,則消費者的使用邏輯會受到消費者用戶端介面設定的影響。
使用注意事項
我們建議您限制個別程序上的消費者數量。
Apache RocketMQ 的消費者支援在通訊協定層級上的非封鎖傳輸模式。非封鎖傳輸模式具有較高的通訊效率,並支援多個執行緒同時存取。因此,在大部分的情況下,只需在單一程序中為消費者群組初始化一個消費者。在開發階段,請避免初始化具有相同組態的多個消費者。
我們建議您不要定期建立和銷毀消費者。
Apache RocketMQ 的消費者是基礎資源,可以重複使用,就像資料庫的連線池一樣。您不需要每次接收訊息時都建立消費者,或在使用訊息後銷毀消費者。如果您定期建立和銷毀消費者,會在代理程式上產生大量的短連線要求。這會對您的系統造成高負載。
正確範例
Consumer c = ConsumerBuilder.build();
for (int i =0;i<n;i++)
{
Message m= c.receive();
//process message
}
c.shutdown();
錯誤範例
for (int i =0;i<n;i++)
{
Consumer c = ConsumerBuilder.build();
Message m= c.receive();
//process message
c.shutdown();
}