Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 49e3936

Browse files
authored
Merge pull request #1091 from atrauzzi/feature/stomp-artemis-support
Add first pass for Apache ActiveMQ Artemis support
2 parents caf0afa + e3d07c9 commit 49e3936

11 files changed

+139
-79
lines changed

‎pkg/enqueue/Tests/Client/Driver/RabbitMqStompDriverTest.php‎

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use Enqueue\Client\MessagePriority;
1313
use Enqueue\Client\Route;
1414
use Enqueue\Client\RouteCollection;
15+
use Enqueue\Stomp\ExtensionType;
1516
use Enqueue\Stomp\StompContext;
1617
use Enqueue\Stomp\StompDestination;
1718
use Enqueue\Stomp\StompMessage;
@@ -47,7 +48,7 @@ public function testShouldBeSubClassOfStompDriver()
4748

4849
public function testShouldCreateAndReturnStompQueueInstance()
4950
{
50-
$expectedQueue = new StompDestination();
51+
$expectedQueue = new StompDestination(ExtensionType::RABBITMQ);
5152

5253
$context = $this->createContextMock();
5354
$context
@@ -185,10 +186,10 @@ public function testShouldInitDeliveryDelayIfDelayPropertyOnSendToProcessor()
185186

186187
public function shouldSendMessageToDelayExchangeIfDelaySet()
187188
{
188-
$queue = new StompDestination();
189+
$queue = new StompDestination(ExtensionType::RABBITMQ);
189190
$queue->setStompName('queueName');
190191

191-
$delayTopic = new StompDestination();
192+
$delayTopic = new StompDestination(ExtensionType::RABBITMQ);
192193
$delayTopic->setStompName('delayTopic');
193194

194195
$transportMessage = new StompMessage();
@@ -339,7 +340,7 @@ public function testShouldSetupBroker()
339340
->expects($this->any())
340341
->method('createQueue')
341342
->willReturnCallback(function (string $name) {
342-
$destination = new StompDestination();
343+
$destination = new StompDestination(ExtensionType::RABBITMQ);
343344
$destination->setType(StompDestination::TYPE_QUEUE);
344345
$destination->setStompName($name);
345346

@@ -431,7 +432,7 @@ public function testSetupBrokerShouldCreateDelayExchangeIfEnabled()
431432
->expects($this->any())
432433
->method('createQueue')
433434
->willReturnCallback(function (string $name) {
434-
$destination = new StompDestination();
435+
$destination = new StompDestination(ExtensionType::RABBITMQ);
435436
$destination->setType(StompDestination::TYPE_QUEUE);
436437
$destination->setStompName($name);
437438

@@ -442,7 +443,7 @@ public function testSetupBrokerShouldCreateDelayExchangeIfEnabled()
442443
->expects($this->any())
443444
->method('createTopic')
444445
->willReturnCallback(function (string $name) {
445-
$destination = new StompDestination();
446+
$destination = new StompDestination(ExtensionType::RABBITMQ);
446447
$destination->setType(StompDestination::TYPE_TOPIC);
447448
$destination->setStompName($name);
448449

@@ -503,7 +504,7 @@ protected function createProducerMock(): InteropProducer
503504
*/
504505
protected function createQueue(string $name): InteropQueue
505506
{
506-
$destination = new StompDestination();
507+
$destination = new StompDestination(ExtensionType::RABBITMQ);
507508
$destination->setType(StompDestination::TYPE_QUEUE);
508509
$destination->setStompName($name);
509510

@@ -515,7 +516,7 @@ protected function createQueue(string $name): InteropQueue
515516
*/
516517
protected function createTopic(string $name): InteropTopic
517518
{
518-
$destination = new StompDestination();
519+
$destination = new StompDestination(ExtensionType::RABBITMQ);
519520
$destination->setType(StompDestination::TYPE_TOPIC);
520521
$destination->setStompName($name);
521522

‎pkg/enqueue/Tests/Client/Driver/StompDriverTest.php‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Enqueue\Client\Message;
1010
use Enqueue\Client\MessagePriority;
1111
use Enqueue\Client\RouteCollection;
12+
use Enqueue\Stomp\ExtensionType;
1213
use Enqueue\Stomp\StompContext;
1314
use Enqueue\Stomp\StompDestination;
1415
use Enqueue\Stomp\StompMessage;
@@ -127,7 +128,7 @@ protected function createProducerMock(): InteropProducer
127128
*/
128129
protected function createQueue(string $name): InteropQueue
129130
{
130-
$destination = new StompDestination();
131+
$destination = new StompDestination(ExtensionType::RABBITMQ);
131132
$destination->setType(StompDestination::TYPE_QUEUE);
132133
$destination->setStompName($name);
133134

@@ -139,7 +140,7 @@ protected function createQueue(string $name): InteropQueue
139140
*/
140141
protected function createTopic(string $name): InteropTopic
141142
{
142-
$destination = new StompDestination();
143+
$destination = new StompDestination(ExtensionType::RABBITMQ);
143144
$destination->setType(StompDestination::TYPE_TOPIC);
144145
$destination->setStompName($name);
145146

‎pkg/stomp/ExtensionType.php‎

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Stomp;
6+
7+
class ExtensionType
8+
{
9+
const ACTIVEMQ = 'activemq';
10+
const RABBITMQ = 'rabbitmq';
11+
const ARTEMIS = 'artemis';
12+
}

‎pkg/stomp/StompConnectionFactory.php‎

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@
1313

1414
class StompConnectionFactory implements ConnectionFactory
1515
{
16-
const SCHEME_EXT_ACTIVEMQ = 'activemq';
17-
const SCHEME_EXT_RABBITMQ = 'rabbitmq';
16+
const SUPPORTED_SCHEMES = [
17+
ExtensionType::ACTIVEMQ,
18+
ExtensionType::RABBITMQ,
19+
ExtensionType::ARTEMIS,
20+
];
1821

1922
/**
2023
* @var array
@@ -71,15 +74,14 @@ public function __construct($config = 'stomp:')
7174
*/
7275
public function createContext(): Context
7376
{
74-
$useExchangePrefix = self::SCHEME_EXT_RABBITMQ === $this->config['target'] ? true : false;
75-
7677
if ($this->config['lazy']) {
77-
return new StompContext(function () {
78-
return $this->establishConnection();
79-
}, $useExchangePrefix);
78+
return new StompContext(
79+
function () { return $this->establishConnection(); },
80+
$this->config['target']
81+
);
8082
}
8183

82-
return new StompContext($this->establishConnection(), $useExchangePrefix);
84+
return new StompContext($this->establishConnection(), $this->config['target']);
8385
}
8486

8587
private function establishConnection(): BufferedStompClient
@@ -123,10 +125,11 @@ private function parseDsn(string $dsn): array
123125

124126
$schemeExtension = current($dsn->getSchemeExtensions());
125127
if (false === $schemeExtension) {
126-
$schemeExtension = self::SCHEME_EXT_RABBITMQ;
128+
$schemeExtension = ExtensionType::RABBITMQ;
127129
}
128-
if (self::SCHEME_EXT_ACTIVEMQ !== $schemeExtension && self::SCHEME_EXT_RABBITMQ !== $schemeExtension) {
129-
throw new \LogicException(sprintf('The given DSN is not supported. The scheme extension "%s" provided is invalid. It must be one of "%s" or "%s".', $schemeExtension, self::SCHEME_EXT_ACTIVEMQ, self::SCHEME_EXT_RABBITMQ));
130+
131+
if (false === in_array($schemeExtension, self::SUPPORTED_SCHEMES, true)) {
132+
throw new \LogicException(sprintf('The given DSN is not supported. The scheme extension "%s" provided is not supported. It must be one of %s.', $schemeExtension, implode(', ', self::SUPPORTED_SCHEMES)));
130133
}
131134

132135
return array_filter(array_replace($dsn->getQuery(), [
@@ -151,7 +154,7 @@ private function parseDsn(string $dsn): array
151154
private function defaultConfig(): array
152155
{
153156
return [
154-
'target' => self::SCHEME_EXT_RABBITMQ,
157+
'target' => ExtensionType::RABBITMQ,
155158
'host' => 'localhost',
156159
'port' => 61613,
157160
'login' => 'guest',

‎pkg/stomp/StompConsumer.php‎

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
namespace Enqueue\Stomp;
66

77
use Interop\Queue\Consumer;
8+
use Interop\Queue\Exception\Exception;
89
use Interop\Queue\Exception\InvalidMessageException;
910
use Interop\Queue\Message;
1011
use Interop\Queue\Queue;
1112
use Stomp\Client;
13+
use Stomp\Exception\ErrorFrameException;
1214
use Stomp\Transport\Frame;
1315

1416
class StompConsumer implements Consumer
@@ -96,16 +98,20 @@ public function receive(int $timeout = 0): ?Message
9698
{
9799
$this->subscribe();
98100

99-
if (0 === $timeout) {
100-
while (true) {
101-
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, 100)) {
101+
try {
102+
if (0 === $timeout) {
103+
while (true) {
104+
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, 100)) {
105+
return $this->convertMessage($message);
106+
}
107+
}
108+
} else {
109+
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, $timeout)) {
102110
return $this->convertMessage($message);
103111
}
104112
}
105-
} else {
106-
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, $timeout)) {
107-
return $this->convertMessage($message);
108-
}
113+
} catch (ErrorFrameException $e) {
114+
throw new Exception($e->getMessage()."\n".$e->getFrame()->getBody(), null, $e);
109115
}
110116

111117
return null;
@@ -143,10 +149,11 @@ public function reject(Message $message, bool $requeue = false): void
143149

144150
$nackFrame = $this->stomp->getProtocol()->getNackFrame($message->getFrame());
145151

146-
// rabbitmq STOMP protocol extension
147-
$nackFrame->addHeaders([
148-
'requeue' => $requeue ? 'true' : 'false',
149-
]);
152+
if (ExtensionType::RABBITMQ === $this->queue->getExtensionType()) {
153+
$nackFrame->addHeaders([
154+
'requeue' => $requeue ? 'true' : 'false',
155+
]);
156+
}
150157

151158
$this->stomp->sendFrame($nackFrame);
152159
}
@@ -168,13 +175,28 @@ private function subscribe(): void
168175
$this->ackMode
169176
);
170177

171-
// rabbitmq STOMP protocol extension
172178
$headers = $this->queue->getHeaders();
173-
$headers['prefetch-count'] = $this->prefetchCount;
174-
$headers = StompHeadersEncoder::encode($headers);
175179

176-
foreach ($headers as $key => $value) {
177-
$frame[$key] = $value;
180+
if (ExtensionType::RABBITMQ === $this->queue->getExtensionType()) {
181+
$headers['prefetch-count'] = $this->prefetchCount;
182+
$headers = StompHeadersEncoder::encode($headers);
183+
184+
foreach ($headers as $key => $value) {
185+
$frame[$key] = $value;
186+
}
187+
} elseif (ExtensionType::ARTEMIS === $this->queue->getExtensionType()) {
188+
$subscriptionName = $this->subscriptionId.'-'.$this->queue->getStompName();
189+
190+
$artemisHeaders = [];
191+
192+
$artemisHeaders['client-id'] = true ? $this->subscriptionId : null;
193+
$artemisHeaders['durable-subscription-name'] = true ? $subscriptionName : null;
194+
195+
$artemisHeaders = StompHeadersEncoder::encode(array_filter($artemisHeaders));
196+
197+
foreach ($artemisHeaders as $key => $value) {
198+
$frame[$key] = $value;
199+
}
178200
}
179201

180202
$this->stomp->sendFrame($frame);

‎pkg/stomp/StompContext.php‎

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ class StompContext implements Context
2323
*/
2424
private $stomp;
2525

26+
/**
27+
* @var string
28+
*/
29+
private $extensionType;
30+
2631
/**
2732
* @var bool
2833
*/
@@ -35,9 +40,8 @@ class StompContext implements Context
3540

3641
/**
3742
* @param BufferedStompClient|callable $stomp
38-
* @param bool $useExchangePrefix
3943
*/
40-
public function __construct($stomp, $useExchangePrefix = true)
44+
public function __construct($stomp, string$extensionType)
4145
{
4246
if ($stomp instanceof BufferedStompClient) {
4347
$this->stomp = $stomp;
@@ -47,7 +51,8 @@ public function __construct($stomp, $useExchangePrefix = true)
4751
throw new \InvalidArgumentException('The stomp argument must be either BufferedStompClient or callable that return BufferedStompClient.');
4852
}
4953

50-
$this->useExchangePrefix = $useExchangePrefix;
54+
$this->extensionType = $extensionType;
55+
$this->useExchangePrefix = ExtensionType::RABBITMQ === $extensionType;
5156
}
5257

5358
/**
@@ -64,7 +69,7 @@ public function createMessage(string $body = '', array $properties = [], array $
6469
public function createQueue(string $name): Queue
6570
{
6671
if (0 !== strpos($name, '/')) {
67-
$destination = new StompDestination();
72+
$destination = new StompDestination($this->extensionType);
6873
$destination->setType(StompDestination::TYPE_QUEUE);
6974
$destination->setStompName($name);
7075

@@ -91,7 +96,7 @@ public function createTemporaryQueue(): Queue
9196
public function createTopic(string $name): Topic
9297
{
9398
if (0 !== strpos($name, '/')) {
94-
$destination = new StompDestination();
99+
$destination = new StompDestination($this->extensionType);
95100
$destination->setType($this->useExchangePrefix ? StompDestination::TYPE_EXCHANGE : StompDestination::TYPE_TOPIC);
96101
$destination->setStompName($name);
97102

@@ -151,7 +156,7 @@ public function createDestination(string $destination): StompDestination
151156
$routingKey = $pieces[1];
152157
}
153158

154-
$destination = new StompDestination();
159+
$destination = new StompDestination($this->extensionType);
155160
$destination->setType($type);
156161
$destination->setStompName($name);
157162
$destination->setRoutingKey($routingKey);
@@ -199,10 +204,7 @@ public function getStomp(): BufferedStompClient
199204
if (false == $this->stomp) {
200205
$stomp = call_user_func($this->stompFactory);
201206
if (false == $stomp instanceof BufferedStompClient) {
202-
throw new \LogicException(sprintf(
203-
'The factory must return instance of BufferedStompClient. It returns %s',
204-
is_object($stomp) ? get_class($stomp) : gettype($stomp)
205-
));
207+
throw new \LogicException(sprintf('The factory must return instance of BufferedStompClient. It returns %s', is_object($stomp) ? get_class($stomp) : gettype($stomp)));
206208
}
207209

208210
$this->stomp = $stomp;

‎pkg/stomp/StompDestination.php‎

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,25 @@ class StompDestination implements Topic, Queue
3939
* @var array
4040
*/
4141
private $headers;
42+
/**
43+
* @var string
44+
*/
45+
private $extensionType;
4246

43-
public function __construct()
47+
public function __construct(string$extensionType)
4448
{
4549
$this->headers = [
4650
self::HEADER_DURABLE => false,
4751
self::HEADER_AUTO_DELETE => true,
4852
self::HEADER_EXCLUSIVE => false,
4953
];
54+
55+
$this->extensionType = $extensionType;
56+
}
57+
58+
public function getExtensionType(): string
59+
{
60+
return $this->extensionType;
5061
}
5162

5263
public function getStompName(): string
@@ -65,6 +76,10 @@ public function getQueueName(): string
6576
throw new \LogicException('Destination name is not set');
6677
}
6778

79+
if (ExtensionType::ARTEMIS === $this->extensionType) {
80+
return $this->getStompName();
81+
}
82+
6883
$name = '/'.$this->getType().'/'.$this->getStompName();
6984

7085
if ($this->getRoutingKey()) {

‎pkg/stomp/Tests/StompConsumerTest.php‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Stomp\Tests;
44

55
use Enqueue\Stomp\BufferedStompClient;
6+
use Enqueue\Stomp\ExtensionType;
67
use Enqueue\Stomp\StompConsumer;
78
use Enqueue\Stomp\StompDestination;
89
use Enqueue\Stomp\StompMessage;
@@ -557,7 +558,7 @@ private function createStompClientMock()
557558

558559
private function createDummyDestination(): StompDestination
559560
{
560-
$destination = new StompDestination();
561+
$destination = new StompDestination(ExtensionType::RABBITMQ);
561562
$destination->setStompName('aName');
562563
$destination->setType(StompDestination::TYPE_QUEUE);
563564

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /