前言:
最近又重新在看ElasticSearch的文档,发现那些DSL语法全都忘记了,所以准备写一个用ES做储存的demo小项目。其实是用DSL代替之前项目的SQL,但是数据以及一些字段还是需要,所以就需要将以前的MySQL数据导入到ElasticSearch中。
以前的做法是写一个脚本,通过创建索引,创建文档,将MySQL数据插入到ElasticSearch中,现在想通过Elastic中的成员之一——Logstash,来完成初步的导入工作。下面就从基础简单介绍该方式的导入过程吧。
工具清单:
1. ElasticSearch
2. Logstash
3. Java JDK
4. mysql-connector-java
环境搭建:
首先可以先安装JDK,配置环境变量,版本1.8之类的都可以。ElasticSearch和Logstash可以在Elastic官网下载最新版本,最好是两个工具的版本保持一致,目前我这里使用的是7.8版本。mysql-connector-java是一个同步驱动,只需要在启动logstash指定目录就可以。
配置MySQL同步:
input {
stdin {
}
jdbc {
# mysql 数据库链接,shop为数据库名
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/rebuild?characterEncoding=UTF-8&useSSL=false"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "root"
# 驱动
jdbc_driver_library => "E:/2setsoft/1dev/logstash-7.8.0/mysqletc/mysql-connector-java-5.1.7-bin.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 文件路径+名称
statement_filepath => "E:/2setsoft/1dev/logstash-7.8.0/mysqletc/run.sql"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# 索引类型
type => "content"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "hhyp"
document_type => "room"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
1. jdbc_connection_string
配置示例:jdbc:mysql://IP地址:端口/数据库名?字符集&其他配置参数。
2. jdbc_driver_library
同步驱动工具绝对地址,全称mysql-connector-java-5.1.7-bin.jar,网上可以自行搜索,版本我这里使用的是5.17。
3. statement_filepath
需要进行执行的sql 文件,主要是通过SQL语句将数据通过到ES指令,以下就表示将room记录同步到ES。
select * from rm_room where delete_time = 0
4. 附加
导入到ElasticSearch的配置,hosts指定ES服务的地址,index导入时创建的索引名称,document_type类似于MYSQL的表,ducument_id类似与该表的主键。
启动同步:
1. 在Logstash根目录下创建一个新文件夹mysqletc,里面放置驱动文件,MySQL同步配置,MySQL查询同步指令。
2. 在当前目录下启动cmd,输入命令,bin\logstash -f mysqletc\mysql.conf 就可以了。
3. 最后在kibana中的索引管理就可以看到新同步过来的索引了,要数据可以通过DSL查看。