本文共 2485 字,大约阅读时间需要 8 分钟。
在设置数据库到Elasticsearch(ES)的数据同步任务时,选择合适的同步边界至关重要。以下是两种常见的同步边界选择及其优缺点:
####字段id
当数据库中新增字段时,使用字段id作为同步边界可以确保新字段被同步到ES。这是因为每当数据库有新增字段时,Logstash会自动将其推送到ES。但其缺点在于,当数据库中的数据进行更新时,Logstash无法自动感知这些更新,可能导致数据不完整或延迟。####updateTime
相比之下,选择updateTime作为同步边界是一个更好的选择。updateTime字段在数据库中无论是新增还是修改都会产生变化。Logstash能够识别这些变化,从而在定期同步过程中自动拉取数据库中更新的数据。这种方式能够有效避免数据延迟问题,确保数据的一致性。Logstash 推出了一个内置的JDBC插件,可以用来直接从数据库中读取数据并将其导入Elasticsearch。以下是该插件的使用步骤和配置示例:
####准备步骤
tar -zxzf logstash-x.x.x-linux-x86_64.tar.gz
mkdir -p logstash-x.x.x/sync
vim logstash-x.x.x/sync/logstash-db-sync.conf
cp ~/mysql-connector-java-5.1.41.jar logstash-x.x.x/sync/
####配置文件示例(logstash-db-sync.conf)
input { jdbc { # 数据库连接信息 jdbc_driver_library => "/usr/local/sql/mysql-connector-java-5.1.41.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://你的数据库地址:3306/你的数据库名" jdbc_user => "数据库用户名" jdbc_password => "数据库密码" # 分页配置 jdbc_paging_enabled => "true" jdbc_page_size => "10000" # 定时执行 SQL schedule => "* * * * *" statement => "SELECT * FROM 表名 WHERE update_time >= :sql_last_value" # 其他配置 use_column_value => "true" last_run_metadata_path => "/usr/local/logstash-x.x.x/sync/syncpoint_table" tracking_column_type => "timestamp" tracking_column => "update_time" clear_run => "false" lowercase_column_names => "false" }}output { elasticsearch { hosts => ["你的 ES 地址:9200", "你的 ES 地址:9201"] index => "你的 ES 索引名" document_id => "%{id}" document_type => "_doc" } stdout { codec => "json_lines" }}
cd logstash-x.x.x/bin/
./logstash -f ../sync/logstash-db-sync.conf
如果需要对数据进行分词处理,可以遵循以下步骤:
####添加分词模板
vim logstash-ik.json
{ "analyzer": "ik_max_word"}
http://your_es_ip:9200/_template/logstash
####修改配置文件在 logstash-db-sync.conf
中添加以下配置:
output { elasticsearch { # 模板名称 template_name => "你的模板名称" # 模板路径 template => "../sync/logstash-ik.json" template_overwrite => "true" manager_template => "false" }}
通过以上配置,你可以实现从数据库到Elasticsearch的实时数据同步。如果有更多问题,可以参考 Logstash 官方文档或查阅社区资料。
转载地址:http://bztyk.baihongyu.com/