Apache BeamでJavaのObjectをParquet形式でS3とGCSとローカルに出力する

GCPのCloud Dataflowでも使われている、Apache BeamでJavaの内部で持っているデータをParquetに出力するやり方です。

サンプルコードの構成

以下のリポジトリに今回書いているコードを置いておきました。

github.com

こちらで補足書いておきます。

元にしたMaven ArcheType

こちらのMaven ArcheType元に作成しています。バージョンは現在の最新の2.22.0を利用しています。

mvnrepository.com

利用するPOJO

以下の構造のデータをParquetに変換してみます。

  static class PC {
    String name;
    int cpuCoreCount;
    double memoryGB;
    double diskGB;
    OS os;

    PC(String name, int cpuCoreCount, double memoryGB, double diskGB, OS os) {
      this.name = name;
      this.cpuCoreCount = cpuCoreCount;
      this.memoryGB = memoryGB;
      this.diskGB = diskGB;
      this.os = os;
    }
  }

  static class OS {
    String name;
    String version;

    OS(String name, String version) {
      this.name = name;
      this.version = version;
    }
  }

コード内では以下のように初期化しています。

List<PC> pcList = Arrays.asList(
        new PC("HomePC1", 4, 16.0, 128.0, new OS("macOS", "Catalina")),
        new PC("HomePC2", 8, 8.0, 128.0, new OS("Windows", "10")),
        new PC("OfficePC", 8, 16.0, 512.0, new OS("Windows", "10")));

GenericRecordへの変換

こちらのJavaDocにも以下の記載があるように、POJOを GenericRecord 型に変換してからParquet出力しています。

 pipeline
   .apply(...) // PCollection<GenericRecord>
   .apply(FileIO
     .<GenericRecord>write()
     .via(ParquetIO.sink(SCHEMA)
       .withCompressionCodec(CompressionCodecName.SNAPPY))
     .to("destination/path")
     .withSuffix(".parquet"));

サンプル内では以下のように書いています。

 private static List<GenericRecord> generateGenericRecords() {

    // Create Sample Data
    List<PC> pcList = Arrays.asList(new PC("HomePC1", 4, 16.0, 128.0, new OS("macOS", "Catalina")),
        new PC("HomePC2", 8, 8.0, 128.0, new OS("Windows", "10")),
        new PC("OfficePC", 8, 16.0, 512.0, new OS("Windows", "10")));

    // Convert from POJOs to GenericRecords
    Schema pcSchema = ReflectData.get().getSchema(PC.class);
    Schema osSchema = ReflectData.get().getSchema(OS.class);
    ArrayList<GenericRecord> list = new ArrayList<>();
    pcList.forEach(p -> {
      Record osRecord = new Record(osSchema);
      osRecord.put("name", p.os.name);
      osRecord.put("version", p.os.version);

      Record pcRecord = new Record(pcSchema);
      pcRecord.put("name", p.name);
      pcRecord.put("cpuCoreCount", p.cpuCoreCount);
      pcRecord.put("memoryGB", p.memoryGB);
      pcRecord.put("diskGB", p.diskGB);
      pcRecord.put("os", osRecord);
      list.add(pcRecord);
    });
    return list;
  }

もっと汎用的にやりたい場合の方法などは以下でも議論されいます。

stackoverflow.com

出力先の切り替え

ローカルに出すか、S3かGCSかは渡したスキーマの情報からBeam側で判断してくれます。

サンプルコード上は以下を切り替えてください。

S3への出力の場合だけ、region の指定が必要になります。

    // ローカルへの出力用
    String outputPath = "./tmp/parquet";

    // GCSへの出力用
    // String outputPath = "gs://your-bucket-name/parquet";

    // AWS S3への出力用(AWSは最低限Regoinの指定が必要)
    // String outputPath = "s3://your-bucket-name/parquet";
    // options.as(AwsOptions.class).setAwsRegion("ap-northeast-1");

ローカルに出力してみる

まずはローカルに出力してみます。そのまま実行すると以下のようにカレントディレクトリ配下にtmp/parquet というフォルダが作成され、ファイルが出力されると思います。

f:id:yomon8:20200710162427p:plain

parquet-toolから見てみます。(parquet-toolインストールが面倒なのでDockerHubにあったDockerイメージ使っています)

まずはメタデータから。入れ子のスキーマまで反映できています。

$ docker run --rm  -v $(pwd)/tmp:/tmp nathanhowell/parquet-tools meta /tmp/parquet/output-00001-of-00003.parquet
file:         file:/tmp/parquet/output-00001-of-00003.parquet
creator:      parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a)
extra:        parquet.avro.schema = {"type":"record","name":"PC","namespace":"com.yomon8.StarterPipeline$","fields":[{"name":"name","type":"string"},{"name":"cpuCoreCount","type":"int"},{"name":"memoryGB","type":"double"},{"name":"diskGB","type":"double"},{"name":"os","type":{"type":"record","name":"OS","fields":[{"name":"name","type":"string"},{"name":"version","type":"string"}]}}]}
extra:        writer.model.name = avro

file schema:  com.yomon8.StarterPipeline$.PC
--------------------------------------------------------------------------------
name:         REQUIRED BINARY O:UTF8 R:0 D:0
cpuCoreCount: REQUIRED INT32 R:0 D:0
memoryGB:     REQUIRED DOUBLE R:0 D:0
diskGB:       REQUIRED DOUBLE R:0 D:0
os:           REQUIRED F:2
.name:        REQUIRED BINARY O:UTF8 R:0 D:0
.version:     REQUIRED BINARY O:UTF8 R:0 D:0

row group 1:  RC:1 TS:386 OFFSET:4
--------------------------------------------------------------------------------
name:          BINARY SNAPPY DO:0 FPO:4 SZ:70/68/0.97 VC:1 ENC:BIT_PACKED,PLAIN
cpuCoreCount:  INT32 SNAPPY DO:0 FPO:74 SZ:51/49/0.96 VC:1 ENC:BIT_PACKED,PLAIN
memoryGB:      DOUBLE SNAPPY DO:0 FPO:125 SZ:71/69/0.97 VC:1 ENC:BIT_PACKED,PLAIN
diskGB:        DOUBLE SNAPPY DO:0 FPO:196 SZ:71/69/0.97 VC:1 ENC:BIT_PACKED,PLAIN
os:
.name:         BINARY SNAPPY DO:0 FPO:267 SZ:60/58/0.97 VC:1 ENC:BIT_PACKED,PLAIN
.version:      BINARY SNAPPY DO:0 FPO:327 SZ:75/73/0.97 VC:1 ENC:BIT_PACKED,PLAIN

次にデータの方。3つのサンプルデータの情報が入っています。

$ docker run --rm  -v $(pwd)/tmp:/tmp nathanhowell/parquet-tools cat /tmp/parquet/output-00001-of-00003.parquet
name = HomePC1
cpuCoreCount = 4
memoryGB = 16.0
diskGB = 128.0
os:
.name = macOS
.version = Catalina

$ docker run --rm  -v $(pwd)/tmp:/tmp nathanhowell/parquet-tools cat /tmp/parquet/output-00002-of-00003.parquet
name = HomePC2
cpuCoreCount = 8
memoryGB = 8.0
diskGB = 128.0
os:
.name = Windows
.version = 10

name = OfficePC
cpuCoreCount = 8
memoryGB = 16.0
diskGB = 512.0
os:
.name = Windows
.version = 10

GCSに出力してみる

GCSに出力するには、バケットを作成したプロジェクトに対して権限持つアカウントに、gcloudコマンドでログオンしておく必要があります。 application-default を入れる必要があります。何が違うか知りたいかたは、こちらが参考になります。

$ gcloud auth application-default login

以下のようにGCP用の設定を有効化して実行してみます。自身のテスト用のバケット名を入れることもお忘れ無く。

    // ローカルへの出力用
    // String outputPath = "./tmp/parquet";

    // GCSへの出力用
    String outputPath = "gs://your-bucket-name/parquet";

GCSに出力されました。

f:id:yomon8:20200710164103p:plain

AWS S3に出力してみる

最後にAWSに出力してみます。AWSは最低限Regionの指定が必要です。

このままだとDefaultのAWS Credentialが使われますが、 setAwsCredentialsProvider で設定したりも可能ですし、AWS_ACCESS_KEY等を環境変数に指定しておけば、それを読み取ってくれます。

    // ローカルへの出力用
    // String outputPath = "./tmp/parquet";

    // GCSへの出力用
    // String outputPath = "gs://your-bucket-name/parquet";

    // AWS S3への出力用(AWSは最低限Regoinの指定が必要)
    String outputPath = "s3://your-bucket-name/parquet";
    options.as(AwsOptions.class).setAwsRegion("ap-northeast-1");

S3にもParquetのアップロードができました。

f:id:yomon8:20200710165923p:plain

せっかくなので、こちらはS3 Selectで読み込んでみます。S3 selectのCLIコマンドは結構複雑なのですがinput-serializationのところでParquetを指定して読み込んでいます。

$ BUCKET_NAME=your-bucket-name
$ aws s3api select-object-content \
  --bucket=${BUCKET_NAME} \
  --key=parquet/output-00000-of-00003.parquet \
  --expression "select * from s3object s" \
  --expression-type SQL \
  --input-serialization '{"Parquet":{}}' \
  --output-serialization '{"JSON": {"RecordDelimiter": "\n"}}' \
  ./tmp.json

出力されたファイルを見てみます。ちゃんと構造化されたデータになっています。

$ cat tmp.json | jq
{
  "name": "HomePC2",
  "cpuCoreCount": 8,
  "memoryGB": 8,
  "diskGB": 128,
  "os": {
    "name": "Windows",
    "version": "10"
  }
}
{
  "name": "OfficePC",
  "cpuCoreCount": 8,
  "memoryGB": 16,
  "diskGB": 512,
  "os": {
    "name": "Windows",
    "version": "10"
  }
}