GRIB2フォーマットをpygrib.fromstringとio.BytesIOでメモリ上で扱う

課題

気象庁から取得したGRIB2ファイルをPythonで解析しようとしています。

対象となるファイル

www.data.jma.go.jp

  • GSM 全球数値予報モデルGPV
  • MSM メソ数値予報モデルGPV
  • LFM 局地数値予報モデルGPV

利用するライブラリ

pypi.org

課題詳細

やりたいことに関してはこちらの記事と大枠は同じ内容です。

1点違うのは、S3上などに置いてあるGRIB2形式のファイルを、ファイルシステムにダウンロードするのではなく、メモリに展開して扱いたいと思います。

qiita.com

イメージとしては以下のような感じです。

s3 = boto3.client("s3")
object_key = "data/Z__C_RJTD_20171205000000_MSM_GPV_Rjp_Lsurf_FH00-15_grib2.bin"
with io.BytesIO() as f:
    s3.download_fileobj(Bucket=S3_BUCKET, Fileobj=f, Key=object_key)
    gpv_file = pygrib.open(f)
    t_messages = gpv_file.select(name="Temperature")
# 省略

ただ残念ながら、こちらはエラーとなります。

それもそのはずで、この辺りを追うとわかるのですが pygrib はファイルシステム上のファイルを引数に取るように書かれています。

github.com

調査

何か渡し方を工夫すれば pygrib.open() が使えるかと思い試行錯誤したのですが、難しそうです。。。

tmpfileとか作って動かしてはいたのですが、メモリに展開すれば早いところをディスクI/Oを起こしたり、ファイルの掃除とかも面倒です。

何とか良い手が無いかと探していたら、pygrib.fromstring() という正にStringから読み込めるようなメソッドを見つけました。

def fromstring(gribstring):
    """
    fromstring(string)
    Create a :py:class:`gribmessage` instance from a python bytes object
    representing a binary grib message (the reverse of :py:meth:`gribmessage.tostring`).
    """

https://github.com/jswhit/pygrib/blob/v2.1.4rel/src/pygrib/_pygrib.pyx#L609-L615

ただ何渡せば良いかわからないです。気象庁のデータをByteとして渡しても動きません。

もう少し調べるとこんなIssueを見つけました。つまりは自分でGRIB2の仕様調べてデータを抜き出せば pygrib.fromstring() を使えるというところでした。

github.com

対応

ということで気象庁のGRIB2データをメモリだけで扱える処理を書きました。

参考資料

こちらのページから取得したPDFを読み込みながら作りました。 www.data.jma.go.jp

読み込み処理

特徴は節毎に意味を持っていて、そこで処理を分岐しているところです。特に一つのファイルに4節~7節が複数含まれているところが特徴です。

import io
import struct

import pygrib

class Grib2BufferException(Exception):
    pass


class Grib2Buffer:
    def __init__(self, msgs):
        self._msgs = msgs

    def select_by_name(self, name):
        return [m for m in self._msgs if m["name"] == name]

    def select_by_short_name(self, short_name):
        return [m for m in self._msgs if m["shortName"] == short_name]


def read_grib2(f: io.BytesIO) -> Grib2Buffer:
    # GRIB2ファイルであることを確認
    f.seek(0)
    grib_start_mark = f.read(4).decode("ascii", "ignore")
    if grib_start_mark != "GRIB":
        raise Grib2BufferException(f"Invalid Grib2 File")

    # GRIB2のファイル終端である7777を確認
    END_GRIB = "7777".encode()
    size = f.seek(0, 2)
    f.seek(size - 4)
    io_end_mark = f.read(4)
    if io_end_mark != END_GRIB:
        raise Grib2BufferException(f"Invalid Grib2 File")
    f.seek(0)

    msgs = []
    bitmap = None
    # 0節目を読み込み
    s_0_half = f.read(8)
    # 0節目のサイズ部分を読み飛ばし
    f.seek(f.tell()+8)
    header = bytes()
    while 1:
        # 節の開始4bytesを取得(値取得後は節開始場所に戻す)
        s_start_pos = f.tell()
        first_4_bytes = f.read(4)

        # 7777(926365495)の場合は終端
        if first_4_bytes == END_GRIB:
            break

        # 最初の4bytesは節の長さを示す
        s_len = struct.unpack(">i", first_4_bytes)[0]

        # 次の1byteは節の番号を示す
        s_no = int(struct.unpack("b", f.read(1))[0])

        # 節の最初に戻し、節番号を取得し場合分け処理
        f.seek(s_start_pos)
        if s_no < 4:
            header = header + f.read(s_len)
        elif s_no == 4:
            data = None
            data = f.read(s_len)
        elif s_no == 5:
            data = data + f.read(s_len)
        elif s_no == 6:
            f.seek(f.tell() + 4 + 1)
            bitmap_ope = int(struct.unpack("B", f.read(1))[0])
            f.seek(s_start_pos)
            if bitmap_ope == 0:
                bitmap = f.read(s_len)
                data = data + bitmap
            elif bitmap_ope == 254:
                f.seek(f.tell() + s_len)
                data = data + bitmap
            elif bitmap_ope == 255:
                data = data + f.read(s_len)
            else:
                raise Grib2BufferException(f"invalid section 6 -> ope ({bitmap_ope})")
        elif s_no == 7:
            data = data + f.read(s_len)
            # メッセージサイズを計算し配置
            msg_size = 16 + len(header) + len(data) + len(END_GRIB)
            msg = pygrib.fromstring(s_0_half + struct.pack('>Q',msg_size) + header + data + END_GRIB)
            msgs.append(msg)
        else:
            raise Grib2BufferException(f"invalid grib2 section -> {s_no}")

    return Grib2Buffer(msgs)

使い方

例として上記の記事と同じファイルに処理を行っています。

mport boto3

S3_BUCKET = "my-bucket"
object_key = "data/Z__C_RJTD_20171205000000_MSM_GPV_Rjp_Lsurf_FH00-15_grib2.bin"
s3 = boto3.client("s3")

with io.BytesIO() as f: 
    s3.download_fileobj(Bucket=S3_BUCKET, Fileobj=f, Key=object_key)
    t_messages = read_grib2(f)
    for msg in t_messages.select_by_name(name="Temperature"):
        value = msg.data( 
            lat1=35.6745-0.025,
            lat2=35.6745+0.025,
            lon1=139.7169-0.03125,
            lon2=139.7169+0.03125,
        )[0][0][0] - 273.15
        print(msg.validDate,value)

タイムゾーンの調整していないので9時間ズレがあるままですが、同じデータが取得できています。

2017-12-05 00:00:00 9.810708618164085
2017-12-05 01:00:00 11.409219360351585
2017-12-05 02:00:00 12.358789062500023
2017-12-05 03:00:00 13.116860961914085
2017-12-05 04:00:00 13.77051696777346
2017-12-05 05:00:00 14.698541259765648
2017-12-05 06:00:00 14.488687133789085
2017-12-05 07:00:00 13.063195800781273
2017-12-05 08:00:00 11.46772155761721
2017-12-05 09:00:00 10.320886230468773
2017-12-05 10:00:00 9.59211120605471
2017-12-05 11:00:00 8.797952270507835
2017-12-05 12:00:00 8.171685791015648
2017-12-05 13:00:00 7.83240661621096
2017-12-05 14:00:00 7.461450195312523
2017-12-05 15:00:00 6.88440856933596