跳至主要內容
版本:5.0

快速入門

本節將說明快速部署單一節點 RocketMQ 集群的步驟;其中也包含用於傳送和接收訊息的指令,作為運作證明。

系統需求
  1. 建議使用 64 位元作業系統,例如 Linux/Unix/macOS
  2. 64 位元 JDK 1.8+

1.取得 Apache RocketMQ

下載 RocketMQ

Apache RocketMQ 以二進制和原始碼套件進行分發。按一下 這裡 下載 Apache RocketMQ 5.2.0 原始碼套件。您可能偏好 預先建置的二進制套件,由於已編譯,因此可以直接執行。

以下說明以 Linux 環境中 RocketMQ 5.2.0 原始碼套件的應用為範例,說明 RocketMQ 的安裝流程。

解壓縮 RocketMQ 5.2.0 的原始碼套件,然後編譯和建置二進制可執行檔

$ unzip rocketmq-all-5.2.0-source-release.zip
$ cd rocketmq-all-5.2.0-source-release/
$ mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
$ cd distribution/target/rocketmq-5.2.0/rocketmq-5.2.0

2. 啟動 NameServer

安裝 RocketMQ 後,啟動 NameServer

### start namesrv
$ nohup sh bin/mqnamesrv &

### verify namesrv
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
資訊

一旦我們從 namesrv.log 中看到 'The Name Server boot success..',表示 NameServer 已成功啟動。

3. 啟動 Broker 和 Proxy

在 nameserver 啟動後,我們需要啟動 broker 和 proxy。我們建議使用本機部署模式,其中 Broker 和 Proxy 部署在同一個程序中。我們也支援叢集部署模式。進一步瞭解 部署簡介

### start broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

### verify broker
$ tail -f ~/logs/rocketmqlogs/proxy.log
The broker[broker-a,192.169.1.2:10911] boot success...
資訊

一旦我們從 proxy.log 中看到 “The broker[brokerName,ip:port] boot success..”,表示 Broker 已成功啟動。

注意

到目前為止,已部署單一主控端 RocketMQ 叢集,我們能夠透過腳本傳送和接收簡訊。

4. 使用工具傳送和接收訊息

在使用工具測試之前,我們需要將 nameserver 位址設定為系統。例如系統環境變數 NAMESRV_ADDR

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

5. 使用 SDK 傳送和接收訊息

我們也可以嘗試使用客戶端 SDK 傳送和接收訊息,您可以在 rocketmq-clients 中看到更多詳細資訊。

  1. 建立一個 Java 專案。

  2. 將 SDK 相依性加入 pom.xml,請記得將 rocketmq-client-java-version 取代為 最新版本

    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>${rocketmq-client-java-version}</version>
    </dependency>
  3. 使用 mqadmin cli 工具建立主題。

    $ sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
  4. 在您建立的 Java 專案中,建立一個傳送訊息的程式,並使用以下程式碼執行它

    import java.io.IOException;
    import org.apache.rocketmq.client.apis.ClientConfiguration;
    import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
    import org.apache.rocketmq.client.apis.ClientException;
    import org.apache.rocketmq.client.apis.ClientServiceProvider;
    import org.apache.rocketmq.client.apis.message.Message;
    import org.apache.rocketmq.client.apis.producer.Producer;
    import org.apache.rocketmq.client.apis.producer.SendReceipt;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    public class ProducerExample {
    private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);

    public static void main(String[] args) throws ClientException, IOException {
    String endpoint = "localhost:8081";
    String topic = "TestTopic";
    ClientServiceProvider provider = ClientServiceProvider.loadService();
    ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
    ClientConfiguration configuration = builder.build();
    Producer producer = provider.newProducerBuilder()
    .setTopics(topic)
    .setClientConfiguration(configuration)
    .build();
    Message message = provider.newMessageBuilder()
    .setTopic(topic)
    .setKeys("messageKey")
    .setTag("messageTag")
    .setBody("messageBody".getBytes())
    .build();
    try {
    SendReceipt sendReceipt = producer.send(message);
    logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
    } catch (ClientException e) {
    logger.error("Failed to send message", e);
    }
    // producer.close();
    }
    }
  5. 在您建立的 Java 專案中,建立一個消費者範例程式並執行它。Apache RocketMQ 支援 SimpleConsumerPushConsumer

    import java.io.IOException;
    import java.util.Collections;
    import org.apache.rocketmq.client.apis.ClientConfiguration;
    import org.apache.rocketmq.client.apis.ClientException;
    import org.apache.rocketmq.client.apis.ClientServiceProvider;
    import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
    import org.apache.rocketmq.client.apis.consumer.FilterExpression;
    import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
    import org.apache.rocketmq.client.apis.consumer.PushConsumer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    public class PushConsumerExample {
    private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
    final ClientServiceProvider provider = ClientServiceProvider.loadService();
    String endpoints = "localhost:8081";
    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints)
    .build();
    String tag = "*";
    FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    String consumerGroup = "YourConsumerGroup";
    String topic = "TestTopic";
    PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    .setConsumerGroup(consumerGroup)
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
    logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
    return ConsumeResult.SUCCESS;
    })
    .build();
    Thread.sleep(Long.MAX_VALUE);
    // pushConsumer.close();
    }
    }

6. 關閉伺服器

完成練習後,我們可以使用以下指令關閉服務。

$ sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

$ sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK