Elasticsearch でバルクインサート

Elasticsearch の Python クライアントを使って、 CloudWatch からダウンロードしたメトリクスの データポイントを 1 件ずつ登録していたけど、さすがに遅い。 遅すぎる。

我慢して使えるレベルではなかったので、 Elasticsearch のバルクインサートを使って高速化を試みた。

# -*- coding: utf-8 -*-
import os
import sys
import json
from elasticsearch import Elasticsearch, helpers

ELASTICSEARCH_URL = "localhost:9200"
METRICS_ROOT_DIR = "/var/log/perform/my-app-name"
INSTANCES = [
    "rds",
]
METRICS = [
    "CPUUtilization",
    "DatabaseConnections",
    "DiskQueueDepth",
    "FreeableMemory",
    "FreeStorageSpace",
    "ReadIOPS",
    "WriteIOPS",
    "ReadLatency",
    "WriteLatency",
    "NetworkReceiveThroughput",
    "NetworkTransmitThroughput",
]


es = Elasticsearch(ELASTICSEARCH_URL)


def post_instance_metrics(dir_path, instance):
    for metric in METRICS:
        file_path = os.path.join(dir_path, metric + ".json")
        with open(file_path) as f:
            data = json.load(f)

            # バルクインサートするデータを作成
            actions = []
            for datapoint in data["Datapoints"]:
                actions.append({
                  "_index": instance,
                  "_type": metric,
                  "_id": datapoint["Timestamp"],
                  "_source": datapoint
                })

            # バルクインサート!!!!!
            helpers.bulk(es, actions)


for instance in INSTANCES:
    metrics_dir = os.path.join(METRICS_ROOT_DIR, instance)
    post_instance_metrics(metrics_dir, instance)

バルクインサートに変えることで、 30 分くらいかかっていたのが 1 分くらいに短縮した。 30 倍速い。 赤い彗星もビックリだ。