I wrote a library implementing the RCON protocol to automate interaction with Minecraft servers.
I know that there are already a couple of implementations out there, but neither I found was convincing me, so I re-invented the wheel.
The library will also be featuring an implementation of the Query protocol, which is not implemented yet.
This is the reason why the RCON stuff is in a rcon
subpackage.
However, this review shall only focus on the fully implemented RCON part.
mcipc.config.py
"""Server configuration parser."""
from configparser import ConfigParser
from pathlib import Path
__all__ = ['SERVERS_INI', 'FORTUNE', 'servers']
SERVERS_INI = Path('/etc/mcipc.d/servers.conf')
_SERVERS = ConfigParser()
FORTUNE = Path('/usr/bin/fortune')
def servers():
"""Yields the respective servers."""
_SERVERS.read(str(SERVERS_INI))
return {
section: (_SERVERS[section]['host'], int(_SERVERS[section]['port']),
_SERVERS[section].get('passwd'))
for section in _SERVERS.sections()}
mcipc.rcon.__init__.py
"""RCON client library."""
from mcipc.rcon.client import Client
from mcipc.rcon.console import rconcmd
from mcipc.rcon.proto import RequestIdMismatch, PacketType, Packet, RawClient
__all__ = [
'RequestIdMismatch',
'rconcmd',
'PacketType',
'Packet',
'RawClient',
'Client']
mcipc.rcon.client.py
"""High level client API."""
from collections import namedtuple
from datetime import datetime
from locale import LC_TIME, getdefaultlocale, setlocale
from logging import getLogger
from subprocess import PIPE, CalledProcessError, check_output
from mcipc.config import FORTUNE
from mcipc.rcon.proto import RequestIdMismatch, RawClient
LOGGER = getLogger(__file__)
_PLAYER_OR_COORDS = TypeError('Must specify either dst_player or coords.')
def _fix_text(text):
"""Fixes text for ascii compliance."""
return text.replace('\t', ' ')
class OnlinePlayers(namedtuple('OnlinePlayers', ('online', 'max', 'players'))):
"""Online players information."""
@classmethod
def from_string(cls, string):
"""Creates the players information from the server response string."""
header, players = string.split(':', maxsplit=1)
players = [player for player in players.split(', ') if player]
_, _, amount, _, _ = header.split()
online, max_ = amount.split('/')
return cls(int(online), int(max_), players)
class Client(RawClient):
"""A high-level RCON client."""
@property
def players(self):
"""Returns the players."""
return OnlinePlayers.from_string(self.run('list'))
def login(self, passwd):
"""Performs a login, returning False on failure."""
try:
return super().login(passwd)
except RequestIdMismatch:
return False
def say(self, message):
"""Broadcast a message to all players."""
LOGGER.debug('Sending text: "%s".', message)
return self.run('say', _fix_text(message))
def tell(self, player, message):
"""Whispers a message to the respective player."""
return self.run('tell', player, _fix_text(message))
def mkop(self, player):
"""Makes the respective player an operator."""
return self.run('op', player)
def deop(self, player):
"""Revokes operator status from the respective player."""
return self.run('deop', player)
def kick(self, player, *reasons):
"""Kicks the respective player."""
return self.run('kick', player, *reasons)
def teleport(self, player, dst_player=None, coords=None, yaw_pitch=None):
"""Teleports players."""
args = [str(player)]
if dst_player is not None and coords is not None:
raise _PLAYER_OR_COORDS
elif dst_player is not None:
args.append(str(dst_player))
elif coords is not None:
coord_x, coord_y, coord_z = coords
args += [str(coord_x), str(coord_y), str(coord_z)]
else:
raise _PLAYER_OR_COORDS
if yaw_pitch is not None:
yaw, pitch = yaw_pitch
args += [str(yaw), str(pitch)]
return self.run('tp', *args)
def fortune(self, short=True, offensive=False):
"""Sends a fortune to all players."""
args = []
if short:
args.append('-s')
if offensive:
args.append('-o')
try:
text = check_output([FORTUNE] + args, stderr=PIPE)
except FileNotFoundError:
LOGGER.error('%s is not available.', FORTUNE)
except CalledProcessError as called_process_error:
LOGGER.error('Error running %s.', FORTUNE)
LOGGER.debug(called_process_error.stderr.decode())
else:
text = text.decode()
LOGGER.debug('Fortune text:\n%s', text)
return self.say(text)
return False
def datetime(self, frmt='%c'):
"""Tells all players the current datetime."""
setlocale(LC_TIME, getdefaultlocale()) # Fix loacale.
text = datetime.now().strftime(frmt)
return self.say(text)
mcipc.rcon.console.py
"""An interactive console."""
from getpass import getpass
from mcipc.rcon.proto import RequestIdMismatch
from mcipc.rcon.client import Client
__all__ = ['rconcmd']
PS1 = 'RCON> '
EXIT_COMMANDS = ('exit', 'quit')
def _read(prompt, type_=None):
"""Reads input and converts it to the respective type."""
while True:
try:
raw = input(prompt)
except EOFError:
continue
if type_ is not None:
try:
return type_(raw)
except (TypeError, ValueError):
print(f'Invalid {type_}: {raw}.')
continue
return raw
def _login(client, passwd):
"""Performs a login."""
if passwd is None:
passwd = getpass('Password: ')
while not client.login(passwd):
print('Invalid password.')
passwd = getpass('Password: ')
return passwd
def rconcmd(host=None, port=None, passwd=None, *, prompt=PS1):
"""Initializes the console."""
if host is None:
try:
host = _read('Host: ')
except KeyboardInterrupt:
print('\nAborted...')
return 1
if port is None:
try:
port = _read('Port: ', type_=int)
except KeyboardInterrupt:
print('\nAborted...')
return 1
with Client(host, port) as client:
try:
passwd = _login(client, passwd)
except (EOFError, KeyboardInterrupt):
print('\nAborted...')
return 1
while True:
try:
command = input(prompt)
except EOFError:
print('\nAborted.')
break
except KeyboardInterrupt:
print()
continue
command, *args = command.split()
if command in EXIT_COMMANDS:
break
try:
result = client.run(command, *args)
except RequestIdMismatch:
print('Session timed out. Please login again.')
try:
passwd = _login(client, passwd)
except (EOFError, KeyboardInterrupt):
print()
continue
print(result)
return 0
mcipc.rcon.credentials.py
"""RCON server credentials."""
from collections import namedtuple
from mcipc.config import servers
__all__ = ['InvalidCredentialsError', 'Credentials']
class InvalidCredentialsError(ValueError):
"""Indicates invalid credentials."""
pass
class Credentials(namedtuple('Credentials', ('host', 'port', 'passwd'))):
"""Represents server credentials."""
@classmethod
def from_string(cls, string):
"""Reads the credentials from the given string."""
try:
host, port = string.split(':')
except ValueError:
try:
return servers()[string]
except KeyError:
raise InvalidCredentialsError(f'No such server: {string}.')
try:
port = int(port)
except ValueError:
InvalidCredentialsError(f'Not an integer: {port}.')
try:
passwd, host = host.rsplit('@', maxsplit=1)
except ValueError:
passwd = None
return cls(host, port, passwd)
mcipc.rcon.proto.py
"""Low-level protocol stuff."""
from collections import namedtuple
from enum import Enum
from itertools import chain
from logging import getLogger
from random import randint
from socket import socket
from struct import pack, unpack
__all__ = [
'RequestIdMismatch',
'PacketType',
'Packet',
'RawClient']
LOGGER = getLogger(__file__)
TAIL = b'0円0円'
class InvalidPacketStructureError(Exception):
"""Indicates an invalid packet structure."""
pass
class RequestIdMismatch(Exception):
"""Indicates that the sent and received request IDs do not match."""
def __init__(self, sent_request_id, received_request_id):
"""Sets the sent and received request IDs."""
super().__init__(sent_request_id, received_request_id)
self.sent_request_id = sent_request_id
self.received_request_id = received_request_id
def _rand_int32():
"""Returns a random int32."""
return randint(0, 2_147_483_647 + 1)
class PacketType(Enum):
"""Available packet types."""
LOGIN = 3
COMMAND = 2
COMMAND_RESPONSE = 0
def __int__(self):
return self.value
class Packet(namedtuple('Packet', ('request_id', 'type', 'payload'))):
"""An RCON packet."""
def __bytes__(self):
"""Returns the packet as bytes."""
payload = pack('<i', self.request_id)
payload += pack('<i', int(self.type))
payload += self.payload.encode()
payload += TAIL
return pack('<i', len(payload)) + payload
@classmethod
def from_bytes(cls, bytes_):
"""Creates a packet from the respective bytes."""
request_id, type_ = unpack('<ii', bytes_[:8])
payload = bytes_[8:-2]
tail = bytes_[-2:]
if tail != TAIL:
raise InvalidPacketStructureError('Invalid tail.', tail)
return cls(request_id, type_, payload.decode())
@classmethod
def from_command(cls, command):
"""Creates a command packet."""
return cls(_rand_int32(), PacketType.COMMAND, command)
@classmethod
def from_login(cls, passwd):
"""Creates a login packet."""
return cls(_rand_int32(), PacketType.LOGIN, passwd)
class RawClient(socket):
"""An RCON client."""
def __init__(self, host, port):
"""Sets host an port."""
super().__init__()
self.host = host
self.port = port
def __enter__(self):
"""Sets up and conntects the socket."""
super().__enter__()
sock = self.socket
LOGGER.debug('Connecting to socket %s.', sock)
self.connect(sock)
return self
def __exit__(self, *args):
"""Disconnects the socket."""
LOGGER.debug('Disconnecting from socket %s.', self.getsockname())
return super().__exit__(*args)
@property
def socket(self):
"""Returns the respective socket."""
return (self.host, self.port)
def sendpacket(self, packet):
"""Sends an Packet."""
bytes_ = bytes(packet)
LOGGER.debug('Sending %i bytes.', len(bytes_))
return self.send(bytes_)
def recvpacket(self):
"""Receives a packet."""
length, = unpack('<i', self.recv(4))
payload = self.recv(length)
return Packet.from_bytes(payload)
def login(self, passwd):
"""Performs a login."""
login_packet = Packet.from_login(passwd)
self.sendpacket(login_packet)
response = self.recvpacket()
if response.request_id == login_packet.request_id:
return True
raise RequestIdMismatch(
login_packet.request_id, response.request_id)
def run(self, command, *arguments):
"""Runs a command."""
command = ' '.join(chain((command,), arguments))
command_packet = Packet.from_command(command)
self.sendpacket(command_packet)
response = self.recvpacket()
if response.request_id == command_packet.request_id:
return response.payload
raise RequestIdMismatch(
command_packet.request_id, response.request_id)
The library also features two scripts. One for an interactive console and one for a oneshot client:
/usr/bin/rconclt
#! /usr/bin/env python3
"""rconclt.
A Minecraft RCON client.
Usage:
rconclt <server> datetime [--format=<format>] [options]
rconclt <server> fortune [--long] [--offensive] [options]
rconclt <server> <command> [<args>...] [options]
Options:
--passwd=<passwd> Specifies the respective RCON password.
--format=<format> Specifies the datetime format [default: %c].
--long Also generate long fortunes.
--offensive Only genenrate offensive fortunes.
--debug Enters debug mode.
--help, -h Shows this page.
"""
from logging import DEBUG, INFO, basicConfig, getLogger
from sys import exit as exit_
from docopt import docopt
from mcipc.rcon.client import Client
from mcipc.rcon.credentials import InvalidCredentialsError, Credentials
LOGGER = getLogger(__file__)
_LOG_FORMAT = '[%(levelname)s] %(name)s: %(message)s'
def main(options):
"""Runs the RCON client."""
log_level = DEBUG if options['--debug'] else INFO
basicConfig(level=log_level, format=_LOG_FORMAT)
try:
host, port, passwd = Credentials.from_string(options['<server>'])
except InvalidCredentialsError as error:
LOGGER.error(error)
exit_(2)
if passwd is None:
passwd = options['--passwd'] or ''
with Client(host, port) as client:
if client.login(passwd):
if options['datetime']:
result = client.datetime(frmt=options['--format'])
elif options['fortune']:
result = client.fortune(
short=not options['--long'],
offensive=options['--offensive'])
else:
result = client.run(options['<command>'], *options['<args>'])
if result:
LOGGER.info(result)
else:
LOGGER.error('Failed to log in.')
exit_(1)
if __name__ == '__main__':
main(docopt(__doc__))
/usr/bin/rconcmd
#! /usr/bin/env python3
"""An interactive RCON console."""
from sys import exit as exit_
from mcipc.rcon.console import rconcmd
if __name__ == '__main__':
exit_(rconcmd())
I'd appreciate any critique.
1 Answer 1
I managed to get rid of the struct
module used in proto.py
, after I learned that int
has a built-in from_bytes
classmethod.
Furthermore I migrated from collections.namedtuple
to typing.NamedTuple
because I personally find this kind of subclassing more readable.
Finally, I outsourced the exceptions into an own module and added type hints:
"""Low-level protocol stuff."""
from enum import Enum
from logging import getLogger
from random import randint
from typing import NamedTuple
from mcipc.common import BaseClient
from mcipc.rcon.exceptions import InvalidPacketStructureError, \
RequestIdMismatchError, InvalidCredentialsError
__all__ = ['Type', 'Packet', 'Client']
LOGGER = getLogger(__file__)
TAIL = b'0円0円'
def _rand_uint32() -> int:
"""Returns a random unsigned int32."""
return randint(0, 4_294_967_295 + 1)
class Type(Enum):
"""Available packet types."""
LOGIN = 3
COMMAND = 2
RESPONSE = 0
def __bytes__(self):
"""Returns the integer value as little endian."""
return self.value.to_bytes(4, 'little') # pylint: disable=E1101
class Packet(NamedTuple):
"""An RCON packet."""
request_id: int
type: Type
payload: bytes
def __bytes__(self):
"""Returns the packet as bytes."""
payload = self.request_id.to_bytes(4, 'little')
payload += bytes(self.type)
payload += self.payload
payload += TAIL
size = len(payload).to_bytes(4, 'little')
return size + payload
@classmethod
def from_bytes(cls, bytes_: bytes):
"""Creates a packet from the respective bytes."""
request_id = int.from_bytes(bytes_[:4], 'little')
type_ = int.from_bytes(bytes_[4:8], 'little')
payload = bytes_[8:-2]
tail = bytes_[-2:]
if tail != TAIL:
raise InvalidPacketStructureError('Invalid tail.', tail)
return cls(request_id, Type(type_), payload)
@classmethod
def from_command(cls, command: str):
"""Creates a command packet."""
return cls(_rand_uint32(), Type.COMMAND, command.encode())
@classmethod
def from_login(cls, passwd: str):
"""Creates a login packet."""
return cls(_rand_uint32(), Type.LOGIN, passwd.encode())
@property
def text(self) -> str:
"""Returns the payload as text."""
return self.payload.decode()
class Client(BaseClient):
"""An RCON client."""
def communicate(self, packet: Packet) -> Packet:
"""Sends and receives a packet."""
self._socket.send(bytes(packet))
header = self._socket.recv(4)
length = int.from_bytes(header, 'little')
payload = self._socket.recv(length)
response = Packet.from_bytes(payload)
if response.request_id == packet.request_id:
return response
raise RequestIdMismatchError(packet.request_id, response.request_id)
def login(self, passwd: str) -> bool:
"""Performs a login."""
packet = Packet.from_login(passwd)
try:
self.communicate(packet)
except RequestIdMismatchError:
raise InvalidCredentialsError()
return True
def run(self, command: str, *arguments: str) -> str:
"""Runs a command."""
command = ' '.join((command,) + arguments)
packet = Packet.from_command(command)
response = self.communicate(packet)
return response.text
Explore related questions
See similar questions with these tags.