First, canal-py2 is fork by https://github.com/haozi3156666/canal-python thx.
canal-py2 是阿里巴巴开源项目 Canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件 的 python 客户端。为 python 开发者提供一个更友好的使用 Canal 的方式。Canal 是mysql数据库binlog的增量订阅&消费组件。
基于日志增量订阅&消费支持的业务:
- 数据库镜像
- 数据库实时备份
- 多级索引 (卖家和买家各自分库索引)
- search build
- 业务cache刷新
- 价格变化等重要业务消息
关于 Canal 的更多信息请访问 https://github.com/alibaba/canal/wiki
canal-py2 是 Canal 的 python 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。
1.Canal连接到mysql数据库,模拟slave
2.canal-py2 与 Canal 建立连接
2.数据库发生变更写入到binlog
5.Canal向数据库发送dump请求,获取binlog并解析
4.canal-py2 向 Canal 请求数据库变更
4.Canal 发送解析后的数据给canal-py2
5.canal-py2 收到数据,消费成功,发送回执。(可选)
6.Canal记录消费位置。
Canal 的安装以及配置使用请查看 https://github.com/alibaba/canal/wiki/QuickStart
python 2.7 (canal-python支持python3)
# maybe pip2
pip install canal-py2import time from canal.client import Client from canal.protocol import EntryProtocol_pb2 from canal.protocol import CanalProtocol_pb2 client = Client() client.connect(host='127.0.0.1', port=11111) client.check_valid(username='canal', password='canal') client.subscribe(client_id='1001', destination='example', filter='.*\\..*') while True: message = client.get(100) entries = message['entries'] for entry in entries: entry_type = entry.entryType if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]: continue row_change = EntryProtocol_pb2.RowChange() row_change.MergeFromString(entry.storeValue) event_type = row_change.eventType header = entry.header database = header.schemaName table = header.tableName event_type = header.eventType for row in row_change.rowDatas: format_data = dict() if event_type == EntryProtocol_pb2.EventType.DELETE: for column in row.beforeColumns: format_data = { column.name: column.value } elif event_type == EntryProtocol_pb2.EventType.INSERT: for column in row.afterColumns: format_data = { column.name: column.value } else: format_data['before'] = format_data['after'] = dict() for column in row.beforeColumns: format_data['before'][column.name] = column.value for column in row.afterColumns: format_data['after'][column.name] = column.value data = dict( db=database, table=table, event_type=event_type, data=format_data, ) print(data) time.sleep(1) client.disconnect()