4
\$\begingroup\$

I created some classes for easier management of RabbitMQ and to practice OOP in Python.

I would care about a few issues and any code advice would be appreciated :)

  1. is it a good practice to use the isinstance function in the constructor?
  2. does using the context manager in these cases close the channel? It seems to me that any use of, for example, the "send()" function opens the channel and closes it.
  3. not sure if I shouldn't close the connection after the use of each instance is complete.
import pika
import colorlog
import logging
from dataclasses import dataclass
from enum import Enum
handler = logging.StreamHandler()
handler.setFormatter(
 colorlog.ColoredFormatter(
 "033円[35m| 033円[0m033円[32mRabbitMQ-033円[0m%(log_color)s%(levelname)-8s%(reset)s %(blue)s%(message)s",
 log_colors={
 "DEBUG": "cyan",
 "INFO": "green",
 "WARNING": "yellow",
 "ERROR": "red",
 "CRITICAL": "red,bg_white",
 },
 )
)
logger = colorlog.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
class Exchange(Enum):
 DEFAULT = ""
 DIRECT = "direct"
 TOPIC = "topic"
 FANOUT = "fanout"
@dataclass
class RabbitDataConnection:
 username: str
 password: str
 host: str = "localhost"
@dataclass
class RabbitDataProducer:
 exchange: Exchange
 routing_key: str
 body: str
@dataclass
class RabbitDataExchange:
 exchange: str
 exchange_type: Exchange
 queue: str
 routing_key: str
 durable: bool = True
@dataclass
class RabbitDataQueue:
 queue: str
 durable: bool = True
@dataclass
class RabbitDataConsumer:
 queue: str
 auto_ack: bool
class RabbitConnection:
 def __init__(
 self,
 creds: RabbitDataConnection,
 ):
 assert isinstance(creds, RabbitDataConnection)
 self.creds = creds
 def _get_connection(self):
 credentials = pika.PlainCredentials(self.creds.username, self.creds.password)
 parameters = pika.ConnectionParameters(
 host=self.creds.host, credentials=credentials
 )
 connection = pika.BlockingConnection(parameters)
 logger.info(f"Connecting to {self.creds.host} as {self.creds.username}")
 return connection.channel()
class RabbitProducer(RabbitConnection):
 def __init__(
 self,
 creds: RabbitDataConnection,
 producer: RabbitDataProducer,
 ):
 assert isinstance(producer, RabbitDataProducer)
 super().__init__(creds)
 self.producer = producer
 def send(self):
 with self._get_connection() as channel:
 channel.basic_publish(
 exchange=self.producer.exchange,
 routing_key=self.producer.routing_key,
 body=self.producer.body,
 properties=pika.BasicProperties(
 delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
 ),
 )
 logger.info(f"Sent {self.producer.body} as {self.producer.exchange}")
class RabbitExchange(RabbitConnection):
 def __init__(
 self,
 creds: RabbitDataConnection,
 exchange: RabbitDataExchange,
 ):
 assert isinstance(exchange, RabbitDataExchange)
 super().__init__(creds)
 self.exchange = exchange
 def create_binding_key(self):
 with self._get_connection() as channel:
 channel.queue_bind(
 exchange=self.exchange.exchange,
 queue=self.exchange.queue,
 routing_key=self.exchange.routing_key,
 )
 logger.info(
 f"Exchange {self.exchange.exchange} has been connected with {self.exchange.queue} by routing key {self.exchange.routing_key}"
 )
 def create_exchange(self):
 with self._get_connection() as channel:
 channel.exchange_declare(
 exchange=self.exchange.exchange,
 exchange_type=self.exchange.exchange_type,
 durable=self.exchange.durable,
 )
 logger.info(
 f"Exchange {self.exchange.exchange} - {self.exchange.exchange_type} has been created"
 )
class RabbitQueue(RabbitConnection):
 def __init__(self, creds: RabbitDataConnection, queue: RabbitDataQueue):
 assert isinstance(queue, RabbitDataQueue)
 super().__init__(creds)
 self.queue = queue
 def create_queue(self):
 with self._get_connection() as channel:
 channel.queue_declare(
 queue=self.queue.queue,
 durable=self.queue.durable,
 )
 logger.info(f"Queue {self.queue.queue} has been created")
class RabbitConsumer(RabbitConnection):
 def __init__(self, creds: RabbitDataConnection, consumer: RabbitDataConsumer):
 assert isinstance(consumer, RabbitDataConsumer)
 super().__init__(creds)
 self.consumer = consumer
 def consume_messages(self):
 with self._get_connection() as channel:
 channel.basic_qos(prefetch_count=1)
 method, properties, body = channel.basic_get(
 queue=self.consumer.queue,
 auto_ack=self.consumer.auto_ack,
 )
 logger.info("Messages was consumed")
 return body
 def stop_consuming(self):
 with self._get_connection() as channel:
 channel.stop_consuming()
 logger.info("Consuming messages has been stopped")
```
asked Apr 20, 2023 at 16:28
\$\endgroup\$

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.