Grokパターン作成
事前調査
ALBのログをパースするためのGrokのパターンを作ってみます。
参考になるのはこの辺りです。
ALBのアクセスログの項目
Application Load Balancer のアクセスログ - Elastic Load Balancing
ビルドインのGrokのパターン
https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
logstashのELBのGrokパターン
ELB(CLB)版ですがALBやるにも参考になります。
https://github.com/logstash-plugins/logstash-patterns-core/blob/v4.1.2/patterns/aws#L11
作成
デバッガーでテストしながらパターンを作ってみます。
https://grokdebug.herokuapp.com/
ALBのログ用にこのようなGrokパターン作ってみました。穴がありそうな気もしなくはないですが。。。
%{NOTSPACE:type} %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:elb} %{IP:client_ip}:%{INT:client_port} (?:(%{IP:target_ip}:?:%{INT:target_port})|-) %{NUMBER:request_processing_time} %{NUMBER:target_processing_time} %{NUMBER:response_processing_time} %{NUMBER:elb_status_code} (?:%{NUMBER:target_status_code}|-) %{NUMBER:received_bytes} %{NUMBER:sent_bytes} "%{NOTSPACE:request_method} %{NOTSPACE:request_url} %{NOTSPACE:protocol}" "%{GREEDYDATA:user_agent}" %{NOTSPACE:ssl_cipher} %{NOTSPACE:ssl_protocol} %{NOTSPACE:target_group_arn} "%{NOTSPACE:trace_id}" "%{NOTSPACE:domain_name}" "%{NOTSPACE:chosen_cert_arn}" (?:%{NUMBER:matched_rule_priority}|-) %{TIMESTAMP_ISO8601:request_creation_time} "%{GREEDYDATA:actions_executed}" "%{GREEDYDATA:redirect_url}"
※正規表現でMatchしていた時もそうなのですが、Request URLの部分は例えば [52.1xx.xxx.xxx]
みたいな記法でアクセスしてくる輩がいるので、ビルドインのURL表現そのまま利用できません。ただ、以下の辺りを参考に調整すればいけると思います。それでも意味の変なアクセスログが時々あったりするので、100%はなかなか難しいです。
URIPROTO [A-Za-z]([A-Za-z0-9+\-.]+)+ URIHOST %{IPORHOST}(?::%{POSINT:port})? # uripath comes loosely from RFC1738, but mostly from what Firefox # doesn't turn into %XX URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%&_\-]*)+ #URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)? URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>]* URIPATHPARAM %{URIPATH}(?:%{URIPARAM})? URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?
分類子(Classifier登録)
ウィザード通りで迷うことも無いので結果だけ。このように分類子を登録します。
$ aws glue get-classifier --name alb-access-log { "Classifier": { "GrokClassifier": { "Name": "alb-access-log", "Classification": "ALB Access Log", "CreationTime": 1539685175.0, "LastUpdated": 1539685175.0, "Version": 1, "GrokPattern": "%{NOTSPACE:type} %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:elb} %{IP:client_ip}:%{INT:client_port} (?:(%{IP:target_ip}:?:%{INT:target_port})|-) %{NUMBER:request_processing_time} %{NUMBER:target_processing_time} %{NUMBER:response_processing_time} %{NUMBER:elb_status_code} (?:%{NUMBER:target_status_code}|-) %{NUMBER:received_bytes} %{NUMBER:sent_bytes} \"%{NOTSPACE:request_method} %{NOTSPACE:request_url} %{NOTSPACE:protocol}\" \"%{GREEDYDATA:user_agent}\" %{NOTSPACE:ssl_cipher} %{NOTSPACE:ssl_protocol} %{NOTSPACE:target_group_arn} \"%{NOTSPACE:trace_id}\" \"%{NOTSPACE:domain_name}\" \"%{NOTSPACE:chosen_cert_arn}\" (?:%{NUMBER:matched_rule_priority}|-) %{TIMESTAMP_ISO8601:request_creation_time} \"%{GREEDYDATA:actions_executed}\" \"%{GREEDYDATA:redirect_url}\"" } } }
Crawler
クローラを登録する際にカスタム分類子として上記で登録したものを設定します。クローラが動いた際には最初にこちらが使われマッチしない場合にビルドインのパーサーが動くようです。
ほかは迷うところ無いと思うので結果だけ。
$ aws glue get-crawler --name alb-access-log-crawler { "Crawler": { "Name": "alb-access-log-crawler", "Role": "service-role/AWSGlueServiceRole-MyALBLog", "Targets": { "S3Targets": [ { "Path": "s3://your-bucket/path/to/log/your-alb-name", "Exclusions": [] } ], "JdbcTargets": [], "DynamoDBTargets": [] }, "DatabaseName": "alblog", "Classifiers": [ "alb-access-log" ], "SchemaChangePolicy": { "UpdateBehavior": "UPDATE_IN_DATABASE", "DeleteBehavior": "DEPRECATE_IN_DATABASE" }, "State": "READY", "CrawlElapsedTime": 0, "CreationTime": 1539685376.0, "LastUpdated": 1539685376.0, "LastCrawl": { "Status": "SUCCEEDED", "LogGroup": "/aws-glue/crawlers", "LogStream": "alb-access-log-crawler", "MessagePrefix": "4740dc11-bdb3-4050-bdc0-0cb6f4f45fc9", "StartTime": 1539685378.0 }, "Version": 1 } }
カタログデータベース確認
テーブル確認
この時点でカタログデータベースに登録されていることが確認できます。
$ aws glue get-table --database alblog --name elasticloadbalancing --query 'Table.StorageDescriptor.Columns' [ { "Name": "type", "Type": "string" }, { "Name": "timestamp", "Type": "string" }, { "Name": "elb", "Type": "string" }, { "Name": "client_ip", "Type": "string" }, { "Name": "client_port", "Type": "string" }, { "Name": "target_ip", "Type": "string" }, { "Name": "target_port", "Type": "string" }, { "Name": "request_processing_time", "Type": "string" }, { "Name": "target_processing_time", "Type": "string" }, { "Name": "response_processing_time", "Type": "string" }, { "Name": "elb_status_code", "Type": "string" }, { "Name": "target_status_code", "Type": "string" }, { "Name": "received_bytes", "Type": "string" }, { "Name": "sent_bytes", "Type": "string" }, { "Name": "request_method", "Type": "string" }, { "Name": "request_url", "Type": "string" }, { "Name": "protocol", "Type": "string" }, { "Name": "user_agent", "Type": "string" }, { "Name": "ssl_cipher", "Type": "string" }, { "Name": "ssl_protocol", "Type": "string" }, { "Name": "target_group_arn", "Type": "string" }, { "Name": "trace_id", "Type": "string" }, { "Name": "domain_name", "Type": "string" }, { "Name": "chosen_cert_arn", "Type": "string" }, { "Name": "matched_rule_priority", "Type": "string" }, { "Name": "request_creation_time", "Type": "string" }, { "Name": "actions_executed", "Type": "string" }, { "Name": "redirect_url", "Type": "string" } ]
Athenaで検索
Glueで生成したデータカタログでAthenaでも利用できます。パーティショニングもされているし、これだけでも結構便利だと思います。
ETLジョブ
後はParquetに変換するのですが、そのくらいならGlueのELBジョブのウィザード進めるだけで以下のようなpysparkのコードが生成されるので、それ実行するだけでいけます。
glueのpyspark拡張のリファレンスはこちらです。
AWS Glue PySpark Extensions Reference - AWS Glue
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "alblog", table_name = "elasticloadbalancing", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "alblog", table_name = "elasticloadbalancing", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("type", "string", "type", "string"), ("timestamp", "string", "timestamp", "string"), ("elb", "string", "elb", "string"), ("client_ip", "string", "client_ip", "string"), ("client_port", "string", "client_port", "string"), ("target_ip", "string", "target_ip", "string"), ("target_port", "string", "target_port", "string"), ("request_processing_time", "string", "request_processing_time", "string"), ("target_processing_time", "string", "target_processing_time", "string"), ("response_processing_time", "string", "response_processing_time", "string"), ("elb_status_code", "string", "elb_status_code", "string"), ("target_status_code", "string", "target_status_code", "string"), ("received_bytes", "string", "received_bytes", "string"), ("sent_bytes", "string", "sent_bytes", "string"), ("request_method", "string", "request_method", "string"), ("request_url", "string", "request_url", "string"), ("protocol", "string", "protocol", "string"), ("user_agent", "string", "user_agent", "string"), ("ssl_cipher", "string", "ssl_cipher", "string"), ("ssl_protocol", "string", "ssl_protocol", "string"), ("target_group_arn", "string", "target_group_arn", "string"), ("trace_id", "string", "trace_id", "string"), ("domain_name", "string", "domain_name", "string"), ("chosen_cert_arn", "string", "chosen_cert_arn", "string"), ("matched_rule_priority", "string", "matched_rule_priority", "string"), ("request_creation_time", "string", "request_creation_time", "string"), ("actions_executed", "string", "actions_executed", "string"), ("redirect_url", "string", "redirect_url", "string"), ("partition_0", "string", "partition_0", "string"), ("partition_1", "string", "partition_1", "string"), ("partition_2", "string", "partition_2", "string"), ("partition_3", "string", "partition_3", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("type", "string", "type", "string"), ("timestamp", "string", "timestamp", "string"), ("elb", "string", "elb", "string"), ("client_ip", "string", "client_ip", "string"), ("client_port", "string", "client_port", "string"), ("target_ip", "string", "target_ip", "string"), ("target_port", "string", "target_port", "string"), ("request_processing_time", "string", "request_processing_time", "string"), ("target_processing_time", "string", "target_processing_time", "string"), ("response_processing_time", "string", "response_processing_time", "string"), ("elb_status_code", "string", "elb_status_code", "string"), ("target_status_code", "string", "target_status_code", "string"), ("received_bytes", "string", "received_bytes", "string"), ("sent_bytes", "string", "sent_bytes", "string"), ("request_method", "string", "request_method", "string"), ("request_url", "string", "request_url", "string"), ("protocol", "string", "protocol", "string"), ("user_agent", "string", "user_agent", "string"), ("ssl_cipher", "string", "ssl_cipher", "string"), ("ssl_protocol", "string", "ssl_protocol", "string"), ("target_group_arn", "string", "target_group_arn", "string"), ("trace_id", "string", "trace_id", "string"), ("domain_name", "string", "domain_name", "string"), ("chosen_cert_arn", "string", "chosen_cert_arn", "string"), ("matched_rule_priority", "string", "matched_rule_priority", "string"), ("request_creation_time", "string", "request_creation_time", "string"), ("actions_executed", "string", "actions_executed", "string"), ("redirect_url", "string", "redirect_url", "string"), ("partition_0", "string", "partition_0", "string"), ("partition_1", "string", "partition_1", "string"), ("partition_2", "string", "partition_2", "string"), ("partition_3", "string", "partition_3", "string")], transformation_ctx = "applymapping1") ## @type: ResolveChoice ## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"] ## @return: resolvechoice2 ## @inputs: [frame = applymapping1] resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2") ## @type: DropNullFields ## @args: [transformation_ctx = "dropnullfields3"] ## @return: dropnullfields3 ## @inputs: [frame = resolvechoice2] dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://otomo-devel/glue/parquet"}, format = "parquet", transformation_ctx = "datasink4"] ## @return: datasink4 ## @inputs: [frame = dropnullfields3] datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://otomo-devel/glue/parquet"}, format = "parquet", transformation_ctx = "datasink4") job.commit()
Glueからパーティショニングして書き込み
パーティション作りながら書き込むパターンもやってみます。
AWS Glue での ETL 出力のパーティションの管理 - AWS Glue
上記pythonコードに対して write_dynamic_frame
の部分に partitionKeys
のプロパティを入れて実行します。
datasink4 = glueContext.write_dynamic_frame.from_options( frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://otomo-devel/glue/pqrquet","partitionKeys":["partition_1","partition_2","partition_3"]}, format = "parquet", transformation_ctx = "datasink4")
フォーマット毎にAthenaで計測
せっかく色々フォーマットを準備したので計測してみました。
- 計測に使ってみたSQL
SELECT count(timestamp) FROM "db"."table" where timestamp like '2018-09-25%' or timestamp like '2018-09-26%';
- 計測結果
色々試してみました。
Format | 圧縮 | 実行時間(秒) | スキャンしたデータ | データ容量 |
---|---|---|---|---|
Raw | Gzip | 15.19 | 151.08MB | 151MB |
Raw(Partition利用) | Gzip | 3.11 | 3.66MB | 同上 |
Raw(正規表現) | Gzip | 16.32 | 151.12MB | 同上 |
Parquet | snappy | 57.86 | 17.75MB | 446MB |
Parquet(Partition利用) | snappy | 6.54 | 480.12KB | 同上 |
ORC | snappy | 57.95 | 307.27MB | 307MB |
JSON | Gzip | 15.58 | 85.03MB | 85.0 MB |
JSON(Partition利用) | Gzip | 1.29 | 2.19MB | 同上 |
JSON | None | 17.98 | 1.48GB | 1.5 GB |
JSON(Partition利用) | None | 3.61 | 42.06MB | 同上 |
Raw(正規表現)の項目は、こちらのDDLでテーブル作成して同様の処理を行ったものです。
最後に
今回のケースの場合は容量も小さいですし、結果として参考になるかは限定的ですが、Glue使うと色々な形式を簡単に試せる点はとても良さそうです。