WindowsにFluentdをインストールしてS3にデータ転送して、管理用の設定も追加

WindowsにFluentdを入れる機会があったのでまとめておきます。

f:id:yomon8:20190308110006p:plain

td-agent(Fluentd)インストール

td-agentダウンロード

以下からmsiファイルをダウンロードしてWindowsに配置します。 本手順では td-agent-3.3.0-1-x64.msi を利用します。

https://td-agent-package-browser.herokuapp.com/3/windows

td-agentインストール

ダウンロードしたmsiファイルを実行します。この際にUnknown publisher の警告が表示されますが、無視して実行します。 全てデフォルトの設定で進めればインストールしました。

インストールが完了しました。 f:id:yomon8:20190308112713p:plain

プラグインのインストール

スタートメニューに追加されている Td-agent Command Prompt を実行する。

f:id:yomon8:20190308112817p:plain

S3への出力には fluent-plugin-s3 が必要です。こちらを最新版にアップデートしておきます。

C:\opt\td-agent>fluent-gem update fluent-plugin-s3

入っているプラグインを確認します。 今回主に利用するのは fluent-plugin-s3 です。

C:\opt\td-agent>fluent-gem list | findstr fluent
fluent-config-regexp-type (1.0.0)
fluent-logger (0.7.2)
fluent-plugin-elasticsearch (3.0.1)
fluent-plugin-kafka (0.8.3)
fluent-plugin-record-modifier (1.1.0)
fluent-plugin-rewrite-tag-filter (2.1.1)
fluent-plugin-s3 (1.1.8, 1.1.7)
fluent-plugin-td (1.0.0)
fluent-plugin-td-monitoring (0.2.4)
fluent-plugin-webhdfs (1.2.3)
fluent-plugin-windows-eventlog (0.2.2)
fluentd (1.3.3)

ディレクト

以下でディレクトリを作成しておきます。設定ファイルで調整可能なので、場所はどこでも問題ありません。

PS> mkdir C:\var\run\pos  
PS> mkdir C:\var\run\buffer
PS> mkdir C:\var\log\td-agent  
PS> mkdir C:\data\csv
PS> mkdir C:\opt\td-agent\etc\td-agent\config.d

それぞれ内容はこちらです。

Dir 内容
C:\var\run\pos ポジションファイルを出力
C:\var\run\buffer バッファを出力
C:\var\log\td-agent td-agentのログ出力先
C:\data\csv データを置く場所
C:\opt\td-agent\etc\td-agent\config.d 設定ファイル配置先

設定例

設定ファイルの退避

元のファイルを退避しておきます。

PS> mv C:\opt\td-agent\etc\td-agent\td-agent.conf C:\opt\td-agent\etc\td-agent\td-agent.conf.org

設定ファイル

注意)設定ファイルをメモ帳で開くときにはANSIで保存します

Windowsでの設定ファイルのポイントはパスに \ を使わずに / を使うことです。 \ を使うと動かない部分が出てきます。

メインの設定ファイル( C:\opt\td-agent\etc\td-agent\td-agent.conf )は 以下のように外部の設定ファイルを読む形にしています。

@include config.d/*.conf

参照先の C:\opt\td-agent\etc\td-agent\config.d\ 配下に今回の設定ファイルを追加します。

設定のポイントだけ書いておきます。

  • Windowsでの設定ファイルのポイントはパスに \ を使わずに / を使うことです。 \ を使うと動かない部分が出てきます。(結構ハマりました)
  • 後述しますがUpdateとAppendで動きを変えています
  • S3のパスはHive、Athena等で利用しやすい形式にしています
  • 最初はForest Pluginを利用していましたが、v0.14からtag情報使えると聞いたのでforestプラグインなしで設定しています
  • Bufferセクションの設定は結構難しいです。ドキュメント読むのは当然として、debugログ出しながら試してみて理解することを強く推奨です。
Append用
  • s3.b1.append.conf
<source>
  @type tail
  path C:/data/csv/append.csv
  pos_file C:/var/run/pos/bar_b_append.pos

  <parse>
    @type csv
    keys type,timestamp,value2,value2,value3,value4,value5,value6,value7,value8,value9,value10
    types type:string,timestamp:integer,value1:integer,value2:integer,value3:integer,value4:integer,value5:integer,value6:integer,value7:integer,value8:integer,value9:integer,value10:integer
  </parse>


  # out.bucket_id.method.db.table
  tag s3.b1.append.bar_db.b_tab
</source>

<match s3.b1.append.**>
  @type s3

  aws_key_id YOUR_ACCESS_KEY
  aws_sec_key YOUR_SECRET_ACCESS_KEY
  s3_region ap-northeast-1

  s3_bucket otomo-bucket-fluentd
  path prefix/path/${tag[3]}/${tag[4]}
  s3_object_key_format %{path}/%{time_slice}/${tag[4]}_%{index}.json.%{file_extension}

  time_slice_format year=%Y/month=%m/day=%d

  <buffer tag,time>
    @type file
    path C:/var/run/buffer/s3.b1.append/buffer
    timekey 1m
    timekey_wait 1m
    timekey_zone Asia/Tokyo
    retry_forever true
    compress gzip
  </buffer

  <format>
    @type json
  </format>
</match>
Update用
  • s3.b1.update.conf
<source>
  @type tail
  path C:/data/csv/update.csv
  pos_file C:/var/run/pos/foo_a_update.pos
  
  # for reading whole updated file
  read_from_head true

  <parse>
    @type csv
    keys type,timestamp,value1,value2,value3,value4,value5,value6,value7,value8,value9,value10
    types type:string,timestamp:integer,value1:integer,value2:integer,value3:integer,value4:integer,value5:integer,value6:integer,value7:integer,value8:integer,value9:integer,value10:integer
  </parse>


  # out.bucket_id.method.db.table
  tag s3.b1.update.foo_db.a_tab
</source>

<match s3.b1.update.**>
  @type s3

  aws_key_id YOUR_ACCESS_KEY
  aws_sec_key YOUR_SECRET_ACCESS_KEY
  s3_region ap-northeast-1

  s3_bucket otomo-bucket-fluentd
  path prefix/path/${tag[3]}/${tag[4]}
  s3_object_key_format %{path}/%{time_slice}/${tag[4]}.json.%{file_extension}
  overwrite true

  time_slice_format year=%Y/month=%m/day=%d/hour=%H/min=%M

  <buffer tag,time>
    @type file
    path C:/var/run/buffer/s3.b1.update/buffer
    chunk_limit_size 10g
    total_limit_size 64g

    flush_mode interval
    flush_interval 10m

    timekey_zone Asia/Tokyo
    timekey 10m
    timekey_wait 1m

    retry_forever false
    retry_type periodic
    retry_wait 60s
    retry_max_times 3

    compress gzip
  </buffer>

  <format>
    @type json
  </format>
</match>
Monitor用
  • monitor.conf
<source>
  @type monitor_agent
  bind 127.0.0.1
  port 24220
</source>

AWSクレデンシャル

ここでは aws_key_id & aws_sec_key を使っていますが、EC2のIAMロールや、別ロールをAssumeして使うことも可能です。それぞれ設定値が異なるので、ここで確認可能です。

https://github.com/fluent/fluent-plugin-s3/blob/v1.1.8/lib/fluent/plugin/out_s3.rb#L409-L457

起動してテスト

td-agent command prompt を起動し以下を実行。テスト時はこちらの方式で起動する。 -v はverboseフラグなのでログを詳細に出すことができます、 -vvv とすると更に詳細なログが確認できます。

C:\opt\td-agent>fluentd -v -c etc\td-agent\td-agent.conf

テストデータ準備

以下のCSVデータをS3にJSON Gzip形式でアップロードするテストします。一度 .\data.txt に保存します。

R,1551919932,5,14,23,96,2,17,57,51,64,40
L,1551858639,24,38,86,31,1,40,21,71,71,26
Z,1551845360,26,42,85,29,93,24,90,66,42,54
L,1551916090,36,52,33,61,43,35,48,30,84,36
D,1551874206,76,82,67,43,96,99,53,23,33,56
X,1551916749,41,3,46,60,11,71,25,95,70,61
O,1551898421,24,61,8,3,10,22,19,0,56,11
M,1551868960,6,67,16,42,62,21,64,37,25,5
T,1551866088,5,13,90,33,47,99,98,60,26,83
H,1551867519,78,45,16,61,11,22,62,94,9,57
Y,1551888203,77,76,51,53,74,1,68,12,93,4
K,1551886024,10,5,93,91,81,33,37,39,58,72
I,1551906415,55,5,99,38,26,74,43,29,35,2
P,1551844837,89,51,28,26,60,14,12,14,13,92
K,1551878556,79,21,11,66,63,63,4,58,64,24
L,1551918974,2,28,25,26,59,8,77,97,86,99
append(ファイル追記)の方のテスト。

この設定だと以下のコマンドを打って20秒くらいで、アップロードされます。

PS> cat .\data.txt >> C:\data\csv\append.csv

ログは以下のようになります。

2019-03-08 02:03:17 +0000 [debug]: #0 fluent/log.rb:302:debug: Created new chunk chunk_id="5838ba052a1a341483e701435088d529" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=1552010590, tag="csv.append.bar_db.b_tab", variables=nil>
2019-03-08 02:03:30 +0000 [debug]: #0 fluent/log.rb:302:debug: out_s3: write chunk 5838ba052a1a341483e701435088d529 with metadata #<struct Fluent::Plugin::Buffer::Metadata timekey=1552010590, tag="csv.append.bar_db.b_tab", variables=nil> to s3://otomo-bucket-fluentd/prefix/path/bar_db/b_tab/year=2019/month=03/day=08/b_tab_1.json.gz
update(ファイル更新)の方のテスト

ファイルが掴まれているので、以下のように更新かけます。

PS> mv -Force C:\data\csv\update.csv C:\data\csv\update.csv.done
PS> cat .\data.txt > C:\data\csv\update.csv

こちらは更新なので即時overwriteされます。デグレがおこる場合があったのでバッファリングは極力しない設定にしています。

2019-03-08 02:06:56 +0000 [debug]: #0 fluent/log.rb:302:debug: Created new chunk chunk_id="5838bad6b6a9be5dc75221ad369b7bc5" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=1552003200, tag="csv.update.foo_db.a_tab", variables=nil>
2019-03-08 02:06:56 +0000 [warn]: #0 fluent/log.rb:342:warn: prefix/path/foo_db/a_tab/year=2019/month=03/day=08/a_tab.json.gz already exists, but will overwrite
2019-03-08 02:06:56 +0000 [debug]: #0 fluent/log.rb:302:debug: out_s3: write chunk 5838bad6b6a9be5dc75221ad369b7bc5 with metadata #<struct Fluent::Plugin::Buffer::Metadata timekey=1552003200, tag="csv.update.foo_db.a_tab", variables=nil> to s3://otomo-bucket-fluentd/prefix/path/foo_db/a_tab/year=2019/month=03/day=08/a_tab.json.gz

サービス化

テスト完了したら以下のコマンドでWindowsサービス化を行い完了です。

この際に必要な引数もこちら参考に設定しておきます。

fluentd --reg-winsvc i
fluentd --reg-winsvc-fluentdopt '-c C:/opt/td-agent/etc/td-agent/td-agent.conf -o C:/var/log/td-agent/td-agent.log --log-rotate-age 5 --log-rotate-size 104857600'

管理用RPC API

rcp_endpoint を設定すると管理用のAPIが利用できるようになります。

<system>
  rpc_endpoint 127.0.0.1:24444
</system>
# 処理停止(SIGINT)
curl http://localhost:24444/api/config.interruptWorkers

# プロセス安全停止(SIGTERM)
curl http://localhost:24444/api/config.killWorkers

# Bufferの即時送信(SIGUSR1)
curl http://localhost:24444/api/config.flushbuffers

# 設定のリロード(SIGHUP)
curl http://localhost:24444/api/config.reload

# ログに設定のダンプを出力
curl http://localhost:24444/api/config.dump

# レスポンスとして設定のダンプを返す 。enable_get_dumpの設定が必要
curl http://localhost:24444/api/config.getDump

参考:https://github.com/fluent/fluentd/blob/v1.3.3/lib/fluent/supervisor.rb#L75

モニタリング

以下で各種情報を取得可能です。jq一緒に使うと便利です。

curl http://localhost:24220/api/plugins.json | jq

docs.fluentd.org

参考URL

docs.fluentd.org

github.com