Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Add first pass for Apache ActiveMQ Artemis support #1091

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
makasim merged 6 commits into php-enqueue:master from atrauzzi:feature/stomp-artemis-support
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions pkg/enqueue/Tests/Client/Driver/RabbitMqStompDriverTest.php
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Enqueue\Client\MessagePriority;
use Enqueue\Client\Route;
use Enqueue\Client\RouteCollection;
use Enqueue\Stomp\ExtensionType;
use Enqueue\Stomp\StompContext;
use Enqueue\Stomp\StompDestination;
use Enqueue\Stomp\StompMessage;
Expand Down Expand Up @@ -47,7 +48,7 @@ public function testShouldBeSubClassOfStompDriver()

public function testShouldCreateAndReturnStompQueueInstance()
{
$expectedQueue = new StompDestination();
$expectedQueue = new StompDestination(ExtensionType::RABBITMQ);

$context = $this->createContextMock();
$context
Expand Down Expand Up @@ -185,10 +186,10 @@ public function testShouldInitDeliveryDelayIfDelayPropertyOnSendToProcessor()

public function shouldSendMessageToDelayExchangeIfDelaySet()
{
$queue = new StompDestination();
$queue = new StompDestination(ExtensionType::RABBITMQ);
$queue->setStompName('queueName');

$delayTopic = new StompDestination();
$delayTopic = new StompDestination(ExtensionType::RABBITMQ);
$delayTopic->setStompName('delayTopic');

$transportMessage = new StompMessage();
Expand Down Expand Up @@ -339,7 +340,7 @@ public function testShouldSetupBroker()
->expects($this->any())
->method('createQueue')
->willReturnCallback(function (string $name) {
$destination = new StompDestination();
$destination = new StompDestination(ExtensionType::RABBITMQ);
$destination->setType(StompDestination::TYPE_QUEUE);
$destination->setStompName($name);

Expand Down Expand Up @@ -431,7 +432,7 @@ public function testSetupBrokerShouldCreateDelayExchangeIfEnabled()
->expects($this->any())
->method('createQueue')
->willReturnCallback(function (string $name) {
$destination = new StompDestination();
$destination = new StompDestination(ExtensionType::RABBITMQ);
$destination->setType(StompDestination::TYPE_QUEUE);
$destination->setStompName($name);

Expand All @@ -442,7 +443,7 @@ public function testSetupBrokerShouldCreateDelayExchangeIfEnabled()
->expects($this->any())
->method('createTopic')
->willReturnCallback(function (string $name) {
$destination = new StompDestination();
$destination = new StompDestination(ExtensionType::RABBITMQ);
$destination->setType(StompDestination::TYPE_TOPIC);
$destination->setStompName($name);

Expand Down Expand Up @@ -503,7 +504,7 @@ protected function createProducerMock(): InteropProducer
*/
protected function createQueue(string $name): InteropQueue
{
$destination = new StompDestination();
$destination = new StompDestination(ExtensionType::RABBITMQ);
$destination->setType(StompDestination::TYPE_QUEUE);
$destination->setStompName($name);

Expand All @@ -515,7 +516,7 @@ protected function createQueue(string $name): InteropQueue
*/
protected function createTopic(string $name): InteropTopic
{
$destination = new StompDestination();
$destination = new StompDestination(ExtensionType::RABBITMQ);
$destination->setType(StompDestination::TYPE_TOPIC);
$destination->setStompName($name);

Expand Down
5 changes: 3 additions & 2 deletions pkg/enqueue/Tests/Client/Driver/StompDriverTest.php
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Enqueue\Client\Message;
use Enqueue\Client\MessagePriority;
use Enqueue\Client\RouteCollection;
use Enqueue\Stomp\ExtensionType;
use Enqueue\Stomp\StompContext;
use Enqueue\Stomp\StompDestination;
use Enqueue\Stomp\StompMessage;
Expand Down Expand Up @@ -127,7 +128,7 @@ protected function createProducerMock(): InteropProducer
*/
protected function createQueue(string $name): InteropQueue
{
$destination = new StompDestination();
$destination = new StompDestination(ExtensionType::RABBITMQ);
$destination->setType(StompDestination::TYPE_QUEUE);
$destination->setStompName($name);

Expand All @@ -139,7 +140,7 @@ protected function createQueue(string $name): InteropQueue
*/
protected function createTopic(string $name): InteropTopic
{
$destination = new StompDestination();
$destination = new StompDestination(ExtensionType::RABBITMQ);
$destination->setType(StompDestination::TYPE_TOPIC);
$destination->setStompName($name);

Expand Down
12 changes: 12 additions & 0 deletions pkg/stomp/ExtensionType.php
View file Open in desktop
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Enqueue\Stomp;

class ExtensionType
{
const ACTIVEMQ = 'activemq';
const RABBITMQ = 'rabbitmq';
const ARTEMIS = 'artemis';
}
27 changes: 15 additions & 12 deletions pkg/stomp/StompConnectionFactory.php
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@

class StompConnectionFactory implements ConnectionFactory
{
const SCHEME_EXT_ACTIVEMQ = 'activemq';
const SCHEME_EXT_RABBITMQ = 'rabbitmq';
const SUPPORTED_SCHEMES = [
ExtensionType::ACTIVEMQ,
ExtensionType::RABBITMQ,
ExtensionType::ARTEMIS,
];

/**
* @var array
Expand Down Expand Up @@ -71,15 +74,14 @@ public function __construct($config = 'stomp:')
*/
public function createContext(): Context
{
$useExchangePrefix = self::SCHEME_EXT_RABBITMQ === $this->config['target'] ? true : false;

if ($this->config['lazy']) {
return new StompContext(function () {
return $this->establishConnection();
}, $useExchangePrefix);
return new StompContext(
function () { return $this->establishConnection(); },
$this->config['target']
);
}

return new StompContext($this->establishConnection(), $useExchangePrefix);
return new StompContext($this->establishConnection(), $this->config['target']);
}

private function establishConnection(): BufferedStompClient
Expand Down Expand Up @@ -123,10 +125,11 @@ private function parseDsn(string $dsn): array

$schemeExtension = current($dsn->getSchemeExtensions());
if (false === $schemeExtension) {
$schemeExtension = self::SCHEME_EXT_RABBITMQ;
$schemeExtension = ExtensionType::RABBITMQ;
}
if (self::SCHEME_EXT_ACTIVEMQ !== $schemeExtension && self::SCHEME_EXT_RABBITMQ !== $schemeExtension) {
throw new \LogicException(sprintf('The given DSN is not supported. The scheme extension "%s" provided is invalid. It must be one of "%s" or "%s".', $schemeExtension, self::SCHEME_EXT_ACTIVEMQ, self::SCHEME_EXT_RABBITMQ));

if (false === in_array($schemeExtension, self::SUPPORTED_SCHEMES, true)) {
throw new \LogicException(sprintf('The given DSN is not supported. The scheme extension "%s" provided is not supported. It must be one of %s.', $schemeExtension, implode(', ', self::SUPPORTED_SCHEMES)));
}

return array_filter(array_replace($dsn->getQuery(), [
Expand All @@ -151,7 +154,7 @@ private function parseDsn(string $dsn): array
private function defaultConfig(): array
{
return [
'target' => self::SCHEME_EXT_RABBITMQ,
'target' => ExtensionType::RABBITMQ,
'host' => 'localhost',
'port' => 61613,
'login' => 'guest',
Expand Down
54 changes: 38 additions & 16 deletions pkg/stomp/StompConsumer.php
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
namespace Enqueue\Stomp;

use Interop\Queue\Consumer;
use Interop\Queue\Exception\Exception;
use Interop\Queue\Exception\InvalidMessageException;
use Interop\Queue\Message;
use Interop\Queue\Queue;
use Stomp\Client;
use Stomp\Exception\ErrorFrameException;
use Stomp\Transport\Frame;

class StompConsumer implements Consumer
Expand Down Expand Up @@ -96,16 +98,20 @@ public function receive(int $timeout = 0): ?Message
{
$this->subscribe();

if (0 === $timeout) {
while (true) {
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, 100)) {
try {
if (0 === $timeout) {
while (true) {
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, 100)) {
return $this->convertMessage($message);
}
}
} else {
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, $timeout)) {
return $this->convertMessage($message);
}
}
} else {
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, $timeout)) {
return $this->convertMessage($message);
}
} catch (ErrorFrameException $e) {
throw new Exception($e->getMessage()."\n".$e->getFrame()->getBody(), null, $e);
}

return null;
Expand Down Expand Up @@ -143,10 +149,11 @@ public function reject(Message $message, bool $requeue = false): void

$nackFrame = $this->stomp->getProtocol()->getNackFrame($message->getFrame());

// rabbitmq STOMP protocol extension
$nackFrame->addHeaders([
'requeue' => $requeue ? 'true' : 'false',
]);
if (ExtensionType::RABBITMQ === $this->queue->getExtensionType()) {
$nackFrame->addHeaders([
'requeue' => $requeue ? 'true' : 'false',
]);
}

$this->stomp->sendFrame($nackFrame);
}
Expand All @@ -168,13 +175,28 @@ private function subscribe(): void
$this->ackMode
);

// rabbitmq STOMP protocol extension
$headers = $this->queue->getHeaders();
$headers['prefetch-count'] = $this->prefetchCount;
$headers = StompHeadersEncoder::encode($headers);

foreach ($headers as $key => $value) {
$frame[$key] = $value;
if (ExtensionType::RABBITMQ === $this->queue->getExtensionType()) {
$headers['prefetch-count'] = $this->prefetchCount;
$headers = StompHeadersEncoder::encode($headers);

foreach ($headers as $key => $value) {
$frame[$key] = $value;
}
} elseif (ExtensionType::ARTEMIS === $this->queue->getExtensionType()) {
$subscriptionName = $this->subscriptionId.'-'.$this->queue->getStompName();

$artemisHeaders = [];

$artemisHeaders['client-id'] = true ? $this->subscriptionId : null;
$artemisHeaders['durable-subscription-name'] = true ? $subscriptionName : null;

$artemisHeaders = StompHeadersEncoder::encode(array_filter($artemisHeaders));

foreach ($artemisHeaders as $key => $value) {
$frame[$key] = $value;
}
}

$this->stomp->sendFrame($frame);
Expand Down
22 changes: 12 additions & 10 deletions pkg/stomp/StompContext.php
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ class StompContext implements Context
*/
private $stomp;

/**
* @var string
*/
private $extensionType;

/**
* @var bool
*/
Expand All @@ -35,9 +40,8 @@ class StompContext implements Context

/**
* @param BufferedStompClient|callable $stomp
* @param bool $useExchangePrefix
*/
public function __construct($stomp, $useExchangePrefix = true)
public function __construct($stomp, string $extensionType)
{
if ($stomp instanceof BufferedStompClient) {
$this->stomp = $stomp;
Expand All @@ -47,7 +51,8 @@ public function __construct($stomp, $useExchangePrefix = true)
throw new \InvalidArgumentException('The stomp argument must be either BufferedStompClient or callable that return BufferedStompClient.');
}

$this->useExchangePrefix = $useExchangePrefix;
$this->extensionType = $extensionType;
$this->useExchangePrefix = ExtensionType::RABBITMQ === $extensionType;
}

/**
Expand All @@ -64,7 +69,7 @@ public function createMessage(string $body = '', array $properties = [], array $
public function createQueue(string $name): Queue
{
if (0 !== strpos($name, '/')) {
$destination = new StompDestination();
$destination = new StompDestination($this->extensionType);
$destination->setType(StompDestination::TYPE_QUEUE);
$destination->setStompName($name);

Expand All @@ -91,7 +96,7 @@ public function createTemporaryQueue(): Queue
public function createTopic(string $name): Topic
{
if (0 !== strpos($name, '/')) {
$destination = new StompDestination();
$destination = new StompDestination($this->extensionType);
$destination->setType($this->useExchangePrefix ? StompDestination::TYPE_EXCHANGE : StompDestination::TYPE_TOPIC);
$destination->setStompName($name);

Expand Down Expand Up @@ -151,7 +156,7 @@ public function createDestination(string $destination): StompDestination
$routingKey = $pieces[1];
}

$destination = new StompDestination();
$destination = new StompDestination($this->extensionType);
$destination->setType($type);
$destination->setStompName($name);
$destination->setRoutingKey($routingKey);
Expand Down Expand Up @@ -199,10 +204,7 @@ public function getStomp(): BufferedStompClient
if (false == $this->stomp) {
$stomp = call_user_func($this->stompFactory);
if (false == $stomp instanceof BufferedStompClient) {
throw new \LogicException(sprintf(
'The factory must return instance of BufferedStompClient. It returns %s',
is_object($stomp) ? get_class($stomp) : gettype($stomp)
));
throw new \LogicException(sprintf('The factory must return instance of BufferedStompClient. It returns %s', is_object($stomp) ? get_class($stomp) : gettype($stomp)));
}

$this->stomp = $stomp;
Expand Down
17 changes: 16 additions & 1 deletion pkg/stomp/StompDestination.php
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,25 @@ class StompDestination implements Topic, Queue
* @var array
*/
private $headers;
/**
* @var string
*/
private $extensionType;

public function __construct()
public function __construct(string $extensionType)
{
$this->headers = [
self::HEADER_DURABLE => false,
self::HEADER_AUTO_DELETE => true,
self::HEADER_EXCLUSIVE => false,
];

$this->extensionType = $extensionType;
}

public function getExtensionType(): string
{
return $this->extensionType;
}

public function getStompName(): string
Expand All @@ -65,6 +76,10 @@ public function getQueueName(): string
throw new \LogicException('Destination name is not set');
}

if (ExtensionType::ARTEMIS === $this->extensionType) {
return $this->getStompName();
}

$name = '/'.$this->getType().'/'.$this->getStompName();

if ($this->getRoutingKey()) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/stomp/Tests/StompConsumerTest.php
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Enqueue\Stomp\Tests;

use Enqueue\Stomp\BufferedStompClient;
use Enqueue\Stomp\ExtensionType;
use Enqueue\Stomp\StompConsumer;
use Enqueue\Stomp\StompDestination;
use Enqueue\Stomp\StompMessage;
Expand Down Expand Up @@ -557,7 +558,7 @@ private function createStompClientMock()

private function createDummyDestination(): StompDestination
{
$destination = new StompDestination();
$destination = new StompDestination(ExtensionType::RABBITMQ);
$destination->setStompName('aName');
$destination->setType(StompDestination::TYPE_QUEUE);

Expand Down
Loading

AltStyle によって変換されたページ (->オリジナル) /