跳转至

第二节 利用 JDBC 插件导入数据到 Elasticsearch

1、同步数据库数据到 Elasticsearch

需求 – 将数据库中的数据同步到 ES,借助 ES 的全文搜索,提高搜索速度

  • 需要把新增用户信息同步到 Elasticsearch 中
  • 用户信息 Update 后,需要能被更新到 Elasticsearch
  • 支持增量更新
  • 用户注销后,不能被 ES 所搜索到

2、JDBC Input Plugin & 设计实现思路

  • 支持通过 JDBC Input Plugin 将数据从数据库从读到 Logstash
    • 需要自己提供所需的 JDBC Driver
  • Scheduling
    • 语法来自 Rufus-scheduler
    • 扩展了 Cron,支持时区
  • State
    • Tracking_column / sql_last_value

https://www.elastic.co/downloads/jdbc-client

cd /usr/share/logstash/logstash-core/lib/jars
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.49.zip
sudo yum install unzip
unzip mysql-connector-java-5.1.49.zip
cd mysql-connector-java-5.1.49
mv mysql-connector-java-5.1.49.jar ../

use dbtest;

CREATE TABLE user (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(320),
    last_updated INT(11) NOT NULL DEFAULT '0',
    is_deleted TINYINT(1) NOT NULL DEFAULT '0'
);


mysql> CREATE TABLE user (
    ->     id INT AUTO_INCREMENT PRIMARY KEY,
    ->     name VARCHAR(255) NOT NULL,
    ->     email VARCHAR(320),
    ->     last_updated INT(11) NOT NULL DEFAULT '0',
    ->     is_deleted TINYINT(1) NOT NULL DEFAULT '0'
    -> );
Query OK, 0 rows affected (0.32 sec)

mysql> insert into user values('','jacob','jacob@test.com','','');
Query OK, 1 row affected, 3 warnings (0.20 sec)

mysql> select * from user;
+----+-------+----------------+--------------+------------+
| id | name  | email          | last_updated | is_deleted |
+----+-------+----------------+--------------+------------+
|  1 | jacob | jacob@test.com |            0 |          0 |
+----+-------+----------------+--------------+------------+
1 row in set (0.19 sec)

mysql-demo.conf

input {
  jdbc {

    jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.49.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://db_hots:3306/dbtest?useSSL=false"
    # useSSL=false disable SSL the verify
    jdbc_user => admin
    jdbc_password => passoword
    #启用追踪,如果为true,则需要指定tracking_column
    use_column_value => true
    #指定追踪的字段,
    tracking_column => "last_updated"
    #追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
    tracking_column_type => "numeric"
    #记录最后一次运行的结果
    record_last_run => true
    #上面运行结果的保存位置
    last_run_metadata_path => "jdbc-position.txt"
    statement => "SELECT * FROM user where last_updated >:sql_last_value;"
    schedule => " * * * * * *"
  }
}
output {
  elasticsearch {
    document_id => "%{id}"
    document_type => "_doc"
    index => "users"
    hosts => ["http://localhost:9200"]
  }
  stdout{
    codec => rubydebug
  }
}
$ logstash -f mysql-demo.conf
 FROM user where last_updated >0;
[2020-12-13T05:15:46,179][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.217996s) SELECT * FROM user where last_updated >0;
[2020-12-13T05:15:48,815][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.215261s) SELECT * FROM user where last_updated >0;
[2020-12-13T05:15:51,512][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.214389s) SELECT * FROM user where last_updated >0;
[2020-12-13T05:15:54,139][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.212874s) SELECT * FROM user where last_updated >0;
[2020-12-13T05:15:56,748][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.216092s) SELECT * FROM user where last_updated >0;
[2020-12-13T05:15:59,373][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.217051s) SELECT * FROM user where last_updated >0;
[2020-12-13T05:16:02,001][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.213808s) SELECT * FROM user where last_updated >0;
[2020-12-13T05:16:05,164][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.210873s) SELECT * FROM user where last_updated >0;
[2020-12-13T05:16:07,750][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.213390s) SELECT * FROM user where last_updated >0;
[2020-12-13T05:16:10,385][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.218651s) SELECT * FROM user where last_updated >0;
[2020-12-13T05:16:13,240][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.213114s) SELECT * FROM user where last_updated >0;
[2020-12-13T05:16:15,803][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.211320s) SELECT * FROM user where last_updated >0;
[2020-12-13T05:16:18,389][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.213701s) SELECT * FROM user where last_updated >0;
[2020-12-13T05:16:20,998][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.216809s) SELECT * FROM user where last_updated >0;
[2020-12-13T05:16:23,646][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.215923s) SELECT * FROM user where last_updated >0;
[2020-12-13T05:16:26,350][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.214398s) SELECT * FROM user where last_updated >0;

3、Demo

3-1 插入新的数据

mysql> insert into user values('','ham','ham@test.com',UNIX_TIMESTAMP() ,'');
Query OK, 1 row affected, 2 warnings (0.20 sec)

mysql> select * from user;
+----+-------+----------------+--------------+------------+
| id | name  | email          | last_updated | is_deleted |
+----+-------+----------------+--------------+------------+
|  1 | jacob | jacob@test.com |            0 |          0 |
|  2 | ham   | ham@test.com   |   1608174096 |          0 |
+----+-------+----------------+--------------+------------+
2 rows in set (0.20 sec)
$ logstash -f mysql-demo.conf

[2020-12-13T05:40:18,761][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.215617s) SELECT * FROM user where last_updated >1608174096;
{
           "email" => "ham@test.com",
            "name" => "ham",
      "@timestamp" => 2020-12-13T05:40:15.249Z,
              "id" => 2,
        "@version" => "1",
      "is_deleted" => false,
    "last_updated" => 1608174096
}
[2020-12-13T05:40:21,395][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.213273s) SELECT * FROM user where last_updated >1608174096;
[2020-12-13T05:40:25,689][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.217912s) SELECT * FROM user where last_updated >1608174096;
insert into user values('','mike','mike@test.com',UNIX_TIMESTAMP() ,'');
insert into user values('','kate','kate@test.com',UNIX_TIMESTAMP() ,'');
[2020-12-13T05:42:54,156][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.216099s) SELECT * FROM user where last_updated >1608174096;
{
           "email" => "mike@test.com",
            "name" => "mike",
      "@timestamp" => 2020-12-13T05:42:54.162Z,
              "id" => 3,
        "@version" => "1",
      "is_deleted" => false,
    "last_updated" => 1608174346
}

....
[2020-12-13T05:43:20,257][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.218078s) SELECT * FROM user where last_updated >1608174346;
{
           "email" => "kate@test.com",
            "name" => "kate",
      "@timestamp" => 2020-12-13T05:43:20.259Z,
              "id" => 4,
        "@version" => "1",
      "is_deleted" => false,
    "last_updated" => 1608174368
}
...

3-2 Check insterted user

GET users/_search
{
  "took" : 28,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "users",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "email" : "ham@test.com",
          "name" : "ham",
          "@timestamp" : "2020-12-13T05:40:15.249Z",
          "id" : 2,
          "@version" : "1",
          "is_deleted" : false,
          "last_updated" : 1608174096
        }
      },
      {
        "_index" : "users",
        "_type" : "_doc",
        "_id" : "3",
        "_score" : 1.0,
        "_source" : {
          "email" : "mike@test.com",
          "name" : "mike",
          "@timestamp" : "2020-12-13T05:42:54.162Z",
          "id" : 3,
          "@version" : "1",
          "is_deleted" : false,
          "last_updated" : 1608174346
        }
      },
      {
        "_index" : "users",
        "_type" : "_doc",
        "_id" : "4",
        "_score" : 1.0,
        "_source" : {
          "email" : "kate@test.com",
          "name" : "kate",
          "@timestamp" : "2020-12-13T05:43:20.259Z",
          "id" : 4,
          "@version" : "1",
          "is_deleted" : false,
          "last_updated" : 1608174368
        }
      }
    ]
  }
}

3-3 Update Data

UPDATE user SET name="ham2", last=UNIX_TIMESTAMP() where id=2;
...
[2020-12-13T06:20:02,467][INFO ][logstash.inputs.jdbc     ][main][6b0ee2f856c602fc2c0158f436cb05489d0e96fe565728ffb597961d7dc64a68] (0.215766s) SELECT * FROM user where last_updated >1608174368;
{
           "email" => "ham@test.com",
            "name" => "ham2",
      "@timestamp" => 2020-12-13T06:20:02.542Z,
              "id" => 2,
        "@version" => "1",
      "is_deleted" => false,
    "last_updated" => 1608176401
}
...

3-3 Check Deleted User Data

# 创建 alias,只显示没有被标记 deleted的用户
POST /_aliases
{
  "actions": [
    {
      "add": {
        "index": "users",
        "alias": "view_users",
         "filter" : { "term" : { "is_deleted" : false } }
      }
    }
  ]
}

Set one user is_deleted as true

UPDATE user SET is_deleted=1, last_updated=UNIX_TIMESTAMP() WHERE id=4;


select * from user;
+----+-------+----------------+--------------+------------+
| id | name  | email          | last_updated | is_deleted |
+----+-------+----------------+--------------+------------+
|  1 | jacob | jacob@test.com |            0 |          0 |
|  2 | ham2  | ham@test.com   |   1608176401 |          0 |
|  3 | mike  | mike@test.com  |   1608174346 |          0 |
|  4 | kate  | kate@test.com  |   1608213235 |          1 |
+----+-------+----------------+--------------+------------+
4 rows in set (0.20 sec)
...
{
      "@timestamp" => 2020-12-13T07:11:09.431Z,
            "name" => "kate",
              "id" => 4,
           "email" => "kate@test.com",
    "last_updated" => 1608213235,
        "@version" => "1",
      "is_deleted" => true
}
...
# 通过 Alias查询,查不到被标记成 deleted的用户
POST view_users/_search
{

}

Output:kate disappear

"hits" : [
      {
        "_index" : "users",
        "_type" : "_doc",
        "_id" : "3",
        "_score" : 1.0,
        "_source" : {
          "email" : "mike@test.com",
          "name" : "mike",
          "@timestamp" : "2020-12-13T05:42:54.162Z",
          "id" : 3,
          "@version" : "1",
          "is_deleted" : false,
          "last_updated" : 1608174346
        }
      },
      {
        "_index" : "users",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "email" : "ham@test.com",
          "name" : "ham2",
          "@timestamp" : "2020-12-13T06:20:02.542Z",
          "id" : 2,
          "@version" : "1",
          "is_deleted" : false,
          "last_updated" : 1608176401
        }
      }
    ]
  }

在只显示未被删除的用户的view_users查询

POST view_users/_search
{
  "query": {
    "term": {
      "name.keyword": {
        "value": "kate"
      }
    }
  }
}
"hits" : [ ]

在全部用户的users查询

POST users/_search
{
  "query": {
    "term": {
      "name.keyword": {
        "value": "kate"
      }
    }
  }
}
"hits" : [
      {
        "_index" : "users",
        "_type" : "_doc",
        "_id" : "4",
        "_score" : 0.6931471,
        "_source" : {
          "@timestamp" : "2020-12-13T07:11:09.431Z",
          "name" : "kate",
          "id" : 4,
          "email" : "kate@test.com",
          "last_updated" : 1608213235,
          "@version" : "1",
          "is_deleted" : true
        }
      }
    ]

4、Q&A

  • 这种查表得方式应该不是很适用于生产环境的,监听binlog日志相对于生产环境来说更好点

    • 监听binlog确实有更多jdbc不具备的优点
  • 请问logstash这种轮询方式获取数据有没有啥性能问题?我们目前使用方式是kafka订阅binlog