Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

alibaba canal 客户端(Python2 版本)

Notifications You must be signed in to change notification settings

Guanpx/canal-python

Folders and files

NameName
Last commit message
Last commit date

Latest commit

History

10 Commits

Repository files navigation

canal-py2

First, canal-py2 is fork by https://github.com/haozi3156666/canal-python thx.

一.canal-py2 简介

canal-py2 是阿里巴巴开源项目 Canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件 的 python 客户端。为 python 开发者提供一个更友好的使用 Canal 的方式。Canal 是mysql数据库binlog的增量订阅&消费组件。

基于日志增量订阅&消费支持的业务:

  1. 数据库镜像
  2. 数据库实时备份
  3. 多级索引 (卖家和买家各自分库索引)
  4. search build
  5. 业务cache刷新
  6. 价格变化等重要业务消息

关于 Canal 的更多信息请访问 https://github.com/alibaba/canal/wiki

二.工作原理

canal-py2 是 Canal 的 python 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。

三.工作流程

1.Canal连接到mysql数据库,模拟slave

2.canal-py2 与 Canal 建立连接

2.数据库发生变更写入到binlog

5.Canal向数据库发送dump请求,获取binlog并解析

4.canal-py2 向 Canal 请求数据库变更

4.Canal 发送解析后的数据给canal-py2

5.canal-py2 收到数据,消费成功,发送回执。(可选)

6.Canal记录消费位置。

四.快速启动

安装Canal

Canal 的安装以及配置使用请查看 https://github.com/alibaba/canal/wiki/QuickStart

环境要求

python 2.7 (canal-python支持python3)

构建canal python客户端

# maybe pip2
pip install canal-py2

建立与Canal的连接

import time
from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from canal.protocol import CanalProtocol_pb2
client = Client()
client.connect(host='127.0.0.1', port=11111)
client.check_valid(username='canal', password='canal')
client.subscribe(client_id='1001', destination='example', filter='.*\\..*')
while True:
 message = client.get(100)
 entries = message['entries']
 for entry in entries:
 entry_type = entry.entryType
 if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
 continue
 row_change = EntryProtocol_pb2.RowChange()
 row_change.MergeFromString(entry.storeValue)
 event_type = row_change.eventType
 header = entry.header
 database = header.schemaName
 table = header.tableName
 event_type = header.eventType
 for row in row_change.rowDatas:
 format_data = dict()
 if event_type == EntryProtocol_pb2.EventType.DELETE:
 for column in row.beforeColumns:
 format_data = {
 column.name: column.value
 }
 elif event_type == EntryProtocol_pb2.EventType.INSERT:
 for column in row.afterColumns:
 format_data = {
 column.name: column.value
 }
 else:
 format_data['before'] = format_data['after'] = dict()
 for column in row.beforeColumns:
 format_data['before'][column.name] = column.value
 for column in row.afterColumns:
 format_data['after'][column.name] = column.value
 data = dict(
 db=database,
 table=table,
 event_type=event_type,
 data=format_data,
 )
 print(data)
 time.sleep(1)
client.disconnect()

About

alibaba canal 客户端(Python2 版本)

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 100.0%

AltStyle によって変換されたページ (->オリジナル) /