3
\$\begingroup\$

Rags, again.


This is the rewrite of Read stdin like a dictator.

From that post:

All too often I find myself wanting to allow only a certain list of characters to be written to stdin, and only recently did I actually bother to implement it. In Python, of all languages!

Essentially, this module provides a few APIs that allow a very tailored approach to reading characters from a terminal. By intercepting individual keypresses at the instant they occur, we can make cool decisions about closing the input stream whenever we want -- we dictate who says what in our terminal!

The standard input stream, after being opened, can be closed after a number of characters, or, at caller's option, any combination of a number of characters and allowed inputs.

... Yep, all still true, just way better, and actually cross-platform now.

It might seem like there's a lot of dead code / overkill stuff here; that's because I'm about to shift its focus from "input-constrainer-thing" to "poor-man's libreadline/curses" and so there's some provisioning in effect, but nevermind that.

I have some unittests but I won't supply them because writing them for an IO module is a lot more work than they're worth at this point.

#!/usr/bin/env python3
import sys
import struct
from platform import system
SYSTEM = system().lower()
class CHAR:
 """essentially an enum, to avoid clouding the ns"""
 NUL = chr(0)
 INT = chr(3)
 EOF = chr(4)
 BEL = chr(7)
 BKS = chr(8)
 LFD = chr(10)
 CRR = chr(13)
 ESC = chr(27)
 SPC = chr(32)
 DEL = chr(127)
 CONDS = [
 (lambda i, chars: i in chars),
 (lambda i, chars: i not in chars),
 (lambda *args, **kwargs: False),
 ]
def init(TERM_BUFSIZE=4096):
 """module initialiser: calls constructors so you don't have to
 you must call this before other functions!"""
 global reader
 reader = read_class(TERMCTL_SPECIAL_BUFSIZE=TERM_BUFSIZE)
def checkinit(func, *args, **kwargs):
 def isdefined(*args, **kwargs):
 if "reader" not in globals().keys():
 print("\n\tfatal: init() not called\n")
 msg = "must call init() first, or call init() again before {}()".format(func.__name__)
 raise TypeError(msg)
 return func(*args, **kwargs)
 return isdefined
class _nt_reader():
 def __init__(self, *args, **kwargs):
 """reader on nt"""
 self.NAME = "NT"
 if SYSTEM != "windows":
 util.writer("\n\there be dragons; ye COWER in the SHADOW of", self.NAME, "\n\n")
 self.msvcrt = __import__("msvcrt")
 self.ctypes = __import__("ctypes")
 try:
 self.colorama = __import__("colorama")
 self.colorama.init()
 except (AttributeError, ImportError):
 print(
 """
 you must install colorama to use this module on windows
 do this by:
 $ cd colorama
 $ python setup.py install
 """
 )
 exit(2)
 def getch(self):
 """use msvcrt to get a char"""
 return self.msvcrt.getch()
 def drain_buf(self):
 """while buffer, pseudo-nonblocking read bytes from buffer using msvcrt"""
 y = []
 while self.msvcrt.kbhit():
 y.append(self.msvcrt.getch())
 return "".join(y)
class _posix_reader():
 def __init__(self, TERMCTL_SPECIAL_BUFSIZE=4096):
 """reader on posix"""
 self.NAME = "POSIX"
 if SYSTEM == "windows":
 util.writer("\n\there be dragons; ye COWER in the SHADOW of", self.NAME, "\n\n")
 self.tty = __import__("tty")
 self.termios = __import__("termios")
 self.fcntl = __import__("fcntl")
 self.O_NONBLOCK = __import__("os").O_NONBLOCK
 self.TERM_BUFSIZE = TERMCTL_SPECIAL_BUFSIZE
 def getch(self):
 """use old fashioned termios to getch"""
 if sys.stdin.isatty(): # fixes "Inappropriate ioctl for device"
 fd = sys.stdin.fileno()
 old_settings = self.termios.tcgetattr(fd)
 try:
 self.tty.setraw(sys.stdin.fileno())
 ch = sys.stdin.read(1)
 finally:
 self.termios.tcsetattr(fd, self.termios.TCSADRAIN, old_settings)
 return ch
 else:
 return sys.stdin.read(1)
 def drain_buf(self):
 """read TERM_BUFSIZE of waiting keypresses"""
 if sys.stdin.isatty():
 fd = sys.stdin.fileno()
 fl = self.fcntl.fcntl(fd, self.fcntl.F_GETFL)
 self.fcntl.fcntl(fd, self.fcntl.F_SETFL, fl | self.O_NONBLOCK)
 try:
 # if nothing is waiting on sys.stdin, then TypeError
 # because "can't concat NoneType and str"
 chars = sys.stdin.read(self.TERM_BUFSIZE)
 except TypeError:
 chars = ""
 finally:
 self.fcntl.fcntl(fd, self.fcntl.F_SETFL, fl) # restore settings
 return chars
 else:
 return sys.stdin.read(self.TERM_BUFSIZE) # ???
read_class = {
 "windows": _nt_reader,
}.get(
 SYSTEM,
 _posix_reader # default
)
class util():
 """utilities"""
 def parsenum(num):
 """sys.maxsize if num is negative"""
 num = int(num)
 return sys.maxsize if num < 0 else num
 def writer(*args):
 """write a string to stdout and flush.
 should be used by all stdout-writing"""
 if not args:
 raise TypeError("writer requires at least one argument")
 if len(args) > 1:
 args = " ".join(str(i) for i in args).strip(" ")
 else:
 args = "".join(str(i) for i in args)
 sys.stdout.write(args)
 sys.stdout.flush()
 def esc_filter(x, y):
 """append x to y as long as x is not DEL or backspace or esc"""
 if x in (CHAR.DEL, CHAR.BKS):
 try:
 y.pop()
 except IndexError:
 pass
 return y
 y.append(x)
 return y
@checkinit
def readkey(raw=False):
 """interface for getch + drain_buf
 if raw, behave like getch but with flushing for multibyte inputs"""
 ch = reader.getch()
 more = reader.drain_buf()
 if raw:
 return ch + more
 # cooked
 if ch == CHAR.INT: raise KeyboardInterrupt
 if ch == CHAR.EOF: raise EOFError
 if ch in (CHAR.BKS, CHAR.DEL):
 util.writer(CHAR.BKS + CHAR.SPC + CHAR.BKS)
 return CHAR.BKS
 elif ch in (CHAR.CRR, CHAR.LFD):
 util.writer(CHAR.CRR if SYSTEM == "Windows" else "")
 return CHAR.LFD
 elif ch == CHAR.ESC:
 if more:
 if more[0] == "[":
 sp = more[1:]
 if sp in ("D", "C"):
 return "033円[" + sp
 elif sp == "3~":
 return CHAR.SPC
 else:
 return CHAR.BEL
 else:
 return CHAR.ESC + more
 ch += more
 return ch
@checkinit
def raw_readkey():
 """alias for readkey(raw=True)"""
 return readkey(raw=True)
@checkinit
def pretty_press(raw=False):
 """literally just read any fancy char from stdin let caller do whatever"""
 y = []
 i = readkey(raw=raw)
 if (not raw) and (i not in (CHAR.BKS, CHAR.DEL, CHAR.ESC)):
 util.writer(i)
 return util.esc_filter(i, y)
@checkinit
def _do_condition(
 end_chars,
 end_condition,
 count,
 ignore_chars=(),
 ignore_condition=CHAR.CONDS[True + 1], # always false
 raw=False
 ):
 """singular interface to reading strings from readkey, to minimise duplication"""
 y = []
 count = util.parsenum(count)
 while len(y) <= count:
 i = readkey(raw=raw)
 if end_condition(i, end_chars):
 break
 if not ignore_condition(i, ignore_chars):
 if (not raw) and (i not in (CHAR.BKS, CHAR.DEL)):
 util.writer(i)
 y = util.esc_filter(i, y)
 return "".join(y)
@checkinit
def thismany(count, raw=False):
 """read exactly count chars"""
 return _do_condition(
 "",
 CHAR.CONDS[True + 1], # more than true == never expires :D
 count,
 raw=raw
 )
@checkinit
def until(chars, invert=False, count=-1, raw=False):
 """get chars of stdin until any of chars is read,
 or until count chars have been read, whichever comes first"""
 return _do_condition(
 chars,
 CHAR.CONDS[invert],
 count,
 raw=raw
 )
@checkinit
def until_not(chars, count=-1, raw=False):
 """read stdin until any of chars stop being read,
 or until count chars have been read; whichever comes first"""
 return until(
 chars,
 invert=True,
 count=count,
 raw=raw
 )
@checkinit
def ignore(
 ignore_these,
 end_on,
 end_cond=True,
 count=-1,
 raw=False,
 invert=False
 ):
 """ignore_these keypresses, and stop reading at end_on or count,
 whichever comes first"""
 return _do_condition(
 end_on,
 CHAR.CONDS[not end_cond],
 count,
 ignore_chars=ignore_these,
 ignore_condition=CHAR.CONDS[invert],
 raw=raw
 )
@checkinit
def ignore_not(
 ignore_these,
 end_on,
 end_cond=True,
 count=-1,
 raw=False
 ):
 """ignore everything that isn't these keypresses
 and stop reading at end_on or count, whichever comes first"""
 return ignore(
 ignore_these,
 end_on,
 end_cond=end_cond,
 count=count,
 raw=raw,
 invert=True
 )

A trivial example of how this might be used:

>>> from pmlr import *
>>> init()
>>> print("enter your age:", end=" "); x = until_not("0123456789", count=2)
enter your age: 123<"a" pressed>
>>> x
123

It may seem like I'm abusing OOP and classes but those are partially for namespace-hygiene and partially because the way I did it previously was really fragile.

Additionally, self.NAME is for debugging, mostly, but I left it in for the same reason read_class is public and asking to be fiddled with: why not?

Finally, there is a "magic number" 4096. That's from this excellent answer by the creator of xterm.

You can get this on github, colorama included.

asked Feb 25, 2016 at 1:25
\$\endgroup\$

2 Answers 2

3
\$\begingroup\$

Take care of the docs

Please avoid destroying your documentation after writing it. checkinit destroys the documentation of the functions it is applied to. You should use functools.wraps.

answered Feb 25, 2016 at 14:17
\$\endgroup\$
3
  • \$\begingroup\$ @cat Please unaccept this answer as more comprehensive ones are probably coming \$\endgroup\$ Commented Feb 26, 2016 at 11:21
  • \$\begingroup\$ Whose choice is accepting an answer? For now, I'll accept this one, as it's pretty valuable to me. If new ones appear, I'll change it. \$\endgroup\$ Commented Feb 26, 2016 at 12:27
  • \$\begingroup\$ @cat Ok, of course it is your choice. Just keep in mind that waiting some time before accepting an answer is standard practice here. \$\endgroup\$ Commented Feb 26, 2016 at 12:32
1
\$\begingroup\$

I've always hated when there is a program that requires me to call some setup function before doing anything. I also dislike when the module keeps some global state and everything is constrained by that. I think a much cleaner solution is something like this.

_readers = {}
def get_reader(buffer_size=4096):
 if buffer_size not in _readers:
 if SYSTEM == 'windows'
 _readers[buffer_size] = _nt_reader(buffer_size)
 else:
 _readers[buffer_size] = _posix_reader(buffer_size)
 return _readers[buffer_size]

This gives us two things - the first, is that we don't need some global variable that the user then accesses, and also (if they so desire) gives them multiple readers of different buffer size, if that were to matter. This will require restructuring a lot of other code to not rely on a global variable. Also, it may be worthwhile to make all other global names prefixed with an _ so they aren't imported by from x import *, or make an __all__ list.

The first thing we'll have to do to get rid of that global state is move all of those functions that rely on global state into a class - we'll make a base class that encompasses them.

class _ReaderBaseClass:
 def __init__(self, buffer_size):
 self.buffer_size = buffer_size
 @abc.abstractmethod
 def getch(self):
 raise NotImplementedError
 @abc.abstractmethod
 def drain_buf(self):
 raise NotImplementedError
 @abc.abstractproperty
 def newline(self):
 return NotImplemented

From here, we'll need to add the other functions in as methods - I'll take those one by one.

readkey

I don't have a lot of specific suggestions here, except that you should return self.newline in the case of a carriage return or line feed - this way you abstract that away to the subclass.

_do_condition

ignore_condition=CHAR.CONDS[True + 1], # always false

This is a massive code smell - if it is always False, then just say False. I'm more confused, though, because True + 1 is 2, which is True... then once I actually go and look at what that returns, I realize the lambda this returns is always false. Then you should actually just do CONDS[2], or even better you should define these lambdas as named functions somewhere so they're actually understandable. They really don't belong in CHAR - I just made them static methods of the base class, but they could be just normal functions if you want.

overall

In general, avoid the hacky indexing by booleans - afaik that they have numeric values of 0 and 1 is a CPython implementation detail (I might be wrong about this), and either way is horrifically unreadable.

I fear that maybe you've tried to get to be too generic using _do_condition as well - it has gotten to the point that the other functions are fairly hard to read. I don't have a good suggestion for this, and if you find it workable then keep going, but there is a balance you have to strike, and I'm not sure you're there.

It also bothers me that a reader actually writes something - imo it should only return values, and then the caller should display them, if at all. Otherwise, you should allow another parameter of an output stream for them to enter - they might not want to print to stdout by default.

Miscellaneous

Why are you making CHAR a class (and not an Enum)? You're on Python 3, so unless you're using 3.3 or below for some reason (please don't) you have access to the enum module.

You actually have a bug - your util class's methods should all be static methods - as is, you have instance methods with strangely named parameters (i.e. you're using num instead of self).

I also feel like these might be worth making helper functions in reader instead of some util class.

Here is the complete code.

#!/usr/bin/env python3
import abc
import sys
import struct
from platform import system
SYSTEM = system().lower()
class CHAR:
 NUL = chr(0)
 INT = chr(3)
 EOF = chr(4)
 BEL = chr(7)
 BKS = chr(8)
 LFD = chr(10)
 CRR = chr(13)
 ESC = chr(27)
 SPC = chr(32)
 DEL = chr(127)
class _ReaderBaseClass:
 def __init__(self, buffer_size):
 self.buffer_size = buffer_size
 @abc.abstractproperty
 def newline_char(self):
 return NotImplemented
 @abc.abstractmethod
 def getch(self):
 raise NotImplementedError
 @abc.abstractmethod
 def drain_buf(self):
 raise NotImplementedError
 @staticmethod
 def char_in_list(ch, chars):
 return ch in chars
 @staticmethod
 def char_not_in_list(ch, chars):
 return ch not in chars
 @staticmethod
 def return_false(*args, **kwargs):
 return False
 def readkey(self, raw=False):
 """interface for getch + drain_buf
 if raw, behave like getch but with flushing for multibyte inputs"""
 ch = self.getch()
 more = self.drain_buf()
 if raw:
 return ch + more
 # cooked
 if ch == CHAR.INT: raise KeyboardInterrupt
 if ch == CHAR.EOF: raise EOFError
 if ch in (CHAR.BKS, CHAR.DEL):
 util.writer(CHAR.BKS + CHAR.SPC + CHAR.BKS)
 return CHAR.BKS
 elif ch in (CHAR.CRR, CHAR.LFD):
 util.writer(self.newline)
 return CHAR.LFD
 elif ch == CHAR.ESC:
 if more:
 if more[0] == "[":
 sp = more[1:]
 if sp in ("D", "C"):
 return "033円[" + sp
 elif sp == "3~":
 return CHAR.SPC
 else:
 return CHAR.BEL
 else:
 return CHAR.ESC + more
 return ch + more
 def raw_readkey(self):
 """alias for readkey(raw=True)"""
 return self.readkey(raw=True)
 def pretty_press(self, raw=False):
 """literally just read any fancy char from stdin let caller do whatever"""
 i = self.readkey(raw=raw)
 if not raw and i not in (CHAR.BKS, CHAR.DEL, CHAR.ESC):
 util.writer(i)
 return util.esc_filter(i, [])
 def _do_condition(
 self,
 end_chars,
 end_condition,
 count,
 ignore_chars=(),
 ignore_condition=return_false,
 raw=False
 ):
 """singular interface to reading strings from readkey, to minimise duplication"""
 y = []
 count = util.parsenum(count)
 while len(y) <= count:
 i = self.readkey(raw=raw)
 if end_condition(i, end_chars):
 break
 if not ignore_condition(i, ignore_chars):
 if not raw and i not in (CHAR.BKS, CHAR.DEL):
 util.writer(i)
 y = util.esc_filter(i, y)
 return "".join(y)
 def thismany(self, count, raw=False):
 """read exactly count chars"""
 return self._do_condition(
 "",
 _ReaderBaseClass.return_false,
 count,
 raw=raw
 )
 def until(self, chars, invert=False, count=-1, raw=False):
 """get chars of stdin until any of chars is read,
 or until count chars have been read, whichever comes first"""
 if invert:
 f = _ReaderBaseClass.char_not_in_list
 else:
 f = _ReaderBaseClass.char_in_list
 return self._do_condition(
 chars,
 f,
 count,
 raw=raw
 )
 def until_not(self, chars, count=-1, raw=False):
 """read stdin until any of chars stop being read,
 or until count chars have been read; whichever comes first"""
 return self.until(
 chars,
 invert=True,
 count=count,
 raw=raw
 )
 def ignore(
 self,
 ignore_these,
 end_on,
 end_cond=True,
 count=-1,
 raw=False,
 invert=False
 ):
 """ignore_these keypresses, and stop reading at end_on or count,
 whichever comes first"""
 if end_cond:
 end_condition = _ReaderBaseClass.char_in_list
 else:
 end_condition = _ReaderBaseClass.char_not_in_list
 if invert:
 ignore_condition = _ReaderBaseClass.char_not_in_list
 else:
 ignore_condition = _ReaderBaseClass.char_in_list
 return self._do_condition(
 end_on,
 end_condition,
 count,
 ignore_chars=ignore_these,
 ignore_condition=ignore_condition,
 raw=raw
 )
 def ignore_not(
 self,
 ignore_these,
 end_on,
 end_cond=True,
 count=-1,
 raw=False
 ):
 """ignore everything that isn't these keypresses
 and stop reading at end_on or count, whichever comes first"""
 return self.ignore(
 ignore_these,
 end_on,
 end_cond=end_cond,
 count=count,
 raw=raw,
 invert=True
 )
class _nt_reader(_ReaderBaseClass):
 @property
 def newline(self):
 return CHAR.CRR
 def __init__(self, buffer_size):
 """reader on nt"""
 super().__init__(buffer_size)
 if SYSTEM != "windows":
 util.writer("\n\there be dragons; ye COWER in the SHADOW of NT\n\n")
 self.msvcrt = __import__("msvcrt")
 self.ctypes = __import__("ctypes")
 try:
 self.colorama = __import__("colorama")
 self.colorama.init()
 except (AttributeError, ImportError):
 print(
 """
 you must install colorama to use this module on windows
 do this by:
 $ cd colorama
 $ python setup.py install
 """
 )
 exit(2)
 def getch(self):
 """use msvcrt to get a char"""
 return self.msvcrt.getch()
 def drain_buf(self):
 """while buffer, pseudo-nonblocking read bytes from buffer using msvcrt"""
 y = []
 while self.msvcrt.kbhit():
 y.append(self.msvcrt.getch())
 return "".join(y)
class _posix_reader(_ReaderBaseClass):
 @property
 def newline(self):
 return ""
 def __init__(self, buffer_size):
 """reader on posix"""
 super().__init__(buffer_size)
 if SYSTEM == "windows":
 util.writer("\n\there be dragons; ye COWER in the SHADOW of POSIX\n\n")
 self.tty = __import__("tty")
 self.termios = __import__("termios")
 self.fcntl = __import__("fcntl")
 self.O_NONBLOCK = __import__("os").O_NONBLOCK
 self.TERM_BUFSIZE = TERMCTL_SPECIAL_BUFSIZE
 def getch(self):
 """use old fashioned termios to getch"""
 if sys.stdin.isatty(): # fixes "Inappropriate ioctl for device"
 fd = sys.stdin.fileno()
 old_settings = self.termios.tcgetattr(fd)
 try:
 self.tty.setraw(sys.stdin.fileno())
 ch = sys.stdin.read(1)
 finally:
 self.termios.tcsetattr(fd, self.termios.TCSADRAIN, old_settings)
 return ch
 else:
 return sys.stdin.read(1)
 def drain_buf(self):
 """read TERM_BUFSIZE of waiting keypresses"""
 if sys.stdin.isatty():
 fd = sys.stdin.fileno()
 fl = self.fcntl.fcntl(fd, self.fcntl.F_GETFL)
 self.fcntl.fcntl(fd, self.fcntl.F_SETFL, fl | self.O_NONBLOCK)
 try:
 # if nothing is waiting on sys.stdin, then TypeError
 # because "can't concat NoneType and str"
 chars = sys.stdin.read(self.TERM_BUFSIZE)
 except TypeError:
 chars = ""
 finally:
 self.fcntl.fcntl(fd, self.fcntl.F_SETFL, fl) # restore settings
 return chars
 else:
 return sys.stdin.read(self.TERM_BUFSIZE) # ???
_readers = {}
def get_reader(buffer_size=4096):
 if buffer_size not in _readers:
 if SYSTEM == 'windows'
 _readers[buffer_size] = _nt_reader(buffer_size)
 else:
 _readers[buffer_size] = _posix_reader(buffer_size)
 return _readers[buffer_size]
class util():
 """utilities"""
 @staticmethod
 def parsenum(num):
 """sys.maxsize if num is negative"""
 num = int(num)
 return sys.maxsize if num < 0 else num
 @staticmethod
 def writer(*args):
 """write a string to stdout and flush.
 should be used by all stdout-writing"""
 if not args:
 raise TypeError("writer requires at least one argument")
 if len(args) > 1:
 args = " ".join(str(i) for i in args).strip(" ")
 else:
 args = "".join(str(i) for i in args)
 sys.stdout.write(args)
 sys.stdout.flush()
 @staticmethod
 def esc_filter(x, y):
 """append x to y as long as x is not DEL or backspace or esc"""
 if x in (CHAR.DEL, CHAR.BKS):
 try:
 y.pop()
 except IndexError:
 pass
 return y
 y.append(x)
 return y
answered Feb 25, 2016 at 22:12
\$\endgroup\$
6
  • \$\begingroup\$ Thank you for the extensive review, despite that I really don't like your implementation and some suggestions. Also, please note the act of returning LFD vs CRR was informed by the writer of xterm. This is not about printing, it's about what that Enter key actually does. \$\endgroup\$ Commented Feb 25, 2016 at 22:19
  • \$\begingroup\$ @cat, sorry, what do you mean about LFD vs CRR? I don't believe that I misrepresented that point here. Do you mind expanding on what exactly you dislike about my implementation? \$\endgroup\$ Commented Feb 25, 2016 at 22:24
  • \$\begingroup\$ @Dannno It seems like you're saying self.newline should be platform-dependent: "...you should return self.newline in the case of a carriage return or line feed - this way you abstract that away to the subclass." Is that a correct interpretation? \$\endgroup\$ Commented Feb 25, 2016 at 22:26
  • 1
    \$\begingroup\$ @cat I see, my implementation differs from yours. I can fix that. I should have written that you always return a newline, but what you write is platform independent. \$\endgroup\$ Commented Feb 25, 2016 at 22:31
  • \$\begingroup\$ In raw mode, I would expect 13, in cooked mode, 10. The question was about raw mode, of course. - Thomas Dickey That goes for both Windows and Posix \$\endgroup\$ Commented Feb 25, 2016 at 22:34

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.