PySparkでOpen Distro for Elasticsearchにデータを投入

タイトルの件、作業メモ。

AWSが出してきたOpen Distro for ElasticsearchにPySpark使ってデータを投入する部分を下調べしたので残しておきます。

利用するデータ

AWSの方で公開されている、こちらのParquetファイルを利用させていただこうと思います。

$ aws s3 ls s3://athena-examples-ap-northeast-1/flight/parquet/year=2016/
2017-05-08 13:12:32   32198164 000012_0

f:id:yomon8:20190327151047p:plain

Open Distro for Elasticsearchの起動

docker-compose使ってサクッと起動します。 Volumeは作成していないので、docker-compose down したらデータは消えてしまいます。

※ Volumeを保存したい場合は補足を確認してください

$ cd your/work/dir
$ cat <<EOF > docker-compose.yaml && docker-compose up -d
version: '3.1'
services:
  odfe-node:
    image: amazon/opendistro-for-elasticsearch:1.0.0
    container_name: odfe-node
    environment:
      - discovery.type=single-node
      - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    ports:
      - 9200:9200
      - 9600:9600 # required for Performance Analyzer
    networks:
      - odfe-net
  kibana:
    image: amazon/opendistro-for-elasticsearch-kibana:1.0.0
    container_name: odfe-kibana
    ports:
      - 5601:5601
    expose:
      - "5601"
    environment:
      ELASTICSEARCH_URL: https://odfe-node:9200
      ELASTICSEARCH_HOSTS: https://odfe-node:9200
    networks:
      - odfe-net
 
networks:
  odfe-net:

起動しました。

$ docker-compose ps
   Name                  Command               State                            Ports
---------------------------------------------------------------------------------------------------------------
odfe-kibana   /usr/local/bin/kibana-docker     Up      0.0.0.0:5601->5601/tcp
odfe-node     /usr/local/bin/docker-entr ...   Up      0.0.0.0:9200->9200/tcp, 9300/tcp, 0.0.0.0:9600->9600/tcp

ElasticSearchにもアクセス可能です。

$ curl https://admin:admin@localhost:9200/ --insecure
{
  "name" : "8EHjdor",
  "cluster_name" : "docker-cluster",
  "cluster_uuid" : "QHScaqmQR-eVf_v1uj-atw",
  "version" : {
    "number" : "6.5.4",
    "build_flavor" : "oss",
    "build_type" : "tar",
    "build_hash" : "d2ef93d",
    "build_date" : "2018-12-17T21:17:40.758843Z",
    "build_snapshot" : false,
    "lucene_version" : "7.5.0",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

Kibanaもアクセス確認しておきます。初期ユーザ admin パスワード admin でログオン可能です。

http://localhost:5601/app/kibana

f:id:yomon8:20190327152817p:plain

PySpark準備

PySparkのインストールは他にも記事沢山あるので飛ばします。

Windowsなら私もこちらに書いています。

EC2のWindows上にpyspark+JupyterでS3上のデータ扱うための開発環境を作る - YOMON8.NET

まず、今回はS3のデータ使うので、hadoop-aws 使います。

$ cd $SPARK_HOME/jars
$ wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar
$ wget http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar

次に elasticsearch-hadoop の中にある、elasticsearch-spark$SPARK_HOME/jars/ にコピーします。

$ wget https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-6.7.0.zip
$ unzip elasticsearch-hadoop-6.7.0.zip  elasticsearch-hadoop-6.7.0/dist/elasticsearch-spark-20_2.11-6.7.0.jar
Archive:  elasticsearch-hadoop-6.7.0.zip
  inflating: elasticsearch-hadoop-6.7.0/dist/elasticsearch-spark-20_2.11-6.7.0.jar
mv elasticsearch-hadoop-6.7.0/dist/elasticsearch-spark-20_2.11-6.7.0.jar $SPARK_HOME/jars/

ElasticSearchに投入

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# 公開されているAthena用のサンプルデータ借りています
flights_data_path = "s3a://athena-examples-ap-northeast-1/flight/parquet/year=2016/"

#全件投入だとPCのスペックによっては重いのでテスト用にデータ件数絞ってます
limit_count = 100000

spark = SparkSession.builder.appName("flight").getOrCreate()

# どこかのIAM認証が必要。AnonymousAWSCredentialsProvider使えるバージョンなら大丈夫かも。
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", 'ACCESS_KEY')
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", 'SEC_ACCESS_KEY')

# S3よりデータを読み込み。limit_countの件数
df = spark.read.parquet(flights_data_path).limit(limit_count)

(
  df.write
    .format("org.elasticsearch.spark.sql")
    .option("es.nodes.wan.only", "true") #これ設定しないとDockerの内部ネットワークにつなぎにいってしまう
    .option("es.nodes", "localhost")
    .option("es.port","9200")
    .option("es.net.http.auth.user","admin") # Open Distro for Elasticsearchの初期管理ユーザ
    .option("es.net.http.auth.pass","admin")
    .option("es.net.ssl.cert.allow.self.signed","true") # オレオレ証明書を許可
    .option("es.net.ssl","true")
    .mode("Overwrite")
    .save("flight/records") # index/type
)

設定どおり100,000件登録されました。

$ curl "https://admin:admin@localhost:9200/_cat/indices?v&s=index" --insecure
health status index                        uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .kibana_1                    dkUEZGO2SJqaQkiLE0dfNw   1   0          0            0       261b           261b
yellow open   .kibana_92668751_admin       uahrDT9vRN-J0auI99ywog   1   1          1            0      3.6kb          3.6kb
green  open   .opendistro_security         -FYfctMtTT-ena_OtA8hAw   1   0          5            0     31.9kb         31.9kb
yellow open   flight                       KwqZmxXtSsOM8GhpfvHBhQ   5   1     100000            0     59.9mb         59.9mb
yellow open   security-auditlog-2019.03.27 pWiZnOYDT0O16j_UMNnzzA   5   1          1            0     13.4kb         13.4kb

Kibanaで見てもちゃんと入っているように見えますが、型変換上手くいってない場合もよくあるのでしっかり確認します。

f:id:yomon8:20190327182821p:plain

参考URL

Open Distro for Elasticsearchのホーム

Open Distro for Elasticsearch | Open Distro

Hadoop AWSの設定

hadoop/index.md at trunk · apache/hadoop · GitHub

補足

現行0.7.0の場合、DockerでVolumeにすると、2回目立ち上げ時 plugin:opendistro_security@6.5.4 Tenant indices migration failed というエラーが発生してしまします。

ここにあるように一部機能を無効にすることで、このエラーを回避するワークアラウンドがあります。 Kibana status is Yellow - "plugin:opendistro_security@6.5.4 Tenant indices migration failed" · Issue #1 · opendistro-for-elasticsearch/security-kibana-plugin · GitHub

opendistro_security.multitenancy.enabled: false

docker-compose.yamlに直すと以下のとおりです。

version: '3'
services:
  odfe-node1:
    image: amazon/opendistro-for-elasticsearch:0.7.0
    container_name: odfe-node
    environment:
      - discovery.type=single-node
      - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - odfe-data1:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
      - 9600:9600 # required for Performance Analyzer
    networks:
      - odfe-net
  kibana:
    image: amazon/opendistro-for-elasticsearch-kibana:0.7.0
    container_name: odfe-kibana
    ports:
      - 5601:5601
    expose:
      - "5601"
    environment:
      ELASTICSEARCH_URL: https://odfe-node1:9200
      OPENDISTRO_SECURITY_MULTITENANCY_ENABLED: "false"
    networks:
      - odfe-net
 
volumes:
  odfe-data1:
 
networks:
  odfe-net:

elasticsearch-hadoopのドキュメント

Elasticsearch for Apache Hadoop [master] | Elastic

特にConfigurationの項目は確認必要です。

Configuration | Elasticsearch for Apache Hadoop [master] | Elastic

GitHub - elastic/elasticsearch-hadoop: Elasticsearch real-time search and analytics natively integrated with Hadoop