Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 01a9aef

Browse files
committed
add files
0 parents commit 01a9aef

File tree

6 files changed

+314
-0
lines changed

6 files changed

+314
-0
lines changed

‎PhpFpmConnectionFactory.php‎

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
namespace Makasim\PhpFpm;
3+
4+
use Enqueue\Dsn\Dsn;
5+
use hollodotme\FastCGI\Client;
6+
use hollodotme\FastCGI\SocketConnections\NetworkSocket;
7+
use hollodotme\FastCGI\SocketConnections\UnixDomainSocket;
8+
use Interop\Queue\ConnectionFactory;
9+
use Interop\Queue\Context;
10+
11+
class PhpFpmConnectionFactory implements ConnectionFactory
12+
{
13+
/**
14+
* @var string
15+
*/
16+
private $dsn;
17+
18+
public function __construct(string $dsn)
19+
{
20+
$this->dsn = $dsn;
21+
}
22+
23+
public function createContext(): Context
24+
{
25+
$dsn = new Dsn($this->dsn);
26+
if ('unix' == $dsn->getSchemeProtocol()) {
27+
$socket = new UnixDomainSocket($dsn->getPath());
28+
} else if ('tcp' == $dsn->getSchemeProtocol()) {
29+
$socket = new NetworkSocket($dsn->getHost(), $dsn->getPort());
30+
} else {
31+
throw new \LogicException('Protocol is not supported');
32+
}
33+
34+
return new PhpFpmContext(new Client($socket));
35+
}
36+
}

‎PhpFpmConsumer.php‎

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
namespace Makasim\PhpFpm;
3+
4+
use Interop\Queue\Consumer;
5+
use Interop\Queue\Message;
6+
use Interop\Queue\Queue;
7+
8+
class PhpFpmConsumer implements Consumer
9+
{
10+
/**
11+
* @var PhpFpmDestination
12+
*/
13+
private $destination;
14+
15+
public function __construct(PhpFpmDestination $destination)
16+
{
17+
$this->destination = $destination;
18+
}
19+
20+
public function getQueue(): Queue
21+
{
22+
return $this->destination;
23+
}
24+
25+
public function receive(int $timeout = 0): ?Message
26+
{
27+
return $this->receiveNoWait();
28+
}
29+
30+
public function receiveNoWait(): ?Message
31+
{
32+
if ($_POST['message']) {
33+
return PhpFpmMessage::jsonUnserialize($_POST['message']);
34+
}
35+
36+
return null;
37+
}
38+
39+
public function acknowledge(Message $message): void
40+
{
41+
}
42+
43+
public function reject(Message $message, bool $requeue = false): void
44+
{
45+
}
46+
}

‎PhpFpmContext.php‎

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?php
2+
namespace Makasim\PhpFpm;
3+
4+
use hollodotme\FastCGI\Client;
5+
use Interop\Queue\ConnectionFactory;
6+
use Interop\Queue\Consumer;
7+
use Interop\Queue\Context;
8+
use Interop\Queue\Destination;
9+
use Interop\Queue\Exception\PurgeQueueNotSupportedException;
10+
use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException;
11+
use Interop\Queue\Exception\TemporaryQueueNotSupportedException;
12+
use Interop\Queue\Message;
13+
use Interop\Queue\Producer;
14+
use Interop\Queue\Queue;
15+
use Interop\Queue\SubscriptionConsumer;
16+
use Interop\Queue\Topic;
17+
18+
class PhpFpmContext implements Context
19+
{
20+
/**
21+
* @var Client
22+
*/
23+
private $cgiClient;
24+
25+
public function __construct(Client $cgiClient)
26+
{
27+
$this->cgiClient = $cgiClient;
28+
}
29+
30+
public function createMessage(string $body = '', array $properties = [], array $headers = []): Message
31+
{
32+
return new PhpFpmMessage($body, $properties, $headers);
33+
}
34+
35+
public function createTopic(string $topicName): Topic
36+
{
37+
return new PhpFpmDestination($topicName);
38+
}
39+
40+
public function createQueue(string $queueName): Queue
41+
{
42+
return new PhpFpmDestination($queueName);
43+
}
44+
45+
public function createProducer(): Producer
46+
{
47+
return new PhpFpmProducer($this->cgiClient);
48+
}
49+
50+
public function createConsumer(Destination $destination): Consumer
51+
{
52+
return new PhpFpmConsumer($destination);
53+
}
54+
55+
public function close(): void
56+
{
57+
}
58+
59+
public function createTemporaryQueue(): Queue
60+
{
61+
throw TemporaryQueueNotSupportedException::providerDoestNotSupportIt();
62+
}
63+
64+
public function createSubscriptionConsumer(): SubscriptionConsumer
65+
{
66+
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
67+
}
68+
69+
public function purgeQueue(Queue $queue): void
70+
{
71+
throw PurgeQueueNotSupportedException::providerDoestNotSupportIt();
72+
}
73+
}

‎PhpFpmDestination.php‎

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
namespace Makasim\PhpFpm;
3+
4+
use Interop\Queue\ConnectionFactory;
5+
use Interop\Queue\Context;
6+
use Interop\Queue\Queue;
7+
use Interop\Queue\Topic;
8+
9+
class PhpFpmDestination implements Queue, Topic
10+
{
11+
/**
12+
* @var string
13+
*/
14+
private $name;
15+
16+
public function __construct(string $name)
17+
{
18+
$this->name = $name;
19+
}
20+
21+
public function getName(): string
22+
{
23+
return $this->name;
24+
}
25+
26+
public function getQueueName(): string
27+
{
28+
return $this->getName();
29+
}
30+
31+
public function getTopicName(): string
32+
{
33+
return $this->getName();
34+
}
35+
}

‎PhpFpmMessage.php‎

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
namespace Makasim\PhpFpm;
3+
4+
use Interop\Queue\Message;
5+
use Interop\Queue\MessageTrait;
6+
7+
class PhpFpmMessage implements Message, \JsonSerializable
8+
{
9+
public function __construct(string $body = '', array $properties = [], array $headers = [])
10+
{
11+
$this->body = $body;
12+
$this->properties = $properties;
13+
$this->headers = $headers;
14+
15+
$this->redelivered = false;
16+
}
17+
18+
use MessageTrait;
19+
20+
public function jsonSerialize(): array
21+
{
22+
return [
23+
'body' => $this->getBody(),
24+
'properties' => $this->getProperties(),
25+
'headers' => $this->getHeaders(),
26+
];
27+
}
28+
29+
public static function jsonUnserialize(string $json): self
30+
{
31+
$data = json_decode($json, true);
32+
if (JSON_ERROR_NONE !== json_last_error()) {
33+
throw new \InvalidArgumentException(sprintf(
34+
'The malformed json given. Error %s and message %s',
35+
json_last_error(),
36+
json_last_error_msg()
37+
));
38+
}
39+
40+
return new self($data['body'], $data['properties'], $data['headers']);
41+
}
42+
}

‎PhpFpmProducer.php‎

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?php
2+
namespace Makasim\PhpFpm;
3+
4+
use hollodotme\FastCGI\Client;
5+
use hollodotme\FastCGI\Requests\PostRequest;
6+
use Interop\Queue\Destination;
7+
use Interop\Queue\Exception\DeliveryDelayNotSupportedException;
8+
use Interop\Queue\Exception\Exception;
9+
use Interop\Queue\Exception\PriorityNotSupportedException;
10+
use Interop\Queue\Exception\TimeToLiveNotSupportedException;
11+
use Interop\Queue\Message;
12+
use Interop\Queue\Producer;
13+
14+
class PhpFpmProducer implements Producer
15+
{
16+
/**
17+
* @var Client
18+
*/
19+
private $cgiClient;
20+
21+
public function __construct(Client $cgiClient)
22+
{
23+
$this->cgiClient = $cgiClient;
24+
}
25+
26+
/**
27+
* @param PhpFpmDestination $destination
28+
* @param PhpFpmMessage $message
29+
*/
30+
public function send(Destination $destination, Message $message): void
31+
{
32+
$request = new PostRequest($destination->getName(), http_build_query(['message' => json_encode($message)]));
33+
34+
try {
35+
$this->cgiClient->sendAsyncRequest($request);
36+
} catch (\Exception $e) {
37+
throw new Exception($e->getMessage(), $e->getCode(), $e);
38+
}
39+
}
40+
41+
public function setDeliveryDelay(int $deliveryDelay = null): Producer
42+
{
43+
if (null !== $deliveryDelay) {
44+
throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt();
45+
}
46+
47+
return $this;
48+
}
49+
50+
public function getDeliveryDelay(): ?int
51+
{
52+
return null;
53+
}
54+
55+
public function setPriority(int $priority = null): Producer
56+
{
57+
if (null !== $priority) {
58+
throw PriorityNotSupportedException::providerDoestNotSupportIt();
59+
}
60+
61+
return $this;
62+
}
63+
64+
public function getPriority(): ?int
65+
{
66+
return null;
67+
}
68+
69+
public function setTimeToLive(int $timeToLive = null): Producer
70+
{
71+
if (null !== $timeToLive) {
72+
throw TimeToLiveNotSupportedException::providerDoestNotSupportIt();
73+
}
74+
75+
return $this;
76+
}
77+
78+
public function getTimeToLive(): ?int
79+
{
80+
return null;
81+
}
82+
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /