I'd like some feedback on code I wrote to obfuscate text on-disk. The basic idea is the same as ROT13 (which is implemented in the Python standard library), except instead of rotating the English alphabet by 13, the underlying byte representation is rotated by 128. The use case is to hide data from a string-search.
Because this is such a general transformation, I decided that the user should specify a "base" codec, and then the code would register a version that performs ROT128 (e.g. specifying 'utf_8'
creates 'utf_8_rot128'
).
rot128.py
# -*- coding: utf-8 -*-
'''
Provides codecs that perform a ROT128 transformation on their underlying
byte representation.
This module is side-effect free; to register codecs, use e.g.
register_codecs(UTF_8, ASCII) # for 'utf_8_rot128' and 'ascii_rot128'
register_rot128_codec() # for the bytes-to-bytes 'rot128'
'''
from typing import Dict, Iterable, Optional as Opt
from collections import defaultdict
from collections.abc import MutableMapping
import codecs
### The ROT128 transformation
ROT128_TRANS = bytes(range(256)[128:]) + bytes(range(128))
def rot128_transform(input: bytes) -> bytes:
'''Rotate bytes from `input` by 128'''
try:
return input.translate(ROT128_TRANS)
except AttributeError:
return bytes(input).translate(ROT128_TRANS)
### Registration function and convenience aliases
UTF_8 = ('utf_8', 'U8', 'UTF', 'utf8') # in Python 3.8, add 'cp65001'
ASCII = ('ascii', '646', 'us-ascii')
def register_codecs(*encodings: Iterable[str]) -> None:
'''Create and register codecs (with aliases) that perform ROT128 on
their underlying byte representations. Arguments are iterables of aliases
to the original encoding, e.g.
```
register_rot128_codecs(('utf_8', 'U8', 'UTF', 'utf8'))
```
creates the codec `utf_8_rot128`, with aliases
```
u8_rot128, utf_rot128, utf8_rot128
```
fetchable via `codecs.lookup(...)`
'''
# register the search function only once
global _REGISTER_ONCE
if _REGISTER_ONCE:
codecs.register(_REGISTERED_CODECS)
_REGISTER_ONCE = False
# add codecs
for encoding in encodings:
# check that aliases refer to the same codec
info_orig = codecs.lookup(encoding[0])
if any(info_orig != codecs.lookup(alias) for alias in encoding[1:]):
msg = f'{encoding!r} are not all aliases for the same codec!'
raise ValueError(msg)
# have we built this codec before?
if info_orig in _REGISTERED_ALIASES:
# fetch codec info
for name in _REGISTERED_ALIASES[info_orig]:
break
info_rot128 = _REGISTERED_CODECS[name + '_rot128']
else:
# build codec, fetch info
info_rot128 = _build_codec(info_orig)
# register codec
unregistered = set(encoding) - _REGISTERED_ALIASES[info_orig]
for name in unregistered:
_REGISTERED_CODECS[name + '_rot128'] = info_rot128
# register aliases
_REGISTERED_ALIASES[info_orig] |= unregistered
def _build_codec(codec_info: codecs.CodecInfo) -> codecs.CodecInfo:
'''Create a ROT128'd codec based on `codec_info`'''
def encode(input, errors: str = 'strict') -> bytes:
input, inlen = codec_info.encode(input, errors)
return rot128_transform(input), inlen
def decode(input: bytes, errors: str ='strict'):
return codec_info.decode(rot128_transform(input), errors)
class Codec(codecs.Codec):
def encode(self, input: str, errors: str = 'strict') -> bytes:
return encode(input, errors)
def decode(self, input: bytes, errors: str ='strict') -> bytes:
return decode(input, errors)
class IncrementalEncoder(codec_info.incrementalencoder):
def encode(self, input, final: bool = False):
return rot128_transform(super().encode(input, final))
class IncrementalDecoder(codec_info.incrementaldecoder):
def decode(self, input, final: bool = False):
return super().decode(rot128_transform(input), final)
class StreamWriter(Codec, codec_info.streamwriter):
pass
class StreamReader(Codec, codec_info.streamreader):
pass
return codecs.CodecInfo(
name = codec_info.name + '_rot128',
encode = encode,
decode = decode,
incrementalencoder = IncrementalEncoder,
incrementaldecoder = IncrementalDecoder,
streamwriter = StreamWriter,
streamreader = StreamReader
)
### Maintain registration with `codecs` module
class _RegisteredCodecs(MutableMapping):
'''`dict`-like class that maps ROT128 codec names to their `CodecInfo`s'''
def __init__(self) -> None:
self._store: Dict[str, codecs.CodecInfo] = {}
@staticmethod
def _trans(key: str) -> str:
'''Normalize codec name'''
return key.lower().replace('-', '_')
def __call__(self, key: str) -> Opt[codecs.CodecInfo]:
'''Provide the search function interface to `codecs.register`'''
return self.get(key, None)
def __getitem__(self, key: str) -> codecs.CodecInfo:
return self._store[self._trans(key)]
def __setitem__(self, key: str, value: codecs.CodecInfo) -> None:
self._store[self._trans(key)] = value
def __delitem__(self, key: str) -> None:
del self._store[self._trans(key)]
def __contains__(self, key: str) -> bool:
return self._trans(key) in self._store
def __iter__(self):
return iter(self._store)
def __len__(self) -> int:
return len(self._store)
def __str__(self) -> str:
return str(list(self.keys()))
_REGISTERED_CODECS = _RegisteredCodecs()
_REGISTERED_ALIASES = defaultdict(set)
_REGISTER_ONCE = True
### ROT128 bytes-to-bytes codec
def register_rot128_codec() -> None:
'''Registers the 'rot128' bytes-to-bytes codec'''
global _REGISTER_ROT128_ONCE
if _REGISTER_ROT128_ONCE:
codecs.register(_rot128_search_function)
_REGISTER_ROT128_ONCE = False
def rot128_transcode(input: bytes, errors='strict') -> bytes:
'''A `codecs`-module-style ROT128 encode/decode method'''
return rot128_transform(input), len(input)
class Rot128Codec(codecs.Codec):
'''ROT128 bytes-to-bytes codec'''
def encode(self, input: bytes, errors: str = 'strict') -> bytes:
return rot128_transcode(input, errors)
decode = encode
class Rot128IncrementalEncoder(codecs.IncrementalEncoder):
'''ROT128 bytes-to-bytes incremental encoder'''
def encode(self, input: bytes, final: bool = False) -> bytes:
return rot128_transform(input)
class Rot128IncrementalDecoder(codecs.IncrementalDecoder):
'''ROT128 bytes-to-bytes incremental decoder'''
def decode(self, input: bytes, final: bool = False) -> bytes:
return rot128_transform(input)
class Rot128StreamWriter(Rot128Codec, codecs.StreamWriter):
'''ROT128 bytes-to-bytes stream writer'''
# need to specify (undocumented) charbuffertype for bytes-to-bytes;
# see https://github.com/python/cpython/blob/3.8/Lib/encodings/base64_codec.py
charbuffertype = bytes
class Rot128StreamReader(Rot128Codec, codecs.StreamReader):
'''ROT128 bytes-to-bytes stream reader'''
charbuffertype = bytes
_ROT128_CODEC_INFO = codecs.CodecInfo(
name = 'rot128',
encode = rot128_transcode,
decode = rot128_transcode,
incrementalencoder = Rot128IncrementalEncoder,
incrementaldecoder = Rot128IncrementalDecoder,
streamwriter = Rot128StreamWriter,
streamreader = Rot128StreamReader
)
def _rot128_search_function(encoding: str) -> Opt[codecs.CodecInfo]:
if encoding.lower() == 'rot128':
return _ROT128_CODEC_INFO
else:
return None
_REGISTER_ROT128_ONCE = True
And a simple example:
import codecs
import rot128
rot128.register_rot128_codec()
rot128.register_codecs(rot128.UTF_8)
if __name__ == '__main__':
# seamless encoding
write_text = 'Hello world! π\n'
with open('test.txt', 'w', encoding='utf_8_rot128') as f:
f.write(write_text)
# seamless decoding
with open('test.txt', 'r', encoding='utf_8_rot128') as f:
read_text = f.read()
assert read_text == write_text
# bytes-to-bytes is a little meaner
with codecs.open('test.txt', 'rb', encoding='rot128') as f:
read_bytes = f.read()
# codecs.open doesn't have universal newlines
read_text = codecs.decode(read_bytes, 'utf_8').replace('\r\n', '\n')
assert read_text == write_text
with open('test.txt', 'rb') as f:
read_bytes = codecs.decode(f.read(), 'rot128')
read_text = codecs.decode(read_bytes, 'utf_8').replace('\r\n', '\n')
assert read_text == write_text
# bytes-like object
mybytes = write_text.encode('utf_8')
memview = memoryview(mybytes)
assert codecs.encode(memview, 'rot128') == codecs.encode(mybytes, 'rot128')
There's a few ugly things I'd like to draw attention to, namely
_RegisteredCodecs
is a reimplementation ofdict
to look up codecs (which is a lot of boilerplate). It does the same "normalization" as thecodecs
module, namely, lowercasing names and converting hyphens to underscores, and its__call__
method implements the search function interface to thecodecs
registry.bytes
-like object edge cases: the ROT128 transformation is implemented withtranslate
, but this does not exist for e.g. amemoryview
, so it converts tobytes
in that case; I'm not sure I should attempt to return the original class- The logic in
register_codecs
is pretty involved, to prevent the user from shooting themself in the foot if they try to register invalid aliases or re-register existing aliases
As an outsider, I'm happy to accept style review as well.
1 Answer 1
Re-implementing a dict is completely unnecessary.
codecs.register()
expects a search function, so a plain function will work just fine. You can use a regular dict to store codecs (in a closure). Normalization can be implemented in its own function.Something as simple as this should work:
def register_codecs(*encodings: Iterable[str]) -> None: registered_codecs = {} def search(codecs_name): return registered_codecs.get(_normalize(codecs_name), None) codecs.register(search) # Add codecs def _normalize(encoding: str) -> str: return encoding.lower().replace('-', '_')
Instead of storing codecs in the global variable
_REGISTERED_CODECS
, we just register another search function each time the user callsregister_codecs()
(which means_REGISTER_ONCE
is also not needed any more; we just got rid of two global variables with one shot!)Now for the error checking in
register_codecs()
. Checking that aliases refer to the same codec is fine, but I doubt if it's really necessary to check for duplicates. The code works all right even if the same codec is registered twice. So I think it's probably not worth it.After removing the check for duplicates, the complete
register_codecs()
function now looks like this:def register_codecs(*encodings: Iterable[str]) -> None: registered_codecs = {} def search(codecs_name): return registered_codecs.get(codecs_name, None) codecs.register(search) # then add codecs to registered_codecs for encoding in encodings: # check that aliases refer to the same codec info_orig = codecs.lookup(encoding[0]) if any(info_orig != codecs.lookup(alias) for alias in encoding[1:]): msg = f"{encoding!r} are not all aliases for the same codec!" raise ValueError(msg) for name in encoding: registered_codecs[_normalize(name) + "_rot128"] = _build_codec(info_orig)
And that's also one less global variable!
rot128_transform()
takes any bytes-like object as argument and returns bytes. It's OK to return bytes even if the caller passes in something else like amemoryview
βthe same way python'sIterable
interface works.As a side note, the
range()
function takes two arguments:start
andend
. So instead ofrange(256)[128:]
, tryrange(128, 256)
.
-
\$\begingroup\$ Hey, thanks for the review! Give me some time to try this out and then I'll mark it as "Accepted" :D \$\endgroup\$LimeHunter7– LimeHunter72020εΉ΄03ζ25ζ₯ 23:08:14 +00:00Commented Mar 25, 2020 at 23:08
-
\$\begingroup\$ Fun fact: according to the
typing
docs,bytes
can be used as shorthand for a bytes-like object \$\endgroup\$LimeHunter7– LimeHunter72020εΉ΄03ζ26ζ₯ 12:30:35 +00:00Commented Mar 26, 2020 at 12:30 -
\$\begingroup\$ " Normalization is also not needed, since the standard library normalize the names for you before calling your search function." This is untrue, not only does the library not normalize these names before sending them to the search function, but since the codec names ALSO come from the user, they need to be normalized. \$\endgroup\$LimeHunter7– LimeHunter72020εΉ΄03ζ26ζ₯ 17:04:02 +00:00Commented Mar 26, 2020 at 17:04
-
1\$\begingroup\$ Yeah, you're right. I thought the standard library does normalization because the
codecs.register()
docs says "Search functions are expected to take one argument, being the encoding name in all lower case letters". But it turns out that it only converts upper case letters to lower case and nothing else. \$\endgroup\$Yizhe Sun– Yizhe Sun2020εΉ΄03ζ27ζ₯ 03:08:47 +00:00Commented Mar 27, 2020 at 3:08
Explore related questions
See similar questions with these tags.