タイトルの件、作業メモ。
AWSが出してきたOpen Distro for ElasticsearchにPySpark使ってデータを投入する部分を下調べしたので残しておきます。
利用するデータ
AWSの方で公開されている、こちらのParquetファイルを利用させていただこうと思います。
$ aws s3 ls s3://athena-examples-ap-northeast-1/flight/parquet/year=2016/ 2017-05-08 13:12:32 32198164 000012_0
Open Distro for Elasticsearchの起動
docker-compose使ってサクッと起動します。
Volumeは作成していないので、docker-compose down
したらデータは消えてしまいます。
※ Volumeを保存したい場合は補足を確認してください
$ cd your/work/dir $ cat <<EOF > docker-compose.yaml && docker-compose up -d version: '3.1' services: odfe-node: image: amazon/opendistro-for-elasticsearch:1.0.0 container_name: odfe-node environment: - discovery.type=single-node - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping - "ES_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 ports: - 9200:9200 - 9600:9600 # required for Performance Analyzer networks: - odfe-net kibana: image: amazon/opendistro-for-elasticsearch-kibana:1.0.0 container_name: odfe-kibana ports: - 5601:5601 expose: - "5601" environment: ELASTICSEARCH_URL: https://odfe-node:9200 ELASTICSEARCH_HOSTS: https://odfe-node:9200 networks: - odfe-net networks: odfe-net:
起動しました。
$ docker-compose ps Name Command State Ports --------------------------------------------------------------------------------------------------------------- odfe-kibana /usr/local/bin/kibana-docker Up 0.0.0.0:5601->5601/tcp odfe-node /usr/local/bin/docker-entr ... Up 0.0.0.0:9200->9200/tcp, 9300/tcp, 0.0.0.0:9600->9600/tcp
ElasticSearchにもアクセス可能です。
$ curl https://admin:admin@localhost:9200/ --insecure { "name" : "8EHjdor", "cluster_name" : "docker-cluster", "cluster_uuid" : "QHScaqmQR-eVf_v1uj-atw", "version" : { "number" : "6.5.4", "build_flavor" : "oss", "build_type" : "tar", "build_hash" : "d2ef93d", "build_date" : "2018-12-17T21:17:40.758843Z", "build_snapshot" : false, "lucene_version" : "7.5.0", "minimum_wire_compatibility_version" : "5.6.0", "minimum_index_compatibility_version" : "5.0.0" }, "tagline" : "You Know, for Search" }
Kibanaもアクセス確認しておきます。初期ユーザ admin
パスワード admin
でログオン可能です。
http://localhost:5601/app/kibana
PySpark準備
PySparkのインストールは他にも記事沢山あるので飛ばします。
Windowsなら私もこちらに書いています。
EC2のWindows上にpyspark+JupyterでS3上のデータ扱うための開発環境を作る - YOMON8.NET
まず、今回はS3のデータ使うので、hadoop-aws
使います。
$ cd $SPARK_HOME/jars $ wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar $ wget http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar
次に elasticsearch-hadoop
の中にある、elasticsearch-spark
を $SPARK_HOME/jars/
にコピーします。
$ wget https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-6.7.0.zip $ unzip elasticsearch-hadoop-6.7.0.zip elasticsearch-hadoop-6.7.0/dist/elasticsearch-spark-20_2.11-6.7.0.jar Archive: elasticsearch-hadoop-6.7.0.zip inflating: elasticsearch-hadoop-6.7.0/dist/elasticsearch-spark-20_2.11-6.7.0.jar mv elasticsearch-hadoop-6.7.0/dist/elasticsearch-spark-20_2.11-6.7.0.jar $SPARK_HOME/jars/
ElasticSearchに投入
from pyspark.sql import SparkSession from pyspark.conf import SparkConf # 公開されているAthena用のサンプルデータ借りています flights_data_path = "s3a://athena-examples-ap-northeast-1/flight/parquet/year=2016/" #全件投入だとPCのスペックによっては重いのでテスト用にデータ件数絞ってます limit_count = 100000 spark = SparkSession.builder.appName("flight").getOrCreate() # どこかのIAM認証が必要。AnonymousAWSCredentialsProvider使えるバージョンなら大丈夫かも。 spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", 'ACCESS_KEY') spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", 'SEC_ACCESS_KEY') # S3よりデータを読み込み。limit_countの件数 df = spark.read.parquet(flights_data_path).limit(limit_count) ( df.write .format("org.elasticsearch.spark.sql") .option("es.nodes.wan.only", "true") #これ設定しないとDockerの内部ネットワークにつなぎにいってしまう .option("es.nodes", "localhost") .option("es.port","9200") .option("es.net.http.auth.user","admin") # Open Distro for Elasticsearchの初期管理ユーザ .option("es.net.http.auth.pass","admin") .option("es.net.ssl.cert.allow.self.signed","true") # オレオレ証明書を許可 .option("es.net.ssl","true") .mode("Overwrite") .save("flight/records") # index/type )
設定どおり100,000件登録されました。
$ curl "https://admin:admin@localhost:9200/_cat/indices?v&s=index" --insecure health status index uuid pri rep docs.count docs.deleted store.size pri.store.size green open .kibana_1 dkUEZGO2SJqaQkiLE0dfNw 1 0 0 0 261b 261b yellow open .kibana_92668751_admin uahrDT9vRN-J0auI99ywog 1 1 1 0 3.6kb 3.6kb green open .opendistro_security -FYfctMtTT-ena_OtA8hAw 1 0 5 0 31.9kb 31.9kb yellow open flight KwqZmxXtSsOM8GhpfvHBhQ 5 1 100000 0 59.9mb 59.9mb yellow open security-auditlog-2019.03.27 pWiZnOYDT0O16j_UMNnzzA 5 1 1 0 13.4kb 13.4kb
Kibanaで見てもちゃんと入っているように見えますが、型変換上手くいってない場合もよくあるのでしっかり確認します。
参考URL
Open Distro for Elasticsearchのホーム
Open Distro for Elasticsearch | Open Distro
Hadoop AWSの設定
hadoop/index.md at trunk · apache/hadoop · GitHub
補足
現行0.7.0の場合、DockerでVolumeにすると、2回目立ち上げ時 plugin:opendistro_security@6.5.4
Tenant indices migration failed
というエラーが発生してしまします。
ここにあるように一部機能を無効にすることで、このエラーを回避するワークアラウンドがあります。 Kibana status is Yellow - "plugin:opendistro_security@6.5.4 Tenant indices migration failed" · Issue #1 · opendistro-for-elasticsearch/security-kibana-plugin · GitHub
opendistro_security.multitenancy.enabled: false
docker-compose.yamlに直すと以下のとおりです。
version: '3' services: odfe-node1: image: amazon/opendistro-for-elasticsearch:0.7.0 container_name: odfe-node environment: - discovery.type=single-node - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping - "ES_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM ulimits: memlock: soft: -1 hard: -1 volumes: - odfe-data1:/usr/share/elasticsearch/data ports: - 9200:9200 - 9600:9600 # required for Performance Analyzer networks: - odfe-net kibana: image: amazon/opendistro-for-elasticsearch-kibana:0.7.0 container_name: odfe-kibana ports: - 5601:5601 expose: - "5601" environment: ELASTICSEARCH_URL: https://odfe-node1:9200 OPENDISTRO_SECURITY_MULTITENANCY_ENABLED: "false" networks: - odfe-net volumes: odfe-data1: networks: odfe-net:
elasticsearch-hadoopのドキュメント
Elasticsearch for Apache Hadoop [master] | Elastic
特にConfigurationの項目は確認必要です。
Configuration | Elasticsearch for Apache Hadoop [master] | Elastic