Wavefrontで学ぶ分散トレーシング Part-5

この文章は、Wavefrontで学ぶ分散トレーシング シリーズの第五回目です。

シリーズ

第一回 : 概要編
第二回 : Spring Bootで分散トレーシング
第三回 : REDメトリクスって何?
第四回 : サービスをつなげてみる
第五回 : Pythonで分散トレーシング ← いまここ
第六回 : AMQPで分散トレーシング
第七回 ; サービスメッシュで分散トレーシング

始めに

過去の回では、Spring Bootをつかった分散トレーシングを紹介しました。 そして、それらの復習をすると

  • 分散トレーシングのキモはTrace IDとSpan ID
  • HTTPヘッダーをもとにTrace IDとSpan IDをサービス間で共有することでサービスがつながる
  • Spring BootではSlueth をつかうことによって、ほぼコードからは透過的にTrace idとSpan idを取り扱うことができる

Spring Bootだとほとんど、コーディングで気にすることなく分散トレーシングが行えてしまいます。 これは便利なのですが、今回はあえてより大変な方法で理解を深めようと思います。

今回はPythonでやります。WavefrontではPythonのコードから分散トレーシングを行うための専用のSDKである、OpenTracing SDKを提供しています。

https://github.com/wavefrontHQ/wavefront-opentracing-sdk-python

準備編

今回必要なのは以下です。

  • Python 3

インストール方法はこちらを参照してください。 相変わらずですが高度なエディターは不要です。

Pythonがインストールできたら、まずは、依存関係をローカルでのみテストしたいので、virtualenvを作ります。任意のディレクトリーで以下を実行してください。

1virtualenv env1
2source env1/bin/activate

ソースコード

ここに公開しています。

https://github.com/mhoshi-vm/wf-demanabu-dis-tracing/tree/master/5

コードの準備

hello.py

まず第一段階として、簡単なコードを用意します。 今回はREST APIにはFlaskを使用します。以下のファイルをrequirements.txtとして保存します。

1flask
2flask-jsonpify
3flask-sqlalchemy
4flask-restful

そうしたら、依存関係をインストールします。

1pip install -r requirements.txt

コードは以下のようにしてください。ファイル名はhello.pyとして保存してください。

 1
 2from flask import Flask
 3
 4app = Flask(__name__)
 5
 6@app.route("/")
 7def hello():
 8    return "Hello World!"
 9
10
11if __name__ == '__main__':
12    app.run(debug=True,host='0.0.0.0')

これを起動します。

1python hello.py

Curlでアクセスできることを確認します。

1curk localhost:5000

この時点でHello World!と帰ってくれば成功です。 現状はそれ以上、特になんの面白味もないです。当然Wavefront側には何も表示されません。 これに分散トレーシングの仕組みを追加していきます。

コードのアップデート

さて、分散トレーシングのコードですが、まず依存関係を修正します。 requirements.txtを以下のファイルにアップデートしてください。

1flask
2flask-jsonpify
3flask-sqlalchemy
4flask-restful
5wavefront-sdk-python
6wavefront-opentracing-sdk-python

そうしたら、依存関係をインストールします。

1pip install -r requirements.txt

そしてコードを以下の内容に差し替えます。

 1from flask import Flask,request
 2
 3# Set up sender
 4import opentracing
 5
 6from wavefront_opentracing_sdk import WavefrontTracer
 7from wavefront_opentracing_sdk import span_context
 8from wavefront_opentracing_sdk.reporting import CompositeReporter
 9from wavefront_opentracing_sdk.reporting import ConsoleReporter
10from wavefront_opentracing_sdk.reporting import WavefrontSpanReporter
11
12import wavefront_sdk
13import argparse
14
15
16app = Flask(__name__)
17
18@app.route("/")
19def hello():
20    span_ctx=None
21    with tracer.start_active_span('hello', child_of=span_ctx, ignore_active_span=True, finish_on_close=True):
22      return "Hello World!"
23
24
25if __name__ == '__main__':
26    parser = argparse.ArgumentParser()
27    parser.add_argument('token')
28    args = parser.parse_args()
29    application_tag = wavefront_sdk.common.ApplicationTags(
30        application='demo5',
31        service='hello-python')
32    # Create Wavefront Span Reporter using Wavefront Direct Client.
33    direct_client = wavefront_sdk.WavefrontDirectClient(
34        server="https://wavefront.surf",
35        token=args.token,
36        max_queue_size=50000,
37        batch_size=10000,
38        flush_interval_seconds=5)
39    direct_reporter = WavefrontSpanReporter(direct_client)
40
41
42    # Create Composite reporter.
43    # Use ConsoleReporter to output span data to console.
44    composite_reporter = CompositeReporter(
45        direct_reporter, ConsoleReporter())
46
47    # Create Tracer with Composite Reporter.
48    tracer = WavefrontTracer(reporter=composite_reporter,
49                             application_tags=application_tag)
50
51
52    app.run(debug=True,host='0.0.0.0')

数行のコードが突然複雑になったように感じると思いますが、一旦は起動します。 引数には、wavefrontのIDが必要ですが、前回までのSpring Bootを行っている場合、~/.wavefront_freemiumというファイルにIDがはいっているはずです。なので以下のように起動します。

1
2python hello.py `cat ~/.wavefront_freemium`

~/.wavefront_freemiumが存在しない場合、[第二回] (https://qiita.com/hmachi/items/d3ab73238b8c9e3b16c9)の検証を実施してください。

Curlでアクセスできることを確認します。

1curk localhost:5000

この時点でHello World!と帰ってくれば成功です。 何回か実行して以下のURLにアクセスしてください。

https://wavefront.surf

そして[Applications] > [Applications Map(Beta)] を選択し、さらにShow Single Service Nodesをオンにします。 すると、demo5, hello-pythonが見えるはずです。

フォーカスをあてるとPythonだということも認識しています。

“View Service Dashboard"や"View Traces for Service"も選択してください。前回と同じような画面がみえるかと思います。(今回は詳細はふれません)

コードの分析

さて、今回のコードですが、注目するべき点はただの2点です。 1点目がTracerオブジェクトを作成している以下の箇所です。

1
2    # Create Tracer with Composite Reporter.
3    tracer = WavefrontTracer(reporter=composite_reporter,
4                             application_tags=application_tag)

このコードを含めた前段でどのようにWavefrontに接続するかを定義しています。 Tracerを生成したら、Pythonのwith構文でどの箇所をトレースするかを定義します。

コードでいう以下の箇所です。

1
2@app.route("/")
3def hello():
4...
5    with tracer.start_active_span('hello', child_of=span_ctx, ignore_active_span=True, finish_on_close=True):
6      return "Hello World!"

このように任意の箇所でトレースしたいものをコーディングするのがライブラリーを使った分散トレーシングのやり方です。 逆にいうと注意が必要なのが正しい位置にこのようなコーディングを入れ込まないと意図しないトレース情報を送付してしまうかもしれません。

サービスをつなげてみる

さて、前回つくったHUBアプリで今回新しくつくったPythonのアプリをつなげてみたいと思います。 前回のHUBアプリの準備に従って用意してください。

最後に起動コマンドを以下のようにします。こうすることでhub.urlsの値をオーバーライドしてpythonのコードにアクセスを行おうとします。

1./mvnw spring-boot:run -Dspring-boot.run.arguments=--hub.urls=http://localhost:5000

そして以下のURLにアクセスをしてみます。

1curl localhost:8083/hub

うまくいくと以下のようなメッセージがみれるはずです。

1REST Complete

すべて成功してしまっているように見えますが、WavefrontのURLにログインしてみます。

つながっていないじゃん

おそらくしばらくまっても、以下のようになり、2つのサービスがつながらないと思います。

なぜか、Traceダッシュボードをみてましょう。 すると、Trace IDが2つのサービス間で一致していないことが確認できます。 たとえば、この例では、hello-pythonはa3e9e320-d07a-11ea-a7cc-faffc269bbf7ですが、

呼び出した方のサービスでは、5f1f8e56-c571-7942-f160-b2c0d33116aaとなっています。

前回にもまとめたようサービス間はHTTPヘッダーをつかいながら、お互いのTrace IDを交換しています。 このときアプリケーション側でただしくTrace IDを展開しないと関連のないサービスとして見えてきません。 これをさけるためにコードをもう一段階修正しなくてはいけません。

コードを修正

Pythonのコードを以下に修正してください。

 1
 2from flask import Flask,request
 3
 4# Set up sender
 5import opentracing
 6
 7from wavefront_opentracing_sdk import WavefrontTracer
 8from wavefront_opentracing_sdk import span_context
 9from wavefront_opentracing_sdk.reporting import CompositeReporter
10from wavefront_opentracing_sdk.reporting import ConsoleReporter
11from wavefront_opentracing_sdk.reporting import WavefrontSpanReporter
12
13import wavefront_sdk
14import argparse
15import uuid
16
17app = Flask(__name__)
18
19@app.route("/")
20def hello():
21    _BAGGAGE_PREFIX = 'x-b3-'
22    _TRACE_ID = _BAGGAGE_PREFIX + 'traceid'
23    _SPAN_ID = _BAGGAGE_PREFIX + 'spanid'
24    _SAMPLE = _BAGGAGE_PREFIX + 'sample'
25
26    trace_id = None
27    span_id = None
28    sampling = None
29    baggage = {}
30    for key, val in dict(request.headers).items():
31        key = key.lower()
32        if key == _TRACE_ID:
33            trace_id = uuid.UUID(val.zfill(32))
34        elif key == _SPAN_ID:
35            span_id = uuid.UUID(val.zfill(32))
36        elif key == _SAMPLE:
37            sampling = bool(val == 'True')
38        elif key.startswith(_BAGGAGE_PREFIX):
39            baggage.update({strip_prefix(_BAGGAGE_PREFIX, key): val})
40    if trace_id is None or span_id is None:
41       span_ctx=None
42    else:
43       span_ctx = span_context.WavefrontSpanContext(trace_id, span_id, baggage,
44                                                 sampling)
45    # Create span1, return a newly started and activated Scope.
46    with tracer.start_active_span('hello', child_of=span_ctx, ignore_active_span=True, finish_on_close=True):
47      return "Hello World!"
48
49def strip_prefix(prefix, key):
50    """
51    Strip the prefix of baggage items.
52    :param prefix: Prefix to be stripped.
53    :type prefix: str
54    :param key: Baggage item to be striped
55    :type key: str
56    :return: Striped baggage item
57    :rtype: str
58    """
59    return key[len(prefix):]
60
61
62if __name__ == '__main__':
63    parser = argparse.ArgumentParser()
64    parser.add_argument('token')
65    args = parser.parse_args()
66    application_tag = wavefront_sdk.common.ApplicationTags(
67        application='demo5',
68        service='hello-python')
69    # Create Wavefront Span Reporter using Wavefront Direct Client.
70    direct_client = wavefront_sdk.WavefrontDirectClient(
71        server="https://wavefront.surf",
72        token=args.token,
73        max_queue_size=50000,
74        batch_size=10000,
75        flush_interval_seconds=5)
76    direct_reporter = WavefrontSpanReporter(direct_client)
77
78
79    # Create Composite reporter.
80    # Use ConsoleReporter to output span data to console.
81    composite_reporter = CompositeReporter(
82        direct_reporter, ConsoleReporter())
83
84    # Create Tracer with Composite Reporter.
85    tracer = WavefrontTracer(reporter=composite_reporter,
86                             application_tags=application_tag)
87
88
89    app.run(debug=True,host='0.0.0.0')

さらに長くなりましたが、修正後もう一度アプリを起動します。

1python hello.py `cat ~/.wavefront_freemium`

そして、しばらくcurlを実行します。

1curl localhost:8083/hub

するとうまくいけばWavefront上の画面でサービスがつながります。

修正したコードで注目すべきは以下の箇所です。 ここでHTTPヘッダーを解釈して、正しいTrace IDを抽出しています。

 1
 2@app.route("/")
 3def hello():
 4...
 5    for key, val in dict(request.headers).items():
 6
 7        key = key.lower()
 8        if key == _TRACE_ID:
 9            trace_id = uuid.UUID(val.zfill(32))
10        elif key == _SPAN_ID:
11            span_id = uuid.UUID(val.zfill(32))
12        elif key == _SAMPLE:
13            sampling = bool(val == 'True')
14        elif key.startswith(_BAGGAGE_PREFIX):
15            baggage.update({strip_prefix(_BAGGAGE_PREFIX, key): val})

再びTraceのダッシュボードを参照すると、接続されたサービスが同じTrace IDをもっていることが記録されているはずです。

以上でサービスをつなげる方法を紹介しました。

まとめ

  • 分散トレーシングを行う上では、コーディング側でどこをTraceしたいか明示的に記載しないといけない
  • 他のサービスとの連携をする際、受け取ったHTTPヘッダーからTrace IDを取り出さないとつながって表示されない

今回はSpring Bootを使わず、他言語でどのように分散トレーシングができるようになるか紹介しました。 次回は「AMQPで分散トレーシング」です。