I wrote this preprocessor just to add a couple features to Python (pipes mostly.) Is there any way I could improve it?
import tokenize, io, types
PREFIX = """
# ITHON START
import sys as _INTERNAL_SYS
class _INTERNAL_LazyIterable:
__slots__ = ('x','y')
def __init__(self, x, y) -> None:
self.x = iter(x)
self.y = iter(y)
def __iter__(self):
return _INTERNAL_LazyIterator(self)
class _INTERNAL_LazyIterator:
__slots__ = ('iterable',)
def __init__(self, iterable) -> None:
self.iterable = iterable
def __next__(self):
try:
return next(self.iterable.x)
except StopIteration:
try:
return next(self.iterable.y)
except StopIteration:
raise StopIteration
class _INTERNAL_LAZYMERGE:
def __init__(self, val):
self.val = val
def __rrshift__(self, other):
return _INTERNAL_LAZYMERGE(other)
def __lshift__(self, other):
return _INTERNAL_LazyIterable(self.val, other)
class _INTERNAL_LPIPE:
def __init__(self, val):
self.val = val
def __rrshift__(self, other):
return _INTERNAL_LPIPE(other)
def __lshift__(self, other):
return other(self.val)
class _INTERNAL_RPIPE:
def __init__(self, action):
self.action = action
def __rrshift__(self, other):
return _INTERNAL_RPIPE(other)
def __lshift__(self, other):
return self.action(other)
_INTERNAL_lpipe = _INTERNAL_LPIPE(None)
_INTERNAL_rpipe = _INTERNAL_RPIPE(None)
_INTERNAL_lazymerge = _INTERNAL_LAZYMERGE(None)
# ITHON END
"""
def translate(file: io.StringIO):
patched_file = io.StringIO(PREFIX + file.read())
skip_token = False
tokens = list(tokenize.generate_tokens(patched_file.readline)) # Precalculate tokens
print(tokens)
for n, i in enumerate(tokens):
type, name,_,_,_ = i
try:
next_type, next_name,_,_,_ = tokens[n + 1]
except IndexError:
next_type, next_name = (None, None)
print(type, name)
if skip_token:
skip_token = False
continue
if type == tokenize.OP and next_type == tokenize.OP:
# Most likely special operation
if name == "|" and next_name == ">": # left pipe
yield tokenize.OP, ">>"
yield tokenize.NAME, "_INTERNAL_lpipe"
yield tokenize.OP, "<<"
skip_token = True
elif name == "<" and next_name == "|": # right pipe
yield tokenize.OP, ">>"
yield tokenize.NAME, "_INTERNAL_rpipe"
yield tokenize.OP, "<<"
skip_token = True
elif name == ":" and next_name == ":": # lazy merge
yield tokenize.OP, ">>"
yield tokenize.NAME, "_INTERNAL_lazymerge"
yield tokenize.OP, "<<"
skip_token = True
else:
yield type,name
elif type == tokenize.OP:
if name == "<>": # barry is flufl
yield tokenize.OP, "!="
else:
yield type, name
else:
yield type,name
script = """
def x():
print(1)
a = "Hello, World!"
b = print
a |> b
b <| a
for i in [1,2,3] :: [4,5,6] :: [7,8,9]:
print(i)
if a <> b:
print("Barry is FLUFL!")
"Hello, World!" |> print
"""
code = tokenize.untokenize(translate(io.StringIO(script)))
print(code)
exec(code)
Example code output:
# ITHON START
import sys as _INTERNAL_SYS
class _INTERNAL_LazyIterable :
__slots__ =('x','y')
def __init__ (self ,x ,y )->None :
self .x =iter (x )
self .y =iter (y )
def __iter__ (self ):
return _INTERNAL_LazyIterator (self )
class _INTERNAL_LazyIterator :
__slots__ =('iterable',)
def __init__ (self ,iterable )->None :
self .iterable =iterable
def __next__ (self ):
try :
return next (self .iterable .x )
except StopIteration :
try :
return next (self .iterable .y )
except StopIteration :
raise StopIteration
class _INTERNAL_LAZYMERGE :
def __init__ (self ,val ):
self .val =val
def __rrshift__ (self ,other ):
return _INTERNAL_LAZYMERGE (other )
def __lshift__ (self ,other ):
return _INTERNAL_LazyIterable (self .val ,other )
class _INTERNAL_LPIPE :
def __init__ (self ,val ):
self .val =val
def __rrshift__ (self ,other ):
return _INTERNAL_LPIPE (other )
def __lshift__ (self ,other ):
return other (self .val )
class _INTERNAL_RPIPE :
def __init__ (self ,action ):
self .action =action
def __rrshift__ (self ,other ):
return _INTERNAL_RPIPE (other )
def __lshift__ (self ,other ):
return self .action (other )
_INTERNAL_lpipe =_INTERNAL_LPIPE (None )
_INTERNAL_rpipe =_INTERNAL_RPIPE (None )
_INTERNAL_lazymerge =_INTERNAL_LAZYMERGE (None )
# ITHON END
def x ():
print (1 )
a ="Hello, World!"
b =print
a >>_INTERNAL_lpipe <<b
b >>_INTERNAL_rpipe <<a
for i in [1 ,2 ,3 ]>>_INTERNAL_lazymerge <<[4 ,5 ,6 ]>>_INTERNAL_lazymerge <<[7 ,8 ,9 ]:
print (i )
if a !=b :
print ("Barry is FLUFL!")
"Hello, World!">>_INTERNAL_lpipe <<print
-
1\$\begingroup\$ So this is a code generator (though I didn't find a tag for that, so I used the closest relevant one I could find). Can you please show some example output? \$\endgroup\$Reinderien– Reinderien2024年09月05日 12:33:06 +00:00Commented Sep 5, 2024 at 12:33
-
\$\begingroup\$ @Reinderien Just added some example output. It is very bad-looking, but that's what tokenize.untokenize outputs. \$\endgroup\$Xandaaah– Xandaaah2024年09月05日 19:39:15 +00:00Commented Sep 5, 2024 at 19:39
1 Answer 1
Unused
import sys as _INTERNAL_SYS
is unused. You can delete this.
slots
Two of your classes define __slots__
, where as three do not. You should probably add the __slots__
to the other classes.
Iterating the hard way
The _INTERNAL_LazyIterator
class is entirely unnecessary. Instead, you can use yield from
in the __iter__
method to create the nessessary iterator.
class _INTERNAL_LazyIterable:
__slots__ = ('x','y')
def __init__(self, x, y) -> None:
self.x = iter(x)
self.y = iter(y)
def __iter__(self):
yield from self.x
yield from self.y
Redundant Code
This code is repeated over and over, every time you define a new operator.
class _INTERNAL_XXX:
def __rrshift__(self, other):
return _INTERNAL_XXX(other)
Moreover, you create a singleton for each class by passing in None
for the "other" argument. This smells.
To reduce the duplicate code, you could have a common base class which defines the __rrshift__
operation and creates a new instance of the current class, but the None
initialization still smells
Instead, you could have one class for your singleton tokens, and pass an action function into it. When lhs >> TOKEN
is evaluated, it could return an second class containing the action function and "lhs" value. This second class would respond to (lhs >> TOKEN) << rhs
class _Token:
__slots__ = ('action',)
def __init__(self, action):
self.action = action
def __rrshift__(self, lhs):
return _Operation(self.action, lhs)
class _Operation:
__slots__ = ('action', 'lhs')
def __init__(self, action, lhs):
self.action = action
self.lhs = lhs
def __lshift__(self, rhs):
return self.action(self.lhs, rhs)
Now, you just need to create the required tokens:
_INTERNAL_lazymerge = _Token(lambda lhs, rhs: _INTERNAL_LazyIterable(lhs, rhs))
No additional classes should be needed for the pipes. Just the tokens, and the appropriate lambda functions:
_INTERNAL_lpipe = _Token(lambda lhs, rhs: rhs(lhs))
_INTERNAL_rpipe = _Token(lambda lhs, rhs: lhs(rhs))
Precalculating Tokens
tokens = list(tokenize.generate_tokens(patched_file.readline))
tokenize.generate_tokens(...)
, as its name implies, is a generator. It doesn't token the entire file up front, and then return the tokens one at a time. It reads the file one line at a time, and emits the tokens as it encounters them. It does this for efficiency; it doesn't need to store millions of token in memory when processing a huge file.
list(...)
undoes all those savings, by realizing all of the tokens and storing them all in one big list. You're doing this just so you can access "the next token" ... tokens[n + 1]
along with the current token.
The itertools.pairwise(...)
method will take an iterable a, b, c, d, e, ...
and return (a, b), (b, c), (c, d), (d, e), ...
. In short, using this would convert your stream of tokens into a stream of pairs of tokens. No need to realize and store them all in a list!
The only issue with pairwise is, given n
tokens, it will yield n-1
pairs of tokens. You need to append a dummy token to the list, in order to get n
token pairs, with the last real token paired with a dummy. We can do this using itertools.chain
from itertools import chain, pairwise
...
def translate(file: io.StringIO):
patched_file = io.StringIO(PREFIX + file.read())
tokens = tokenize.generate_tokens(patched_file.readline)
token_pairs = pairwise(chain(tokens, [(None,)*5]))
skip_token = False
for first_token, second_token in token_pairs:
type, name, _, _, _ = first_token
next_type, next_name, _, _, _ = second_token
...
LazyIterable: revisited
The itertools.chain
function is doing effectively the same thing as our _INTERNAL_LazyIterable
class is doing. We can remove that class and just use itertools.chain
... renamed to avoid any collisions with the given source.
from itertools import chain as _INTERNAL_LazyIterable
Revised code
(The _INTERNAL
prefix was removed for brevity)
import tokenize, io
from itertools import chain, pairwise
PREFIX = """
# ITHON START
from itertools import chain as _LazyIterable
class _Token:
__slots__ = ('action',)
def __init__(self, action):
self.action = action
def __rrshift__(self, lhs):
return _Operation(self.action, lhs)
class _Operation:
__slots__ = ('action', 'lhs')
def __init__(self, action, lhs):
self.action = action
self.lhs = lhs
def __lshift__(self, rhs):
return self.action(self.lhs, rhs)
_LAZY_MERGE = _Token(lambda lhs, rhs: _LazyIterable(lhs, rhs))
_LPIPE = _Token(lambda lhs, rhs: rhs(lhs))
_RPIPE = _Token(lambda lhs, rhs: lhs(rhs))
# ITHON END
"""
def translate(file: io.StringIO):
patched_file = io.StringIO(PREFIX + file.read())
tokens = tokenize.generate_tokens(patched_file.readline)
token_pairs = pairwise(chain(tokens, [(None,)*5]))
skip_token = False
for first, second in token_pairs:
type, name, _, _, _ = first
next_type, next_name, _, _, _ = second
if skip_token:
skip_token = False
continue
if type == tokenize.OP and next_type == tokenize.OP:
# Most likely special operation
if name == "|" and next_name == ">": # left pipe
yield tokenize.OP, ">>"
yield tokenize.NAME, "_LPIPE"
yield tokenize.OP, "<<"
skip_token = True
elif name == "<" and next_name == "|": # right pipe
yield tokenize.OP, ">>"
yield tokenize.NAME, "_RPIPE"
yield tokenize.OP, "<<"
skip_token = True
elif name == ":" and next_name == ":": # lazy merge
yield tokenize.OP, ">>"
yield tokenize.NAME, "_LAZY_MERGE"
yield tokenize.OP, "<<"
skip_token = True
else:
yield type,name
elif type == tokenize.OP:
if name == "<>":
yield tokenize.OP, "!="
else:
yield type, name
else:
yield type, name
script = """
def x():
print(1)
a = "Hello, World!"
b = print
a |> b
b <| a
for i in [1,2,3] :: [4,5,6] :: [7,8,9]:
print(i)
if a <> b:
print("Barry is FLUFL!")
"Hello, World!" |> print
"""
code = tokenize.untokenize(translate(io.StringIO(script)))
exec(code)
-
\$\begingroup\$ Thanks, but pairwise definitely won't work for me. I don't want to have a pipe operation split in half. \$\endgroup\$Xandaaah– Xandaaah2024年09月07日 20:51:41 +00:00Commented Sep 7, 2024 at 20:51
-
\$\begingroup\$ @Xandaaah I'm sorry, what do you mean? Your original code is processing the token stream in a a pairwise fashion. eg)
tokens[n]
&tokens[n+1]
, forn=0
, thenn=1
, thenn=2
, and so on. Thepairwise
operation does exactly this, and my revised code produces the same output yours did. \$\endgroup\$AJNeufeld– AJNeufeld2024年09月09日 00:00:36 +00:00Commented Sep 9, 2024 at 0:00 -
\$\begingroup\$ I thought you meant that it would be n=0, then n=2, and like that. I'm going to add new operators that are longer anyway. \$\endgroup\$Xandaaah– Xandaaah2024年09月15日 19:06:42 +00:00Commented Sep 15, 2024 at 19:06
-
\$\begingroup\$ You certainly can add longer, complex operations; just not in lambda functions. Simply add
def _long_complex_operation(lhs, rhs): ...
and_LONG_COMPLEX_OPERATION = _Token(_long_complex_operation)
, then just add the appropriate syntax parsing in yourtranslate()
function. \$\endgroup\$AJNeufeld– AJNeufeld2024年09月17日 22:19:06 +00:00Commented Sep 17, 2024 at 22:19 -
\$\begingroup\$ I meant more than 2 characters, not complex as in does complex operations. \$\endgroup\$Xandaaah– Xandaaah2024年09月21日 02:52:27 +00:00Commented Sep 21, 2024 at 2:52