GCPのCloud Dataflowでも使われている、Apache BeamでJavaの内部で持っているデータをParquetに出力するやり方です。
サンプルコードの構成
以下のリポジトリに今回書いているコードを置いておきました。
こちらで補足書いておきます。
元にしたMaven ArcheType
こちらのMaven ArcheType元に作成しています。バージョンは現在の最新の2.22.0を利用しています。
利用するPOJO
以下の構造のデータをParquetに変換してみます。
static class PC { String name; int cpuCoreCount; double memoryGB; double diskGB; OS os; PC(String name, int cpuCoreCount, double memoryGB, double diskGB, OS os) { this.name = name; this.cpuCoreCount = cpuCoreCount; this.memoryGB = memoryGB; this.diskGB = diskGB; this.os = os; } } static class OS { String name; String version; OS(String name, String version) { this.name = name; this.version = version; } }
コード内では以下のように初期化しています。
List<PC> pcList = Arrays.asList( new PC("HomePC1", 4, 16.0, 128.0, new OS("macOS", "Catalina")), new PC("HomePC2", 8, 8.0, 128.0, new OS("Windows", "10")), new PC("OfficePC", 8, 16.0, 512.0, new OS("Windows", "10")));
GenericRecordへの変換
こちらのJavaDocにも以下の記載があるように、POJOを GenericRecord
型に変換してからParquet出力しています。
pipeline .apply(...) // PCollection<GenericRecord> .apply(FileIO .<GenericRecord>write() .via(ParquetIO.sink(SCHEMA) .withCompressionCodec(CompressionCodecName.SNAPPY)) .to("destination/path") .withSuffix(".parquet"));
サンプル内では以下のように書いています。
private static List<GenericRecord> generateGenericRecords() { // Create Sample Data List<PC> pcList = Arrays.asList(new PC("HomePC1", 4, 16.0, 128.0, new OS("macOS", "Catalina")), new PC("HomePC2", 8, 8.0, 128.0, new OS("Windows", "10")), new PC("OfficePC", 8, 16.0, 512.0, new OS("Windows", "10"))); // Convert from POJOs to GenericRecords Schema pcSchema = ReflectData.get().getSchema(PC.class); Schema osSchema = ReflectData.get().getSchema(OS.class); ArrayList<GenericRecord> list = new ArrayList<>(); pcList.forEach(p -> { Record osRecord = new Record(osSchema); osRecord.put("name", p.os.name); osRecord.put("version", p.os.version); Record pcRecord = new Record(pcSchema); pcRecord.put("name", p.name); pcRecord.put("cpuCoreCount", p.cpuCoreCount); pcRecord.put("memoryGB", p.memoryGB); pcRecord.put("diskGB", p.diskGB); pcRecord.put("os", osRecord); list.add(pcRecord); }); return list; }
もっと汎用的にやりたい場合の方法などは以下でも議論されいます。
出力先の切り替え
ローカルに出すか、S3かGCSかは渡したスキーマの情報からBeam側で判断してくれます。
サンプルコード上は以下を切り替えてください。
S3への出力の場合だけ、region
の指定が必要になります。
// ローカルへの出力用 String outputPath = "./tmp/parquet"; // GCSへの出力用 // String outputPath = "gs://your-bucket-name/parquet"; // AWS S3への出力用(AWSは最低限Regoinの指定が必要) // String outputPath = "s3://your-bucket-name/parquet"; // options.as(AwsOptions.class).setAwsRegion("ap-northeast-1");
ローカルに出力してみる
まずはローカルに出力してみます。そのまま実行すると以下のようにカレントディレクトリ配下にtmp/parquet
というフォルダが作成され、ファイルが出力されると思います。
parquet-toolから見てみます。(parquet-toolインストールが面倒なのでDockerHubにあったDockerイメージ使っています)
まずはメタデータから。入れ子のスキーマまで反映できています。
$ docker run --rm -v $(pwd)/tmp:/tmp nathanhowell/parquet-tools meta /tmp/parquet/output-00001-of-00003.parquet file: file:/tmp/parquet/output-00001-of-00003.parquet creator: parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a) extra: parquet.avro.schema = {"type":"record","name":"PC","namespace":"com.yomon8.StarterPipeline$","fields":[{"name":"name","type":"string"},{"name":"cpuCoreCount","type":"int"},{"name":"memoryGB","type":"double"},{"name":"diskGB","type":"double"},{"name":"os","type":{"type":"record","name":"OS","fields":[{"name":"name","type":"string"},{"name":"version","type":"string"}]}}]} extra: writer.model.name = avro file schema: com.yomon8.StarterPipeline$.PC -------------------------------------------------------------------------------- name: REQUIRED BINARY O:UTF8 R:0 D:0 cpuCoreCount: REQUIRED INT32 R:0 D:0 memoryGB: REQUIRED DOUBLE R:0 D:0 diskGB: REQUIRED DOUBLE R:0 D:0 os: REQUIRED F:2 .name: REQUIRED BINARY O:UTF8 R:0 D:0 .version: REQUIRED BINARY O:UTF8 R:0 D:0 row group 1: RC:1 TS:386 OFFSET:4 -------------------------------------------------------------------------------- name: BINARY SNAPPY DO:0 FPO:4 SZ:70/68/0.97 VC:1 ENC:BIT_PACKED,PLAIN cpuCoreCount: INT32 SNAPPY DO:0 FPO:74 SZ:51/49/0.96 VC:1 ENC:BIT_PACKED,PLAIN memoryGB: DOUBLE SNAPPY DO:0 FPO:125 SZ:71/69/0.97 VC:1 ENC:BIT_PACKED,PLAIN diskGB: DOUBLE SNAPPY DO:0 FPO:196 SZ:71/69/0.97 VC:1 ENC:BIT_PACKED,PLAIN os: .name: BINARY SNAPPY DO:0 FPO:267 SZ:60/58/0.97 VC:1 ENC:BIT_PACKED,PLAIN .version: BINARY SNAPPY DO:0 FPO:327 SZ:75/73/0.97 VC:1 ENC:BIT_PACKED,PLAIN
次にデータの方。3つのサンプルデータの情報が入っています。
$ docker run --rm -v $(pwd)/tmp:/tmp nathanhowell/parquet-tools cat /tmp/parquet/output-00001-of-00003.parquet name = HomePC1 cpuCoreCount = 4 memoryGB = 16.0 diskGB = 128.0 os: .name = macOS .version = Catalina $ docker run --rm -v $(pwd)/tmp:/tmp nathanhowell/parquet-tools cat /tmp/parquet/output-00002-of-00003.parquet name = HomePC2 cpuCoreCount = 8 memoryGB = 8.0 diskGB = 128.0 os: .name = Windows .version = 10 name = OfficePC cpuCoreCount = 8 memoryGB = 16.0 diskGB = 512.0 os: .name = Windows .version = 10
GCSに出力してみる
GCSに出力するには、バケットを作成したプロジェクトに対して権限持つアカウントに、gcloudコマンドでログオンしておく必要があります。 application-default
を入れる必要があります。何が違うか知りたいかたは、こちらが参考になります。
$ gcloud auth application-default login
以下のようにGCP用の設定を有効化して実行してみます。自身のテスト用のバケット名を入れることもお忘れ無く。
// ローカルへの出力用 // String outputPath = "./tmp/parquet"; // GCSへの出力用 String outputPath = "gs://your-bucket-name/parquet";
GCSに出力されました。
AWS S3に出力してみる
最後にAWSに出力してみます。AWSは最低限Regionの指定が必要です。
このままだとDefaultのAWS Credentialが使われますが、 setAwsCredentialsProvider
で設定したりも可能ですし、AWS_ACCESS_KEY等を環境変数に指定しておけば、それを読み取ってくれます。
// ローカルへの出力用 // String outputPath = "./tmp/parquet"; // GCSへの出力用 // String outputPath = "gs://your-bucket-name/parquet"; // AWS S3への出力用(AWSは最低限Regoinの指定が必要) String outputPath = "s3://your-bucket-name/parquet"; options.as(AwsOptions.class).setAwsRegion("ap-northeast-1");
S3にもParquetのアップロードができました。
せっかくなので、こちらはS3 Selectで読み込んでみます。S3 selectのCLIコマンドは結構複雑なのですがinput-serializationのところでParquetを指定して読み込んでいます。
$ BUCKET_NAME=your-bucket-name $ aws s3api select-object-content \ --bucket=${BUCKET_NAME} \ --key=parquet/output-00000-of-00003.parquet \ --expression "select * from s3object s" \ --expression-type SQL \ --input-serialization '{"Parquet":{}}' \ --output-serialization '{"JSON": {"RecordDelimiter": "\n"}}' \ ./tmp.json
出力されたファイルを見てみます。ちゃんと構造化されたデータになっています。
$ cat tmp.json | jq { "name": "HomePC2", "cpuCoreCount": 8, "memoryGB": 8, "diskGB": 128, "os": { "name": "Windows", "version": "10" } } { "name": "OfficePC", "cpuCoreCount": 8, "memoryGB": 16, "diskGB": 512, "os": { "name": "Windows", "version": "10" } }