Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit c66e2c2

Browse files
authored
Merge pull request #86 from php-enqueue/async-events
Symfony. Async event dispatching
2 parents ee23c9a + e8ff445 commit c66e2c2

27 files changed

+1529
-1
lines changed

‎docs/bundle/async_events.md‎

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
# Async events
2+
3+
The EnqueueBundle allows you to dispatch events asynchronously.
4+
Behind the scene it replaces your listener with one that sends a message to MQ.
5+
The message contains the event object.
6+
The consumer, once it receives the message, restores the event and dispatches it to only async listeners.
7+
8+
Async listeners benefits:
9+
10+
* The response time lesser. It has to do less work.
11+
* Better fault tolerance. Bugs in async listener does not affect user. Messages will wait till you fix bugs.
12+
* Better scaling. Add more consumers to meet the load.
13+
14+
_**Note**: The php serializer transformer (the default one) does not work on Symfony prior 3.0. The event contains eventDispatcher and therefor could not be serialized. You have to register a transformer for every async event. Read the [event transformer](#event-transformer)._
15+
16+
## Configuration
17+
18+
I suppose you already [installed the bundle](quick_tour.md#install).
19+
Now, you have to enable `async_events`.
20+
If you do not enable it, events will be processed as before: synchronously.
21+
22+
```yaml
23+
# app/config/config.yml
24+
25+
enqueue:
26+
async_events:
27+
enabled: true
28+
# if you'd like to send send messages onTerminate use spool_producer (it makes response time even lesser):
29+
# spool_producer: true
30+
```
31+
32+
## Usage
33+
34+
To make your listener async you have add `async: true` attribute to the tag `kernel.event_listener`, like this:
35+
36+
```yaml
37+
# app/config/config.yml
38+
39+
services:
40+
acme.foo_listener:
41+
class: 'AcmeBundle\Listener\FooListener'
42+
tags:
43+
- { name: 'kernel.event_listener', async: true, event: 'foo', method: 'onEvent' }
44+
```
45+
46+
That's basically it. The rest of the doc describes advanced features.
47+
48+
## Advanced Usage.
49+
50+
You can also add an async listener directly and register a custom message processor for it:
51+
52+
```yaml
53+
# app/config/config.yml
54+
55+
services:
56+
acme.async_foo_listener:
57+
class: 'Enqueue\Bundle\Events\AsyncListener'
58+
public: false
59+
arguments: ['@enqueue.client.producer', '@enqueue.events.registry']
60+
tags:
61+
- { name: 'kernel.event_listener', event: 'foo', method: 'onEvent' }
62+
```
63+
64+
The message processor must subscribe to `event.foo` topic. The message queue topics names for event follow this patter `event.{eventName}`.
65+
66+
```php
67+
<?php
68+
69+
use Enqueue\Bundle\Events\Registry;
70+
use Enqueue\Client\TopicSubscriberInterface;
71+
use Enqueue\Psr\PsrContext;
72+
use Enqueue\Psr\PsrMessage;
73+
use Enqueue\Psr\PsrProcessor;
74+
75+
class FooEventProcessor implements PsrProcessor, TopicSubscriberInterface
76+
{
77+
/**
78+
* @var Registry
79+
*/
80+
private $registry;
81+
82+
/**
83+
* @param Registry $registry
84+
*/
85+
public function __construct(Registry $registry)
86+
{
87+
$this->registry = $registry;
88+
}
89+
90+
public function process(PsrMessage $message, PsrContext $context)
91+
{
92+
if (false == $eventName = $message->getProperty('event_name')) {
93+
return self::REJECT;
94+
}
95+
if (false == $transformerName = $message->getProperty('transformer_name')) {
96+
return self::REJECT;
97+
}
98+
99+
// do what you want with the event.
100+
$event = $this->registry->getTransformer($transformerName)->toEvent($eventName, $message);
101+
102+
103+
return self::ACK;
104+
}
105+
106+
public static function getSubscribedTopics()
107+
{
108+
return ['event.foo'];
109+
}
110+
}
111+
```
112+
113+
114+
## Event transformer
115+
116+
The bundle uses [php serializer](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue-bundle/Events/PhpSerializerEventTransformer.php) transformer by default to pass events through MQ.
117+
You could create a transformer for the given event type. The transformer must implement `Enqueue\Bundle\Events\EventTransformer` interface.
118+
Consider the next example. It shows how to send an event that contains Doctrine entity as a subject
119+
120+
```php
121+
<?php
122+
namespace AcmeBundle\Listener;
123+
124+
// src/AcmeBundle/Listener/FooEventTransformer.php
125+
126+
use Enqueue\Client\Message;
127+
use Enqueue\Consumption\Result;
128+
use Enqueue\Psr\PsrMessage;
129+
use Enqueue\Util\JSON;
130+
use Symfony\Component\EventDispatcher\Event;
131+
use Enqueue\Bundle\Events\EventTransformer;
132+
use Doctrine\Bundle\DoctrineBundle\Registry;
133+
use Symfony\Component\EventDispatcher\GenericEvent;
134+
135+
class FooEventTransformer implements EventTransformer
136+
{
137+
/** @var Registry @doctrine */
138+
private $doctrine;
139+
140+
public function __construct(Registry $doctrine)
141+
{
142+
$this->doctrine = $doctrine;
143+
}
144+
145+
/**
146+
* {@inheritdoc}
147+
*
148+
* @param GenericEvent $event
149+
*/
150+
public function toMessage($eventName, Event $event = null)
151+
{
152+
$entity = $event->getSubject();
153+
$entityClass = get_class($event);
154+
155+
$manager = $this->doctrine->getManagerForClass($entityClass);
156+
$meta = $manager->getClassMetadata($entityClass);
157+
158+
$id = $meta->getIdentifierValues($entity);
159+
160+
$message = new Message();
161+
$message->setBody([
162+
'entityClass' => $entityClass,
163+
'entityId' => $id,
164+
'arguments' => $event->getArguments()
165+
]);
166+
167+
return $message;
168+
}
169+
170+
/**
171+
* {@inheritdoc}
172+
*/
173+
public function toEvent($eventName, PsrMessage $message)
174+
{
175+
$data = JSON::decode($message->getBody());
176+
177+
$entityClass = $data['entityClass'];
178+
179+
$manager = $this->doctrine->getManagerForClass($entityClass);
180+
if (false == $entity = $manager->find($entityClass, $data['entityId'])) {
181+
return Result::reject('The entity could not be found.');
182+
}
183+
184+
return new GenericEvent($entity, $data['arguments']);
185+
}
186+
}
187+
```
188+
189+
and register it:
190+
191+
```yaml
192+
# app/config/config.yml
193+
194+
services:
195+
acme.foo_event_transofrmer:
196+
class: 'AcmeBundle\Listener\FooEventTransformer'
197+
arguments: ['@doctrine']
198+
tags:
199+
- {name: 'enqueue.event_transformer', eventName: 'foo' }
200+
```
201+
202+
The `eventName` attribute accepts a regexp. You can do next `eventName: '/foo\..*?/'`.
203+
It uses this transformer for all event with the name beginning with `foo.`
204+
205+
[back to index](../index.md)

‎docs/bundle/quick_tour.md‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ It adds easy to use [configuration layer](config_reference.md), register service
99
$ composer require enqueue/enqueue-bundle enqueue/amqp-ext
1010
```
1111

12+
_**Note**: You could use not only AMQP transport but other available: STOMP, Amazon SQS, Redis, Filesystem, Doctrine DBAL and others._
13+
1214
## Enable the Bundle
1315

1416
Then, enable the bundle by adding `new Enqueue\Bundle\EnqueueBundle()` to the bundles array of the registerBundles method in your project's `app/AppKernel.php` file:

‎docs/index.md‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
- [Cli commands](bundle/cli_commands.md)
2828
- [Message producer](bundle/message_producer.md)
2929
- [Message processor](bundle/message_processor.md)
30+
- [Async events](bundle/async_events.md)
3031
- [Job queue](bundle/job_queue.md)
3132
- [Consumption extension](bundle/consumption_extension.md)
3233
- [Production settings](bundle/production_settings.md)

‎pkg/enqueue-bundle/DependencyInjection/Configuration.php‎

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ public function getConfigTreeBuilder()
5151
->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end()
5252
->end()->end()
5353
->booleanNode('job')->defaultFalse()->end()
54+
->arrayNode('async_events')
55+
->canBeEnabled()
56+
->children()
57+
->booleanNode('spool_producer')->defaultFalse()->end()
58+
->end()
59+
->end()
5460
->arrayNode('extensions')->addDefaultsIfNotSet()->children()
5561
->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end()
5662
->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end()

‎pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php‎

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,16 @@ public function load(array $configs, ContainerBuilder $container)
113113
$loader->load('job.yml');
114114
}
115115

116+
if (isset($config['async_events']['enabled'])) {
117+
$loader->load('events.yml');
118+
119+
if (false == empty($config['async_events']['spool_producer'])) {
120+
$container->getDefinition('enqueue.events.async_listener')
121+
->replaceArgument(0, new Reference('enqueue.client.spool_producer'))
122+
;
123+
}
124+
}
125+
116126
if ($config['extensions']['doctrine_ping_connection_extension']) {
117127
$loader->load('extensions/doctrine_ping_connection_extension.yml');
118128
}

‎pkg/enqueue-bundle/EnqueueBundle.php‎

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass;
1313
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
1414
use Enqueue\Bundle\DependencyInjection\EnqueueExtension;
15+
use Enqueue\Bundle\Events\DependencyInjection\AsyncEventsPass;
16+
use Enqueue\Bundle\Events\DependencyInjection\AsyncTransformersPass;
1517
use Enqueue\Dbal\DbalContext;
1618
use Enqueue\Dbal\Symfony\DbalTransportFactory;
1719
use Enqueue\Fs\FsContext;
@@ -23,6 +25,7 @@
2325
use Enqueue\Stomp\StompContext;
2426
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
2527
use Enqueue\Stomp\Symfony\StompTransportFactory;
28+
use Symfony\Component\DependencyInjection\Compiler\PassConfig;
2629
use Symfony\Component\DependencyInjection\ContainerBuilder;
2730
use Symfony\Component\HttpKernel\Bundle\Bundle;
2831

@@ -68,5 +71,8 @@ public function build(ContainerBuilder $container)
6871
if (class_exists(SqsContext::class)) {
6972
$extension->addTransportFactory(new SqsTransportFactory());
7073
}
74+
75+
$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
76+
$container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
7177
}
7278
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Events;
4+
5+
use Enqueue\Client\Message;
6+
use Enqueue\Client\ProducerInterface;
7+
use Symfony\Component\EventDispatcher\Event;
8+
9+
class AsyncListener
10+
{
11+
/**
12+
* @var ProducerInterface
13+
*/
14+
private $producer;
15+
16+
/**
17+
* @var Registry
18+
*/
19+
private $registry;
20+
21+
/**
22+
* @var bool
23+
*/
24+
private $syncMode;
25+
26+
/**
27+
* @param ProducerInterface $producer
28+
* @param Registry $registry
29+
*/
30+
public function __construct(ProducerInterface $producer, Registry $registry)
31+
{
32+
$this->producer = $producer;
33+
$this->registry = $registry;
34+
}
35+
36+
public function resetSyncMode()
37+
{
38+
$this->syncMode = [];
39+
}
40+
41+
/**
42+
* @param string $eventName
43+
*/
44+
public function syncMode($eventName)
45+
{
46+
$this->syncMode[$eventName] = true;
47+
}
48+
49+
/**
50+
* @param Event $event
51+
* @param string $eventName
52+
*/
53+
public function onEvent(Event $event = null, $eventName)
54+
{
55+
if (false == isset($this->syncMode[$eventName])) {
56+
$transformerName = $this->registry->getTransformerNameForEvent($eventName);
57+
58+
$message = $this->registry->getTransformer($transformerName)->toMessage($eventName, $event);
59+
$message->setScope(Message::SCOPE_APP);
60+
$message->setProperty('event_name', $eventName);
61+
$message->setProperty('transformer_name', $transformerName);
62+
63+
$this->producer->send('event.'.$eventName, $message);
64+
}
65+
}
66+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Events;
4+
5+
use Enqueue\Consumption\Result;
6+
use Enqueue\Psr\PsrContext;
7+
use Enqueue\Psr\PsrMessage;
8+
use Enqueue\Psr\PsrProcessor;
9+
10+
class AsyncProcessor implements PsrProcessor
11+
{
12+
/**
13+
* @var Registry
14+
*/
15+
private $registry;
16+
17+
/**
18+
* @var ProxyEventDispatcher
19+
*/
20+
private $eventDispatcher;
21+
22+
/**
23+
* @param Registry $registry
24+
* @param ProxyEventDispatcher $eventDispatcher
25+
*/
26+
public function __construct(Registry $registry, ProxyEventDispatcher $eventDispatcher)
27+
{
28+
$this->registry = $registry;
29+
$this->eventDispatcher = $eventDispatcher;
30+
}
31+
32+
/**
33+
* {@inheritdoc}
34+
*/
35+
public function process(PsrMessage $message, PsrContext $context)
36+
{
37+
if (false == $eventName = $message->getProperty('event_name')) {
38+
return Result::reject('The message is missing "event_name" property');
39+
}
40+
if (false == $transformerName = $message->getProperty('transformer_name')) {
41+
return Result::reject('The message is missing "transformer_name" property');
42+
}
43+
44+
$event = $this->registry->getTransformer($transformerName)->toEvent($eventName, $message);
45+
46+
$this->eventDispatcher->dispatchAsyncListenersOnly($eventName, $event);
47+
48+
return self::ACK;
49+
}
50+
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /