Wavefrontで学ぶ分散トレーシング Part-6

この文章は、Wavefrontで学ぶ分散トレーシング シリーズの第六回目です。

シリーズ

第一回 : 概要編
第二回 : Spring Bootで分散トレーシング
第三回 : REDメトリクスって何?
第四回 : サービスをつなげてみる
第五回 : Pythonで分散トレーシング
第六回 : AMQPで分散トレーシング**← いまここ**
第七回 ; サービスメッシュで分散トレーシング

始めに

過去の回では、Spring BootおよびPythonを使った分散トレーシングを紹介しました。 もう一度、それらの復習をすると

  • 分散トレーシングのキモはTrace IDとSpan ID
  • HTTPヘッダーをもとにTrace IDとSpan IDをサービス間で共有することでサービスがつながる
  • アプリケーション側では、HTTPヘッダーからTrace IDとSpan IDを展開するようなコーディングを含まないといけない

さて、この中で2点目ですが、いままでの検証ではずっと「HTTPリクエスト」をベースにやってきました。 そこで、すこし疑問に思うかもしれないのが「HTTP以外のリクエスト」でも、これがうまくいくのかという点です。

そこで今回は、RabbitMQを使い、AMQPでも分散トレーシングを行えるか検証します。

準備

今回はRabbitMQを使いますので、PC上にインストールします。 手順はOSによって、違うので以下をもとにインストールおよび起動をしてください。

https://www.rabbitmq.com/download.html

今回はSpring Bootのアプリを使って、RabbitMQにキューを仕込むおよび取り出すコードを書いていきます。 いままで同様、これには最低限以下をインストールしてください。

  • Java JDK 8+

Oracle JDKに従いJDKをインストールしてください

ソースコード

ここに公開しています。

https://github.com/mhoshi-vm/wf-demanabu-dis-tracing/tree/master/6

アプリの準備

必要なアプリは2つです、ProducerアプリとConsumerアプリです。

Producerアプリ

さて、第六回にもなったので、いままで通り、start.spring.ioからダウンロードするのではなく、CLIをつかって直接必要なパッケージをダウンロードします。

1curl https://start.spring.io/starter.tgz \
2       -d artifactId=producer \
3       -d baseDir=producer \
4       -d dependencies=web,amqp,cloud-starter-sleuth,wavefront \
5       -d packageName=com.example \
6       -d applicationName=ProducerApplication | tar -xzvf -

以下のように出力されれば成功です。

 1x producer/.mvn/
 2x producer/.mvn/wrapper/
 3x producer/.mvn/wrapper/maven-wrapper.properties
 4x producer/.mvn/wrapper/MavenWrapperDownloader.java
 5x producer/.mvn/wrapper/maven-wrapper.jar
 6x producer/.gitignore
 7x producer/HELP.md
 8x producer/mvnw.cmd
 9x producer/src/
10x producer/src/main/
11x producer/src/main/resources/
12x producer/src/main/resources/templates/
13x producer/src/main/resources/application.properties
14x producer/src/main/resources/static/
15x producer/src/main/java/
16x producer/src/main/java/com/
17x producer/src/main/java/com/example/
18x producer/src/main/java/com/example/producer.java
19x producer/src/test/
20x producer/src/test/java/
21x producer/src/test/java/com/
22x producer/src/test/java/com/example/
23x producer/src/test/java/com/example/producerTests.java
24x producer/pom.xml

これに二つのコードを追加します。 まずproducer/src/main/java/com/example/ProducerRestController.javaというファイルを作り以下の内容にします。

 1package com.example;
 2
 3import java.util.Map;
 4
 5import org.slf4j.Logger;
 6import org.slf4j.LoggerFactory;
 7import org.springframework.beans.factory.annotation.Autowired;
 8import org.springframework.http.ResponseEntity;
 9import org.springframework.web.bind.annotation.GetMapping;
10import org.springframework.web.bind.annotation.RequestHeader;
11import org.springframework.web.bind.annotation.RestController;
12
13@RestController
14public class ProducerRestController {
15
16        private static final Logger LOGGER = LoggerFactory.getLogger(ProducerRestController.class);
17
18        @Autowired
19        Producer producer;
20
21        @GetMapping("/amqp")
22        public ResponseEntity<String> hello (@RequestHeader Map<String, String> header){
23                printAllHeaders(header);
24            producer.send();
25            return ResponseEntity.ok("Hello World!");
26        }
27
28        private void printAllHeaders(Map<String, String> headers) {
29                headers.forEach((key, value) -> {
30                        LOGGER.info(String.format("Header '%s' = %s", key, value));
31                });
32        }
33}

さらにproducer/src/main/java/com/example/Producer.javaを作ります。

 1package com.example;
 2
 3import org.springframework.amqp.core.Queue;
 4import org.springframework.amqp.rabbit.core.RabbitTemplate;
 5import org.springframework.beans.factory.annotation.Autowired;
 6import org.springframework.context.annotation.Bean;
 7import org.springframework.stereotype.Component;
 8
 9@Component
10public class Producer{
11
12    @Bean
13    public Queue hello() {
14        return new Queue("hello");
15    }
16
17    @Autowired
18    private RabbitTemplate template;
19
20    @Autowired
21    private Queue queue;
22
23    public void send() {
24        String message = "Hello World!";
25        this.template.convertAndSend(queue.getName(), message);
26        System.out.println(" [x] Sent '" + message + "'");
27    }
28}

最後にproducer/src/main/resources/application.propertiesを以下の内容にアップデートします。

1spring.application.name=producer
2
3management.endpoints.web.exposure.include=wavefront
4server.port=8086
5
6wavefront.application.name=demo6
7wavefront.application.service=producer
8
9spring.rabbitmq.host=localhost

Consumerアプリ

もうひとつConsumerアプリを作ります。 まず以下のCurlコマンドを実行します。

1curl https://start.spring.io/starter.tgz \
2       -d artifactId=consumer \
3       -d baseDir=consumer \
4       -d dependencies=web,amqp,cloud-starter-sleuth,wavefront \
5       -d packageName=com.example \
6       -d applicationName=ConsumerApplication | tar -xzvf -

これにconsumer/src/main/java/com/example/Consumer.javaを作ります。

 1package com.example;
 2
 3import java.util.Map;
 4
 5import org.slf4j.Logger;
 6import org.slf4j.LoggerFactory;
 7import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 8import org.springframework.amqp.rabbit.annotation.RabbitListener;
 9import org.springframework.messaging.handler.annotation.Headers;
10import org.springframework.messaging.handler.annotation.Payload;
11import org.springframework.stereotype.Component;
12
13@Component
14@RabbitListener(queues = "hello")
15public class Consumer {
16
17    private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
18
19    @RabbitHandler
20    public void receive(@Payload String body, @Headers Map<String,Object> headers) {
21		LOGGER.info(String.format(" [x] Received '" + body + headers + "'"));
22    }
23
24}

最後にconsumer/src/main/resources/application.propertiesを以下の内容にアップデートします。

1spring.application.name=consumer
2
3management.endpoints.web.exposure.include=wavefront
4server.port=18086
5
6wavefront.application.name=demo6
7wavefront.application.service=consumer
8
9spring.rabbitmq.host=localhost

アプリの起動

準備ができたらアプリを起動していきます。 まず前提としてRabbitMQの起動が必要です。 これはOSによって異なりますが、私はMacなので以下で起動できます。

1/usr/local/sbin/rabbitmq-server

次にProducerアプリを起動します。

1cd producer
2./mvnw spring-boot:run

さらに、Consumerアプリを起動します。

1cd ..
2cd consumer
3./mvnw spring-boot:run

両方起動したら、以下のURLにcurlを流します。

1curl localhost:8086/amqp

うまくいけば、 [x] Sent 'Hello World!'と表示されるはずです。 しばらく何回かcurlを実行して下さい。

そしてしばらくしたら以下のURLをアクセスします。

http://localhost:8086/actuator/wavefront

そして Applications > Application Map(Beta)を選択してください。

うまくいけば、以下のように繋がるはずです。

つまり、なんとAMQPでも分散トレーシングができていることになります。 Applications > Tracesをみると、ちゃんとTrace IDが一致した状態で、オブジェクトがつながっているのがわかります。

なにが起きている?

さて、一体何がなんやらと思うので、説明します。 まず、状況をわかりやすくするため、Consumerアプリをとめてください。(Ctrl + c)

Producerアプリを起動したまま、以下を三回程度実行します。

1curl localhost:8086/amqp

この状態で、RabbitMQの管理コンソールにログインします。

http://localhost:15672/#/  ユーザー・パスワードはguest/guestです。

ログイン後、Queueタブをみると、helloというQueueにメッセージがcurlを行った回数分溜まっていることがわかります。

つまりProducerアプリは、curlがくるたびにhelloキューにメッセージを貯める非常にシンプルなアプリです。 対しConsumerアプリは、このhelloキューのメッセージを読み取るようなアプリケーションです。

x-b3ヘッダーは?

前回から繰り返しているよう、x-b3ヘッダーがTrace IDなどのやりとりをおこなう仕組みでした。 AMQPはこれはどうやっているのか?

先ほどの管理コンソールから、hello キューを選択して、Get Messagesを選択します。 するとメッセージの中にb3ヘッダーが表示されます。

これはつまり、AMQPでもヘッダーをつかって、Trace IDのやりとりが行われていることがわかります。 なお、これもまたSleuthが透過的にSpring Bootの場合に行ってくれるおかげです。

https://docs.spring.io/spring-cloud-sleuth/docs/current-SNAPSHOT/reference/html/#sleuth-with-zipkin-over-rabbitmq-or-kafka

まとめ

  • AMQPでも分散トレーシングはできる
  • AMQPでもヘッダーをつかってTrace IDなどをやりとりしている
  • Spring Bootだと相変わらず、コードから透過的に分散トレーシングしてくれる

さて、次回はいよいよ「サービスメッシュで分散トレーシング」です。