Pythonスクリプト内からCuratorを使ってElasticsearchのIndexを操作する

Curatorは通常、YAML形式の定義ファイルを使ってコマンドラインから操作することが多いと思いますが、PythonからAPIを経由で操作することも可能です。

基本的な操作のざっくりコードですが例を書きます。

準備

まずは準備をしておきます。

Elasticsearchクライアントの生成

elasticsearch-pyでElasticsearchに接続するクライアントを作成しておきます。

from elasticsearch import Elasticsearch
client = Elasticsearch("https://localhost:9200",verify_certs=False,http_auth=('admin','admin'))

CuratorのImportと変数定義

curatorをインポートして、この記事のコードで使う変数定義しておきます。

import curator
my_index_base = 'yomon8'
my_alias = my_index_base

インデックス作成

まずはIndexの作成方法です。こんなIndexを作成してみます。

Index名: yomon8-YYYYmmddHHMMSS

例: yomon8-20190416214427

スキーマは以下のように設定したいです。

{
  "mappings": {
    "_doc": {
      "_source": {
        "enabled": true
      },
      "properties": {
        "field1": {
          "type": "keyword",
          "store": true, 
          "index": true
        },
        "field2": {
          "type": "integer",
          "store": true, 
          "index": true
        },
        "field3": {
          "type": "float",
          "store": true,
          "index": true
        }
      }
    }
  }
}

Curatorを使うと以下のようになります。 深いdictを書くのが少し面倒です。

from datetime import datetime
index_name = '{}-{}'.format(
    my_index_base,
    datetime.now().strftime('%Y%m%d%H%M%S'))

settings = {}
settings['number_of_shards'] = 2
settings['number_of_replicas'] = 3
properties = {}
properties['field1'] = {'type':'keyword','store':True}
properties['field2'] = {'type':'integer','store':True}
properties['field3'] = {'type':'float','store':True}
mappings = {}
mappings['_doc'] = {'properties':properties}

extra_settings = {
    'settings': settings,
    'mappings': mappings,
}

ci = curator.CreateIndex(client,index_name,extra_settings=extra_settings)
ci.do_action()

Indexの参照の取得

Indexの参照の取得は、curator.IndexList(client) で行います。 取得した変数はfilter関数でどんどんフィルターします。

フィルタリングされたインデックスは変数から排除されていくので、内容が破壊的に内容が変わっていきます。

all_my_indecis = curator.IndexList(client)
# yomon8で始まるインデックスをフィルター
all_my_indecis.filter_by_regex(kind='prefix',value=my_index_base)

途中でフィルタの状況を見たい場合は以下のように indices プロパティを見るか、 working_list() の関数を利用します。

all_my_indecis.indices
all_my_indecis.working_list()

色々なIndexのフィルタリング

次のAliasのローテションをしたいのですが、その前の準備をします。

Aliasが設定されているIndexを取得するフィルタリング。

alias = curator.IndexList(client)
alias.filter_by_regex(kind='prefix',value=my_index_base)
if alias.indices:
    alias.filter_by_alias(aliases=my_alias)

Aliasが入ってない最新のインデックスを一つ取得するフィルタリング。

non_alias = curator.IndexList(client)
non_alias.filter_by_regex(kind='prefix',value=my_index_base)
if non_alias.indices:
    # aliasが設定されていないインデックスを取得
    non_alias.filter_by_alias(aliases=my_alias,exclude=True) 
if non_alias.indices and len(non_alias.indices) > 1:
    # 名前が英数でソートされるので、timestringを埋め込んだインデックス名の場合は最新のもの以外をリストから削除
    non_alias.filter_by_count(count=len(non_alias.indices)-1,reverse=False)  

AliasとIndexの紐付けローテーション

上記で取得したAliasとインデックスの情報から、ローテーションをかけてみます。

これで最新のインデックス一つにAliasがついている状態になります。

def rotate_alias(client,alias,old_index,new_index):
    ac = curator.Alias(client=client,name=alias)
    ac.add(new_index)
    ac.remove(old_index)
    ac.do_action()

def set_alias(client,alias,new_index):
    ac = curator.Alias(client=client,name=alias)
    ac.add(new_index)
    ac.do_action()

if alias.indices:
    rotate_alias(client,my_alias,alias,non_alias)
else:
    set_alias(client,my_alias,non_alias)

インデックスの削除

最後に削除の方法を書いておきます。

def delete_indices(index_list):
    if index_list.indices and len(index_list.indices) > 0:
        delete_indeces = curator.DeleteIndices(ilo=index_list)
        delete_indeces.do_action()
    else:
        print('target not found')


delete_indices(all_my_indecis)

参考URL

レポジトリ

github.com

マニュアル

https://curator.readthedocs.io/en/latest/index.html