0

I am running a gunicorn server with 1 eventlet worker through which I communicate with the FE via websockets.

However, I also need to listen to an external pod running some code that will emit a redis pubsub event. Thus, my approach was to start a socketio background thread which inits the redis pubsub listener.

It apparently works fine until some point in which it just stops listening and I cannot determine how. This is how I start socketio in an extensions.py:

socketio = SocketIO(logger=False, engineio_logger=False, path='/socket.io', cors_allowed_origins='*', async_mode='eventlet', ping_timeout=10, ping_interval=60)

This is my listener code:

class RedisConnection:
 def __init__()
 .... 
 def subscription_listener(self):
 while True:
 try:
 pubsub = self.redis.pubsub()
 pubsub.subscribe(self.websocket_channel) 
 try:
 for message in pubsub.listen():
 try:
 m = TypeAdapter(RedisEnvelopeMessage).validate_python(message)
 self.logger.debug(f"Received from REDIS: {message}")
 if m.type == 'message':
 d = m.data
 self.logger.debug(f"Received message from REDIS: {d}")
 with self.app.app_context():
 # use d.module instead of RedisWS
 WSS = self.app.extensions.get(d.module, "RedisWS").ws_services.get(d.company_id)
 # TODO the reponse model should route to a WSService or to something different
 if d.message is not None:
 if isinstance(d.message, list):
 getattr(WSS, d.method)(*d.message)
 elif isinstance(d.message, dict):
 getattr(WSS, d.method)(d.message)
 elif isinstance(d.message, str):
 getattr(WSS, d.method)(d.message)
 else:
 getattr(WSS, d.method)()
 elif m.type == 'subscribe':
 self.logger.info(f"Subscribed to REDIS channel: {m.channel}")
 else:
 self.logger.info(f"Received message from REDIS but NOT PROCESSED: {message}")
 except Exception as e:
 self.logger.catch_exception(e)
 self.logger.error(f"Pubsub parsing error: {e}").save("pubsub_listener")
 except Exception as e:
 self.logger.catch_exception(e, level="critical")
 self.logger.error(f"Pubsub listener error: {e}").save("pubsub_listener")
 #socketio.sleep(2)
 except Exception as e:
 self.logger.catch_exception(e, level="critical")
 self.logger.error(f"Pubsub loop error: {e}").save("pubsub_listener")

In my flask application init:

def create_app(config_class=Config):
....
 socketio.init_app(app)
 with app.app_context():
 app.extensions["pubsub"] = socketio.start_background_task(redis_db_static.subscription_listener)

I don't know if someone has struggled with something similar. Many thanks!

It actually works, but after some time it just stops and I dont see any exception nor nothing. Restarting the server sets it back online. I was expecting the thread to run continously until the main gunicorn gets killed and restarted, which would actually restart it.

3
  • Not sure if the solution would be this one: stackoverflow.com/questions/49113148/… Commented Jun 27, 2025 at 10:15
  • Or maybe this one? I'm just trying it, will report in a couple days. github.com/miguelgrinberg/Flask-SocketIO/discussions/2023 Commented Jun 27, 2025 at 11:00
  • First of all, eventlet is not a great package to use in 2025 because it hasn't been properly maintained in the last few years. I would suggest you retest your application using either regular threads (async_mode='threading') or gevent (async_mode='gevent') and then update the question if there are any issues with those. Commented Jun 28, 2025 at 11:10

1 Answer 1

0

now it is working and solved thanks to @miguel-grinberg!

First I switched to gevent instead of using eventlet and made some changes to my code, so instead of running the pubsub in the background as a thread I am running it with socketio default's.

# extensions.py
socketio = SocketIO(path='/socket.io', async_mode='gevent', cors_allowed_origins='*', ping_timeout=15, ping_interval=60)
# __init__.py
def create_app(config_class=Config):
....
 socketio.init_app(app, message_queue=redis_db_static.redis_url, channel=app.config.get("WEBSOCKET_CHANNEL"))

Then within my redis publish method I did so that it could work both with websockets or with other channels/services and keep my websocket dispatcher services class (think that this code is running in a celery worker):

 def publish(self, pubsub_message: RedisPubSubMessage):
 try:
 if pubsub_message.module == "RedisWS":
 WSS = self.app.extensions.get("RedisWS").ws_services.get(pubsub_message.company_id)
 # TODO the reponse model should route to a WSService or to something different
 if pubsub_message.message is not None:
 if isinstance(pubsub_message.message, list):
 getattr(WSS, pubsub_message.method)(*pubsub_message.message)
 elif isinstance(pubsub_message.message, dict):
 getattr(WSS, pubsub_message.method)(pubsub_message.message)
 elif isinstance(pubsub_message.message, str):
 getattr(WSS, pubsub_message.method)(pubsub_message.message)
 else:
 getattr(WSS, pubsub_message.method)()
 self.logger.debug(f"Event emitted in socketio {self.socketio}: {pubsub_message.model_dump()}")
 return "emitted to sockets"
 else:
 # GENERIC PUBLISH
 return self.redis.publish(self.channel, pubsub_message.model_dump_json())
 except Exception as e:
 self.logger.error(f"Pubsub publish error: {e}").save("pubsub_published")
class WSService:
 def __init__(self, company, socketio):
 self._version = '2.2'
 self.socket = socketio
 self.logger = logger
...
 def new_message(self, message):
 if message.tracking_status != "hidden":
 message_payload = message.to_dict()
 self.socket.emit('new_message', message_payload, room=message.user.id)
 
answered Jun 30, 2025 at 13:13
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.