使用go-mysql-postgresql实现 MySQL实时同步数据到PG
我的二狗呢 · · 2690 次点击 · · 开始浏览MySQL to PG 的数据同步,可以通过canal 或者 bireme 来做,但是操作起来步骤都比较费事。
之前公司的同事,在go-mysql-elasticsearch的基础上,改了一下,将target从es改为了pg,工具名称叫做go-mysql-postgresql 。这个工具最大的好处就是一键部署使用,不依赖其它组件。
项目地址:https://github.com/frainmeng/go-mysql-elasticsearch
我实验的时候,看到当前最新版本为 : go-mysql-postgresql3.0.0-linux-amd64.tar.gz
下面是我的配置操作笔记:
1、 在源MySQL上开设同步专用的账号
grant replication slave, replication client,process ,select on *.* to dts@'%' identified by 'dts';
MySQL上面的表情况:
use testdb;
testdb >show create table t_order \G
*************************** 1. row ***************************
Table: t_order
Create Table: CREATE TABLE `t_order` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`aid` int(10) unsigned NOT NULL,
`uid` int(10) unsigned NOT NULL,
`type` tinyint(3) unsigned NOT NULL,
`status` tinyint(4) unsigned NOT NULL,
`price` int(10) unsigned NOT NULL COMMENT '',
`num` int(10) unsigned NOT NULL,
`city` varchar(64) NOT NULL,
`category` varchar(64) NOT NULL,
PRIMARY KEY (`id`),
KEY `uid` (`uid`)
) ENGINE=InnoDB AUTO_INCREMENT=1000 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPRESSED COMMENT=''
1 row in set (0.00 sec)2、在PG上创建相同的表
create database testdb ; \c testdb CREATE TABLE t_order ( id bigint NOT NULL, aid bigint NOT NULL, uid bigint NOT NULL, type bigint NOT NULL, status bigint NOT NULL, price bigint NOT NULL , num bigint NOT NULL, city varchar(64) NOT NULL, category varchar(64) NOT NULL, PRIMARY KEY (id) ) ; CREATE USER dts REPLICATION LOGIN CONNECTION LIMIT 10 ENCRYPTED PASSWORD 'dts'; grant connect on database testdb to dts; grant usage on schema public to dts; grant select on all tables in schema public to dts; grant all on table t_order to dts;
go-mysql-postgresql 的部署:
将文件解压到 /var/lib/pgsql/go-mysql-postgresql 目录里面。
vim /var/lib/pgsql/go-mysql-postgresql/master.info 将准备同步的binlog信息写入文件中
bin_name = "mysql-bin.000167"
bin_pos = 13389413
cat /var/lib/pgsql/go-mysql-postgresql/river.toml
# 源端MySQL连接配置 my_addr = "172.31.10.100:3306" my_user = "dts" my_pass = "dts" my_charset = "utf8" # 目的端PG连接配置 pg_host = "192.168.2.4" pg_port = 5434 pg_user = "dts" pg_pass = "dts" pg_dbname = "testdb" # 存放同步到的位移点的文件目录 data_dir = "./var" # Inner Http status address stat_addr = "192.168.2.4:12800" # statsd monitor statsd_host = "127.0.0.1" statsd_port = 8125 statsd_prefix = "dbsync" # 伪装成slave时候,配置的server-id server_id = 1001 flavor = "mysql" # minimal items to be inserted in one bulk bulk_size = 1 # force flush the pending requests if we don't have enough items >= bulk_size flush_bulk_time = "500ms" # Ignore table without primary key skip_no_pk_table = false # concurrency conf concurrent_size = 6 concurrent_ack_win = 2048 # MySQL data source [[source]] schema = "testdb" tables = ["t_order"] # 目标PG的连接配置 [[target]] pg_name = "172.31.10.100_testdb_t_order" pg_host = "192.168.2.4" pg_port = 5434 pg_user = "dts" pg_pass = "dts" pg_dbname = "testdb" # MySQL 数据到 PG 后的分发规则 [[rule]] #mysql 库表的配置 schema = "testdb" table = "t_order" # pg 库表的配置 pg_schema = "public" pg_table = "t_order" # 下面这行很重要,标识了rule和target的绑定关系 pg_name = "172.31.10.100_testdb_t_order"
启动:
sh start.sh 即可
日志大致类似这样的:
[2019年08月21日 13:02:36] [info] pgclient.go:199 pg delete event execute success! Schema[public] Table[t_order], Id[166773984],result[{0xc000182b00 1}],reqId[503]
测试:
5k条记录, 走专线 从传输到写入到pg 用了33s 2019年08月20日 23:33:29.289 CST [112184] LOG: duration: 0.321 ms 2019年08月20日 23:34:02.769 CST [112184] LOG: duration: 0.085 ms 2w记录, 走专线 从传输到写入到pg 用了 140s 2019年08月20日 23:35:20.216 CST [112189] LOG: duration: 0.347 ms 2019年08月20日 23:37:39.848 CST [85173] LOG: duration: 6.648 ms
最后补充:
我们在做异构数据同步的时候,使用go-mysql-postgresql之前,通常情况下还需要将mysql老的数据全量同步过来,然后才能使用 go-mysql-postgresql来消费binlog达到同步数据的目的。 全量同步数据的方法,可以参考上一篇blog,地址: https://blog.51cto.com/lee90/2436325
有疑问加站长微信联系(非本文作者)
入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889
关注微信- 请尽量让自己的回复能够对别人有帮助
- 支持 Markdown 格式, **粗体**、~~删除线~~、
`单行代码` - 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
- 图片支持拖拽、截图粘贴等方式上传
收入到我管理的专栏 新建专栏
MySQL to PG 的数据同步,可以通过canal 或者 bireme 来做,但是操作起来步骤都比较费事。
之前公司的同事,在go-mysql-elasticsearch的基础上,改了一下,将target从es改为了pg,工具名称叫做go-mysql-postgresql 。这个工具最大的好处就是一键部署使用,不依赖其它组件。
项目地址:https://github.com/frainmeng/go-mysql-elasticsearch
我实验的时候,看到当前最新版本为 : go-mysql-postgresql3.0.0-linux-amd64.tar.gz
下面是我的配置操作笔记:
1、 在源MySQL上开设同步专用的账号
grant replication slave, replication client,process ,select on *.* to dts@'%' identified by 'dts';
MySQL上面的表情况:
use testdb;
testdb >show create table t_order \G
*************************** 1. row ***************************
Table: t_order
Create Table: CREATE TABLE `t_order` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`aid` int(10) unsigned NOT NULL,
`uid` int(10) unsigned NOT NULL,
`type` tinyint(3) unsigned NOT NULL,
`status` tinyint(4) unsigned NOT NULL,
`price` int(10) unsigned NOT NULL COMMENT '',
`num` int(10) unsigned NOT NULL,
`city` varchar(64) NOT NULL,
`category` varchar(64) NOT NULL,
PRIMARY KEY (`id`),
KEY `uid` (`uid`)
) ENGINE=InnoDB AUTO_INCREMENT=1000 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPRESSED COMMENT=''
1 row in set (0.00 sec)2、在PG上创建相同的表
create database testdb ; \c testdb CREATE TABLE t_order ( id bigint NOT NULL, aid bigint NOT NULL, uid bigint NOT NULL, type bigint NOT NULL, status bigint NOT NULL, price bigint NOT NULL , num bigint NOT NULL, city varchar(64) NOT NULL, category varchar(64) NOT NULL, PRIMARY KEY (id) ) ; CREATE USER dts REPLICATION LOGIN CONNECTION LIMIT 10 ENCRYPTED PASSWORD 'dts'; grant connect on database testdb to dts; grant usage on schema public to dts; grant select on all tables in schema public to dts; grant all on table t_order to dts;
go-mysql-postgresql 的部署:
将文件解压到 /var/lib/pgsql/go-mysql-postgresql 目录里面。
vim /var/lib/pgsql/go-mysql-postgresql/master.info 将准备同步的binlog信息写入文件中
bin_name = "mysql-bin.000167"
bin_pos = 13389413
cat /var/lib/pgsql/go-mysql-postgresql/river.toml
# 源端MySQL连接配置 my_addr = "172.31.10.100:3306" my_user = "dts" my_pass = "dts" my_charset = "utf8" # 目的端PG连接配置 pg_host = "192.168.2.4" pg_port = 5434 pg_user = "dts" pg_pass = "dts" pg_dbname = "testdb" # 存放同步到的位移点的文件目录 data_dir = "./var" # Inner Http status address stat_addr = "192.168.2.4:12800" # statsd monitor statsd_host = "127.0.0.1" statsd_port = 8125 statsd_prefix = "dbsync" # 伪装成slave时候,配置的server-id server_id = 1001 flavor = "mysql" # minimal items to be inserted in one bulk bulk_size = 1 # force flush the pending requests if we don't have enough items >= bulk_size flush_bulk_time = "500ms" # Ignore table without primary key skip_no_pk_table = false # concurrency conf concurrent_size = 6 concurrent_ack_win = 2048 # MySQL data source [[source]] schema = "testdb" tables = ["t_order"] # 目标PG的连接配置 [[target]] pg_name = "172.31.10.100_testdb_t_order" pg_host = "192.168.2.4" pg_port = 5434 pg_user = "dts" pg_pass = "dts" pg_dbname = "testdb" # MySQL 数据到 PG 后的分发规则 [[rule]] #mysql 库表的配置 schema = "testdb" table = "t_order" # pg 库表的配置 pg_schema = "public" pg_table = "t_order" # 下面这行很重要,标识了rule和target的绑定关系 pg_name = "172.31.10.100_testdb_t_order"
启动:
sh start.sh 即可
日志大致类似这样的:
[2019年08月21日 13:02:36] [info] pgclient.go:199 pg delete event execute success! Schema[public] Table[t_order], Id[166773984],result[{0xc000182b00 1}],reqId[503]
测试:
5k条记录, 走专线 从传输到写入到pg 用了33s 2019年08月20日 23:33:29.289 CST [112184] LOG: duration: 0.321 ms 2019年08月20日 23:34:02.769 CST [112184] LOG: duration: 0.085 ms 2w记录, 走专线 从传输到写入到pg 用了 140s 2019年08月20日 23:35:20.216 CST [112189] LOG: duration: 0.347 ms 2019年08月20日 23:37:39.848 CST [85173] LOG: duration: 6.648 ms
最后补充:
我们在做异构数据同步的时候,使用go-mysql-postgresql之前,通常情况下还需要将mysql老的数据全量同步过来,然后才能使用 go-mysql-postgresql来消费binlog达到同步数据的目的。 全量同步数据的方法,可以参考上一篇blog,地址: https://blog.51cto.com/lee90/2436325