WindowsにFluentdを入れる機会があったのでまとめておきます。
td-agent(Fluentd)インストール
td-agentダウンロード
以下からmsiファイルをダウンロードしてWindowsに配置します。
本手順では td-agent-3.3.0-1-x64.msi
を利用します。
https://td-agent-package-browser.herokuapp.com/3/windows
td-agentインストール
ダウンロードしたmsiファイルを実行します。この際にUnknown publisher の警告が表示されますが、無視して実行します。 全てデフォルトの設定で進めればインストールしました。
インストールが完了しました。
プラグインのインストール
スタートメニューに追加されている Td-agent Command Prompt
を実行する。
S3への出力には fluent-plugin-s3
が必要です。こちらを最新版にアップデートしておきます。
C:\opt\td-agent>fluent-gem update fluent-plugin-s3
入っているプラグインを確認します。
今回主に利用するのは fluent-plugin-s3
です。
C:\opt\td-agent>fluent-gem list | findstr fluent fluent-config-regexp-type (1.0.0) fluent-logger (0.7.2) fluent-plugin-elasticsearch (3.0.1) fluent-plugin-kafka (0.8.3) fluent-plugin-record-modifier (1.1.0) fluent-plugin-rewrite-tag-filter (2.1.1) fluent-plugin-s3 (1.1.8, 1.1.7) fluent-plugin-td (1.0.0) fluent-plugin-td-monitoring (0.2.4) fluent-plugin-webhdfs (1.2.3) fluent-plugin-windows-eventlog (0.2.2) fluentd (1.3.3)
ディレクトリ
以下でディレクトリを作成しておきます。設定ファイルで調整可能なので、場所はどこでも問題ありません。
PS> mkdir C:\var\run\pos PS> mkdir C:\var\run\buffer PS> mkdir C:\var\log\td-agent PS> mkdir C:\data\csv PS> mkdir C:\opt\td-agent\etc\td-agent\config.d
それぞれ内容はこちらです。
Dir | 内容 |
---|---|
C:\var\run\pos | ポジションファイルを出力 |
C:\var\run\buffer | バッファを出力 |
C:\var\log\td-agent | td-agentのログ出力先 |
C:\data\csv | データを置く場所 |
C:\opt\td-agent\etc\td-agent\config.d | 設定ファイル配置先 |
設定例
設定ファイルの退避
元のファイルを退避しておきます。
PS> mv C:\opt\td-agent\etc\td-agent\td-agent.conf C:\opt\td-agent\etc\td-agent\td-agent.conf.org
設定ファイル
注意)設定ファイルをメモ帳で開くときにはANSIで保存します
Windowsでの設定ファイルのポイントはパスに \
を使わずに /
を使うことです。 \
を使うと動かない部分が出てきます。
メインの設定ファイル( C:\opt\td-agent\etc\td-agent\td-agent.conf
)は 以下のように外部の設定ファイルを読む形にしています。
@include config.d/*.conf
参照先の C:\opt\td-agent\etc\td-agent\config.d\
配下に今回の設定ファイルを追加します。
設定のポイントだけ書いておきます。
- Windowsでの設定ファイルのポイントはパスに
\
を使わずに/
を使うことです。\
を使うと動かない部分が出てきます。(結構ハマりました) - 後述しますがUpdateとAppendで動きを変えています
- S3のパスはHive、Athena等で利用しやすい形式にしています
- 最初はForest Pluginを利用していましたが、v0.14からtag情報使えると聞いたのでforestプラグインなしで設定しています
- Bufferセクションの設定は結構難しいです。ドキュメント読むのは当然として、debugログ出しながら試してみて理解することを強く推奨です。
Append用
- s3.b1.append.conf
<source> @type tail path C:/data/csv/append.csv pos_file C:/var/run/pos/bar_b_append.pos <parse> @type csv keys type,timestamp,value2,value2,value3,value4,value5,value6,value7,value8,value9,value10 types type:string,timestamp:integer,value1:integer,value2:integer,value3:integer,value4:integer,value5:integer,value6:integer,value7:integer,value8:integer,value9:integer,value10:integer </parse> # out.bucket_id.method.db.table tag s3.b1.append.bar_db.b_tab </source> <match s3.b1.append.**> @type s3 aws_key_id YOUR_ACCESS_KEY aws_sec_key YOUR_SECRET_ACCESS_KEY s3_region ap-northeast-1 s3_bucket otomo-bucket-fluentd path prefix/path/${tag[3]}/${tag[4]} s3_object_key_format %{path}/%{time_slice}/${tag[4]}_%{index}.json.%{file_extension} time_slice_format year=%Y/month=%m/day=%d <buffer tag,time> @type file path C:/var/run/buffer/s3.b1.append/buffer timekey 1m timekey_wait 1m timekey_zone Asia/Tokyo retry_forever true compress gzip </buffer <format> @type json </format> </match>
Update用
- s3.b1.update.conf
<source> @type tail path C:/data/csv/update.csv pos_file C:/var/run/pos/foo_a_update.pos # for reading whole updated file read_from_head true <parse> @type csv keys type,timestamp,value1,value2,value3,value4,value5,value6,value7,value8,value9,value10 types type:string,timestamp:integer,value1:integer,value2:integer,value3:integer,value4:integer,value5:integer,value6:integer,value7:integer,value8:integer,value9:integer,value10:integer </parse> # out.bucket_id.method.db.table tag s3.b1.update.foo_db.a_tab </source> <match s3.b1.update.**> @type s3 aws_key_id YOUR_ACCESS_KEY aws_sec_key YOUR_SECRET_ACCESS_KEY s3_region ap-northeast-1 s3_bucket otomo-bucket-fluentd path prefix/path/${tag[3]}/${tag[4]} s3_object_key_format %{path}/%{time_slice}/${tag[4]}.json.%{file_extension} overwrite true time_slice_format year=%Y/month=%m/day=%d/hour=%H/min=%M <buffer tag,time> @type file path C:/var/run/buffer/s3.b1.update/buffer chunk_limit_size 10g total_limit_size 64g flush_mode interval flush_interval 10m timekey_zone Asia/Tokyo timekey 10m timekey_wait 1m retry_forever false retry_type periodic retry_wait 60s retry_max_times 3 compress gzip </buffer> <format> @type json </format> </match>
Monitor用
- monitor.conf
<source> @type monitor_agent bind 127.0.0.1 port 24220 </source>
AWSクレデンシャル
ここでは aws_key_id
& aws_sec_key
を使っていますが、EC2のIAMロールや、別ロールをAssumeして使うことも可能です。それぞれ設定値が異なるので、ここで確認可能です。
https://github.com/fluent/fluent-plugin-s3/blob/v1.1.8/lib/fluent/plugin/out_s3.rb#L409-L457
起動してテスト
td-agent command prompt
を起動し以下を実行。テスト時はこちらの方式で起動する。
-v
はverboseフラグなのでログを詳細に出すことができます、
-vvv
とすると更に詳細なログが確認できます。
C:\opt\td-agent>fluentd -v -c etc\td-agent\td-agent.conf
テストデータ準備
以下のCSVデータをS3にJSON Gzip形式でアップロードするテストします。一度 .\data.txt
に保存します。
R,1551919932,5,14,23,96,2,17,57,51,64,40 L,1551858639,24,38,86,31,1,40,21,71,71,26 Z,1551845360,26,42,85,29,93,24,90,66,42,54 L,1551916090,36,52,33,61,43,35,48,30,84,36 D,1551874206,76,82,67,43,96,99,53,23,33,56 X,1551916749,41,3,46,60,11,71,25,95,70,61 O,1551898421,24,61,8,3,10,22,19,0,56,11 M,1551868960,6,67,16,42,62,21,64,37,25,5 T,1551866088,5,13,90,33,47,99,98,60,26,83 H,1551867519,78,45,16,61,11,22,62,94,9,57 Y,1551888203,77,76,51,53,74,1,68,12,93,4 K,1551886024,10,5,93,91,81,33,37,39,58,72 I,1551906415,55,5,99,38,26,74,43,29,35,2 P,1551844837,89,51,28,26,60,14,12,14,13,92 K,1551878556,79,21,11,66,63,63,4,58,64,24 L,1551918974,2,28,25,26,59,8,77,97,86,99
append(ファイル追記)の方のテスト。
この設定だと以下のコマンドを打って20秒くらいで、アップロードされます。
PS> cat .\data.txt >> C:\data\csv\append.csv
ログは以下のようになります。
2019-03-08 02:03:17 +0000 [debug]: #0 fluent/log.rb:302:debug: Created new chunk chunk_id="5838ba052a1a341483e701435088d529" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=1552010590, tag="csv.append.bar_db.b_tab", variables=nil> 2019-03-08 02:03:30 +0000 [debug]: #0 fluent/log.rb:302:debug: out_s3: write chunk 5838ba052a1a341483e701435088d529 with metadata #<struct Fluent::Plugin::Buffer::Metadata timekey=1552010590, tag="csv.append.bar_db.b_tab", variables=nil> to s3://otomo-bucket-fluentd/prefix/path/bar_db/b_tab/year=2019/month=03/day=08/b_tab_1.json.gz
update(ファイル更新)の方のテスト
ファイルが掴まれているので、以下のように更新かけます。
PS> mv -Force C:\data\csv\update.csv C:\data\csv\update.csv.done PS> cat .\data.txt > C:\data\csv\update.csv
こちらは更新なので即時overwriteされます。デグレがおこる場合があったのでバッファリングは極力しない設定にしています。
2019-03-08 02:06:56 +0000 [debug]: #0 fluent/log.rb:302:debug: Created new chunk chunk_id="5838bad6b6a9be5dc75221ad369b7bc5" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=1552003200, tag="csv.update.foo_db.a_tab", variables=nil> 2019-03-08 02:06:56 +0000 [warn]: #0 fluent/log.rb:342:warn: prefix/path/foo_db/a_tab/year=2019/month=03/day=08/a_tab.json.gz already exists, but will overwrite 2019-03-08 02:06:56 +0000 [debug]: #0 fluent/log.rb:302:debug: out_s3: write chunk 5838bad6b6a9be5dc75221ad369b7bc5 with metadata #<struct Fluent::Plugin::Buffer::Metadata timekey=1552003200, tag="csv.update.foo_db.a_tab", variables=nil> to s3://otomo-bucket-fluentd/prefix/path/foo_db/a_tab/year=2019/month=03/day=08/a_tab.json.gz
サービス化
テスト完了したら以下のコマンドでWindowsサービス化を行い完了です。
この際に必要な引数もこちら参考に設定しておきます。
fluentd --reg-winsvc i fluentd --reg-winsvc-fluentdopt '-c C:/opt/td-agent/etc/td-agent/td-agent.conf -o C:/var/log/td-agent/td-agent.log --log-rotate-age 5 --log-rotate-size 104857600'
管理用RPC API
rcp_endpoint
を設定すると管理用のAPIが利用できるようになります。
<system> rpc_endpoint 127.0.0.1:24444 </system>
# 処理停止(SIGINT) curl http://localhost:24444/api/config.interruptWorkers # プロセス安全停止(SIGTERM) curl http://localhost:24444/api/config.killWorkers # Bufferの即時送信(SIGUSR1) curl http://localhost:24444/api/config.flushbuffers # 設定のリロード(SIGHUP) curl http://localhost:24444/api/config.reload # ログに設定のダンプを出力 curl http://localhost:24444/api/config.dump # レスポンスとして設定のダンプを返す 。enable_get_dumpの設定が必要 curl http://localhost:24444/api/config.getDump
参考:https://github.com/fluent/fluentd/blob/v1.3.3/lib/fluent/supervisor.rb#L75
モニタリング
以下で各種情報を取得可能です。jq一緒に使うと便利です。
curl http://localhost:24220/api/plugins.json | jq