Glueで色々な日付フォーマットの文字列をtimestamp型に変換してみた

掲題の件、JSON等にある日付フォーマットの文字列から、Timestamp形式に変換したかったのですが、どの書式だと変換できるのか良くわかってなかったので、実際に実行してみました。

初期化

初期化してから動かしてみます。

from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
glueContext = GlueContext(sc)

日付文字列

ISO8601を中心に少しずつ違うのを並べて準備してみました。

time_strings = sc.parallelize([
    (1,"20060102"),
    (2,"2006-01-02"),
    (3,"2006/01/02"),
    (4,"2006-01-02 15:04:05"),
    (5,"2006-01-02 15:04:05.123"),
    (6,"2006-01-02 15:04:05.123456789"),
    (7,"2006-01-02T15:04:05"),  
    (8,"2006-01-02T15:04:05Z0700"),
    (9,"2006-01-02T15:04:05Z070000"),
    (10,"2006-01-02T15:04:05-0700"),
    (11,"2006-01-02T15:04:05-070000"),
    (12,"2006-01-02T15:04:05-07"),
    (13,"2006-01-02T15:04:05-07:00"),
    (14,"2006-01-02T15:04:05.999Z"),
    (15,"2006-01-02T15:04:05.999-0700"),
    (16,"2006-01-02T15:04:05Z"),
    (17,"2006-01-02T15:04:05Z07"),
    (18,"2006-01-02T15:04:05Z07:00"), 
    (19,"2006-01-02T15:04:05Z0700"), 
    (20,"2006-01-02T15:04:05.999999999Z07:00"),
]).toDF(['id','time_string'])
time_strings.printSchema()
root
 |-- id: long (nullable = true)
 |-- time_string: string (nullable = true)

Glueでマッピング

GlueのDynamicFrame使って直接 timestamp 型にマッピングしてみます。

time_strings_dyf = DynamicFrame.fromDF(
    dataframe = time_strings,
    glue_ctx = glueContext,
    name = ''
)

time_parsed = time_strings_dyf.apply_mapping([
    ('id','long','id','long'),
    ('time_string','string','time_string','string'),
    ('time_string','string','parsed_time','timestamp')
])
time_parsed.printSchema()
root
|-- id: long
|-- time_string: string
|-- parsed_time: timestamp

結果

結果を出力してみます。

time_parsed.toDF().show(truncate=False)

いくつか失敗してnullになってしまいました。細かくできたりできないのがあります。

+---+-----------------------------------+--------------------------+
|id |time_string                        |parsed_time               |
+---+-----------------------------------+--------------------------+
|1  |20060102                           |null                      |
|2  |2006-01-02                         |2006-01-02 00:00:00       |
|3  |2006/01/02                         |null                      |
|4  |2006-01-02 15:04:05                |2006-01-02 15:04:05       |
|5  |2006-01-02 15:04:05.123            |2006-01-02 15:04:05.123   |
|6  |2006-01-02 15:04:05.123456789      |2006-01-02 15:04:05.123456|
|7  |2006-01-02T15:04:05                |2006-01-02 15:04:05       |
|8  |2006-01-02T15:04:05Z0700           |null                      |
|9  |2006-01-02T15:04:05Z070000         |null                      |
|10 |2006-01-02T15:04:05-0700           |null                      |
|11 |2006-01-02T15:04:05-070000         |null                      |
|12 |2006-01-02T15:04:05-07             |2006-01-02 22:04:05       |
|13 |2006-01-02T15:04:05-07:00          |2006-01-02 22:04:05       |
|14 |2006-01-02T15:04:05.999Z           |2006-01-02 15:04:05.999   |
|15 |2006-01-02T15:04:05.999-0700       |null                      |
|16 |2006-01-02T15:04:05Z               |2006-01-02 15:04:05       |
|17 |2006-01-02T15:04:05Z07             |2006-01-02 08:04:05       |
|18 |2006-01-02T15:04:05Z07:00          |2006-01-02 08:04:05       |
|19 |2006-01-02T15:04:05Z0700           |null                      |
|20 |2006-01-02T15:04:05.999999999Z07:00|2006-01-02 08:04:05.999999|
+---+-----------------------------------+--------------------------+

DynamicFrame使わない方法も試してみました。TimestampType型にキャストしてみてます。

この方法では結果は一緒でした。特別なことはしてなさそうです。

from pyspark.sql.types import TimestampType
time_strings.select(
    'id',
    'time_string',
    time_strings['time_string'].cast(TimestampType()).alias('parsed_time')
).show(truncate=False)

UnixTimeの変換

同様にUnixtimeも試してみました。10桁と13桁のものです。

unixtime_strings = sc.parallelize([
    ("unixts", 1136181845),
    ("unixtsms", 1136181845999)
]).toDF(['id','time_long'])

こちらはGlueのマッピング使ったものと、Sparkでキャストしたものが結果が異なりました。

Glue

Glueのライブラリでは13桁ミリ秒の方が正しく変換できています。

unixtime_strings_dyf = DynamicFrame.fromDF(
    dataframe = unixtime_strings,
    glue_ctx = glueContext,
    name = ''
)

parsed_unixtime = unixtime_strings_dyf.apply_mapping([
    ('id','string','id','string'),
    ('time_long','long','time_long','long'),
    ('time_long','long','parsed_time','timestamp')
])
parsed_unixtime.toDF().show(truncate=False)
+--------+-------------+-----------------------+
|id      |time_long    |parsed_time            |
+--------+-------------+-----------------------+
|unixts  |1136181845   |1970-01-14 03:36:21.845|
|unixtsms|1136181845999|2006-01-02 06:04:05.999|
+--------+-------------+-----------------------+

Spark

こちらは10桁の秒のUnixTimeが正しく変換できています。

from pyspark.sql.types import TimestampType
from pyspark.sql.functions import from_unixtime
unixtime_strings.select(
    'id',
    'time_long',
    unixtime_strings['time_long'].cast(TimestampType()).alias('parsed_time'),
    from_unixtime(unixtime_strings['time_long']).alias('parsed_time_by_functions') # これはString型
).show(truncate=False)
+--------+-------------+-----------------------+------------------------+
|id      |time_long    |parsed_time            |parsed_time_by_functions|
+--------+-------------+-----------------------+------------------------+
|unixts  |1136181845   |2006-01-02 06:04:05    |2006-01-02 06:04:05     |
|unixtsms|1136181845999|37974-03-03 20:19:599.0|37974-03-03 20:19:59    |
+--------+-------------+-----------------------+------------------------+

sparkのソースや、pysparkの情報見てみたら、以下のようにあるので、Glue側が別の処理をしているのかも。

Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the given format. http://spark.apache.org/docs/2.2.1/api/python/_modules/pyspark/sql/functions.html#from_unixtime

https://github.com/apache/spark/blob/v2.2.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L677-L682