Requirement: python:3.7
yapf -dr . | (! grep '.')
pylava .
isort -rc . --check-only --diff
1. cd examples
2. setup docker container by `sudo docker-compose up -d`
3. enter container terminal_1 by `sudo docker exec -it your_container_id bash` and run main.py
4. enter a new container terminal_2 and run send_message.py
# event_manager.py import os from event import Manager from event.server.kafka import Server as KafkaServer from event.server.rabbitmq import Server as RabbitmqServer from event.server.redis import Server as RedisServer BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) manager = Manager(base_dir=BASE_DIR) manager.register_server('redis', RedisServer(url='redis://redis/3')) manager.register_server('kafka', KafkaServer(url='kafka:9092')) # wsgi.py import os from django.core.wsgi import get_wsgi_application from .event_manager import manager os.environ.setdefault("DJANGO_SETTINGS_MODULE", "example.settings") application = get_wsgi_application() manager.run_in_django() # handlers.py from example.event_manager import manager redis_server = manager.get_server('redis') kafka_server = manager.get_server('kafka') @redis_server.handler(channels=['example:test:django']) def handle_example_django_redis(message): print('Received redis message', message) @kafka_server.handler(topics=['example-test-django']) def handle_example_django_kafka(message): print('Received kafka message: ', message, message.value) # views.py from django.shortcuts import HttpResponse from django.views import View from example.event_manager import manager redis_server = manager.get_server('redis') kafka_server = manager.get_server('kafka') class TestView(View): def get(self, request, *args, **kwargs): redis_server.publish_wait(channel='example:test:django', message={ 'test_id': 'redis_id', 'message': 'redis good' }) kafka_server.publish_wait(topic='example-test-django', message={ 'test_id': 'kafka_id', 'message': 'kafka good' }) return HttpResponse('good')
# Run server import os from event import Manager from event.server.redis import Server BASE_DIR = os.path.dirname(os.path.abspath(__file__)) manager = Manager(base_dir=BASE_DIR) config = { 'url': 'redis://redis/3', } server = Server(**config) manager.register_server('default', server) @server.handler(channels=['example:test:redis']) def handle_example_message(message): print('Received message: ', message) manager.run_forever() # send message await server.publish(channel='example-test-redis', message={ 'test_id': 'redis', 'message': 'good test' }) # OR server.publish_soon(channel='example-test-redis', message={ 'test_id': 'redis', 'message': 'good test' }) # OR server.publish_wait(channel='example-test-redis', message={ 'test_id': 'redis', 'message': 'good test' })
# Run server import os from event import Manager from event.server.kafka import Server BASE_DIR = os.path.dirname(os.path.abspath(__file__)) manager = Manager(base_dir=BASE_DIR) config = { 'url': 'kafka:9092', } server = Server(**config) manager.register_server('default', server) @server.handler(topics=['example-test-kafka']) def handle_example_message(message): print('Received message: ', message, message.value) manager.run_forever() # send message await server.publish(topic='example-test-kafka', message={ 'test_id': 'kafka', 'message': 'good test' }) # OR server.publish_soon(topic='example-test-kafka', message={ 'test_id': 'kafka', 'message': 'good test' })
# Run server import os from event import Manager from event.server.rabbitmq import Server BASE_DIR = os.path.dirname(os.path.abspath(__file__)) manager = Manager(base_dir=BASE_DIR) config = { 'url': 'amqp://rabbitmq:5672', 'exchange': 'test', 'exchange_type': 'topic', } server = Server(**config) manager.register_server('default', server) @server.handler(routing_key='example-test-rabbitmq', queue='example-test-rabbitmq-queue') def handle_example_message(message): print('Received message: ', message) manager.run_forever() # send message await server.publish(routing_key='example-test-rabbitmq', message={ 'test_id': 'rabbitmq', 'message': 'good test' }) # OR server.publish_soon(routing_key='example-test-rabbitmq', message={ 'test_id': 'rabbitmq', 'message': 'good test' })
- 消息异常处理
- 消息处理确认
- 消息处理重试