Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 272c2bc

Browse files
authored
Merge pull request #1099 from atrauzzi/fix/dead-non-consumer-connections
Fix - Add automatic reconnect support for STOMP producers
2 parents b7d5f9e + b46a945 commit 272c2bc

File tree

5 files changed

+47
-16
lines changed

5 files changed

+47
-16
lines changed

‎.gitignore‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ bin/jp.php
99
bin/php-parse
1010
bin/google-cloud-batch
1111
bin/thruway
12+
bin/phpstan.phar
13+
bin/var-dump-server
14+
bin/yaml-lint
1215
vendor
1316
var
1417
.php_cs
1518
.php_cs.cache
16-
composer.lock
19+
composer.lock

‎pkg/stomp/StompConnectionFactory.php‎

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ public function __construct($config = 'stomp:')
7474
*/
7575
public function createContext(): Context
7676
{
77-
if ($this->config['lazy']) {
78-
returnnewStompContext(
79-
function () { return$this->establishConnection(); },
80-
$this->config['target']
81-
);
82-
}
77+
$stomp = $this->config['lazy']
78+
? function () { return$this->establishConnection(); }
79+
: $this->establishConnection();
80+
81+
$target = $this->config['target'];
82+
$detectTransientConnections = (bool) $this->config['detect_transient_connections'];
8383

84-
return new StompContext($this->establishConnection(), $this->config['target']);
84+
return new StompContext($stomp, $target, $detectTransientConnections);
8585
}
8686

8787
private function establishConnection(): BufferedStompClient
@@ -169,6 +169,7 @@ private function defaultConfig(): array
169169
'read_timeout' => 60,
170170
'send_heartbeat' => 0,
171171
'receive_heartbeat' => 0,
172+
'detect_transient_connections' => false,
172173
];
173174
}
174175
}

‎pkg/stomp/StompContext.php‎

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,15 @@ class StompContext implements Context
3838
*/
3939
private $stompFactory;
4040

41+
/**
42+
* @var bool
43+
*/
44+
private $transient;
45+
4146
/**
4247
* @param BufferedStompClient|callable $stomp
4348
*/
44-
public function __construct($stomp, string $extensionType)
49+
public function __construct($stomp, string $extensionType, bool$detectTransientConnections = false)
4550
{
4651
if ($stomp instanceof BufferedStompClient) {
4752
$this->stomp = $stomp;
@@ -53,6 +58,7 @@ public function __construct($stomp, string $extensionType)
5358

5459
$this->extensionType = $extensionType;
5560
$this->useExchangePrefix = ExtensionType::RABBITMQ === $extensionType;
61+
$this->transient = $detectTransientConnections;
5662
}
5763

5864
/**
@@ -173,6 +179,8 @@ public function createConsumer(Destination $destination): Consumer
173179
{
174180
InvalidDestinationException::assertDestinationInstanceOf($destination, StompDestination::class);
175181

182+
$this->transient = false;
183+
176184
return new StompConsumer($this->getStomp(), $destination);
177185
}
178186

@@ -181,6 +189,10 @@ public function createConsumer(Destination $destination): Consumer
181189
*/
182190
public function createProducer(): Producer
183191
{
192+
if ($this->transient && $this->stomp) {
193+
$this->stomp->disconnect();
194+
}
195+
184196
return new StompProducer($this->getStomp());
185197
}
186198

@@ -202,14 +214,20 @@ public function purgeQueue(Queue $queue): void
202214
public function getStomp(): BufferedStompClient
203215
{
204216
if (false == $this->stomp) {
205-
$stomp = call_user_func($this->stompFactory);
206-
if (false == $stomp instanceof BufferedStompClient) {
207-
throw new \LogicException(sprintf('The factory must return instance of BufferedStompClient. It returns %s', is_object($stomp) ? get_class($stomp) : gettype($stomp)));
208-
}
209-
210-
$this->stomp = $stomp;
217+
$this->stomp = $this->createStomp();
211218
}
212219

213220
return $this->stomp;
214221
}
222+
223+
private function createStomp(): BufferedStompClient
224+
{
225+
$stomp = call_user_func($this->stompFactory);
226+
227+
if (false == $stomp instanceof BufferedStompClient) {
228+
throw new \LogicException(sprintf('The factory must return instance of BufferedStompClient. It returns %s', is_object($stomp) ? get_class($stomp) : gettype($stomp)));
229+
}
230+
231+
return $stomp;
232+
}
215233
}

‎pkg/stomp/Tests/StompConnectionFactoryConfigTest.php‎

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public static function provideConfigs()
7070
'read_timeout' => 60,
7171
'send_heartbeat' => 0,
7272
'receive_heartbeat' => 0,
73+
'detect_transient_connections' => false,
7374
],
7475
];
7576

@@ -91,6 +92,7 @@ public static function provideConfigs()
9192
'read_timeout' => 60,
9293
'send_heartbeat' => 0,
9394
'receive_heartbeat' => 0,
95+
'detect_transient_connections' => false,
9496
],
9597
];
9698

@@ -112,6 +114,7 @@ public static function provideConfigs()
112114
'read_timeout' => 60,
113115
'send_heartbeat' => 0,
114116
'receive_heartbeat' => 0,
117+
'detect_transient_connections' => false,
115118
],
116119
];
117120

@@ -134,6 +137,7 @@ public static function provideConfigs()
134137
'read_timeout' => 60,
135138
'send_heartbeat' => 0,
136139
'receive_heartbeat' => 0,
140+
'detect_transient_connections' => false,
137141
],
138142
];
139143

@@ -156,6 +160,7 @@ public static function provideConfigs()
156160
'read_timeout' => 60,
157161
'send_heartbeat' => 0,
158162
'receive_heartbeat' => 0,
163+
'detect_transient_connections' => false,
159164
],
160165
];
161166

@@ -178,6 +183,7 @@ public static function provideConfigs()
178183
'read_timeout' => 60,
179184
'send_heartbeat' => 0,
180185
'receive_heartbeat' => 0,
186+
'detect_transient_connections' => false,
181187
],
182188
];
183189

@@ -201,6 +207,7 @@ public static function provideConfigs()
201207
'read_timeout' => 60,
202208
'send_heartbeat' => 0,
203209
'receive_heartbeat' => 0,
210+
'detect_transient_connections' => false,
204211
],
205212
];
206213

@@ -222,6 +229,7 @@ public static function provideConfigs()
222229
'read_timeout' => 60,
223230
'send_heartbeat' => 0,
224231
'receive_heartbeat' => 0,
232+
'detect_transient_connections' => false,
225233
],
226234
];
227235

@@ -244,6 +252,7 @@ public static function provideConfigs()
244252
'read_timeout' => 60,
245253
'send_heartbeat' => 0,
246254
'receive_heartbeat' => 0,
255+
'detect_transient_connections' => false,
247256
],
248257
];
249258
}

‎pkg/stomp/Tests/StompContextTest.php‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public function testShouldCloseConnections()
144144
{
145145
$client = $this->createStompClientMock();
146146
$client
147-
->expects($this->once())
147+
->expects($this->atLeastOnce())
148148
->method('disconnect')
149149
;
150150

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /