Google Cloud DataflowでNative Library(*.so)が必要なjarを利用する方法

タイトルの通り、*.soなどの形式で提供されるNative Libraryを必要とするjarファイルをDataflowで使おうとして、ハマったので記録しておきます。

問題となっていたエラー

まず出たエラーはこちら。このエラーの時点では「あー、ローカルでは動いたのに、Workerでは見つからないか・・」という感じでした。

java.lang.UnsatisfiedLinkError: no mylibrary in java.library.path: [/usr/java/packages/lib, /usr/lib64, /lib64, /lib, /usr/lib]

それではと、次にDataflowのStaging場所である、 gs://my-bucket/staging にライブラリを配置してみました。

DataflowのWorkerからは /var/opt/google/dataflow/mylibrary .so として配置されるように見えるはずです。ここで以下のエラーが発生しました。

failed to map segment from shared object

調査

色々方法をためしたのですが、動かない。ソース読んで、Googleと格闘して調査し、やっとのことで以下の内容を見つけました。

stackoverflow.com

Then inside the DoFn.setup() method download the archive and unpack into /usr/lib. It's little messier then I'd like but it works.

確かにSetupでダウンロードして /usr/lib に置けばさっきのエラーは回避できそう。

SetupなどのDoFnのライフサイクルは以下の図の「DoFn Lifecycle」で確認できます。

f:id:yomon8:20200715183505p:plain

引用元 : https://beam.apache.org/documentation/programming-guide/

しかも、Apache BeamのリポジトリにExampleもあると書いてあったので、以下を読み始めました。

beam/examples/java/src/main/java/org/apache/beam/examples/subprocess at v2.22.0 · apache/beam · GitHub

サンプルコード

上記のExampleから、GCSから /usr/lib にダウンロードする部分だけ抜き出して以下のように実装したら動くようになりました。

import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.transforms.DoFn;

/** 省略 */
    @SuppressWarnings("serial")
    public static class SomethingDoFn extends DoFn<String, String> {
        public ResourceId getFileResourceId(String directory, String fileName) {
            ResourceId resourceID = FileSystems.matchNewResource(directory, true);
            return resourceID.getCurrentDirectory().resolve(fileName, StandardResolveOptions.RESOLVE_FILE);
        }

        public void copyFile(ResourceId sourceFile, ResourceId destinationFile) throws IOException {

            try (WritableByteChannel writeChannel = FileSystems.create(destinationFile, "text/plain")) {
                try (ReadableByteChannel readChannel = FileSystems.open(sourceFile)) {

                    final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);
                    while (readChannel.read(buffer) != -1) {
                        buffer.flip();
                        writeChannel.write(buffer);
                        buffer.compact();
                    }
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        writeChannel.write(buffer);
                    }
                }
            }
        }

        @Setup
        public void setUp() throws Exception {
            // 静的に設定していますが、通常はパラメータ等で指定
            String sourcePath = "gs://mybucket/staging";
            String workerPath = "/usr/lib";
            String binaryFileName = "mylibrary.so";

            // WorkerにGCSからsoファイルをダウンロードします
            Path path = Paths.get(workerPath);
            if (!path.toFile().exists()) {
                Files.createDirectories(path);
            }
            copyFile(getFileResourceId(sourcePath, binaryFileName), getFileResourceId(workerPath, binaryFileName));
        }

        @ProcessElement
        public void processElement(@Element String in, OutputReceiver<String> out) throws Exception {
            // このsomethingConvert()の中でsoファイルを読み込む
            out.output(somethingConvert(in));
        }
/** 省略 */