PandasのDataFrameを使ってElasticsearchにデータを投入

PandasのDataFrameをそのままElasticsearchに入れた方法書きます。

元データ準備

CSVのデータないかなとググったら一番最初に出てきたこちらのデータ使います。

男女別人口-全国,都道府県(大正9年~平成27年)

www.e-stat.go.jp

f:id:yomon8:20190412173201p:plain

ダウンロードしたら、とりあえずShift_JISからUTF-8に変換しました。

中身のデータはこんな感じです。

f:id:yomon8:20190412173425p:plain

データの前処理

簡単に前処理かけます。

# CSV読み込み
df = pd.read_csv("c01.csv")

# 不要な列を削除
df = df.drop('注',axis=1)
df = df.drop('和暦(年)',axis=1)


# 日本語を英字に直す
df = df.rename(columns={
    '都道府県コード':'ken_code',
    '都道府県名':'ken_name',
    '元号':'gengou',
    '西暦(年)':'year',
    '人口(総数)':'population_total',
    '人口(男)':'population_male',
    '人口(女)':'population_female',
})

# nanを削除
df = df.dropna()

# 数値に `-` が入っている行があるので削除
import numpy as np
df = df[(df.population_total != '-') & (df.population_male != '-') & (df.population_female != '-')]
df['population_total'] = df['population_total'].astype(np.int64)
df['population_male'] = df['population_male'].astype(np.int64)
df['population_female'] = df['population_female'].astype(np.int64)

# サマリ値を除去
df = df[df.ken_name != '全国']
df = df[df.ken_name != '人口集中地区以外の地区']
df = df[df.ken_name != '人口集中地区']

この時点でデータはこんな感じです。

f:id:yomon8:20190412174533p:plain

Elasticsearchへの接続確認

今回はこの記事で構築したOpen Distro for ElasticsearchのDocker環境にデータ投入します。

PySparkでOpen Distro for Elasticsearchにデータを投入 - YOMON8.NET

Docker環境でオレオレSSL証明書なので、SSLのチェックを verify_cert=False で無視しています。

認証はデフォルトのadminユーザ使っています。

from elasticsearch import Elasticsearch
es = Elasticsearch("https://localhost:9200",verify_certs=False,http_auth=('admin','admin'))
es.info()

正常に接続できると以下のように返ってきます。SSL証明書のチェック無視しているので警告も表示されますが、このまま進められます。

....\venv\lib\site-packages\urllib3\connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
  InsecureRequestWarning)
{'name': 'CEgJqG1',
 'cluster_name': 'docker-cluster',
 'cluster_uuid': '4Bjcq8DOSjCpelDY3wWbtg',
 'version': {'number': '6.5.4',
  'build_flavor': 'oss',
  'build_type': 'tar',
  'build_hash': 'd2ef93d',
  'build_date': '2018-12-17T21:17:40.758843Z',
  'build_snapshot': False,
  'lucene_version': '7.5.0',
  'minimum_wire_compatibility_version': '5.6.0',
  'minimum_index_compatibility_version': '5.0.0'},
 'tagline': 'You Know, for Search'}

本当はGithubのリポジトリのReadmeに書いてある通り、以下のようにSSLの接続設定します。

>>> from elasticsearch import Elasticsearch
>>> from ssl import create_default_context

>>> context = create_default_context(cafile="path/to/cafile.pem")
>>> es = Elasticsearch("https://elasticsearch.url:port", ssl_context=context, http_auth=('elastic','yourpassword'))
>>> es.info()

https://github.com/elastic/elasticsearch-py/tree/7.0.0#example-use

Elasticsearchへインデックスのスキーマテンプレートの定義

次はスキーマの定義します。

PUT _template/population-template
{
  "template": "population*",
  "mappings": {
    "_doc": {
      "_source": {
        "enabled": true
      },
      "properties": {
        "ken_code": {
          "type": "keyword",
          "store": true, 
          "index": true
        },
        "ken_name": {
          "type": "keyword",
          "store": true, 
          "index": true
        },
        "gengou": {
          "type": "keyword",
          "store": true,
          "index": true
        },
        "year": {
          "type": "keyword",
          "store": true,
          "index": true
        },
        "population_total": {
          "type": "long",
          "store": true,
          "index": true
        },
        "population_male": {
          "type": "long",
          "store": true,
          "index": true
        },
        "population_female": {
          "type": "long",
          "store": true,
          "index": true
        },
        "created_at": {
          "type": "date",
          "format": "EEE MMM dd HH:mm:ss Z YYYY"
        }
      }
    }
  }
}

Elasticsearchにデータ投入

以下のようなコードでDataFrameからElasticsearchに投入できます。

from elasticsearch import helpers

# Generatorを定義
def es_doc_generator(index,df):
    records =  [d[1] for d in df.iterrows()]
    docs_es = [{key: doc[key] for key in doc.keys()} for doc in records]
    for doc in docs_es:
        hashid = hash(frozenset(doc.items()))
        yield {
            "_index": index,
            "_id": hashid,
            "_type": "_doc",
            "_source": doc,
        }

# bulkで投入
helpers.bulk(es,es_doc_generator("population-japan",df))

ここではわかりやすくhash関数使ってますが、コメントで以下いただいているように、データにより適切なIDを考えて使うことをオススメします。

特に時系列データの場合,日時をそのままdoc_idにしても良さそうですね.

確認

データ投入されました。

$ curl https://admin:admin@localhost:9200/_cat/indices/population-japan --insecure
yellow open population-japan Jp1M9FSCQB2D5p9B3kQ2sw 5 1 939 0 176.8kb 176.8kb

Kibanaでも表示できます。

f:id:yomon8:20190412191359p:plain

参考URL

Exporting Pandas Data to Elasticsearch - Towards Data Science