_tagsparsefailure 问题 #
Logstash 在处理 Elasticsearch 索引数据迁移时,如果遇到字段类型不匹配或者解析错误的情况,可能会给该字段添加一个 _tagsparsefailure 标签来标记这些有问题的文档。
如果想保持和之前的行为一致,可以在 Logstash 配置文件中添加如下配置:
event_api.tags.illegal: warn
设置为 warn,允许对保留tags字段分配非法值。默认是使用的 rename
迁移索引别名时候,迁移 is_write_index 之类的配置 #
POST /_aliases
{
"actions": [
{
"add": {
"index": "topic-2024.01.01",
"alias": "aw.topic",
"is_write_index": true
}
}
]
}
可以用 python 脚本批量迁移
import requests
import pprint
from requests.auth import HTTPBasicAuth
# 配置信息,按照实际环境调整
config = {
# 源集群host
'old_cluster_host': 'xxx:9200',
# 源集群用户名
'old_cluster_user': 'elastic',
# 源集群密码
'old_cluster_password': '',
# 源集群http协议,可选 http/https
'old_cluster_protocol': 'http',
'new_cluster_host': 'xxxxx:9200',
# 目标集群用户名
'new_cluster_user': 'elastic',
# 目标集群密码
'new_cluster_password': '',
# 目标集群http协议,可选 http/https
'new_cluster_protocol': 'http',
}
# 通用的 HTTP 请求函数
def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
url = f"{protocol}://{host}{endpoint}"
auth = (username, password) if username and password else None
headers = {'Content-Type': 'application/json'} if method != 'GET' else None
try:
response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
# 打印错误信息
print(f"HTTP Error: {e.response.status_code} for {url}")
print(e.response.text)
except ValueError as e:
# 如果响应不是 JSON 格式,打印错误并返回原始内容
print("Invalid JSON response:")
raise
# 获取所有索引列表
def get_index_alias(index_name):
endpoint = f"/{index_name}/_alias"
index_alias = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
return index_alias[index_name]["aliases"]
def create_index_alias(index_name, meta_config):
endpoint = f"/_aliases"
index_alias = get_index_alias(index_name)
pprint.pprint(meta_config)
for alias_name, meta in meta_config.items():
print(f"Creating alias {alias_name} for index {index_name}")
pprint.pprint(meta)
is_write_index = meta.get("is_write_index", False)
add_alias_config = {"index": index_name, "alias": alias_name}
if is_write_index:
add_alias_config["is_write_index"] = is_write_index
create_alias_body = {
"actions": [
{"add": add_alias_config}
]
}
pprint.pprint(create_alias_body)
send_http_request('POST', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], json_body=create_alias_body, protocol=config['new_cluster_protocol'])
# 主函数
def main():
index_list_str = "a,b,c"
index_list = index_list_str.split(',')
for index_name in index_list:
meta_config = get_index_alias(index_name)
create_index_alias(index_name, meta_config)
if __name__ == '__main__':
main()