-
Notifications
You must be signed in to change notification settings - Fork 690
Made QueryEvent decode options class variables #296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
That's a elegant solution to a problem I've been complaining about for way too long :D So thank you!
Would you mind sharing how you use this? Are you using subclass or just injecting QueryEvent.charset as a global variable?
001ace2 to
6c033f8
Compare
Re-pushed because I forgot to add self. to charset and on_errors. Also added keywords to make it more explicit.
6c033f8 to
2ce0580
Compare
Sorry about the re-pushes. It was a bit late and I missed the self. in the arguments and wrote in bad style.
@baloo Just set the proper charset/on_errors right after import. An example that I've tested is as follows:
from pymysqlreplication import BinLogStreamReader from pymysqlreplication.event import QueryEvent QueryEvent.charset = 'cp932' QueryEvent.on_errors = 'ignore' stream = BinLogStreamReader( connection_settings={'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': 'nopasswd'}, blocking=True, server_id=100) for e in stream: e.dump()
For those not sure why this is needed, try shuffing a
create table test_table (some_column text comment 'コラムおおおおお') engine=innodb;
using cp932 (e.g. using iconv -t cp932) then running the above test script with the charset and on_errors settings commented out. You should get a
UnicodeDecodeError: 'utf8' codec can't decode byte 0x83 in position 47: invalid start byte
On second thoughts, I would prefer to not call decode all together, break the api, and store and reply bytestring to the user of this.
This change as you propose would make it impossible to have different charset per schema, and I would sincerely prefer to push that responsability to the consumer of the api.
Just wondering, have you check binlog recently see if the charset was specified in the transaction in the replication stream or something? That's what I would expect, but I haven't checked.
@noplay any thoughts?
I got this parsed:
diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py
index 1ee7655e5ca9a..385b7dbae0c6a 100644
--- a/pymysqlreplication/binlogstream.py
+++ b/pymysqlreplication/binlogstream.py
@@ -8,7 +8,7 @@ from pymysql.cursors import DictCursor
from pymysql.util import int2byte
from .packet import BinLogPacketWrapper
-from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT
+from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT, START_EVENT_V3, FORMAT_DESCRIPTION_EVENT
from .gtid import GtidSet
from .event import (
QueryEvent, RotateEvent, FormatDescriptionEvent,
@@ -219,6 +219,8 @@ class BinLogStreamReader(object):
else:
self.pymysql_wrapper = pymysql.connect
+ self.binlog_version = None
+
def close(self):
if self.__connected_stream:
self._stream_connection.close()
@@ -452,9 +454,14 @@ class BinLogStreamReader(object):
self.__only_schemas,
self.__ignored_schemas,
self.__freeze_schema,
- self.__fail_on_table_metadata_unavailable)
-
- if binlog_event.event_type == ROTATE_EVENT:
+ self.__fail_on_table_metadata_unavailable,
+ self.binlog_version)
+
+ if binlog_event.event_type == FORMAT_DESCRIPTION_EVENT:
+ self.binlog_version = binlog_event.event.version
+ elif binlog_event.event_type == START_EVENT_V3:
+ self.binlog_version = binlog_event.event.version
+ elif binlog_event.event_type == ROTATE_EVENT:
self.log_pos = binlog_event.event.position
self.log_file = binlog_event.event.next_binlog
# Table Id in binlog are NOT persistent in MySQL - they are in-memory identifiers
diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py
index 97ff58172cb55..4e90faf95f3d7 100644
--- a/pymysqlreplication/event.py
+++ b/pymysqlreplication/event.py
@@ -3,6 +3,12 @@
import binascii
import struct
import datetime
+import warnings
+
+try:
+ from io import BytesIO
+except ImportError:
+ from cStringIO import StringIO as BytesIO
from pymysql.util import byte2int, int2byte
@@ -98,8 +104,38 @@ class RotateEvent(BinLogEvent):
print()
+# FormatDescriptionEvent and StartEventV3 are used to determine binlog version, see:
+# https://dev.mysql.com/doc/internals/en/determining-the-binlog-version.html
class FormatDescriptionEvent(BinLogEvent):
- pass
+ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
+ super(FormatDescriptionEvent, self).__init__(from_packet, event_size, table_map,
+ ctl_connection, **kwargs)
+ self.version = struct.unpack('<H', self.packet.read(2))[0]
+
+ # Note: the implementation is incomplete, if you need more, please submit
+ # a pull request
+ # see https://dev.mysql.com/doc/internals/en/format-description-event.html
+
+ def dump(self):
+ print("=== %s ===" % (self.__class__.__name__))
+ print("Version: %d" % self.version)
+ print()
+
+
+class StartEventV3(BinLogEvent):
+ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
+ super(StartEventV3, self).__init__(from_packet, event_size, table_map,
+ ctl_connection, **kwargs)
+
+ self.version = struct.unpack('<H', self.packet.read(2))[0]
+ # Note: the implementation is incomplete, if you need more, please submit
+ # a pull request
+ # https://dev.mysql.com/doc/internals/en/start-event-v3.html
+
+ def dump(self):
+ print("=== %s ===" % (self.__class__.__name__))
+ print("Version: %d" % self.version)
+ print()
class StopEvent(BinLogEvent):
@@ -166,15 +202,27 @@ class QueryEvent(BinLogEvent):
self.execution_time = self.packet.read_uint32()
self.schema_length = byte2int(self.packet.read(1))
self.error_code = self.packet.read_uint16()
- self.status_vars_length = self.packet.read_uint16()
+ if from_packet.binlog_version >= 4:
+ self.status_vars_length = self.packet.read_uint16()
+ else:
+ self.status_vars_length = 0
# Payload
- self.status_vars = self.packet.read(self.status_vars_length)
+ if self.status_vars_length > 0:
+ data = self.packet.read(self.status_vars_length)
+ print('data', data)
+ self.status_vars = QueryEvent._parse_status_vars(data)
+ else:
+ self.status_vars = {}
self.schema = self.packet.read(self.schema_length)
self.packet.advance(1)
self.query = self.packet.read(event_size - 13 - self.status_vars_length
- self.schema_length - 1).decode("utf-8")
+ print(self.dump())
+ print(self.status_vars)
+ raise "foo"
+
#string[EOF] query
def _dump(self):
@@ -184,6 +232,121 @@ class QueryEvent(BinLogEvent):
print("Query: %s" % (self.query))
+ @staticmethod
+ def _parse_status_vars(data):
+ # https://dev.mysql.com/doc/internals/en/query-event.html
+ # 0x00 Q_FLAGS2_CODE 4
+ # 0x01 Q_SQL_MODE_CODE 8
+ # 0x02 Q_CATALOG 1 + n + 1
+ # 0x03 Q_AUTO_INCREMENT 2 + 2
+ # 0x04 Q_CHARSET_CODE 2 + 2 + 2
+ # 0x05 Q_TIME_ZONE_CODE 1 + n
+ # 0x06 Q_CATALOG_NZ_CODE 1 + n
+ # 0x07 Q_LC_TIME_NAMES_CODE 2
+ # 0x08 Q_CHARSET_DATABASE_CODE 2
+ # 0x09 Q_TABLE_MAP_FOR_UPDATE_CODE 8
+ # 0x0a Q_MASTER_DATA_WRITTEN_CODE 4
+ # 0x0b Q_INVOKERS 1 + n + 1 + n
+ # 0x0c Q_UPDATED_DB_NAMES 1 + n*nul-term-string
+ # 0x0d Q_MICROSECONDS 3
+ data = BytesIO(data) # TODO: fixup python2 support
+ status_vars = {}
+ while len(data.getvalue()[data.tell():]) > 0:
+ code = struct.unpack('<B', data.read(1))[0]
+ print('code', code)
+ if code == 0x00:
+ # 0x00 Q_FLAGS2_CODE 4
+ value = struct.unpack('<I', data.read(4))[0]
+ status_vars['flags2'] = value
+ elif code == 0x01:
+ # 0x01 Q_SQL_MODE_CODE 8
+ value = struct.unpack('<Q', data.read(8))[0]
+ status_vars['sql_mode'] = value
+ elif code == 0x02:
+ # 0x02 Q_CATALOG 1 + n + 1
+ length = struct.unpack('<B', data.read(1))[0]
+ value = struct.unpack('%ds' % length, data.read(length))[0]
+ data.read(1)
+ status_vars['catalog'] = value
+ elif code == 0x03:
+ # 0x03 Q_AUTO_INCREMENT 2 + 2
+ incr = struct.unpack('<H', data.read(2))[0]
+ offset = struct.unpack('<H', data.read(2))[0]
+ status_vars['auto_increment'] = (incr, offset)
+ elif code == 0x04:
+ # 0x04 Q_CHARSET_CODE 2 + 2 + 2
+ charset = data.read(6)
+ set_client = struct.unpack('<H', charset[0:2])[0]
+ connection = struct.unpack('<H', charset[2:4])[0]
+ server = struct.unpack('<H', charset[4:])[0]
+ status_vars['charset_code'] = {
+ 'set_client': set_client,
+ 'connection': connection,
+ 'server': server,
+ }
+ elif code == 0x05:
+ # 0x05 Q_TIME_ZONE_CODE 1 + n
+ length = struct.unpack('<B', data.read(1))[0]
+ value = struct.unpack('%ds' % length, data.read(length))[0]
+ status_vars['time_zone_code'] = value
+ elif code == 0x06:
+ # 0x06 Q_CATALOG_NZ_CODE 1 + n
+ length = struct.unpack('<B', data.read(1))[0]
+ value = struct.unpack('%ds' % length, data.read(length))[0]
+ status_vars['catalog_nz_code'] = value
+ elif code == 0x07:
+ # 0x07 Q_LC_TIME_NAMES_CODE 2
+ value = struct.unpack('<H', data.read(2))[0]
+ status_vars['time_names_code'] = value
+ elif code == 0x08:
+ # 0x08 Q_CHARSET_DATABASE_CODE 2
+ value = struct.unpack('<H', data.read(2))[0]
+ status_vars['charset_database_code'] = value
+ elif code == 0x09:
+ # 0x09 Q_TABLE_MAP_FOR_UPDATE_CODE 8
+ value = struct.unpack('<Q', data.read(8))[0]
+ status_vars['table_map_for_update_code'] = value
+ elif code == 0x0a:
+ # 0x0a Q_MASTER_DATA_WRITTEN_CODE 4
+ value = struct.unpack('<Q', data.read(4))[0]
+ status_vars['master_data_written_code'] = value
+ elif code == 0x0b:
+ # 0x0b Q_INVOKERS 1 + n + 1 + n
+ length = struct.unpack('<B', data.read(1))[0]
+ username = struct.unpack('%ds' % length, data.read(length))[0]
+ length = struct.unpack('<B', data.read(1))[0]
+ hostname = struct.unpack('%ds' % length, data.read(length))[0]
+ status_vars['invokers'] = {
+ 'username': username,
+ 'hostname': hostname,
+ }
+ elif code == 0x0c:
+ # 0x0c Q_UPDATED_DB_NAMES 1 + n*nul-term-string
+ length = struct.unpack('<B', data.read(1))[0]
+ out = []
+ for _ in range(0, length):
+ def read_string(inp):
+ out = BytesIO()
+ while True:
+ c = inp.read(1)
+ if c == b'0円':
+ return out.getvalue()
+ out.write(c)
+ out.append(read_string(data))
+ status_vars['updated_db_names'] = out
+ elif code == 0x0d:
+ # 0x0d Q_MICROSECONDS 3
+ high, low = struct.unpack('<BH', data.read(3))
+ status_vars['microseconds'] = (high << 16) + low
+ else:
+ # Do not make an infinite loop :)
+ # TODO: should send a warning
+ warnings.warn("status var %x is not handled, please report to"
+ "https://github.com/noplay/python-mysql-replication/issues/new" % code) # noqa
+ _ = data.read()
+ return status_vars
+
+
class BeginLoadQueryEvent(BinLogEvent):
"""
diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py
index 936dc8c5b39d5..aedf58ef6e65d 100644
--- a/pymysqlreplication/packet.py
+++ b/pymysqlreplication/packet.py
@@ -68,6 +68,7 @@ class BinLogPacketWrapper(object):
constants.INTVAR_EVENT: event.IntvarEvent,
constants.GTID_LOG_EVENT: event.GtidEvent,
constants.STOP_EVENT: event.StopEvent,
+ constants.START_EVENT_V3: event.StartEventV3,
constants.BEGIN_LOAD_QUERY_EVENT: event.BeginLoadQueryEvent,
constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent,
@@ -92,7 +93,8 @@ class BinLogPacketWrapper(object):
only_schemas,
ignored_schemas,
freeze_schema,
- fail_on_table_metadata_unavailable):
+ fail_on_table_metadata_unavailable,
+ binlog_version):
# -1 because we ignore the ok byte
self.read_bytes = 0
# Used when we want to override a value in the data buffer
@@ -100,6 +102,7 @@ class BinLogPacketWrapper(object):
self.packet = from_packet
self.charset = ctl_connection.charset
+ self.binlog_version = binlog_version
# OK value
# timestamp
the patch was straitforward enough, BUT:
- the charset is platform dependent of the mysql server, if the mysql is running on big-endian server, the charset code I read will be jammed up, not sure this is a big issue, I do not expect too many people to run mysql on bigendian servers.
- We need to provide the mapping manually, which means going through all the files in strings directory and fetching all lines like:
https://github.com/percona/percona-server/blob/5e7e0bad4e028d9b4010b7b5886aafbbf688a230/strings/ctype-latin1.cc#L358
8 is indeed the latin1_charset
@baloo You’re right. This is only a temporary solution that at least allows my project to move since all schemas have the same encoding.
What about directly querying the server for charset names?
SELECT default_character_set_name FROM information_schema.SCHEMATA WHERE schema_name = "example_schema";
chadmayo
commented
Oct 10, 2019
@baloo I am dealing with a similar issue. When a query inserting binary data gets parsed, the reader is failing with the same exception @joy13975 reported above.
Is it possible to make a change to allow the errors argument to be specified? This could be passed in as an argument when instantiating BinLogStreamReader or set as a class variable in QueryEvent as suggested by @joy13975 . I am happy to submit a pull request. Please let me know your thoughts.
@joy13975 well no that does not work, you can't query server to get this afaict.
@cmayo117 I would prefer to go the call decode with "values passed from status_vars" way. But I do not have time to handle this myself. If you'd like to throw in a PR, that would be great.
chadmayo
commented
Oct 18, 2019
@baloo Unfortunately I don't think that will solve our issue as it isn't a problem of using the wrong charset. Some queries in our case have embedded binary data which cannot be decoded easily. Instead, it would be helpful to have the option of just ignoring decoding errors altogether.
QueryEvent decodes the packet using utf-8, which is fine except it's hardcoded and there's no option to set the
errorsargument fordecode().In a project I'm working on, we need cp932 plus the errors='ignore' argument. In general it can just be set through class variables right after importing them.