Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Made QueryEvent decode options class variables #296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
joy13975 wants to merge 1 commit into julien-duponchelle:master
base: master
Choose a base branch
Loading
from joy13975:master

Conversation

@joy13975
Copy link

@joy13975 joy13975 commented Jul 14, 2019

QueryEvent decodes the packet using utf-8, which is fine except it's hardcoded and there's no option to set the errors argument for decode().

In a project I'm working on, we need cp932 plus the errors='ignore' argument. In general it can just be set through class variables right after importing them.

Copy link
Collaborator

baloo commented Jul 14, 2019

That's a elegant solution to a problem I've been complaining about for way too long :D So thank you!

Would you mind sharing how you use this? Are you using subclass or just injecting QueryEvent.charset as a global variable?

Re-pushed because I forgot to add self. to charset and on_errors.
Also added keywords to make it more explicit.
Copy link
Author

joy13975 commented Jul 15, 2019
edited
Loading

Sorry about the re-pushes. It was a bit late and I missed the self. in the arguments and wrote in bad style.

@baloo Just set the proper charset/on_errors right after import. An example that I've tested is as follows:

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.event import QueryEvent
QueryEvent.charset = 'cp932'
QueryEvent.on_errors = 'ignore'
stream = BinLogStreamReader(
 connection_settings={'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': 'nopasswd'},
 blocking=True,
 server_id=100)
for e in stream:
 e.dump()

For those not sure why this is needed, try shuffing a

create table test_table (some_column text comment 'コラムおおおおお') engine=innodb;

using cp932 (e.g. using iconv -t cp932) then running the above test script with the charset and on_errors settings commented out. You should get a

UnicodeDecodeError: 'utf8' codec can't decode byte 0x83 in position 47: invalid start byte

Copy link
Collaborator

baloo commented Jul 15, 2019
edited
Loading

On second thoughts, I would prefer to not call decode all together, break the api, and store and reply bytestring to the user of this.

This change as you propose would make it impossible to have different charset per schema, and I would sincerely prefer to push that responsability to the consumer of the api.
Just wondering, have you check binlog recently see if the charset was specified in the transaction in the replication stream or something? That's what I would expect, but I haven't checked.

@noplay any thoughts?

Copy link
Owner

julien-duponchelle commented Jul 15, 2019 via email

It seem possible to get the information. https://dev.mysql.com/doc/internals/en/query-event.html In the status_vars their is a Q_CHARSET_CODE and Q_CHARSET_DATABASE_CODE I didn't found the mapping to charset. But I guess it can be extracted from this. Le lun. 15 juil. 2019 à 21:45, Arthur Gautier <notifications@github.com> a écrit :
...
On second thoughts, I would prefer to *not* call decode all together, break the api, and store and reply bytestring to the user of this. This change as you propose would make it impossible to have different charset per schema, and I would sincerely prefer to push that responsability to the consumer of the api. Just wondering, have you check binlog recently see if the charset was specified in the transaction in the replication stream or something? That's what I would expect, but I haven't checked. @noplay <https://github.com/noplay> any thoughts? — You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub <#296>, or mute the thread <https://github.com/notifications/unsubscribe-auth/AACUKXJV6OEFWVEENBFXKSLP7THUZANCNFSM4IDQ6FRQ> .

Copy link
Collaborator

baloo commented Jul 16, 2019

I got this parsed:

diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py
index 1ee7655e5ca9a..385b7dbae0c6a 100644
--- a/pymysqlreplication/binlogstream.py
+++ b/pymysqlreplication/binlogstream.py
@@ -8,7 +8,7 @@ from pymysql.cursors import DictCursor
 from pymysql.util import int2byte
 from .packet import BinLogPacketWrapper
-from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT
+from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT, START_EVENT_V3, FORMAT_DESCRIPTION_EVENT
 from .gtid import GtidSet
 from .event import (
 QueryEvent, RotateEvent, FormatDescriptionEvent,
@@ -219,6 +219,8 @@ class BinLogStreamReader(object):
 else:
 self.pymysql_wrapper = pymysql.connect
+ self.binlog_version = None
+
 def close(self):
 if self.__connected_stream:
 self._stream_connection.close()
@@ -452,9 +454,14 @@ class BinLogStreamReader(object):
 self.__only_schemas,
 self.__ignored_schemas,
 self.__freeze_schema,
- self.__fail_on_table_metadata_unavailable)
-
- if binlog_event.event_type == ROTATE_EVENT:
+ self.__fail_on_table_metadata_unavailable,
+ self.binlog_version)
+
+ if binlog_event.event_type == FORMAT_DESCRIPTION_EVENT:
+ self.binlog_version = binlog_event.event.version
+ elif binlog_event.event_type == START_EVENT_V3:
+ self.binlog_version = binlog_event.event.version
+ elif binlog_event.event_type == ROTATE_EVENT:
 self.log_pos = binlog_event.event.position
 self.log_file = binlog_event.event.next_binlog
 # Table Id in binlog are NOT persistent in MySQL - they are in-memory identifiers
diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py
index 97ff58172cb55..4e90faf95f3d7 100644
--- a/pymysqlreplication/event.py
+++ b/pymysqlreplication/event.py
@@ -3,6 +3,12 @@
 import binascii
 import struct
 import datetime
+import warnings
+
+try:
+ from io import BytesIO
+except ImportError:
+ from cStringIO import StringIO as BytesIO
 from pymysql.util import byte2int, int2byte
@@ -98,8 +104,38 @@ class RotateEvent(BinLogEvent):
 print()
+# FormatDescriptionEvent and StartEventV3 are used to determine binlog version, see:
+# https://dev.mysql.com/doc/internals/en/determining-the-binlog-version.html
 class FormatDescriptionEvent(BinLogEvent):
- pass
+ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
+ super(FormatDescriptionEvent, self).__init__(from_packet, event_size, table_map,
+ ctl_connection, **kwargs)
+ self.version = struct.unpack('<H', self.packet.read(2))[0]
+
+ # Note: the implementation is incomplete, if you need more, please submit
+ # a pull request
+ # see https://dev.mysql.com/doc/internals/en/format-description-event.html
+
+ def dump(self):
+ print("=== %s ===" % (self.__class__.__name__))
+ print("Version: %d" % self.version)
+ print()
+
+
+class StartEventV3(BinLogEvent):
+ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
+ super(StartEventV3, self).__init__(from_packet, event_size, table_map,
+ ctl_connection, **kwargs)
+
+ self.version = struct.unpack('<H', self.packet.read(2))[0]
+ # Note: the implementation is incomplete, if you need more, please submit
+ # a pull request
+ # https://dev.mysql.com/doc/internals/en/start-event-v3.html
+
+ def dump(self):
+ print("=== %s ===" % (self.__class__.__name__))
+ print("Version: %d" % self.version)
+ print()
 class StopEvent(BinLogEvent):
@@ -166,15 +202,27 @@ class QueryEvent(BinLogEvent):
 self.execution_time = self.packet.read_uint32()
 self.schema_length = byte2int(self.packet.read(1))
 self.error_code = self.packet.read_uint16()
- self.status_vars_length = self.packet.read_uint16()
+ if from_packet.binlog_version >= 4:
+ self.status_vars_length = self.packet.read_uint16()
+ else:
+ self.status_vars_length = 0
 # Payload
- self.status_vars = self.packet.read(self.status_vars_length)
+ if self.status_vars_length > 0:
+ data = self.packet.read(self.status_vars_length)
+ print('data', data)
+ self.status_vars = QueryEvent._parse_status_vars(data)
+ else:
+ self.status_vars = {}
 self.schema = self.packet.read(self.schema_length)
 self.packet.advance(1)
 self.query = self.packet.read(event_size - 13 - self.status_vars_length
 - self.schema_length - 1).decode("utf-8")
+ print(self.dump())
+ print(self.status_vars)
+ raise "foo"
+
 #string[EOF] query
 def _dump(self):
@@ -184,6 +232,121 @@ class QueryEvent(BinLogEvent):
 print("Query: %s" % (self.query))
+ @staticmethod
+ def _parse_status_vars(data):
+ # https://dev.mysql.com/doc/internals/en/query-event.html
+ # 0x00 Q_FLAGS2_CODE 4
+ # 0x01 Q_SQL_MODE_CODE 8
+ # 0x02 Q_CATALOG 1 + n + 1
+ # 0x03 Q_AUTO_INCREMENT 2 + 2
+ # 0x04 Q_CHARSET_CODE 2 + 2 + 2
+ # 0x05 Q_TIME_ZONE_CODE 1 + n
+ # 0x06 Q_CATALOG_NZ_CODE 1 + n
+ # 0x07 Q_LC_TIME_NAMES_CODE 2
+ # 0x08 Q_CHARSET_DATABASE_CODE 2
+ # 0x09 Q_TABLE_MAP_FOR_UPDATE_CODE 8
+ # 0x0a Q_MASTER_DATA_WRITTEN_CODE 4
+ # 0x0b Q_INVOKERS 1 + n + 1 + n
+ # 0x0c Q_UPDATED_DB_NAMES 1 + n*nul-term-string
+ # 0x0d Q_MICROSECONDS 3
+ data = BytesIO(data) # TODO: fixup python2 support
+ status_vars = {}
+ while len(data.getvalue()[data.tell():]) > 0:
+ code = struct.unpack('<B', data.read(1))[0]
+ print('code', code)
+ if code == 0x00:
+ # 0x00 Q_FLAGS2_CODE 4
+ value = struct.unpack('<I', data.read(4))[0]
+ status_vars['flags2'] = value
+ elif code == 0x01:
+ # 0x01 Q_SQL_MODE_CODE 8
+ value = struct.unpack('<Q', data.read(8))[0]
+ status_vars['sql_mode'] = value
+ elif code == 0x02:
+ # 0x02 Q_CATALOG 1 + n + 1
+ length = struct.unpack('<B', data.read(1))[0]
+ value = struct.unpack('%ds' % length, data.read(length))[0]
+ data.read(1)
+ status_vars['catalog'] = value
+ elif code == 0x03:
+ # 0x03 Q_AUTO_INCREMENT 2 + 2
+ incr = struct.unpack('<H', data.read(2))[0]
+ offset = struct.unpack('<H', data.read(2))[0]
+ status_vars['auto_increment'] = (incr, offset)
+ elif code == 0x04:
+ # 0x04 Q_CHARSET_CODE 2 + 2 + 2
+ charset = data.read(6)
+ set_client = struct.unpack('<H', charset[0:2])[0]
+ connection = struct.unpack('<H', charset[2:4])[0]
+ server = struct.unpack('<H', charset[4:])[0]
+ status_vars['charset_code'] = {
+ 'set_client': set_client,
+ 'connection': connection,
+ 'server': server,
+ }
+ elif code == 0x05:
+ # 0x05 Q_TIME_ZONE_CODE 1 + n
+ length = struct.unpack('<B', data.read(1))[0]
+ value = struct.unpack('%ds' % length, data.read(length))[0]
+ status_vars['time_zone_code'] = value
+ elif code == 0x06:
+ # 0x06 Q_CATALOG_NZ_CODE 1 + n
+ length = struct.unpack('<B', data.read(1))[0]
+ value = struct.unpack('%ds' % length, data.read(length))[0]
+ status_vars['catalog_nz_code'] = value
+ elif code == 0x07:
+ # 0x07 Q_LC_TIME_NAMES_CODE 2
+ value = struct.unpack('<H', data.read(2))[0]
+ status_vars['time_names_code'] = value
+ elif code == 0x08:
+ # 0x08 Q_CHARSET_DATABASE_CODE 2
+ value = struct.unpack('<H', data.read(2))[0]
+ status_vars['charset_database_code'] = value
+ elif code == 0x09:
+ # 0x09 Q_TABLE_MAP_FOR_UPDATE_CODE 8
+ value = struct.unpack('<Q', data.read(8))[0]
+ status_vars['table_map_for_update_code'] = value
+ elif code == 0x0a:
+ # 0x0a Q_MASTER_DATA_WRITTEN_CODE 4
+ value = struct.unpack('<Q', data.read(4))[0]
+ status_vars['master_data_written_code'] = value
+ elif code == 0x0b:
+ # 0x0b Q_INVOKERS 1 + n + 1 + n
+ length = struct.unpack('<B', data.read(1))[0]
+ username = struct.unpack('%ds' % length, data.read(length))[0]
+ length = struct.unpack('<B', data.read(1))[0]
+ hostname = struct.unpack('%ds' % length, data.read(length))[0]
+ status_vars['invokers'] = {
+ 'username': username,
+ 'hostname': hostname,
+ }
+ elif code == 0x0c:
+ # 0x0c Q_UPDATED_DB_NAMES 1 + n*nul-term-string
+ length = struct.unpack('<B', data.read(1))[0]
+ out = []
+ for _ in range(0, length):
+ def read_string(inp):
+ out = BytesIO()
+ while True:
+ c = inp.read(1)
+ if c == b'0円':
+ return out.getvalue()
+ out.write(c)
+ out.append(read_string(data))
+ status_vars['updated_db_names'] = out
+ elif code == 0x0d:
+ # 0x0d Q_MICROSECONDS 3
+ high, low = struct.unpack('<BH', data.read(3))
+ status_vars['microseconds'] = (high << 16) + low
+ else:
+ # Do not make an infinite loop :)
+ # TODO: should send a warning
+ warnings.warn("status var %x is not handled, please report to"
+ "https://github.com/noplay/python-mysql-replication/issues/new" % code) # noqa
+ _ = data.read()
+ return status_vars
+
+
 class BeginLoadQueryEvent(BinLogEvent):
 """
diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py
index 936dc8c5b39d5..aedf58ef6e65d 100644
--- a/pymysqlreplication/packet.py
+++ b/pymysqlreplication/packet.py
@@ -68,6 +68,7 @@ class BinLogPacketWrapper(object):
 constants.INTVAR_EVENT: event.IntvarEvent,
 constants.GTID_LOG_EVENT: event.GtidEvent,
 constants.STOP_EVENT: event.StopEvent,
+ constants.START_EVENT_V3: event.StartEventV3,
 constants.BEGIN_LOAD_QUERY_EVENT: event.BeginLoadQueryEvent,
 constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
 constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent,
@@ -92,7 +93,8 @@ class BinLogPacketWrapper(object):
 only_schemas,
 ignored_schemas,
 freeze_schema,
- fail_on_table_metadata_unavailable):
+ fail_on_table_metadata_unavailable,
+ binlog_version):
 # -1 because we ignore the ok byte
 self.read_bytes = 0
 # Used when we want to override a value in the data buffer
@@ -100,6 +102,7 @@ class BinLogPacketWrapper(object):
 self.packet = from_packet
 self.charset = ctl_connection.charset
+ self.binlog_version = binlog_version
 # OK value
 # timestamp

the patch was straitforward enough, BUT:

Copy link
Author

joy13975 commented Jul 16, 2019
edited
Loading

@baloo You’re right. This is only a temporary solution that at least allows my project to move since all schemas have the same encoding.

What about directly querying the server for charset names?

SELECT default_character_set_name FROM information_schema.SCHEMATA 
WHERE schema_name = "example_schema";

Copy link

@baloo I am dealing with a similar issue. When a query inserting binary data gets parsed, the reader is failing with the same exception @joy13975 reported above.

Is it possible to make a change to allow the errors argument to be specified? This could be passed in as an argument when instantiating BinLogStreamReader or set as a class variable in QueryEvent as suggested by @joy13975 . I am happy to submit a pull request. Please let me know your thoughts.

Copy link
Collaborator

baloo commented Oct 10, 2019

@joy13975 well no that does not work, you can't query server to get this afaict.

@cmayo117 I would prefer to go the call decode with "values passed from status_vars" way. But I do not have time to handle this myself. If you'd like to throw in a PR, that would be great.

Copy link

@baloo Unfortunately I don't think that will solve our issue as it isn't a problem of using the wrong charset. Some queries in our case have embedded binary data which cannot be decoded easily. Instead, it would be helpful to have the option of just ignoring decoding errors altogether.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Reviewers

No reviews

Assignees

No one assigned

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

AltStyle によって変換されたページ (->オリジナル) /