使用 Logstash 迁移 Elasticsearch 数据记录

· devopsnote's blog


_tagsparsefailure 问题 #

Logstash 在处理 Elasticsearch 索引数据迁移时,如果遇到字段类型不匹配或者解析错误的情况,可能会给该字段添加一个 _tagsparsefailure 标签来标记这些有问题的文档。

如果想保持和之前的行为一致,可以在 Logstash 配置文件中添加如下配置:

event_api.tags.illegal: warn

设置为 warn,允许对保留tags字段分配非法值。默认是使用的 rename

迁移索引别名时候,迁移 is_write_index 之类的配置 #

POST /_aliases
{
  "actions": [
    {
      "add": {
        "index": "topic-2024.01.01",
        "alias": "aw.topic",
        "is_write_index": true
      }
    }
  ]
}

可以用 python 脚本批量迁移

import requests
import pprint
from requests.auth import HTTPBasicAuth

# 配置信息,按照实际环境调整
config = {
    # 源集群host
    'old_cluster_host': 'xxx:9200',
    # 源集群用户名
    'old_cluster_user': 'elastic',
    # 源集群密码
    'old_cluster_password': '',
    # 源集群http协议,可选 http/https
    'old_cluster_protocol': 'http',
    'new_cluster_host': 'xxxxx:9200',
    # 目标集群用户名
    'new_cluster_user': 'elastic',
    # 目标集群密码
    'new_cluster_password': '',
    # 目标集群http协议,可选 http/https
    'new_cluster_protocol': 'http',
}

# 通用的 HTTP 请求函数
def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
    url = f"{protocol}://{host}{endpoint}"
    auth = (username, password) if username and password else None
    headers = {'Content-Type': 'application/json'} if method != 'GET' else None
    try:
        response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.HTTPError as e:
        # 打印错误信息
        print(f"HTTP Error: {e.response.status_code} for {url}")
        print(e.response.text)
    except ValueError as e:
        # 如果响应不是 JSON 格式,打印错误并返回原始内容
        print("Invalid JSON response:")
        raise

# 获取所有索引列表
def get_index_alias(index_name):
    endpoint = f"/{index_name}/_alias"
    index_alias = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
    return index_alias[index_name]["aliases"]

def create_index_alias(index_name, meta_config):
    endpoint = f"/_aliases"
    index_alias = get_index_alias(index_name)
    pprint.pprint(meta_config)
    for alias_name, meta in meta_config.items():
        print(f"Creating alias {alias_name} for index {index_name}")
        pprint.pprint(meta)
        is_write_index = meta.get("is_write_index", False)
        add_alias_config = {"index": index_name, "alias": alias_name}
        if is_write_index:
            add_alias_config["is_write_index"] = is_write_index

        create_alias_body = {
            "actions": [
                {"add": add_alias_config}
            ]
        }
        pprint.pprint(create_alias_body)
        send_http_request('POST', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], json_body=create_alias_body, protocol=config['new_cluster_protocol'])


# 主函数
def main():
    index_list_str = "a,b,c"
    index_list = index_list_str.split(',')
    for index_name in index_list:
        meta_config = get_index_alias(index_name)
        create_index_alias(index_name, meta_config)


if __name__ == '__main__':
    main()

链接 #