6
\$\begingroup\$

I'd like some feedback on code I wrote to obfuscate text on-disk. The basic idea is the same as ROT13 (which is implemented in the Python standard library), except instead of rotating the English alphabet by 13, the underlying byte representation is rotated by 128. The use case is to hide data from a string-search.

Because this is such a general transformation, I decided that the user should specify a "base" codec, and then the code would register a version that performs ROT128 (e.g. specifying 'utf_8' creates 'utf_8_rot128').

rot128.py
# -*- coding: utf-8 -*-
'''
Provides codecs that perform a ROT128 transformation on their underlying
byte representation.
This module is side-effect free; to register codecs, use e.g.
 register_codecs(UTF_8, ASCII) # for 'utf_8_rot128' and 'ascii_rot128'
 register_rot128_codec() # for the bytes-to-bytes 'rot128'
'''
from typing import Dict, Iterable, Optional as Opt
from collections import defaultdict
from collections.abc import MutableMapping
import codecs
### The ROT128 transformation
ROT128_TRANS = bytes(range(256)[128:]) + bytes(range(128))
def rot128_transform(input: bytes) -> bytes:
 '''Rotate bytes from `input` by 128'''
 try:
 return input.translate(ROT128_TRANS)
 except AttributeError:
 return bytes(input).translate(ROT128_TRANS)
### Registration function and convenience aliases
UTF_8 = ('utf_8', 'U8', 'UTF', 'utf8') # in Python 3.8, add 'cp65001'
ASCII = ('ascii', '646', 'us-ascii')
def register_codecs(*encodings: Iterable[str]) -> None:
 '''Create and register codecs (with aliases) that perform ROT128 on
 their underlying byte representations. Arguments are iterables of aliases
 to the original encoding, e.g.
 ```
 register_rot128_codecs(('utf_8', 'U8', 'UTF', 'utf8'))
 ```
 creates the codec `utf_8_rot128`, with aliases
 ```
 u8_rot128, utf_rot128, utf8_rot128
 ```
 fetchable via `codecs.lookup(...)`
 '''
 # register the search function only once
 global _REGISTER_ONCE
 if _REGISTER_ONCE:
 codecs.register(_REGISTERED_CODECS)
 _REGISTER_ONCE = False
 # add codecs
 for encoding in encodings:
 # check that aliases refer to the same codec
 info_orig = codecs.lookup(encoding[0])
 if any(info_orig != codecs.lookup(alias) for alias in encoding[1:]):
 msg = f'{encoding!r} are not all aliases for the same codec!'
 raise ValueError(msg)
 # have we built this codec before?
 if info_orig in _REGISTERED_ALIASES:
 # fetch codec info
 for name in _REGISTERED_ALIASES[info_orig]:
 break
 info_rot128 = _REGISTERED_CODECS[name + '_rot128']
 else:
 # build codec, fetch info
 info_rot128 = _build_codec(info_orig)
 # register codec
 unregistered = set(encoding) - _REGISTERED_ALIASES[info_orig]
 for name in unregistered:
 _REGISTERED_CODECS[name + '_rot128'] = info_rot128
 # register aliases
 _REGISTERED_ALIASES[info_orig] |= unregistered
def _build_codec(codec_info: codecs.CodecInfo) -> codecs.CodecInfo:
 '''Create a ROT128'd codec based on `codec_info`'''
 
 def encode(input, errors: str = 'strict') -> bytes:
 input, inlen = codec_info.encode(input, errors)
 return rot128_transform(input), inlen
 
 def decode(input: bytes, errors: str ='strict'):
 return codec_info.decode(rot128_transform(input), errors)
 
 class Codec(codecs.Codec):
 def encode(self, input: str, errors: str = 'strict') -> bytes:
 return encode(input, errors)
 
 def decode(self, input: bytes, errors: str ='strict') -> bytes:
 return decode(input, errors)
 
 class IncrementalEncoder(codec_info.incrementalencoder):
 def encode(self, input, final: bool = False):
 return rot128_transform(super().encode(input, final))
 
 class IncrementalDecoder(codec_info.incrementaldecoder):
 def decode(self, input, final: bool = False):
 return super().decode(rot128_transform(input), final)
 
 class StreamWriter(Codec, codec_info.streamwriter):
 pass
 
 class StreamReader(Codec, codec_info.streamreader):
 pass
 
 return codecs.CodecInfo(
 name = codec_info.name + '_rot128',
 encode = encode,
 decode = decode,
 incrementalencoder = IncrementalEncoder,
 incrementaldecoder = IncrementalDecoder,
 streamwriter = StreamWriter,
 streamreader = StreamReader
 )
### Maintain registration with `codecs` module
class _RegisteredCodecs(MutableMapping):
 '''`dict`-like class that maps ROT128 codec names to their `CodecInfo`s'''
 
 def __init__(self) -> None:
 self._store: Dict[str, codecs.CodecInfo] = {}
 
 @staticmethod
 def _trans(key: str) -> str:
 '''Normalize codec name'''
 return key.lower().replace('-', '_')
 
 def __call__(self, key: str) -> Opt[codecs.CodecInfo]:
 '''Provide the search function interface to `codecs.register`'''
 return self.get(key, None)
 
 def __getitem__(self, key: str) -> codecs.CodecInfo:
 return self._store[self._trans(key)]
 
 def __setitem__(self, key: str, value: codecs.CodecInfo) -> None:
 self._store[self._trans(key)] = value
 
 def __delitem__(self, key: str) -> None:
 del self._store[self._trans(key)]
 
 def __contains__(self, key: str) -> bool:
 return self._trans(key) in self._store
 
 def __iter__(self):
 return iter(self._store)
 
 def __len__(self) -> int:
 return len(self._store)
 
 def __str__(self) -> str:
 return str(list(self.keys()))
_REGISTERED_CODECS = _RegisteredCodecs()
_REGISTERED_ALIASES = defaultdict(set)
_REGISTER_ONCE = True
### ROT128 bytes-to-bytes codec
def register_rot128_codec() -> None:
 '''Registers the 'rot128' bytes-to-bytes codec'''
 global _REGISTER_ROT128_ONCE
 if _REGISTER_ROT128_ONCE:
 codecs.register(_rot128_search_function)
 _REGISTER_ROT128_ONCE = False
def rot128_transcode(input: bytes, errors='strict') -> bytes:
 '''A `codecs`-module-style ROT128 encode/decode method'''
 return rot128_transform(input), len(input)
class Rot128Codec(codecs.Codec):
 '''ROT128 bytes-to-bytes codec'''
 def encode(self, input: bytes, errors: str = 'strict') -> bytes:
 return rot128_transcode(input, errors)
 
 decode = encode
class Rot128IncrementalEncoder(codecs.IncrementalEncoder):
 '''ROT128 bytes-to-bytes incremental encoder'''
 def encode(self, input: bytes, final: bool = False) -> bytes:
 return rot128_transform(input)
class Rot128IncrementalDecoder(codecs.IncrementalDecoder):
 '''ROT128 bytes-to-bytes incremental decoder'''
 def decode(self, input: bytes, final: bool = False) -> bytes:
 return rot128_transform(input)
class Rot128StreamWriter(Rot128Codec, codecs.StreamWriter):
 '''ROT128 bytes-to-bytes stream writer'''
 # need to specify (undocumented) charbuffertype for bytes-to-bytes;
 # see https://github.com/python/cpython/blob/3.8/Lib/encodings/base64_codec.py
 charbuffertype = bytes
class Rot128StreamReader(Rot128Codec, codecs.StreamReader):
 '''ROT128 bytes-to-bytes stream reader'''
 charbuffertype = bytes
_ROT128_CODEC_INFO = codecs.CodecInfo(
 name = 'rot128',
 encode = rot128_transcode,
 decode = rot128_transcode,
 incrementalencoder = Rot128IncrementalEncoder,
 incrementaldecoder = Rot128IncrementalDecoder,
 streamwriter = Rot128StreamWriter,
 streamreader = Rot128StreamReader
)
def _rot128_search_function(encoding: str) -> Opt[codecs.CodecInfo]:
 if encoding.lower() == 'rot128':
 return _ROT128_CODEC_INFO
 else:
 return None
_REGISTER_ROT128_ONCE = True

And a simple example:

import codecs
import rot128
rot128.register_rot128_codec()
rot128.register_codecs(rot128.UTF_8)
if __name__ == '__main__':
 # seamless encoding
 write_text = 'Hello world! 𐍈\n'
 with open('test.txt', 'w', encoding='utf_8_rot128') as f:
 f.write(write_text)
 
 # seamless decoding
 with open('test.txt', 'r', encoding='utf_8_rot128') as f:
 read_text = f.read()
 assert read_text == write_text
 
 # bytes-to-bytes is a little meaner
 with codecs.open('test.txt', 'rb', encoding='rot128') as f:
 read_bytes = f.read()
 # codecs.open doesn't have universal newlines
 read_text = codecs.decode(read_bytes, 'utf_8').replace('\r\n', '\n')
 assert read_text == write_text
 with open('test.txt', 'rb') as f:
 read_bytes = codecs.decode(f.read(), 'rot128')
 read_text = codecs.decode(read_bytes, 'utf_8').replace('\r\n', '\n')
 assert read_text == write_text
 
 # bytes-like object
 mybytes = write_text.encode('utf_8')
 memview = memoryview(mybytes)
 assert codecs.encode(memview, 'rot128') == codecs.encode(mybytes, 'rot128')

There's a few ugly things I'd like to draw attention to, namely

  • _RegisteredCodecs is a reimplementation of dict to look up codecs (which is a lot of boilerplate). It does the same "normalization" as the codecs module, namely, lowercasing names and converting hyphens to underscores, and its __call__ method implements the search function interface to the codecs registry.
  • bytes-like object edge cases: the ROT128 transformation is implemented with translate, but this does not exist for e.g. a memoryview, so it converts to bytes in that case; I'm not sure I should attempt to return the original class
  • The logic in register_codecs is pretty involved, to prevent the user from shooting themself in the foot if they try to register invalid aliases or re-register existing aliases

As an outsider, I'm happy to accept style review as well.

asked Mar 24, 2020 at 22:52
\$\endgroup\$

1 Answer 1

3
\$\begingroup\$
  • Re-implementing a dict is completely unnecessary. codecs.register() expects a search function, so a plain function will work just fine. You can use a regular dict to store codecs (in a closure). Normalization can be implemented in its own function.

    Something as simple as this should work:

    def register_codecs(*encodings: Iterable[str]) -> None:
     registered_codecs = {}
     def search(codecs_name):
     return registered_codecs.get(_normalize(codecs_name), None)
     codecs.register(search)
     # Add codecs
    def _normalize(encoding: str) -> str:
     return encoding.lower().replace('-', '_')
    

    Instead of storing codecs in the global variable _REGISTERED_CODECS, we just register another search function each time the user calls register_codecs() (which means _REGISTER_ONCE is also not needed any more; we just got rid of two global variables with one shot!)

  • Now for the error checking in register_codecs(). Checking that aliases refer to the same codec is fine, but I doubt if it's really necessary to check for duplicates. The code works all right even if the same codec is registered twice. So I think it's probably not worth it.

    After removing the check for duplicates, the complete register_codecs() function now looks like this:

    def register_codecs(*encodings: Iterable[str]) -> None:
     registered_codecs = {}
     def search(codecs_name):
     return registered_codecs.get(codecs_name, None)
     codecs.register(search)
     # then add codecs to registered_codecs
     for encoding in encodings:
     # check that aliases refer to the same codec
     info_orig = codecs.lookup(encoding[0])
     if any(info_orig != codecs.lookup(alias) for alias in encoding[1:]):
     msg = f"{encoding!r} are not all aliases for the same codec!"
     raise ValueError(msg)
     for name in encoding:
     registered_codecs[_normalize(name) + "_rot128"] = _build_codec(info_orig)
    

    And that's also one less global variable!

  • rot128_transform() takes any bytes-like object as argument and returns bytes. It's OK to return bytes even if the caller passes in something else like a memoryviewβ€”the same way python's Iterable interface works.

  • As a side note, the range() function takes two arguments: start and end. So instead of range(256)[128:], try range(128, 256).

answered Mar 25, 2020 at 16:56
\$\endgroup\$
4
  • \$\begingroup\$ Hey, thanks for the review! Give me some time to try this out and then I'll mark it as "Accepted" :D \$\endgroup\$ Commented Mar 25, 2020 at 23:08
  • \$\begingroup\$ Fun fact: according to the typing docs, bytes can be used as shorthand for a bytes-like object \$\endgroup\$ Commented Mar 26, 2020 at 12:30
  • \$\begingroup\$ " Normalization is also not needed, since the standard library normalize the names for you before calling your search function." This is untrue, not only does the library not normalize these names before sending them to the search function, but since the codec names ALSO come from the user, they need to be normalized. \$\endgroup\$ Commented Mar 26, 2020 at 17:04
  • 1
    \$\begingroup\$ Yeah, you're right. I thought the standard library does normalization because the codecs.register() docs says "Search functions are expected to take one argument, being the encoding name in all lower case letters". But it turns out that it only converts upper case letters to lower case and nothing else. \$\endgroup\$ Commented Mar 27, 2020 at 3:08

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.