AWS GlueでGrok使いALB アクセスログを取り込んでParquetやORC形式に変換

f:id:yomon8:20181017121627p:plain

Grokパターン作成

事前調査

ALBのログをパースするためのGrokのパターンを作ってみます。

参考になるのはこの辺りです。

ALBのアクセスログの項目

Application Load Balancer のアクセスログ - Elastic Load Balancing

ビルドインのGrokのパターン

https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

logstashのELBのGrokパターン

ELB(CLB)版ですがALBやるにも参考になります。

https://github.com/logstash-plugins/logstash-patterns-core/blob/v4.1.2/patterns/aws#L11

作成

デバッガーでテストしながらパターンを作ってみます。

https://grokdebug.herokuapp.com/

ALBのログ用にこのようなGrokパターン作ってみました。穴がありそうな気もしなくはないですが。。。

%{NOTSPACE:type} %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:elb} %{IP:client_ip}:%{INT:client_port} (?:(%{IP:target_ip}:?:%{INT:target_port})|-) %{NUMBER:request_processing_time} %{NUMBER:target_processing_time} %{NUMBER:response_processing_time} %{NUMBER:elb_status_code} (?:%{NUMBER:target_status_code}|-) %{NUMBER:received_bytes} %{NUMBER:sent_bytes} "%{NOTSPACE:request_method} %{NOTSPACE:request_url} %{NOTSPACE:protocol}" "%{GREEDYDATA:user_agent}" %{NOTSPACE:ssl_cipher} %{NOTSPACE:ssl_protocol} %{NOTSPACE:target_group_arn} "%{NOTSPACE:trace_id}" "%{NOTSPACE:domain_name}" "%{NOTSPACE:chosen_cert_arn}" (?:%{NUMBER:matched_rule_priority}|-) %{TIMESTAMP_ISO8601:request_creation_time} "%{GREEDYDATA:actions_executed}" "%{GREEDYDATA:redirect_url}"

正規表現でMatchしていた時もそうなのですが、Request URLの部分は例えば [52.1xx.xxx.xxx] みたいな記法でアクセスしてくる輩がいるので、ビルドインのURL表現そのまま利用できません。ただ、以下の辺りを参考に調整すればいけると思います。それでも意味の変なアクセスログが時々あったりするので、100%はなかなか難しいです。

URIPROTO [A-Za-z]([A-Za-z0-9+\-.]+)+
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%&_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?

分類子(Classifier登録)

ウィザード通りで迷うことも無いので結果だけ。このように分類子を登録します。

$ aws glue get-classifier --name alb-access-log
{
    "Classifier": {
        "GrokClassifier": {
            "Name": "alb-access-log",
            "Classification": "ALB Access Log",
            "CreationTime": 1539685175.0,
            "LastUpdated": 1539685175.0,
            "Version": 1,
            "GrokPattern": "%{NOTSPACE:type} %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:elb} %{IP:client_ip}:%{INT:client_port} (?:(%{IP:target_ip}:?:%{INT:target_port})|-) %{NUMBER:request_processing_time} %{NUMBER:target_processing_time} %{NUMBER:response_processing_time} %{NUMBER:elb_status_code} (?:%{NUMBER:target_status_code}|-) %{NUMBER:received_bytes} %{NUMBER:sent_bytes} \"%{NOTSPACE:request_method} %{NOTSPACE:request_url} %{NOTSPACE:protocol}\" \"%{GREEDYDATA:user_agent}\" %{NOTSPACE:ssl_cipher} %{NOTSPACE:ssl_protocol} %{NOTSPACE:target_group_arn} \"%{NOTSPACE:trace_id}\" \"%{NOTSPACE:domain_name}\" \"%{NOTSPACE:chosen_cert_arn}\" (?:%{NUMBER:matched_rule_priority}|-) %{TIMESTAMP_ISO8601:request_creation_time} \"%{GREEDYDATA:actions_executed}\" \"%{GREEDYDATA:redirect_url}\""
        }
    }
}

Crawler

クローラを登録する際にカスタム分類子として上記で登録したものを設定します。クローラが動いた際には最初にこちらが使われマッチしない場合にビルドインのパーサーが動くようです。

ほかは迷うところ無いと思うので結果だけ。

$ aws glue get-crawler --name alb-access-log-crawler
{
    "Crawler": {
        "Name": "alb-access-log-crawler",
        "Role": "service-role/AWSGlueServiceRole-MyALBLog",
        "Targets": {
            "S3Targets": [
                {
                    "Path": "s3://your-bucket/path/to/log/your-alb-name",
                    "Exclusions": []
                }
            ],
            "JdbcTargets": [],
            "DynamoDBTargets": []
        },
        "DatabaseName": "alblog",
        "Classifiers": [
            "alb-access-log"
        ],
        "SchemaChangePolicy": {
            "UpdateBehavior": "UPDATE_IN_DATABASE",
            "DeleteBehavior": "DEPRECATE_IN_DATABASE"
        },
        "State": "READY",
        "CrawlElapsedTime": 0,
        "CreationTime": 1539685376.0,
        "LastUpdated": 1539685376.0,
        "LastCrawl": {
            "Status": "SUCCEEDED",
            "LogGroup": "/aws-glue/crawlers",
            "LogStream": "alb-access-log-crawler",
            "MessagePrefix": "4740dc11-bdb3-4050-bdc0-0cb6f4f45fc9",
            "StartTime": 1539685378.0
        },
        "Version": 1
    }
}

カタログデータベース確認

テーブル確認

この時点でカタログデータベースに登録されていることが確認できます。

$ aws glue get-table --database alblog --name elasticloadbalancing --query 'Table.StorageDescriptor.Columns'
[
    {
        "Name": "type",
        "Type": "string"
    },
    {
        "Name": "timestamp",
        "Type": "string"
    },
    {
        "Name": "elb",
        "Type": "string"
    },
    {
        "Name": "client_ip",
        "Type": "string"
    },
    {
        "Name": "client_port",
        "Type": "string"
    },
    {
        "Name": "target_ip",
        "Type": "string"
    },
    {
        "Name": "target_port",
        "Type": "string"
    },
    {
        "Name": "request_processing_time",
        "Type": "string"
    },
    {
        "Name": "target_processing_time",
        "Type": "string"
    },
    {
        "Name": "response_processing_time",
        "Type": "string"
    },
    {
        "Name": "elb_status_code",
        "Type": "string"
    },
    {
        "Name": "target_status_code",
        "Type": "string"
    },
    {
        "Name": "received_bytes",
        "Type": "string"
    },
    {
        "Name": "sent_bytes",
        "Type": "string"
    },
    {
        "Name": "request_method",
        "Type": "string"
    },
    {
        "Name": "request_url",
        "Type": "string"
    },
    {
        "Name": "protocol",
        "Type": "string"
    },
    {
        "Name": "user_agent",
        "Type": "string"
    },
    {
        "Name": "ssl_cipher",
        "Type": "string"
    },
    {
        "Name": "ssl_protocol",
        "Type": "string"
    },
    {
        "Name": "target_group_arn",
        "Type": "string"
    },
    {
        "Name": "trace_id",
        "Type": "string"
    },
    {
        "Name": "domain_name",
        "Type": "string"
    },
    {
        "Name": "chosen_cert_arn",
        "Type": "string"
    },
    {
        "Name": "matched_rule_priority",
        "Type": "string"
    },
    {
        "Name": "request_creation_time",
        "Type": "string"
    },
    {
        "Name": "actions_executed",
        "Type": "string"
    },
    {
        "Name": "redirect_url",
        "Type": "string"
    }
]

Athenaで検索

Glueで生成したデータカタログでAthenaでも利用できます。パーティショニングもされているし、これだけでも結構便利だと思います。

ETLジョブ

後はParquetに変換するのですが、そのくらいならGlueのELBジョブのウィザード進めるだけで以下のようなpysparkのコードが生成されるので、それ実行するだけでいけます。

glueのpyspark拡張のリファレンスはこちらです。

AWS Glue PySpark Extensions Reference - AWS Glue

GitHub - awslabs/aws-glue-libs: AWS Glue Libraries are additions and enhancements to Spark for ETL operations.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "alblog", table_name = "elasticloadbalancing", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "alblog", table_name = "elasticloadbalancing", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("type", "string", "type", "string"), ("timestamp", "string", "timestamp", "string"), ("elb", "string", "elb", "string"), ("client_ip", "string", "client_ip", "string"), ("client_port", "string", "client_port", "string"), ("target_ip", "string", "target_ip", "string"), ("target_port", "string", "target_port", "string"), ("request_processing_time", "string", "request_processing_time", "string"), ("target_processing_time", "string", "target_processing_time", "string"), ("response_processing_time", "string", "response_processing_time", "string"), ("elb_status_code", "string", "elb_status_code", "string"), ("target_status_code", "string", "target_status_code", "string"), ("received_bytes", "string", "received_bytes", "string"), ("sent_bytes", "string", "sent_bytes", "string"), ("request_method", "string", "request_method", "string"), ("request_url", "string", "request_url", "string"), ("protocol", "string", "protocol", "string"), ("user_agent", "string", "user_agent", "string"), ("ssl_cipher", "string", "ssl_cipher", "string"), ("ssl_protocol", "string", "ssl_protocol", "string"), ("target_group_arn", "string", "target_group_arn", "string"), ("trace_id", "string", "trace_id", "string"), ("domain_name", "string", "domain_name", "string"), ("chosen_cert_arn", "string", "chosen_cert_arn", "string"), ("matched_rule_priority", "string", "matched_rule_priority", "string"), ("request_creation_time", "string", "request_creation_time", "string"), ("actions_executed", "string", "actions_executed", "string"), ("redirect_url", "string", "redirect_url", "string"), ("partition_0", "string", "partition_0", "string"), ("partition_1", "string", "partition_1", "string"), ("partition_2", "string", "partition_2", "string"), ("partition_3", "string", "partition_3", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("type", "string", "type", "string"), ("timestamp", "string", "timestamp", "string"), ("elb", "string", "elb", "string"), ("client_ip", "string", "client_ip", "string"), ("client_port", "string", "client_port", "string"), ("target_ip", "string", "target_ip", "string"), ("target_port", "string", "target_port", "string"), ("request_processing_time", "string", "request_processing_time", "string"), ("target_processing_time", "string", "target_processing_time", "string"), ("response_processing_time", "string", "response_processing_time", "string"), ("elb_status_code", "string", "elb_status_code", "string"), ("target_status_code", "string", "target_status_code", "string"), ("received_bytes", "string", "received_bytes", "string"), ("sent_bytes", "string", "sent_bytes", "string"), ("request_method", "string", "request_method", "string"), ("request_url", "string", "request_url", "string"), ("protocol", "string", "protocol", "string"), ("user_agent", "string", "user_agent", "string"), ("ssl_cipher", "string", "ssl_cipher", "string"), ("ssl_protocol", "string", "ssl_protocol", "string"), ("target_group_arn", "string", "target_group_arn", "string"), ("trace_id", "string", "trace_id", "string"), ("domain_name", "string", "domain_name", "string"), ("chosen_cert_arn", "string", "chosen_cert_arn", "string"), ("matched_rule_priority", "string", "matched_rule_priority", "string"), ("request_creation_time", "string", "request_creation_time", "string"), ("actions_executed", "string", "actions_executed", "string"), ("redirect_url", "string", "redirect_url", "string"), ("partition_0", "string", "partition_0", "string"), ("partition_1", "string", "partition_1", "string"), ("partition_2", "string", "partition_2", "string"), ("partition_3", "string", "partition_3", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://otomo-devel/glue/parquet"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://otomo-devel/glue/parquet"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

Glueからパーティショニングして書き込み

パーティション作りながら書き込むパターンもやってみます。

AWS Glue での ETL 出力のパーティションの管理 - AWS Glue

上記pythonコードに対して write_dynamic_frame の部分に partitionKeys のプロパティを入れて実行します。

datasink4 = glueContext.write_dynamic_frame.from_options(
    frame = dropnullfields3, 
    connection_type = "s3", 
    connection_options = {"path": "s3://otomo-devel/glue/pqrquet","partitionKeys":["partition_1","partition_2","partition_3"]}, 
    format = "parquet", 
    transformation_ctx = "datasink4")

フォーマット毎にAthenaで計測

せっかく色々フォーマットを準備したので計測してみました。

  • 計測に使ってみたSQL
SELECT count(timestamp) 
FROM "db"."table"
where timestamp like '2018-09-25%' or timestamp like '2018-09-26%'; 
  • 計測結果

色々試してみました。

Format 圧縮 実行時間(秒) スキャンしたデータ データ容量
Raw Gzip 15.19 151.08MB 151MB
Raw(Partition利用) Gzip 3.11 3.66MB 同上
Raw(正規表現) Gzip 16.32 151.12MB 同上
Parquet snappy 57.86 17.75MB 446MB
Parquet(Partition利用) snappy 6.54 480.12KB 同上
ORC snappy 57.95 307.27MB 307MB
JSON Gzip 15.58 85.03MB 85.0 MB
JSON(Partition利用) Gzip 1.29 2.19MB 同上
JSON None 17.98 1.48GB 1.5 GB
JSON(Partition利用) None 3.61 42.06MB 同上

Raw(正規表現)の項目は、こちらのDDLでテーブル作成して同様の処理を行ったものです。

docs.aws.amazon.com

最後に

今回のケースの場合は容量も小さいですし、結果として参考になるかは限定的ですが、Glue使うと色々な形式を簡単に試せる点はとても良さそうです。