香港云主机最佳企业级服务商!

ADSL拨号VPS包含了中国大陆(联通,移动,电信,)

中国香港,国外拨号VPS。

当前位置:云主机 > MYSQL >

电信ADSL拨号VPS
联通ADSL拨号VPS
移动ADSL拨号VPS

MySQL 与 Elasticsearch 数据不对称问题解决办法


时间:2020-11-03 13:32 作者:admin


mysql/' target='_blank'>mysql 与 Elasticsearch 数据不对称问题解决办法

jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。

当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。

这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化

MySQL> desc article;+-------------+--------------+------+-----+--------------------------------+-------+| Field    | Type     | Null | Key | Default            | Extra |+-------------+--------------+------+-----+--------------------------------+-------+| id     | int(11)   | NO  |   | 0               |    || title    | mediumtext  | NO  |   | NULL              |    || description | mediumtext  | YES |   | NULL              |    || author   | varchar(100) | YES |   | NULL              |    || source   | varchar(100) | YES |   | NULL              |    || content   | longtext   | YES |   | NULL              |    || status   | enum('Y','N')| NO  |   | 'N'              |    || ctime    | timestamp  | NO  |   | CURRENT_TIMESTAMP       |    || mtime    | timestamp  | YES |   | ON UPDATE CURRENT_TIMESTAMP  |    |+-------------+--------------+------+-----+--------------------------------+-------+7 rows in set (0.00 sec)

logstash 增加 mtime 的查询规则

jdbc {  jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"  jdbc_driver_class => "com.mysql.jdbc.Driver"  jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"  jdbc_user => "cms"  jdbc_password => "password"  schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次  statement => "select * from article where mtime > :sql_last_value"  use_column_value => true  tracking_column => "mtime"  tracking_column_type => "timestamp"   record_last_run => true  last_run_metadata_path => "/var/tmp/article-mtime.last" }

创建回收站表,这个事用于解决数据库删除,或者禁用 status = 'N' 这种情况的。

CREATE TABLE `elasticsearch_trash` ( `id` int(11) NOT NULL, `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

为 article 表创建触发器

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROWBEGIN -- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。 IF NEW.status = 'N' THEN insert into elasticsearch_trash(id) values(OLD.id); END IF; -- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。  IF NEW.status = 'Y' THEN delete from elasticsearch_trash where id = OLD.id; END IF;ENDCREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROWBEGIN -- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。 insert into elasticsearch_trash(id) values(OLD.id);END

接下来我们需要写一个简单地 Shell 每分钟运行一次,从 elasticsearch_trash 数据表中取出数据,然后使用 curl 命令调用 elasticsearch restful 接口,删除被收回的数据。

你还可以开发相关的程序,这里提供一个 Spring boot 定时任务例子。

实体

package cn.netkiller.api.domain.elasticsearch;import java.util.Date;import javax.persistence.Column;import javax.persistence.Entity;import javax.persistence.Id;import javax.persistence.Table;@Entity@Tablepublic class ElasticsearchTrash { @Id private int id; @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP") private Date ctime; public int getId() { return id; } public void setId(int id) { this.id = id; } public Date getCtime() { return ctime; } public void setCtime(Date ctime) { this.ctime = ctime; }}

仓库

package cn.netkiller.api.repository.elasticsearch;import org.springframework.data.repository.CrudRepository;import com.example.api.domain.elasticsearch.ElasticsearchTrash;public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{}

定时任务

package cn.netkiller.api.schedule;import org.elasticsearch.action.delete.DeleteResponse;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.rest.RestStatus;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import com.example.api.domain.elasticsearch.ElasticsearchTrash;import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;@Componentpublic class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); @Autowired private TransportClient client; @Autowired private ElasticsearchTrashRepository alasticsearchTrashRepository; public ScheduledTasks() { } @Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务 public void cleanTrash() { for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {  DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();  RestStatus status = response.status();  logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());  if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {  alasticsearchTrashRepository.delete(elasticsearchTrash);  } } }}

Spring boot 启动主程序。

package cn.netkiller.api;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication@EnableSchedulingpublic class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); }} 

以上就是MySQL 与 Elasticsearch 数据不对称问题解决办法的讲解,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

(责任编辑:admin)






帮助中心
会员注册
找回密码
新闻中心
快捷通道
域名登录面板
虚机登录面板
云主机登录面板
关于我们
关于我们
联系我们
联系方式

售前咨询:17830004266(重庆移动)

企业QQ:383546523

《中华人民共和国工业和信息化部》 编号:ICP备00012341号

Copyright © 2002 -2018 香港云主机 版权所有
声明:香港云主机品牌标志、品牌吉祥物均已注册商标,版权所有,窃用必究

云官方微信

在线客服

  • 企业QQ: 点击这里给我发消息
  • 技术支持:383546523

  • 公司总台电话:17830004266(重庆移动)
  • 售前咨询热线:17830004266(重庆移动)