Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Fix - Add automatic reconnect support for STOMP producers #1099

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
makasim merged 11 commits into php-enqueue:master from atrauzzi:fix/dead-non-consumer-connections
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
11 commits
Select commit Hold shift + click to select a range
b09cecb
Add automatic reconnect support for STOMP producers.
atrauzzi Sep 29, 2020
bfd08b1
Not sure why lock got ignored...
atrauzzi Sep 29, 2020
b4afec2
Do we not want this?
atrauzzi Sep 30, 2020
2a0545c
Fix for stomp-php - Use one connection for each message.
atrauzzi Sep 30, 2020
a439f00
Unfortunately, can't use this as it will break actual long-lived conn...
atrauzzi Sep 30, 2020
cd3c765
Allow connections that have no consumer to be considered 'transient'.
atrauzzi Oct 1, 2020
905b3e8
Revert StompProducer
atrauzzi Oct 1, 2020
327ca4d
Amend test.
atrauzzi Oct 1, 2020
36d4e01
Allow the detection of transient connections to be configured.
atrauzzi Oct 8, 2020
2e1d06d
Add default configuration for transient connection detection.
atrauzzi Oct 8, 2020
b46a945
Update configuration for tests.
atrauzzi Oct 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ bin/jp.php
bin/php-parse
bin/google-cloud-batch
bin/thruway
bin/phpstan.phar
bin/var-dump-server
bin/yaml-lint
vendor
var
.php_cs
.php_cs.cache
composer.lock
composer.lock
15 changes: 8 additions & 7 deletions pkg/stomp/StompConnectionFactory.php
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ public function __construct($config = 'stomp:')
*/
public function createContext(): Context
{
if ($this->config['lazy']) {
return new StompContext(
function () { return $this->establishConnection(); },
$this->config['target']
);
}
$stomp = $this->config['lazy']
? function () { return $this->establishConnection(); }
: $this->establishConnection();
Copy link
Contributor Author

@atrauzzi atrauzzi Oct 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally I indent lines like 79, but the code style suggested this, apologies!


$target = $this->config['target'];
$detectTransientConnections = (bool) $this->config['detect_transient_connections'];

return new StompContext($this->establishConnection(), $this->config['target']);
return new StompContext($stomp, $target, $detectTransientConnections);
}

private function establishConnection(): BufferedStompClient
Expand Down Expand Up @@ -169,6 +169,7 @@ private function defaultConfig(): array
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
];
}
}
32 changes: 25 additions & 7 deletions pkg/stomp/StompContext.php
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ class StompContext implements Context
*/
private $stompFactory;

/**
* @var bool
*/
private $transient;

/**
* @param BufferedStompClient|callable $stomp
*/
public function __construct($stomp, string $extensionType)
public function __construct($stomp, string $extensionType, bool $detectTransientConnections = false)
Copy link
Contributor Author

@atrauzzi atrauzzi Oct 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is where we default to "off".

{
if ($stomp instanceof BufferedStompClient) {
$this->stomp = $stomp;
Expand All @@ -53,6 +58,7 @@ public function __construct($stomp, string $extensionType)

$this->extensionType = $extensionType;
$this->useExchangePrefix = ExtensionType::RABBITMQ === $extensionType;
$this->transient = $detectTransientConnections;
}

/**
Expand Down Expand Up @@ -173,6 +179,8 @@ public function createConsumer(Destination $destination): Consumer
{
InvalidDestinationException::assertDestinationInstanceOf($destination, StompDestination::class);

$this->transient = false;
Copy link
Contributor Author

@atrauzzi atrauzzi Oct 1, 2020
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as the first consumer is created against a context, we convert to long-lived and disable all transient functionality so that queues with heartbeats don't lose their connections while heartbeating.


return new StompConsumer($this->getStomp(), $destination);
}

Expand All @@ -181,6 +189,10 @@ public function createConsumer(Destination $destination): Consumer
*/
public function createProducer(): Producer
{
if ($this->transient && $this->stomp) {
$this->stomp->disconnect();
}

return new StompProducer($this->getStomp());
}

Expand All @@ -202,14 +214,20 @@ public function purgeQueue(Queue $queue): void
public function getStomp(): BufferedStompClient
{
if (false == $this->stomp) {
$stomp = call_user_func($this->stompFactory);
if (false == $stomp instanceof BufferedStompClient) {
throw new \LogicException(sprintf('The factory must return instance of BufferedStompClient. It returns %s', is_object($stomp) ? get_class($stomp) : gettype($stomp)));
}

$this->stomp = $stomp;
$this->stomp = $this->createStomp();
}

return $this->stomp;
}

private function createStomp(): BufferedStompClient
{
$stomp = call_user_func($this->stompFactory);

if (false == $stomp instanceof BufferedStompClient) {
throw new \LogicException(sprintf('The factory must return instance of BufferedStompClient. It returns %s', is_object($stomp) ? get_class($stomp) : gettype($stomp)));
}

return $stomp;
}
Comment on lines +223 to +232
Copy link
Contributor Author

@atrauzzi atrauzzi Oct 1, 2020
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hope you don't mind, I moved this out. 😄

}
9 changes: 9 additions & 0 deletions pkg/stomp/Tests/StompConnectionFactoryConfigTest.php
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -91,6 +92,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -112,6 +114,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -134,6 +137,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -156,6 +160,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -178,6 +183,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -201,6 +207,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -222,6 +229,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];

Expand All @@ -244,6 +252,7 @@ public static function provideConfigs()
'read_timeout' => 60,
'send_heartbeat' => 0,
'receive_heartbeat' => 0,
'detect_transient_connections' => false,
],
];
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stomp/Tests/StompContextTest.php
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public function testShouldCloseConnections()
{
$client = $this->createStompClientMock();
$client
->expects($this->once())
->expects($this->atLeastOnce())
->method('disconnect')
;

Expand Down

AltStyle によって変換されたページ (->オリジナル) /