This is the rewrite of Read stdin like a dictator.
From that post:
All too often I find myself wanting to allow only a certain list of characters to be written to stdin, and only recently did I actually bother to implement it. In Python, of all languages!
Essentially, this module provides a few APIs that allow a very tailored approach to reading characters from a terminal. By intercepting individual keypresses at the instant they occur, we can make cool decisions about closing the input stream whenever we want -- we dictate who says what in our terminal!
The standard input stream, after being opened, can be closed after a number of characters, or, at caller's option, any combination of a number of characters and allowed inputs.
... Yep, all still true, just way better, and actually cross-platform now.
It might seem like there's a lot of dead code / overkill stuff here; that's because I'm about to shift its focus from "input-constrainer-thing" to "poor-man's libreadline/curses" and so there's some provisioning in effect, but nevermind that.
I have some unittests but I won't supply them because writing them for an IO module is a lot more work than they're worth at this point.
#!/usr/bin/env python3
import sys
import struct
from platform import system
SYSTEM = system().lower()
class CHAR:
"""essentially an enum, to avoid clouding the ns"""
NUL = chr(0)
INT = chr(3)
EOF = chr(4)
BEL = chr(7)
BKS = chr(8)
LFD = chr(10)
CRR = chr(13)
ESC = chr(27)
SPC = chr(32)
DEL = chr(127)
CONDS = [
(lambda i, chars: i in chars),
(lambda i, chars: i not in chars),
(lambda *args, **kwargs: False),
]
def init(TERM_BUFSIZE=4096):
"""module initialiser: calls constructors so you don't have to
you must call this before other functions!"""
global reader
reader = read_class(TERMCTL_SPECIAL_BUFSIZE=TERM_BUFSIZE)
def checkinit(func, *args, **kwargs):
def isdefined(*args, **kwargs):
if "reader" not in globals().keys():
print("\n\tfatal: init() not called\n")
msg = "must call init() first, or call init() again before {}()".format(func.__name__)
raise TypeError(msg)
return func(*args, **kwargs)
return isdefined
class _nt_reader():
def __init__(self, *args, **kwargs):
"""reader on nt"""
self.NAME = "NT"
if SYSTEM != "windows":
util.writer("\n\there be dragons; ye COWER in the SHADOW of", self.NAME, "\n\n")
self.msvcrt = __import__("msvcrt")
self.ctypes = __import__("ctypes")
try:
self.colorama = __import__("colorama")
self.colorama.init()
except (AttributeError, ImportError):
print(
"""
you must install colorama to use this module on windows
do this by:
$ cd colorama
$ python setup.py install
"""
)
exit(2)
def getch(self):
"""use msvcrt to get a char"""
return self.msvcrt.getch()
def drain_buf(self):
"""while buffer, pseudo-nonblocking read bytes from buffer using msvcrt"""
y = []
while self.msvcrt.kbhit():
y.append(self.msvcrt.getch())
return "".join(y)
class _posix_reader():
def __init__(self, TERMCTL_SPECIAL_BUFSIZE=4096):
"""reader on posix"""
self.NAME = "POSIX"
if SYSTEM == "windows":
util.writer("\n\there be dragons; ye COWER in the SHADOW of", self.NAME, "\n\n")
self.tty = __import__("tty")
self.termios = __import__("termios")
self.fcntl = __import__("fcntl")
self.O_NONBLOCK = __import__("os").O_NONBLOCK
self.TERM_BUFSIZE = TERMCTL_SPECIAL_BUFSIZE
def getch(self):
"""use old fashioned termios to getch"""
if sys.stdin.isatty(): # fixes "Inappropriate ioctl for device"
fd = sys.stdin.fileno()
old_settings = self.termios.tcgetattr(fd)
try:
self.tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
finally:
self.termios.tcsetattr(fd, self.termios.TCSADRAIN, old_settings)
return ch
else:
return sys.stdin.read(1)
def drain_buf(self):
"""read TERM_BUFSIZE of waiting keypresses"""
if sys.stdin.isatty():
fd = sys.stdin.fileno()
fl = self.fcntl.fcntl(fd, self.fcntl.F_GETFL)
self.fcntl.fcntl(fd, self.fcntl.F_SETFL, fl | self.O_NONBLOCK)
try:
# if nothing is waiting on sys.stdin, then TypeError
# because "can't concat NoneType and str"
chars = sys.stdin.read(self.TERM_BUFSIZE)
except TypeError:
chars = ""
finally:
self.fcntl.fcntl(fd, self.fcntl.F_SETFL, fl) # restore settings
return chars
else:
return sys.stdin.read(self.TERM_BUFSIZE) # ???
read_class = {
"windows": _nt_reader,
}.get(
SYSTEM,
_posix_reader # default
)
class util():
"""utilities"""
def parsenum(num):
"""sys.maxsize if num is negative"""
num = int(num)
return sys.maxsize if num < 0 else num
def writer(*args):
"""write a string to stdout and flush.
should be used by all stdout-writing"""
if not args:
raise TypeError("writer requires at least one argument")
if len(args) > 1:
args = " ".join(str(i) for i in args).strip(" ")
else:
args = "".join(str(i) for i in args)
sys.stdout.write(args)
sys.stdout.flush()
def esc_filter(x, y):
"""append x to y as long as x is not DEL or backspace or esc"""
if x in (CHAR.DEL, CHAR.BKS):
try:
y.pop()
except IndexError:
pass
return y
y.append(x)
return y
@checkinit
def readkey(raw=False):
"""interface for getch + drain_buf
if raw, behave like getch but with flushing for multibyte inputs"""
ch = reader.getch()
more = reader.drain_buf()
if raw:
return ch + more
# cooked
if ch == CHAR.INT: raise KeyboardInterrupt
if ch == CHAR.EOF: raise EOFError
if ch in (CHAR.BKS, CHAR.DEL):
util.writer(CHAR.BKS + CHAR.SPC + CHAR.BKS)
return CHAR.BKS
elif ch in (CHAR.CRR, CHAR.LFD):
util.writer(CHAR.CRR if SYSTEM == "Windows" else "")
return CHAR.LFD
elif ch == CHAR.ESC:
if more:
if more[0] == "[":
sp = more[1:]
if sp in ("D", "C"):
return "033円[" + sp
elif sp == "3~":
return CHAR.SPC
else:
return CHAR.BEL
else:
return CHAR.ESC + more
ch += more
return ch
@checkinit
def raw_readkey():
"""alias for readkey(raw=True)"""
return readkey(raw=True)
@checkinit
def pretty_press(raw=False):
"""literally just read any fancy char from stdin let caller do whatever"""
y = []
i = readkey(raw=raw)
if (not raw) and (i not in (CHAR.BKS, CHAR.DEL, CHAR.ESC)):
util.writer(i)
return util.esc_filter(i, y)
@checkinit
def _do_condition(
end_chars,
end_condition,
count,
ignore_chars=(),
ignore_condition=CHAR.CONDS[True + 1], # always false
raw=False
):
"""singular interface to reading strings from readkey, to minimise duplication"""
y = []
count = util.parsenum(count)
while len(y) <= count:
i = readkey(raw=raw)
if end_condition(i, end_chars):
break
if not ignore_condition(i, ignore_chars):
if (not raw) and (i not in (CHAR.BKS, CHAR.DEL)):
util.writer(i)
y = util.esc_filter(i, y)
return "".join(y)
@checkinit
def thismany(count, raw=False):
"""read exactly count chars"""
return _do_condition(
"",
CHAR.CONDS[True + 1], # more than true == never expires :D
count,
raw=raw
)
@checkinit
def until(chars, invert=False, count=-1, raw=False):
"""get chars of stdin until any of chars is read,
or until count chars have been read, whichever comes first"""
return _do_condition(
chars,
CHAR.CONDS[invert],
count,
raw=raw
)
@checkinit
def until_not(chars, count=-1, raw=False):
"""read stdin until any of chars stop being read,
or until count chars have been read; whichever comes first"""
return until(
chars,
invert=True,
count=count,
raw=raw
)
@checkinit
def ignore(
ignore_these,
end_on,
end_cond=True,
count=-1,
raw=False,
invert=False
):
"""ignore_these keypresses, and stop reading at end_on or count,
whichever comes first"""
return _do_condition(
end_on,
CHAR.CONDS[not end_cond],
count,
ignore_chars=ignore_these,
ignore_condition=CHAR.CONDS[invert],
raw=raw
)
@checkinit
def ignore_not(
ignore_these,
end_on,
end_cond=True,
count=-1,
raw=False
):
"""ignore everything that isn't these keypresses
and stop reading at end_on or count, whichever comes first"""
return ignore(
ignore_these,
end_on,
end_cond=end_cond,
count=count,
raw=raw,
invert=True
)
A trivial example of how this might be used:
>>> from pmlr import *
>>> init()
>>> print("enter your age:", end=" "); x = until_not("0123456789", count=2)
enter your age: 123<"a" pressed>
>>> x
123
It may seem like I'm abusing OOP and classes but those are partially for namespace-hygiene and partially because the way I did it previously was really fragile.
Additionally, self.NAME
is for debugging, mostly, but I left it in for the same reason read_class
is public and asking to be fiddled with: why not?
Finally, there is a "magic number" 4096. That's from this excellent answer by the creator of xterm
.
You can get this on github, colorama included.
2 Answers 2
Take care of the docs
Please avoid destroying your documentation after writing it. checkinit
destroys the documentation of the functions it is applied to. You should use functools.wraps
.
-
\$\begingroup\$ @cat Please unaccept this answer as more comprehensive ones are probably coming \$\endgroup\$Caridorc– Caridorc2016年02月26日 11:21:59 +00:00Commented Feb 26, 2016 at 11:21
-
\$\begingroup\$ Whose choice is accepting an answer? For now, I'll accept this one, as it's pretty valuable to me. If new ones appear, I'll change it. \$\endgroup\$cat– cat2016年02月26日 12:27:07 +00:00Commented Feb 26, 2016 at 12:27
-
\$\begingroup\$ @cat Ok, of course it is your choice. Just keep in mind that waiting some time before accepting an answer is standard practice here. \$\endgroup\$Caridorc– Caridorc2016年02月26日 12:32:21 +00:00Commented Feb 26, 2016 at 12:32
I've always hated when there is a program that requires me to call some setup function before doing anything. I also dislike when the module keeps some global state and everything is constrained by that. I think a much cleaner solution is something like this.
_readers = {}
def get_reader(buffer_size=4096):
if buffer_size not in _readers:
if SYSTEM == 'windows'
_readers[buffer_size] = _nt_reader(buffer_size)
else:
_readers[buffer_size] = _posix_reader(buffer_size)
return _readers[buffer_size]
This gives us two things - the first, is that we don't need some global variable that the user then accesses, and also (if they so desire) gives them multiple readers of different buffer size, if that were to matter. This will require restructuring a lot of other code to not rely on a global variable. Also, it may be worthwhile to make all other global names prefixed with an _
so they aren't imported by from x import *
, or make an __all__
list.
The first thing we'll have to do to get rid of that global state is move all of those functions that rely on global state into a class - we'll make a base class that encompasses them.
class _ReaderBaseClass:
def __init__(self, buffer_size):
self.buffer_size = buffer_size
@abc.abstractmethod
def getch(self):
raise NotImplementedError
@abc.abstractmethod
def drain_buf(self):
raise NotImplementedError
@abc.abstractproperty
def newline(self):
return NotImplemented
From here, we'll need to add the other functions in as methods - I'll take those one by one.
readkey
I don't have a lot of specific suggestions here, except that you should return self.newline
in the case of a carriage return or line feed - this way you abstract that away to the subclass.
_do_condition
ignore_condition=CHAR.CONDS[True + 1], # always false
This is a massive code smell - if it is always False
, then just say False
. I'm more confused, though, because True + 1
is 2
, which is True
... then once I actually go and look at what that returns, I realize the lambda this returns is always false. Then you should actually just do CONDS[2]
, or even better you should define these lambdas as named functions somewhere so they're actually understandable. They really don't belong in CHAR
- I just made them static methods of the base class, but they could be just normal functions if you want.
overall
In general, avoid the hacky indexing by booleans - afaik that they have numeric values of 0 and 1 is a CPython implementation detail (I might be wrong about this), and either way is horrifically unreadable.
I fear that maybe you've tried to get to be too generic using _do_condition
as well - it has gotten to the point that the other functions are fairly hard to read. I don't have a good suggestion for this, and if you find it workable then keep going, but there is a balance you have to strike, and I'm not sure you're there.
It also bothers me that a reader
actually writes something - imo it should only return values, and then the caller should display them, if at all. Otherwise, you should allow another parameter of an output stream for them to enter - they might not want to print to stdout by default.
Miscellaneous
Why are you making CHAR
a class (and not an Enum
)? You're on Python 3, so unless you're using 3.3 or below for some reason (please don't) you have access to the enum module.
You actually have a bug - your util
class's methods should all be static methods - as is, you have instance methods with strangely named parameters (i.e. you're using num
instead of self
).
I also feel like these might be worth making helper functions in reader
instead of some util
class.
Here is the complete code.
#!/usr/bin/env python3
import abc
import sys
import struct
from platform import system
SYSTEM = system().lower()
class CHAR:
NUL = chr(0)
INT = chr(3)
EOF = chr(4)
BEL = chr(7)
BKS = chr(8)
LFD = chr(10)
CRR = chr(13)
ESC = chr(27)
SPC = chr(32)
DEL = chr(127)
class _ReaderBaseClass:
def __init__(self, buffer_size):
self.buffer_size = buffer_size
@abc.abstractproperty
def newline_char(self):
return NotImplemented
@abc.abstractmethod
def getch(self):
raise NotImplementedError
@abc.abstractmethod
def drain_buf(self):
raise NotImplementedError
@staticmethod
def char_in_list(ch, chars):
return ch in chars
@staticmethod
def char_not_in_list(ch, chars):
return ch not in chars
@staticmethod
def return_false(*args, **kwargs):
return False
def readkey(self, raw=False):
"""interface for getch + drain_buf
if raw, behave like getch but with flushing for multibyte inputs"""
ch = self.getch()
more = self.drain_buf()
if raw:
return ch + more
# cooked
if ch == CHAR.INT: raise KeyboardInterrupt
if ch == CHAR.EOF: raise EOFError
if ch in (CHAR.BKS, CHAR.DEL):
util.writer(CHAR.BKS + CHAR.SPC + CHAR.BKS)
return CHAR.BKS
elif ch in (CHAR.CRR, CHAR.LFD):
util.writer(self.newline)
return CHAR.LFD
elif ch == CHAR.ESC:
if more:
if more[0] == "[":
sp = more[1:]
if sp in ("D", "C"):
return "033円[" + sp
elif sp == "3~":
return CHAR.SPC
else:
return CHAR.BEL
else:
return CHAR.ESC + more
return ch + more
def raw_readkey(self):
"""alias for readkey(raw=True)"""
return self.readkey(raw=True)
def pretty_press(self, raw=False):
"""literally just read any fancy char from stdin let caller do whatever"""
i = self.readkey(raw=raw)
if not raw and i not in (CHAR.BKS, CHAR.DEL, CHAR.ESC):
util.writer(i)
return util.esc_filter(i, [])
def _do_condition(
self,
end_chars,
end_condition,
count,
ignore_chars=(),
ignore_condition=return_false,
raw=False
):
"""singular interface to reading strings from readkey, to minimise duplication"""
y = []
count = util.parsenum(count)
while len(y) <= count:
i = self.readkey(raw=raw)
if end_condition(i, end_chars):
break
if not ignore_condition(i, ignore_chars):
if not raw and i not in (CHAR.BKS, CHAR.DEL):
util.writer(i)
y = util.esc_filter(i, y)
return "".join(y)
def thismany(self, count, raw=False):
"""read exactly count chars"""
return self._do_condition(
"",
_ReaderBaseClass.return_false,
count,
raw=raw
)
def until(self, chars, invert=False, count=-1, raw=False):
"""get chars of stdin until any of chars is read,
or until count chars have been read, whichever comes first"""
if invert:
f = _ReaderBaseClass.char_not_in_list
else:
f = _ReaderBaseClass.char_in_list
return self._do_condition(
chars,
f,
count,
raw=raw
)
def until_not(self, chars, count=-1, raw=False):
"""read stdin until any of chars stop being read,
or until count chars have been read; whichever comes first"""
return self.until(
chars,
invert=True,
count=count,
raw=raw
)
def ignore(
self,
ignore_these,
end_on,
end_cond=True,
count=-1,
raw=False,
invert=False
):
"""ignore_these keypresses, and stop reading at end_on or count,
whichever comes first"""
if end_cond:
end_condition = _ReaderBaseClass.char_in_list
else:
end_condition = _ReaderBaseClass.char_not_in_list
if invert:
ignore_condition = _ReaderBaseClass.char_not_in_list
else:
ignore_condition = _ReaderBaseClass.char_in_list
return self._do_condition(
end_on,
end_condition,
count,
ignore_chars=ignore_these,
ignore_condition=ignore_condition,
raw=raw
)
def ignore_not(
self,
ignore_these,
end_on,
end_cond=True,
count=-1,
raw=False
):
"""ignore everything that isn't these keypresses
and stop reading at end_on or count, whichever comes first"""
return self.ignore(
ignore_these,
end_on,
end_cond=end_cond,
count=count,
raw=raw,
invert=True
)
class _nt_reader(_ReaderBaseClass):
@property
def newline(self):
return CHAR.CRR
def __init__(self, buffer_size):
"""reader on nt"""
super().__init__(buffer_size)
if SYSTEM != "windows":
util.writer("\n\there be dragons; ye COWER in the SHADOW of NT\n\n")
self.msvcrt = __import__("msvcrt")
self.ctypes = __import__("ctypes")
try:
self.colorama = __import__("colorama")
self.colorama.init()
except (AttributeError, ImportError):
print(
"""
you must install colorama to use this module on windows
do this by:
$ cd colorama
$ python setup.py install
"""
)
exit(2)
def getch(self):
"""use msvcrt to get a char"""
return self.msvcrt.getch()
def drain_buf(self):
"""while buffer, pseudo-nonblocking read bytes from buffer using msvcrt"""
y = []
while self.msvcrt.kbhit():
y.append(self.msvcrt.getch())
return "".join(y)
class _posix_reader(_ReaderBaseClass):
@property
def newline(self):
return ""
def __init__(self, buffer_size):
"""reader on posix"""
super().__init__(buffer_size)
if SYSTEM == "windows":
util.writer("\n\there be dragons; ye COWER in the SHADOW of POSIX\n\n")
self.tty = __import__("tty")
self.termios = __import__("termios")
self.fcntl = __import__("fcntl")
self.O_NONBLOCK = __import__("os").O_NONBLOCK
self.TERM_BUFSIZE = TERMCTL_SPECIAL_BUFSIZE
def getch(self):
"""use old fashioned termios to getch"""
if sys.stdin.isatty(): # fixes "Inappropriate ioctl for device"
fd = sys.stdin.fileno()
old_settings = self.termios.tcgetattr(fd)
try:
self.tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
finally:
self.termios.tcsetattr(fd, self.termios.TCSADRAIN, old_settings)
return ch
else:
return sys.stdin.read(1)
def drain_buf(self):
"""read TERM_BUFSIZE of waiting keypresses"""
if sys.stdin.isatty():
fd = sys.stdin.fileno()
fl = self.fcntl.fcntl(fd, self.fcntl.F_GETFL)
self.fcntl.fcntl(fd, self.fcntl.F_SETFL, fl | self.O_NONBLOCK)
try:
# if nothing is waiting on sys.stdin, then TypeError
# because "can't concat NoneType and str"
chars = sys.stdin.read(self.TERM_BUFSIZE)
except TypeError:
chars = ""
finally:
self.fcntl.fcntl(fd, self.fcntl.F_SETFL, fl) # restore settings
return chars
else:
return sys.stdin.read(self.TERM_BUFSIZE) # ???
_readers = {}
def get_reader(buffer_size=4096):
if buffer_size not in _readers:
if SYSTEM == 'windows'
_readers[buffer_size] = _nt_reader(buffer_size)
else:
_readers[buffer_size] = _posix_reader(buffer_size)
return _readers[buffer_size]
class util():
"""utilities"""
@staticmethod
def parsenum(num):
"""sys.maxsize if num is negative"""
num = int(num)
return sys.maxsize if num < 0 else num
@staticmethod
def writer(*args):
"""write a string to stdout and flush.
should be used by all stdout-writing"""
if not args:
raise TypeError("writer requires at least one argument")
if len(args) > 1:
args = " ".join(str(i) for i in args).strip(" ")
else:
args = "".join(str(i) for i in args)
sys.stdout.write(args)
sys.stdout.flush()
@staticmethod
def esc_filter(x, y):
"""append x to y as long as x is not DEL or backspace or esc"""
if x in (CHAR.DEL, CHAR.BKS):
try:
y.pop()
except IndexError:
pass
return y
y.append(x)
return y
-
\$\begingroup\$ Thank you for the extensive review, despite that I really don't like your implementation and some suggestions. Also, please note the act of returning LFD vs CRR was informed by the writer of xterm. This is not about printing, it's about what that Enter key actually does. \$\endgroup\$cat– cat2016年02月25日 22:19:54 +00:00Commented Feb 25, 2016 at 22:19
-
\$\begingroup\$ @cat, sorry, what do you mean about LFD vs CRR? I don't believe that I misrepresented that point here. Do you mind expanding on what exactly you dislike about my implementation? \$\endgroup\$Dan Oberlam– Dan Oberlam2016年02月25日 22:24:19 +00:00Commented Feb 25, 2016 at 22:24
-
\$\begingroup\$ @Dannno It seems like you're saying
self.newline
should be platform-dependent: "...you should return self.newline in the case of a carriage return or line feed - this way you abstract that away to the subclass." Is that a correct interpretation? \$\endgroup\$cat– cat2016年02月25日 22:26:10 +00:00Commented Feb 25, 2016 at 22:26 -
1\$\begingroup\$ @cat I see, my implementation differs from yours. I can fix that. I should have written that you always return a newline, but what you write is platform independent. \$\endgroup\$Dan Oberlam– Dan Oberlam2016年02月25日 22:31:58 +00:00Commented Feb 25, 2016 at 22:31
-
\$\begingroup\$ In raw mode, I would expect 13, in cooked mode, 10. The question was about raw mode, of course. - Thomas Dickey That goes for both Windows and Posix \$\endgroup\$cat– cat2016年02月25日 22:34:04 +00:00Commented Feb 25, 2016 at 22:34
Explore related questions
See similar questions with these tags.