PandasのDataFrameをそのままElasticsearchに入れた方法書きます。
元データ準備
CSVのデータないかなとググったら一番最初に出てきたこちらのデータ使います。
男女別人口-全国,都道府県(大正9年~平成27年)
ダウンロードしたら、とりあえずShift_JISからUTF-8に変換しました。
中身のデータはこんな感じです。
データの前処理
簡単に前処理かけます。
# CSV読み込み df = pd.read_csv("c01.csv") # 不要な列を削除 df = df.drop('注',axis=1) df = df.drop('和暦(年)',axis=1) # 日本語を英字に直す df = df.rename(columns={ '都道府県コード':'ken_code', '都道府県名':'ken_name', '元号':'gengou', '西暦(年)':'year', '人口(総数)':'population_total', '人口(男)':'population_male', '人口(女)':'population_female', }) # nanを削除 df = df.dropna() # 数値に `-` が入っている行があるので削除 import numpy as np df = df[(df.population_total != '-') & (df.population_male != '-') & (df.population_female != '-')] df['population_total'] = df['population_total'].astype(np.int64) df['population_male'] = df['population_male'].astype(np.int64) df['population_female'] = df['population_female'].astype(np.int64) # サマリ値を除去 df = df[df.ken_name != '全国'] df = df[df.ken_name != '人口集中地区以外の地区'] df = df[df.ken_name != '人口集中地区']
この時点でデータはこんな感じです。
Elasticsearchへの接続確認
今回はこの記事で構築したOpen Distro for ElasticsearchのDocker環境にデータ投入します。
PySparkでOpen Distro for Elasticsearchにデータを投入 - YOMON8.NET
Docker環境でオレオレSSL証明書なので、SSLのチェックを verify_cert=False
で無視しています。
認証はデフォルトのadminユーザ使っています。
from elasticsearch import Elasticsearch es = Elasticsearch("https://localhost:9200",verify_certs=False,http_auth=('admin','admin')) es.info()
正常に接続できると以下のように返ってきます。SSL証明書のチェック無視しているので警告も表示されますが、このまま進められます。
....\venv\lib\site-packages\urllib3\connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings InsecureRequestWarning) {'name': 'CEgJqG1', 'cluster_name': 'docker-cluster', 'cluster_uuid': '4Bjcq8DOSjCpelDY3wWbtg', 'version': {'number': '6.5.4', 'build_flavor': 'oss', 'build_type': 'tar', 'build_hash': 'd2ef93d', 'build_date': '2018-12-17T21:17:40.758843Z', 'build_snapshot': False, 'lucene_version': '7.5.0', 'minimum_wire_compatibility_version': '5.6.0', 'minimum_index_compatibility_version': '5.0.0'}, 'tagline': 'You Know, for Search'}
本当はGithubのリポジトリのReadmeに書いてある通り、以下のようにSSLの接続設定します。
>>> from elasticsearch import Elasticsearch >>> from ssl import create_default_context >>> context = create_default_context(cafile="path/to/cafile.pem") >>> es = Elasticsearch("https://elasticsearch.url:port", ssl_context=context, http_auth=('elastic','yourpassword')) >>> es.info()
https://github.com/elastic/elasticsearch-py/tree/7.0.0#example-use
Elasticsearchへインデックスのスキーマテンプレートの定義
次はスキーマの定義します。
PUT _template/population-template { "template": "population*", "mappings": { "_doc": { "_source": { "enabled": true }, "properties": { "ken_code": { "type": "keyword", "store": true, "index": true }, "ken_name": { "type": "keyword", "store": true, "index": true }, "gengou": { "type": "keyword", "store": true, "index": true }, "year": { "type": "keyword", "store": true, "index": true }, "population_total": { "type": "long", "store": true, "index": true }, "population_male": { "type": "long", "store": true, "index": true }, "population_female": { "type": "long", "store": true, "index": true }, "created_at": { "type": "date", "format": "EEE MMM dd HH:mm:ss Z YYYY" } } } } }
Elasticsearchにデータ投入
以下のようなコードでDataFrameからElasticsearchに投入できます。
from elasticsearch import helpers # Generatorを定義 def es_doc_generator(index,df): records = [d[1] for d in df.iterrows()] docs_es = [{key: doc[key] for key in doc.keys()} for doc in records] for doc in docs_es: hashid = hash(frozenset(doc.items())) yield { "_index": index, "_id": hashid, "_type": "_doc", "_source": doc, } # bulkで投入 helpers.bulk(es,es_doc_generator("population-japan",df))
ここではわかりやすくhash関数使ってますが、コメントで以下いただいているように、データにより適切なIDを考えて使うことをオススメします。
特に時系列データの場合,日時をそのままdoc_idにしても良さそうですね.
確認
データ投入されました。
$ curl https://admin:admin@localhost:9200/_cat/indices/population-japan --insecure yellow open population-japan Jp1M9FSCQB2D5p9B3kQ2sw 5 1 939 0 176.8kb 176.8kb
Kibanaでも表示できます。
参考URL
Exporting Pandas Data to Elasticsearch - Towards Data Science