2
\$\begingroup\$

I wrote this preprocessor just to add a couple features to Python (pipes mostly.) Is there any way I could improve it?

import tokenize, io, types
PREFIX = """
# ITHON START
import sys as _INTERNAL_SYS
class _INTERNAL_LazyIterable:
 __slots__ = ('x','y')
 def __init__(self, x, y) -> None:
 self.x = iter(x)
 self.y = iter(y)
 def __iter__(self):
 return _INTERNAL_LazyIterator(self)
class _INTERNAL_LazyIterator:
 __slots__ = ('iterable',)
 def __init__(self, iterable) -> None:
 self.iterable = iterable
 def __next__(self):
 try:
 return next(self.iterable.x)
 except StopIteration:
 try:
 return next(self.iterable.y)
 except StopIteration:
 raise StopIteration
class _INTERNAL_LAZYMERGE:
 def __init__(self, val):
 self.val = val
 def __rrshift__(self, other):
 return _INTERNAL_LAZYMERGE(other)
 def __lshift__(self, other):
 return _INTERNAL_LazyIterable(self.val, other)
class _INTERNAL_LPIPE:
 def __init__(self, val):
 self.val = val
 def __rrshift__(self, other):
 return _INTERNAL_LPIPE(other)
 def __lshift__(self, other):
 return other(self.val)
class _INTERNAL_RPIPE:
 def __init__(self, action):
 self.action = action
 def __rrshift__(self, other):
 return _INTERNAL_RPIPE(other)
 def __lshift__(self, other):
 return self.action(other)
_INTERNAL_lpipe = _INTERNAL_LPIPE(None)
_INTERNAL_rpipe = _INTERNAL_RPIPE(None)
_INTERNAL_lazymerge = _INTERNAL_LAZYMERGE(None)
# ITHON END
"""
def translate(file: io.StringIO):
 patched_file = io.StringIO(PREFIX + file.read())
 skip_token = False
 tokens = list(tokenize.generate_tokens(patched_file.readline)) # Precalculate tokens
 print(tokens)
 for n, i in enumerate(tokens):
 type, name,_,_,_ = i
 try:
 next_type, next_name,_,_,_ = tokens[n + 1]
 except IndexError:
 next_type, next_name = (None, None)
 print(type, name)
 if skip_token:
 skip_token = False
 continue
 if type == tokenize.OP and next_type == tokenize.OP:
 # Most likely special operation
 if name == "|" and next_name == ">": # left pipe
 yield tokenize.OP, ">>"
 yield tokenize.NAME, "_INTERNAL_lpipe"
 yield tokenize.OP, "<<"
 skip_token = True
 elif name == "<" and next_name == "|": # right pipe
 yield tokenize.OP, ">>"
 yield tokenize.NAME, "_INTERNAL_rpipe"
 yield tokenize.OP, "<<"
 skip_token = True
 elif name == ":" and next_name == ":": # lazy merge
 yield tokenize.OP, ">>"
 yield tokenize.NAME, "_INTERNAL_lazymerge"
 yield tokenize.OP, "<<"
 skip_token = True
 else:
 yield type,name
 elif type == tokenize.OP:
 if name == "<>": # barry is flufl
 yield tokenize.OP, "!="
 else:
 yield type, name
 else:
 
 yield type,name
script = """
def x():
 print(1)
a = "Hello, World!"
b = print
a |> b
b <| a
for i in [1,2,3] :: [4,5,6] :: [7,8,9]:
 print(i)
if a <> b:
 print("Barry is FLUFL!")
"Hello, World!" |> print
"""
code = tokenize.untokenize(translate(io.StringIO(script)))
print(code)
exec(code)

Example code output:

# ITHON START
import sys as _INTERNAL_SYS 
class _INTERNAL_LazyIterable :
 __slots__ =('x','y')
 def __init__ (self ,x ,y )->None :
 self .x =iter (x )
 self .y =iter (y )
 def __iter__ (self ):
 return _INTERNAL_LazyIterator (self )
class _INTERNAL_LazyIterator :
 __slots__ =('iterable',)
 def __init__ (self ,iterable )->None :
 self .iterable =iterable 
 def __next__ (self ):
 try :
 return next (self .iterable .x )
 except StopIteration :
 try :
 return next (self .iterable .y )
 except StopIteration :
 raise StopIteration 
class _INTERNAL_LAZYMERGE :
 def __init__ (self ,val ):
 self .val =val 
 def __rrshift__ (self ,other ):
 return _INTERNAL_LAZYMERGE (other )
 def __lshift__ (self ,other ):
 return _INTERNAL_LazyIterable (self .val ,other )
class _INTERNAL_LPIPE :
 def __init__ (self ,val ):
 self .val =val 
 def __rrshift__ (self ,other ):
 return _INTERNAL_LPIPE (other )
 def __lshift__ (self ,other ):
 return other (self .val )
class _INTERNAL_RPIPE :
 def __init__ (self ,action ):
 self .action =action 
 def __rrshift__ (self ,other ):
 return _INTERNAL_RPIPE (other )
 def __lshift__ (self ,other ):
 return self .action (other )
_INTERNAL_lpipe =_INTERNAL_LPIPE (None )
_INTERNAL_rpipe =_INTERNAL_RPIPE (None )
_INTERNAL_lazymerge =_INTERNAL_LAZYMERGE (None )
# ITHON END
def x ():
 print (1 )
a ="Hello, World!"
b =print 
a >>_INTERNAL_lpipe <<b 
b >>_INTERNAL_rpipe <<a 
for i in [1 ,2 ,3 ]>>_INTERNAL_lazymerge <<[4 ,5 ,6 ]>>_INTERNAL_lazymerge <<[7 ,8 ,9 ]:
 print (i )
if a !=b :
 print ("Barry is FLUFL!")
"Hello, World!">>_INTERNAL_lpipe <<print
asked Sep 5, 2024 at 11:33
\$\endgroup\$
2
  • 1
    \$\begingroup\$ So this is a code generator (though I didn't find a tag for that, so I used the closest relevant one I could find). Can you please show some example output? \$\endgroup\$ Commented Sep 5, 2024 at 12:33
  • \$\begingroup\$ @Reinderien Just added some example output. It is very bad-looking, but that's what tokenize.untokenize outputs. \$\endgroup\$ Commented Sep 5, 2024 at 19:39

1 Answer 1

3
\$\begingroup\$

Unused

import sys as _INTERNAL_SYS is unused. You can delete this.

slots

Two of your classes define __slots__, where as three do not. You should probably add the __slots__ to the other classes.

Iterating the hard way

The _INTERNAL_LazyIterator class is entirely unnecessary. Instead, you can use yield from in the __iter__ method to create the nessessary iterator.

class _INTERNAL_LazyIterable:
 __slots__ = ('x','y')
 
 def __init__(self, x, y) -> None:
 self.x = iter(x)
 self.y = iter(y)
 
 def __iter__(self):
 yield from self.x
 yield from self.y

Redundant Code

This code is repeated over and over, every time you define a new operator.

class _INTERNAL_XXX:
 def __rrshift__(self, other):
 return _INTERNAL_XXX(other)

Moreover, you create a singleton for each class by passing in None for the "other" argument. This smells.

To reduce the duplicate code, you could have a common base class which defines the __rrshift__ operation and creates a new instance of the current class, but the None initialization still smells

Instead, you could have one class for your singleton tokens, and pass an action function into it. When lhs >> TOKEN is evaluated, it could return an second class containing the action function and "lhs" value. This second class would respond to (lhs >> TOKEN) << rhs

class _Token:
 __slots__ = ('action',)
 
 def __init__(self, action):
 self.action = action
 def __rrshift__(self, lhs):
 return _Operation(self.action, lhs)
class _Operation:
 __slots__ = ('action', 'lhs')
 def __init__(self, action, lhs):
 self.action = action
 self.lhs = lhs
 def __lshift__(self, rhs):
 return self.action(self.lhs, rhs)

Now, you just need to create the required tokens:

_INTERNAL_lazymerge = _Token(lambda lhs, rhs: _INTERNAL_LazyIterable(lhs, rhs))

No additional classes should be needed for the pipes. Just the tokens, and the appropriate lambda functions:

_INTERNAL_lpipe = _Token(lambda lhs, rhs: rhs(lhs))
_INTERNAL_rpipe = _Token(lambda lhs, rhs: lhs(rhs))

Precalculating Tokens

tokens = list(tokenize.generate_tokens(patched_file.readline))

tokenize.generate_tokens(...), as its name implies, is a generator. It doesn't token the entire file up front, and then return the tokens one at a time. It reads the file one line at a time, and emits the tokens as it encounters them. It does this for efficiency; it doesn't need to store millions of token in memory when processing a huge file.

list(...) undoes all those savings, by realizing all of the tokens and storing them all in one big list. You're doing this just so you can access "the next token" ... tokens[n + 1] along with the current token.

The itertools.pairwise(...) method will take an iterable a, b, c, d, e, ... and return (a, b), (b, c), (c, d), (d, e), .... In short, using this would convert your stream of tokens into a stream of pairs of tokens. No need to realize and store them all in a list!

The only issue with pairwise is, given n tokens, it will yield n-1 pairs of tokens. You need to append a dummy token to the list, in order to get n token pairs, with the last real token paired with a dummy. We can do this using itertools.chain

from itertools import chain, pairwise
...
def translate(file: io.StringIO):
 patched_file = io.StringIO(PREFIX + file.read())
 tokens = tokenize.generate_tokens(patched_file.readline)
 token_pairs = pairwise(chain(tokens, [(None,)*5]))
 skip_token = False
 for first_token, second_token in token_pairs:
 type, name, _, _, _ = first_token
 next_type, next_name, _, _, _ = second_token
 ...

LazyIterable: revisited

The itertools.chain function is doing effectively the same thing as our _INTERNAL_LazyIterable class is doing. We can remove that class and just use itertools.chain ... renamed to avoid any collisions with the given source.

from itertools import chain as _INTERNAL_LazyIterable

Revised code

(The _INTERNAL prefix was removed for brevity)

import tokenize, io
from itertools import chain, pairwise
PREFIX = """
# ITHON START
from itertools import chain as _LazyIterable
class _Token:
 __slots__ = ('action',)
 
 def __init__(self, action):
 self.action = action
 def __rrshift__(self, lhs):
 return _Operation(self.action, lhs)
class _Operation:
 __slots__ = ('action', 'lhs')
 def __init__(self, action, lhs):
 self.action = action
 self.lhs = lhs
 def __lshift__(self, rhs):
 return self.action(self.lhs, rhs) 
_LAZY_MERGE = _Token(lambda lhs, rhs: _LazyIterable(lhs, rhs))
_LPIPE = _Token(lambda lhs, rhs: rhs(lhs))
_RPIPE = _Token(lambda lhs, rhs: lhs(rhs))
# ITHON END
"""
def translate(file: io.StringIO):
 patched_file = io.StringIO(PREFIX + file.read())
 tokens = tokenize.generate_tokens(patched_file.readline)
 token_pairs = pairwise(chain(tokens, [(None,)*5]))
 skip_token = False
 for first, second in token_pairs:
 type, name, _, _, _ = first
 next_type, next_name, _, _, _ = second
 if skip_token:
 skip_token = False
 continue
 
 if type == tokenize.OP and next_type == tokenize.OP:
 # Most likely special operation
 if name == "|" and next_name == ">": # left pipe
 yield tokenize.OP, ">>"
 yield tokenize.NAME, "_LPIPE"
 yield tokenize.OP, "<<"
 skip_token = True
 elif name == "<" and next_name == "|": # right pipe
 yield tokenize.OP, ">>"
 yield tokenize.NAME, "_RPIPE"
 yield tokenize.OP, "<<"
 skip_token = True
 elif name == ":" and next_name == ":": # lazy merge
 yield tokenize.OP, ">>"
 yield tokenize.NAME, "_LAZY_MERGE"
 yield tokenize.OP, "<<"
 skip_token = True
 else:
 yield type,name
 
 elif type == tokenize.OP:
 if name == "<>":
 yield tokenize.OP, "!="
 else:
 yield type, name
 
 else:
 yield type, name
script = """
def x():
 print(1)
a = "Hello, World!"
b = print
a |> b
b <| a
for i in [1,2,3] :: [4,5,6] :: [7,8,9]:
 print(i)
if a <> b:
 print("Barry is FLUFL!")
"Hello, World!" |> print
"""
code = tokenize.untokenize(translate(io.StringIO(script)))
exec(code)
toolic
14.7k5 gold badges29 silver badges204 bronze badges
answered Sep 5, 2024 at 22:44
\$\endgroup\$
5
  • \$\begingroup\$ Thanks, but pairwise definitely won't work for me. I don't want to have a pipe operation split in half. \$\endgroup\$ Commented Sep 7, 2024 at 20:51
  • \$\begingroup\$ @Xandaaah I'm sorry, what do you mean? Your original code is processing the token stream in a a pairwise fashion. eg) tokens[n] & tokens[n+1], for n=0, then n=1, then n=2, and so on. The pairwise operation does exactly this, and my revised code produces the same output yours did. \$\endgroup\$ Commented Sep 9, 2024 at 0:00
  • \$\begingroup\$ I thought you meant that it would be n=0, then n=2, and like that. I'm going to add new operators that are longer anyway. \$\endgroup\$ Commented Sep 15, 2024 at 19:06
  • \$\begingroup\$ You certainly can add longer, complex operations; just not in lambda functions. Simply add def _long_complex_operation(lhs, rhs): ... and _LONG_COMPLEX_OPERATION = _Token(_long_complex_operation), then just add the appropriate syntax parsing in your translate() function. \$\endgroup\$ Commented Sep 17, 2024 at 22:19
  • \$\begingroup\$ I meant more than 2 characters, not complex as in does complex operations. \$\endgroup\$ Commented Sep 21, 2024 at 2:52

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.