Wavefrontで学ぶ分散トレーシング Part-6
この文章は、Wavefrontで学ぶ分散トレーシング シリーズの第六回目です。
シリーズ
第一回 : 概要編
第二回 : Spring Bootで分散トレーシング
第三回 : REDメトリクスって何?
第四回 : サービスをつなげてみる
第五回 : Pythonで分散トレーシング
第六回 : AMQPで分散トレーシング**← いまここ**
第七回 ; サービスメッシュで分散トレーシング
始めに
過去の回では、Spring BootおよびPythonを使った分散トレーシングを紹介しました。 もう一度、それらの復習をすると
- 分散トレーシングのキモはTrace IDとSpan ID
- HTTPヘッダーをもとにTrace IDとSpan IDをサービス間で共有することでサービスがつながる
- アプリケーション側では、HTTPヘッダーからTrace IDとSpan IDを展開するようなコーディングを含まないといけない
さて、この中で2点目ですが、いままでの検証ではずっと「HTTPリクエスト」をベースにやってきました。 そこで、すこし疑問に思うかもしれないのが「HTTP以外のリクエスト」でも、これがうまくいくのかという点です。
そこで今回は、RabbitMQを使い、AMQPでも分散トレーシングを行えるか検証します。
準備
今回はRabbitMQを使いますので、PC上にインストールします。 手順はOSによって、違うので以下をもとにインストールおよび起動をしてください。
https://www.rabbitmq.com/download.html
今回はSpring Bootのアプリを使って、RabbitMQにキューを仕込むおよび取り出すコードを書いていきます。 いままで同様、これには最低限以下をインストールしてください。
- Java JDK 8+
Oracle JDKに従いJDKをインストールしてください
ソースコード
ここに公開しています。
https://github.com/mhoshi-vm/wf-demanabu-dis-tracing/tree/master/6
アプリの準備
必要なアプリは2つです、ProducerアプリとConsumerアプリです。
Producerアプリ
さて、第六回にもなったので、いままで通り、start.spring.ioからダウンロードするのではなく、CLIをつかって直接必要なパッケージをダウンロードします。
1curl https://start.spring.io/starter.tgz \
2       -d artifactId=producer \
3       -d baseDir=producer \
4       -d dependencies=web,amqp,cloud-starter-sleuth,wavefront \
5       -d packageName=com.example \
6       -d applicationName=ProducerApplication | tar -xzvf -
以下のように出力されれば成功です。
 1x producer/.mvn/
 2x producer/.mvn/wrapper/
 3x producer/.mvn/wrapper/maven-wrapper.properties
 4x producer/.mvn/wrapper/MavenWrapperDownloader.java
 5x producer/.mvn/wrapper/maven-wrapper.jar
 6x producer/.gitignore
 7x producer/HELP.md
 8x producer/mvnw.cmd
 9x producer/src/
10x producer/src/main/
11x producer/src/main/resources/
12x producer/src/main/resources/templates/
13x producer/src/main/resources/application.properties
14x producer/src/main/resources/static/
15x producer/src/main/java/
16x producer/src/main/java/com/
17x producer/src/main/java/com/example/
18x producer/src/main/java/com/example/producer.java
19x producer/src/test/
20x producer/src/test/java/
21x producer/src/test/java/com/
22x producer/src/test/java/com/example/
23x producer/src/test/java/com/example/producerTests.java
24x producer/pom.xml
これに二つのコードを追加します。
まずproducer/src/main/java/com/example/ProducerRestController.javaというファイルを作り以下の内容にします。
 1package com.example;
 2
 3import java.util.Map;
 4
 5import org.slf4j.Logger;
 6import org.slf4j.LoggerFactory;
 7import org.springframework.beans.factory.annotation.Autowired;
 8import org.springframework.http.ResponseEntity;
 9import org.springframework.web.bind.annotation.GetMapping;
10import org.springframework.web.bind.annotation.RequestHeader;
11import org.springframework.web.bind.annotation.RestController;
12
13@RestController
14public class ProducerRestController {
15
16        private static final Logger LOGGER = LoggerFactory.getLogger(ProducerRestController.class);
17
18        @Autowired
19        Producer producer;
20
21        @GetMapping("/amqp")
22        public ResponseEntity<String> hello (@RequestHeader Map<String, String> header){
23                printAllHeaders(header);
24            producer.send();
25            return ResponseEntity.ok("Hello World!");
26        }
27
28        private void printAllHeaders(Map<String, String> headers) {
29                headers.forEach((key, value) -> {
30                        LOGGER.info(String.format("Header '%s' = %s", key, value));
31                });
32        }
33}
さらにproducer/src/main/java/com/example/Producer.javaを作ります。
 1package com.example;
 2
 3import org.springframework.amqp.core.Queue;
 4import org.springframework.amqp.rabbit.core.RabbitTemplate;
 5import org.springframework.beans.factory.annotation.Autowired;
 6import org.springframework.context.annotation.Bean;
 7import org.springframework.stereotype.Component;
 8
 9@Component
10public class Producer{
11
12    @Bean
13    public Queue hello() {
14        return new Queue("hello");
15    }
16
17    @Autowired
18    private RabbitTemplate template;
19
20    @Autowired
21    private Queue queue;
22
23    public void send() {
24        String message = "Hello World!";
25        this.template.convertAndSend(queue.getName(), message);
26        System.out.println(" [x] Sent '" + message + "'");
27    }
28}
最後にproducer/src/main/resources/application.propertiesを以下の内容にアップデートします。
1spring.application.name=producer
2
3management.endpoints.web.exposure.include=wavefront
4server.port=8086
5
6wavefront.application.name=demo6
7wavefront.application.service=producer
8
9spring.rabbitmq.host=localhost
Consumerアプリ
もうひとつConsumerアプリを作ります。 まず以下のCurlコマンドを実行します。
1curl https://start.spring.io/starter.tgz \
2       -d artifactId=consumer \
3       -d baseDir=consumer \
4       -d dependencies=web,amqp,cloud-starter-sleuth,wavefront \
5       -d packageName=com.example \
6       -d applicationName=ConsumerApplication | tar -xzvf -
これにconsumer/src/main/java/com/example/Consumer.javaを作ります。
 1package com.example;
 2
 3import java.util.Map;
 4
 5import org.slf4j.Logger;
 6import org.slf4j.LoggerFactory;
 7import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 8import org.springframework.amqp.rabbit.annotation.RabbitListener;
 9import org.springframework.messaging.handler.annotation.Headers;
10import org.springframework.messaging.handler.annotation.Payload;
11import org.springframework.stereotype.Component;
12
13@Component
14@RabbitListener(queues = "hello")
15public class Consumer {
16
17    private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
18
19    @RabbitHandler
20    public void receive(@Payload String body, @Headers Map<String,Object> headers) {
21		LOGGER.info(String.format(" [x] Received '" + body + headers + "'"));
22    }
23
24}
最後にconsumer/src/main/resources/application.propertiesを以下の内容にアップデートします。
1spring.application.name=consumer
2
3management.endpoints.web.exposure.include=wavefront
4server.port=18086
5
6wavefront.application.name=demo6
7wavefront.application.service=consumer
8
9spring.rabbitmq.host=localhost
アプリの起動
準備ができたらアプリを起動していきます。 まず前提としてRabbitMQの起動が必要です。 これはOSによって異なりますが、私はMacなので以下で起動できます。
1/usr/local/sbin/rabbitmq-server
次にProducerアプリを起動します。
1cd producer
2./mvnw spring-boot:run
さらに、Consumerアプリを起動します。
1cd ..
2cd consumer
3./mvnw spring-boot:run
両方起動したら、以下のURLにcurlを流します。
1curl localhost:8086/amqp
うまくいけば、 [x] Sent 'Hello World!'と表示されるはずです。
しばらく何回かcurlを実行して下さい。
そしてしばらくしたら以下のURLをアクセスします。
http://localhost:8086/actuator/wavefront
そして Applications > Application Map(Beta)を選択してください。
うまくいけば、以下のように繋がるはずです。
 
    
つまり、なんとAMQPでも分散トレーシングができていることになります。 Applications > Tracesをみると、ちゃんとTrace IDが一致した状態で、オブジェクトがつながっているのがわかります。
 
    
なにが起きている?
さて、一体何がなんやらと思うので、説明します。 まず、状況をわかりやすくするため、Consumerアプリをとめてください。(Ctrl + c)
Producerアプリを起動したまま、以下を三回程度実行します。
1curl localhost:8086/amqp
この状態で、RabbitMQの管理コンソールにログインします。
http://localhost:15672/#/ ユーザー・パスワードはguest/guestです。
ログイン後、Queueタブをみると、helloというQueueにメッセージがcurlを行った回数分溜まっていることがわかります。
 
    
つまりProducerアプリは、curlがくるたびにhelloキューにメッセージを貯める非常にシンプルなアプリです。
対しConsumerアプリは、このhelloキューのメッセージを読み取るようなアプリケーションです。
x-b3ヘッダーは?
前回から繰り返しているよう、x-b3ヘッダーがTrace IDなどのやりとりをおこなう仕組みでした。 AMQPはこれはどうやっているのか?
先ほどの管理コンソールから、hello キューを選択して、Get Messagesを選択します。
するとメッセージの中にb3ヘッダーが表示されます。
 
    
これはつまり、AMQPでもヘッダーをつかって、Trace IDのやりとりが行われていることがわかります。
なお、これもまたSleuthが透過的にSpring Bootの場合に行ってくれるおかげです。
まとめ
- AMQPでも分散トレーシングはできる
- AMQPでもヘッダーをつかってTrace IDなどをやりとりしている
- Spring Bootだと相変わらず、コードから透過的に分散トレーシングしてくれる
さて、次回はいよいよ「サービスメッシュで分散トレーシング」です。