6
\$\begingroup\$

I wrote a library implementing the RCON protocol to automate interaction with Minecraft servers.
I know that there are already a couple of implementations out there, but neither I found was convincing me, so I re-invented the wheel.

The library will also be featuring an implementation of the Query protocol, which is not implemented yet. This is the reason why the RCON stuff is in a rcon subpackage.
However, this review shall only focus on the fully implemented RCON part.

mcipc.config.py

"""Server configuration parser."""
from configparser import ConfigParser
from pathlib import Path
__all__ = ['SERVERS_INI', 'FORTUNE', 'servers']
SERVERS_INI = Path('/etc/mcipc.d/servers.conf')
_SERVERS = ConfigParser()
FORTUNE = Path('/usr/bin/fortune')
def servers():
 """Yields the respective servers."""
 _SERVERS.read(str(SERVERS_INI))
 return {
 section: (_SERVERS[section]['host'], int(_SERVERS[section]['port']),
 _SERVERS[section].get('passwd'))
 for section in _SERVERS.sections()}

mcipc.rcon.__init__.py

"""RCON client library."""
from mcipc.rcon.client import Client
from mcipc.rcon.console import rconcmd
from mcipc.rcon.proto import RequestIdMismatch, PacketType, Packet, RawClient
__all__ = [
 'RequestIdMismatch',
 'rconcmd',
 'PacketType',
 'Packet',
 'RawClient',
 'Client']

mcipc.rcon.client.py

"""High level client API."""
from collections import namedtuple
from datetime import datetime
from locale import LC_TIME, getdefaultlocale, setlocale
from logging import getLogger
from subprocess import PIPE, CalledProcessError, check_output
from mcipc.config import FORTUNE
from mcipc.rcon.proto import RequestIdMismatch, RawClient
LOGGER = getLogger(__file__)
_PLAYER_OR_COORDS = TypeError('Must specify either dst_player or coords.')
def _fix_text(text):
 """Fixes text for ascii compliance."""
 return text.replace('\t', ' ')
class OnlinePlayers(namedtuple('OnlinePlayers', ('online', 'max', 'players'))):
 """Online players information."""
 @classmethod
 def from_string(cls, string):
 """Creates the players information from the server response string."""
 header, players = string.split(':', maxsplit=1)
 players = [player for player in players.split(', ') if player]
 _, _, amount, _, _ = header.split()
 online, max_ = amount.split('/')
 return cls(int(online), int(max_), players)
class Client(RawClient):
 """A high-level RCON client."""
 @property
 def players(self):
 """Returns the players."""
 return OnlinePlayers.from_string(self.run('list'))
 def login(self, passwd):
 """Performs a login, returning False on failure."""
 try:
 return super().login(passwd)
 except RequestIdMismatch:
 return False
 def say(self, message):
 """Broadcast a message to all players."""
 LOGGER.debug('Sending text: "%s".', message)
 return self.run('say', _fix_text(message))
 def tell(self, player, message):
 """Whispers a message to the respective player."""
 return self.run('tell', player, _fix_text(message))
 def mkop(self, player):
 """Makes the respective player an operator."""
 return self.run('op', player)
 def deop(self, player):
 """Revokes operator status from the respective player."""
 return self.run('deop', player)
 def kick(self, player, *reasons):
 """Kicks the respective player."""
 return self.run('kick', player, *reasons)
 def teleport(self, player, dst_player=None, coords=None, yaw_pitch=None):
 """Teleports players."""
 args = [str(player)]
 if dst_player is not None and coords is not None:
 raise _PLAYER_OR_COORDS
 elif dst_player is not None:
 args.append(str(dst_player))
 elif coords is not None:
 coord_x, coord_y, coord_z = coords
 args += [str(coord_x), str(coord_y), str(coord_z)]
 else:
 raise _PLAYER_OR_COORDS
 if yaw_pitch is not None:
 yaw, pitch = yaw_pitch
 args += [str(yaw), str(pitch)]
 return self.run('tp', *args)
 def fortune(self, short=True, offensive=False):
 """Sends a fortune to all players."""
 args = []
 if short:
 args.append('-s')
 if offensive:
 args.append('-o')
 try:
 text = check_output([FORTUNE] + args, stderr=PIPE)
 except FileNotFoundError:
 LOGGER.error('%s is not available.', FORTUNE)
 except CalledProcessError as called_process_error:
 LOGGER.error('Error running %s.', FORTUNE)
 LOGGER.debug(called_process_error.stderr.decode())
 else:
 text = text.decode()
 LOGGER.debug('Fortune text:\n%s', text)
 return self.say(text)
 return False
 def datetime(self, frmt='%c'):
 """Tells all players the current datetime."""
 setlocale(LC_TIME, getdefaultlocale()) # Fix loacale.
 text = datetime.now().strftime(frmt)
 return self.say(text)

mcipc.rcon.console.py

"""An interactive console."""
from getpass import getpass
from mcipc.rcon.proto import RequestIdMismatch
from mcipc.rcon.client import Client
__all__ = ['rconcmd']
PS1 = 'RCON> '
EXIT_COMMANDS = ('exit', 'quit')
def _read(prompt, type_=None):
 """Reads input and converts it to the respective type."""
 while True:
 try:
 raw = input(prompt)
 except EOFError:
 continue
 if type_ is not None:
 try:
 return type_(raw)
 except (TypeError, ValueError):
 print(f'Invalid {type_}: {raw}.')
 continue
 return raw
def _login(client, passwd):
 """Performs a login."""
 if passwd is None:
 passwd = getpass('Password: ')
 while not client.login(passwd):
 print('Invalid password.')
 passwd = getpass('Password: ')
 return passwd
def rconcmd(host=None, port=None, passwd=None, *, prompt=PS1):
 """Initializes the console."""
 if host is None:
 try:
 host = _read('Host: ')
 except KeyboardInterrupt:
 print('\nAborted...')
 return 1
 if port is None:
 try:
 port = _read('Port: ', type_=int)
 except KeyboardInterrupt:
 print('\nAborted...')
 return 1
 with Client(host, port) as client:
 try:
 passwd = _login(client, passwd)
 except (EOFError, KeyboardInterrupt):
 print('\nAborted...')
 return 1
 while True:
 try:
 command = input(prompt)
 except EOFError:
 print('\nAborted.')
 break
 except KeyboardInterrupt:
 print()
 continue
 command, *args = command.split()
 if command in EXIT_COMMANDS:
 break
 try:
 result = client.run(command, *args)
 except RequestIdMismatch:
 print('Session timed out. Please login again.')
 try:
 passwd = _login(client, passwd)
 except (EOFError, KeyboardInterrupt):
 print()
 continue
 print(result)
 return 0

mcipc.rcon.credentials.py

"""RCON server credentials."""
from collections import namedtuple
from mcipc.config import servers
__all__ = ['InvalidCredentialsError', 'Credentials']
class InvalidCredentialsError(ValueError):
 """Indicates invalid credentials."""
 pass
class Credentials(namedtuple('Credentials', ('host', 'port', 'passwd'))):
 """Represents server credentials."""
 @classmethod
 def from_string(cls, string):
 """Reads the credentials from the given string."""
 try:
 host, port = string.split(':')
 except ValueError:
 try:
 return servers()[string]
 except KeyError:
 raise InvalidCredentialsError(f'No such server: {string}.')
 try:
 port = int(port)
 except ValueError:
 InvalidCredentialsError(f'Not an integer: {port}.')
 try:
 passwd, host = host.rsplit('@', maxsplit=1)
 except ValueError:
 passwd = None
 return cls(host, port, passwd)

mcipc.rcon.proto.py

"""Low-level protocol stuff."""
from collections import namedtuple
from enum import Enum
from itertools import chain
from logging import getLogger
from random import randint
from socket import socket
from struct import pack, unpack
__all__ = [
 'RequestIdMismatch',
 'PacketType',
 'Packet',
 'RawClient']
LOGGER = getLogger(__file__)
TAIL = b'0円0円'
class InvalidPacketStructureError(Exception):
 """Indicates an invalid packet structure."""
 pass
class RequestIdMismatch(Exception):
 """Indicates that the sent and received request IDs do not match."""
 def __init__(self, sent_request_id, received_request_id):
 """Sets the sent and received request IDs."""
 super().__init__(sent_request_id, received_request_id)
 self.sent_request_id = sent_request_id
 self.received_request_id = received_request_id
def _rand_int32():
 """Returns a random int32."""
 return randint(0, 2_147_483_647 + 1)
class PacketType(Enum):
 """Available packet types."""
 LOGIN = 3
 COMMAND = 2
 COMMAND_RESPONSE = 0
 def __int__(self):
 return self.value
class Packet(namedtuple('Packet', ('request_id', 'type', 'payload'))):
 """An RCON packet."""
 def __bytes__(self):
 """Returns the packet as bytes."""
 payload = pack('<i', self.request_id)
 payload += pack('<i', int(self.type))
 payload += self.payload.encode()
 payload += TAIL
 return pack('<i', len(payload)) + payload
 @classmethod
 def from_bytes(cls, bytes_):
 """Creates a packet from the respective bytes."""
 request_id, type_ = unpack('<ii', bytes_[:8])
 payload = bytes_[8:-2]
 tail = bytes_[-2:]
 if tail != TAIL:
 raise InvalidPacketStructureError('Invalid tail.', tail)
 return cls(request_id, type_, payload.decode())
 @classmethod
 def from_command(cls, command):
 """Creates a command packet."""
 return cls(_rand_int32(), PacketType.COMMAND, command)
 @classmethod
 def from_login(cls, passwd):
 """Creates a login packet."""
 return cls(_rand_int32(), PacketType.LOGIN, passwd)
class RawClient(socket):
 """An RCON client."""
 def __init__(self, host, port):
 """Sets host an port."""
 super().__init__()
 self.host = host
 self.port = port
 def __enter__(self):
 """Sets up and conntects the socket."""
 super().__enter__()
 sock = self.socket
 LOGGER.debug('Connecting to socket %s.', sock)
 self.connect(sock)
 return self
 def __exit__(self, *args):
 """Disconnects the socket."""
 LOGGER.debug('Disconnecting from socket %s.', self.getsockname())
 return super().__exit__(*args)
 @property
 def socket(self):
 """Returns the respective socket."""
 return (self.host, self.port)
 def sendpacket(self, packet):
 """Sends an Packet."""
 bytes_ = bytes(packet)
 LOGGER.debug('Sending %i bytes.', len(bytes_))
 return self.send(bytes_)
 def recvpacket(self):
 """Receives a packet."""
 length, = unpack('<i', self.recv(4))
 payload = self.recv(length)
 return Packet.from_bytes(payload)
 def login(self, passwd):
 """Performs a login."""
 login_packet = Packet.from_login(passwd)
 self.sendpacket(login_packet)
 response = self.recvpacket()
 if response.request_id == login_packet.request_id:
 return True
 raise RequestIdMismatch(
 login_packet.request_id, response.request_id)
 def run(self, command, *arguments):
 """Runs a command."""
 command = ' '.join(chain((command,), arguments))
 command_packet = Packet.from_command(command)
 self.sendpacket(command_packet)
 response = self.recvpacket()
 if response.request_id == command_packet.request_id:
 return response.payload
 raise RequestIdMismatch(
 command_packet.request_id, response.request_id)

The library also features two scripts. One for an interactive console and one for a oneshot client:

/usr/bin/rconclt

#! /usr/bin/env python3
"""rconclt.
A Minecraft RCON client.
Usage:
 rconclt <server> datetime [--format=<format>] [options]
 rconclt <server> fortune [--long] [--offensive] [options]
 rconclt <server> <command> [<args>...] [options]
Options:
 --passwd=<passwd> Specifies the respective RCON password.
 --format=<format> Specifies the datetime format [default: %c].
 --long Also generate long fortunes.
 --offensive Only genenrate offensive fortunes.
 --debug Enters debug mode.
 --help, -h Shows this page.
"""
from logging import DEBUG, INFO, basicConfig, getLogger
from sys import exit as exit_
from docopt import docopt
from mcipc.rcon.client import Client
from mcipc.rcon.credentials import InvalidCredentialsError, Credentials
LOGGER = getLogger(__file__)
_LOG_FORMAT = '[%(levelname)s] %(name)s: %(message)s'
def main(options):
 """Runs the RCON client."""
 log_level = DEBUG if options['--debug'] else INFO
 basicConfig(level=log_level, format=_LOG_FORMAT)
 try:
 host, port, passwd = Credentials.from_string(options['<server>'])
 except InvalidCredentialsError as error:
 LOGGER.error(error)
 exit_(2)
 if passwd is None:
 passwd = options['--passwd'] or ''
 with Client(host, port) as client:
 if client.login(passwd):
 if options['datetime']:
 result = client.datetime(frmt=options['--format'])
 elif options['fortune']:
 result = client.fortune(
 short=not options['--long'],
 offensive=options['--offensive'])
 else:
 result = client.run(options['<command>'], *options['<args>'])
 if result:
 LOGGER.info(result)
 else:
 LOGGER.error('Failed to log in.')
 exit_(1)
if __name__ == '__main__':
 main(docopt(__doc__))

/usr/bin/rconcmd

#! /usr/bin/env python3
"""An interactive RCON console."""
from sys import exit as exit_
from mcipc.rcon.console import rconcmd
if __name__ == '__main__':
 exit_(rconcmd())

I'd appreciate any critique.

asked Aug 10, 2018 at 10:24
\$\endgroup\$

1 Answer 1

1
\$\begingroup\$

I managed to get rid of the struct module used in proto.py, after I learned that int has a built-in from_bytes classmethod.

Furthermore I migrated from collections.namedtuple to typing.NamedTuple because I personally find this kind of subclassing more readable.

Finally, I outsourced the exceptions into an own module and added type hints:

"""Low-level protocol stuff."""
from enum import Enum
from logging import getLogger
from random import randint
from typing import NamedTuple
from mcipc.common import BaseClient
from mcipc.rcon.exceptions import InvalidPacketStructureError, \
 RequestIdMismatchError, InvalidCredentialsError
__all__ = ['Type', 'Packet', 'Client']
LOGGER = getLogger(__file__)
TAIL = b'0円0円'
def _rand_uint32() -> int:
 """Returns a random unsigned int32."""
 return randint(0, 4_294_967_295 + 1)
class Type(Enum):
 """Available packet types."""
 LOGIN = 3
 COMMAND = 2
 RESPONSE = 0
 def __bytes__(self):
 """Returns the integer value as little endian."""
 return self.value.to_bytes(4, 'little') # pylint: disable=E1101
class Packet(NamedTuple):
 """An RCON packet."""
 request_id: int
 type: Type
 payload: bytes
 def __bytes__(self):
 """Returns the packet as bytes."""
 payload = self.request_id.to_bytes(4, 'little')
 payload += bytes(self.type)
 payload += self.payload
 payload += TAIL
 size = len(payload).to_bytes(4, 'little')
 return size + payload
 @classmethod
 def from_bytes(cls, bytes_: bytes):
 """Creates a packet from the respective bytes."""
 request_id = int.from_bytes(bytes_[:4], 'little')
 type_ = int.from_bytes(bytes_[4:8], 'little')
 payload = bytes_[8:-2]
 tail = bytes_[-2:]
 if tail != TAIL:
 raise InvalidPacketStructureError('Invalid tail.', tail)
 return cls(request_id, Type(type_), payload)
 @classmethod
 def from_command(cls, command: str):
 """Creates a command packet."""
 return cls(_rand_uint32(), Type.COMMAND, command.encode())
 @classmethod
 def from_login(cls, passwd: str):
 """Creates a login packet."""
 return cls(_rand_uint32(), Type.LOGIN, passwd.encode())
 @property
 def text(self) -> str:
 """Returns the payload as text."""
 return self.payload.decode()
class Client(BaseClient):
 """An RCON client."""
 def communicate(self, packet: Packet) -> Packet:
 """Sends and receives a packet."""
 self._socket.send(bytes(packet))
 header = self._socket.recv(4)
 length = int.from_bytes(header, 'little')
 payload = self._socket.recv(length)
 response = Packet.from_bytes(payload)
 if response.request_id == packet.request_id:
 return response
 raise RequestIdMismatchError(packet.request_id, response.request_id)
 def login(self, passwd: str) -> bool:
 """Performs a login."""
 packet = Packet.from_login(passwd)
 try:
 self.communicate(packet)
 except RequestIdMismatchError:
 raise InvalidCredentialsError()
 return True
 def run(self, command: str, *arguments: str) -> str:
 """Runs a command."""
 command = ' '.join((command,) + arguments)
 packet = Packet.from_command(command)
 response = self.communicate(packet)
 return response.text
answered Sep 13, 2018 at 12:36
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.