タイトルの通り、*.soなどの形式で提供されるNative Libraryを必要とするjarファイルをDataflowで使おうとして、ハマったので記録しておきます。
問題となっていたエラー
まず出たエラーはこちら。このエラーの時点では「あー、ローカルでは動いたのに、Workerでは見つからないか・・」という感じでした。
java.lang.UnsatisfiedLinkError: no mylibrary in java.library.path: [/usr/java/packages/lib, /usr/lib64, /lib64, /lib, /usr/lib]
それではと、次にDataflowのStaging場所である、 gs://my-bucket/staging
にライブラリを配置してみました。
DataflowのWorkerからは /var/opt/google/dataflow/mylibrary .so
として配置されるように見えるはずです。ここで以下のエラーが発生しました。
failed to map segment from shared object
調査
色々方法をためしたのですが、動かない。ソース読んで、Googleと格闘して調査し、やっとのことで以下の内容を見つけました。
Then inside the DoFn.setup() method download the archive and unpack into /usr/lib. It's little messier then I'd like but it works.
確かにSetupでダウンロードして /usr/lib
に置けばさっきのエラーは回避できそう。
SetupなどのDoFnのライフサイクルは以下の図の「DoFn Lifecycle」で確認できます。
引用元 : https://beam.apache.org/documentation/programming-guide/
しかも、Apache BeamのリポジトリにExampleもあると書いてあったので、以下を読み始めました。
サンプルコード
上記のExampleから、GCSから /usr/lib
にダウンロードする部分だけ抜き出して以下のように実装したら動くようになりました。
import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.transforms.DoFn; /** 省略 */ @SuppressWarnings("serial") public static class SomethingDoFn extends DoFn<String, String> { public ResourceId getFileResourceId(String directory, String fileName) { ResourceId resourceID = FileSystems.matchNewResource(directory, true); return resourceID.getCurrentDirectory().resolve(fileName, StandardResolveOptions.RESOLVE_FILE); } public void copyFile(ResourceId sourceFile, ResourceId destinationFile) throws IOException { try (WritableByteChannel writeChannel = FileSystems.create(destinationFile, "text/plain")) { try (ReadableByteChannel readChannel = FileSystems.open(sourceFile)) { final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024); while (readChannel.read(buffer) != -1) { buffer.flip(); writeChannel.write(buffer); buffer.compact(); } buffer.flip(); while (buffer.hasRemaining()) { writeChannel.write(buffer); } } } } @Setup public void setUp() throws Exception { // 静的に設定していますが、通常はパラメータ等で指定 String sourcePath = "gs://mybucket/staging"; String workerPath = "/usr/lib"; String binaryFileName = "mylibrary.so"; // WorkerにGCSからsoファイルをダウンロードします Path path = Paths.get(workerPath); if (!path.toFile().exists()) { Files.createDirectories(path); } copyFile(getFileResourceId(sourcePath, binaryFileName), getFileResourceId(workerPath, binaryFileName)); } @ProcessElement public void processElement(@Element String in, OutputReceiver<String> out) throws Exception { // このsomethingConvert()の中でsoファイルを読み込む out.output(somethingConvert(in)); } /** 省略 */