基本最佳實務
生產者
傳送訊息的注意事項
標籤的使用
應用程式可以識別為主題,訊息子類型可以識別為標籤。標籤可以由應用程式自由設定。只有當生產者在傳送訊息時設定標籤,消費者才能在訂閱訊息時透過代理程式使用標籤來篩選訊息。5.x SDK 可以呼叫 messageBuilder.setTag("messageTag"),舊版本可以呼叫 message.setTags("messageTag")。
金鑰的使用
在服務層級,建議將每則訊息對應到一個唯一的服務識別碼,並設定為 keys 欄位,以便在未來找出訊息遺失的問題。伺服器會為每則訊息建立一個索引(雜湊索引),應用程式可以透過主題和金鑰,以及訊息由誰使用來查詢訊息的內容。由於它是雜湊索引,請確定金鑰盡可能唯一,以避免潛在的雜湊衝突。常見的設定政策使用離散的唯一識別碼,例如訂單 ID、使用者 ID 和要求 ID。
列印記錄
如果訊息傳送成功或失敗,您需要列印訊息記錄以進行故障排除服務。傳送表示只要沒有引發例外狀況,訊息就已傳送成功。
訊息傳送失敗的處理方法
生產者本身的傳送方法支援內部重試,5.x 重試邏輯參考 傳送重試政策:
上述策略在一定程度上也保障了消息发送的成功率,如果业务要求消息必须无损发送,还需要针对可能出现的异常情况进行兜底,比如发送同步方式调用发送失败,则尝试将消息存储到 db 中,由后台线程定时重试,确保消息最终能到达 Broker。
上述 DB 重试方式没有集成到 MQ 客户端中,而是需要应用自己完成,主要基于以下几个方面的考虑:一是 MQ 客户端设计为无状态模式,方便任意水平扩展,机器资源消耗仅为 cpu、内存、网络。二是如果 MQ 客户端内部集成 KV 存储模块,只有同步刷盘才能保证数据可靠,而同步刷盘本身有较大的性能开销,所以通常采用异步刷盘,并且由于应用关闭过程不受 MQ 运维人员控制,经常会出现 kill -9 这种暴力关闭。导致数据来不及刷盘丢失。三是 Producer 所在机器可靠性较低,一般为虚拟机,不适合存储重要数据。综上,建议由应用自己控制重试流程。
Consumer
消费过程做到幂等
RocketMQ 无法避免消息重复消费(Exactly Once),所以如果业务对消费重复非常敏感,需要在业务层面进行去重处理。这可以通过借助关系型数据库来实现,首先需要为消息确定一个唯一键,可以是 msgId,也可以是消息内容中的某个唯一标识字段,比如订单 id。消费前先判断该唯一键在关系型数据库中是否存在,如果不存在则插入并消费,否则跳过。(实际过程中应该考虑原子性问题,判断是否存在主键冲突,则插入失败,直接跳过)
MsgId 必须是全局唯一标识,但在实际使用中,可能会出现同一条消息拥有两个不同 msgId 的情况(消费者主动重传、客户端重投机制导致重复等),这就需要对业务字段进行去重。
消费过程变慢
增加消费并行度
絕大多數的消息消費都是 IO 密集型的,即可能在操作資料庫或調用 RPC,而這種消費的消費速率取決於後端資料庫或外部系統的吞吐量。通過增加消費並行度,可以提升總體的消費吞吐量,但當並行度增加到一定程度後,反而會下降,因此應用必須設定一個合理的並行度。修改消費並行度有以下幾種方式
- 在同一個 ConsumerGroup 中,我們增加 Consumer 實例的數量來提升並行度(注意超過訂閱隊列數量的 Consumer 實例是無效的)。可以增加機器,或在已有的機器上啟動多個進程。
- 提升單個 Consumer 的消費並行線程,5.x PushConsumer SDK 可以通過 PushConsumerBuilder.setConsumptionThreadCount() 來設定線程數,SimpleConsumer 則可以通過業務線程自由增加併發度,底層會安全地進行線程安全;歷史的 SDK PushConsumer 可以通過修改參數 consumeThreadMin 和 consumeThreadMax 來實現。
批量消費
如果某些業務場景支援批量消費,可以大幅提升消費吞吐量。例如,訂單扣減的應用,單次處理一個訂單需要 1s,而一次處理 10 個訂單可能只需要 2s,這樣可以大幅提升消費吞吐量。建議使用 5.x SDK 的 SimpleConsumer,設定每個介面呼叫的批次大小,一次拉取多條消息。
重置位點跳過不重要的訊息
在消息堆積的情況下,如果消費速率跟不上投遞速率,且業務對資料要求不那麼嚴格,可以選擇捨棄不重要的訊息。建議使用重置位點功能,直接調整消費位點到指定的時間或位置。
優化單條訊息的消費流程
例如,一條訊息的消費流程如下
- 根據訊息從 DB 查 [資料 1]
- 根據訊息從 DB 查 [資料 2]
- 複雜的業務計算
- 將 [資料 3] 寫入 DB
- 將 [資料 4] 寫入 DB
在消費此訊息期間,與資料庫有四次互動。如果我們將每次互動計算為 5 毫秒,總時間為 20 毫秒。假設服務運算花費 5 毫秒,總時間為 25 毫秒。因此,如果四次資料庫互動可以最佳化為兩次,則總時間可以最佳化為 15 毫秒,這表示整體效能提升了 40%。因此,如果應用程式對延遲很敏感,則資料庫可以部署在 SSD 磁碟上。與 SCSI 磁碟相比,前者的 RT 小很多。
消費列印記錄檔
如果訊息數量很少,建議您在消費進入方法中列印訊息,這需要很長的時間來消費。
new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
LOGGER.info("Consume message={}", messageView);
//Do your consume process
return ConsumeResult.SUCCESS;
}
}
如果您能列印每個訊息的消費時間,將會更方便排除線上問題,例如消費速度慢。
代理程式
代理程式角色
代理程式角色分為 ASYNC_MASTER、SYNC_MASTER 和 SLAVE。如果您對訊息可靠性有嚴格的要求,請部署 SYNC_MASTER 加上 SLAVE。如果不需要訊息可靠性,請部署 ASYNC_MASTER 加上 SLAVE。如果測試只求方便,您可以選擇僅 ASYNC_MASTER 或僅 SYNC_MASTER 部署。
FlushDiskType
與 ASYNC_FLUSH 相比,SYNC_FLUSH 雖然效能較差,但可靠性較高。因此,必須根據實際服務場景進行權衡。
代理程式設定
參數 | 預設值 | 說明 |
---|---|---|
listenPort | 10911 | 接受用戶端連線的監聽埠 |
namesrvAddr | null | 名稱伺服器地址 |
brokerIP1 | 網路 InetAddress | 代理程式目前監聽的 IP 位址 |
brokerIP2 | 與 brokerIP1 相同 | 當存在主從架構的 Broker 時,若在 Broker 主節點上配置了 brokerIP2 屬性,則 Broker 從節點會連線到主節點上配置的 brokerIP2 進行同步 |
brokerName | null | Broker 名稱 |
brokerClusterName | DefaultCluster | 此 Broker 所屬的叢集名稱 |
brokerId | 0 | broker id 0 表示主節點,其他正整數表示從節點 |
storePathCommitLog | $HOME/store/commitlog/ | 儲存提交記錄的目錄 |
storePathConsumerQueue | $HOME/store/consumequeue/ | 消費佇列儲存的目錄 |
mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | 提交記錄映射檔案大小 |
deleteWhen | 04 | 刪除超過檔案保留時間的提交記錄的時間點 |
fileReservedTime | 72 | 檔案保留時間(小時) |
brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |
flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH 處於 SYNC_FLUSH 模式的 Broker 保證在收到生產者確認之前將訊息刷新。ASYNC_FLUSH Broker 使用此刷新模式刷新一組訊息以提升效能。 |