Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Opening Change Stream with 30 specified collection or watch on database with 100 collection #3179

Closed Answered by alcaeus
masterbater asked this question in General
Discussion options

Sorry to open discussion here, mongodb github driver doesnt have discussion. Been scavenging the internet and I dont see any helpful guides.

What I just know is when watching 30 collection it mean there is 30 connnections while watching an entire database means 1 connection.

Hopefully the mongodb php team could help me enlighten whats the better approach thanks.

You must be logged in to vote

There isn't any clear guidance, except for "it depends". If you find yourself constrained by lots of connections, it might be more beneficial to open a single change stream, filter for the events and collections you're looking for. On the other hand, if you find you're getting lots of events for all collections combined to the point where it's starting to bottleneck, you'll want to create multiple change streams to handle them in parallel, using multiple connections to the server.

Replies: 2 comments 2 replies

Comment options

Sorry to open discussion here, mongodb github driver doesnt have discussion. Been scavenging the internet and I dont see any helpful guides.

These questions are usually handled in the Developer Community Forums, especially as the question may also apply to other drivers.

What I just know is when watching 30 collection it mean there is 30 connnections while watching an entire database means 1 connection.

Hopefully the mongodb php team could help me enlighten whats the better approach thanks.

Before going into details, it's important to know a few details about the PHP driver:

  • It only implements single-threaded SDAM without a connection pool. Usually in your PHP script, you'd have a single client that maintains a connection to the MongoDB deployment (or multiple, depending on the topology)
  • When creating multiple clients, the underlying connection may be reused (i.e. if the connection string, URI options, and driver options are the same, and if connection reuse is not explicitly disabled). This works because we expect a single-threaded application to only use one client at a time, i.e. one operation concludes before a separate one starts. In that case, a single connection is sufficient
  • A single client may open multiple connections, depending on the topology being used. For example, when connecting to a replica set with three members, the driver will open a connection to each of the members, creating three outgoing connections. This is also true when not all of the replica set members have been listed in the connection string; the driver "discovers" the replica set topology and connects to each member. However, when connecting to a sharded cluster, the driver will open a connection only to the mongos instances listed in the connection string, i.e. it does not "discover" any other mongos instances that may be part of the deployment.
  • Change streams are implemented through cursors, which means that once a response (either to the original aggregate command or a subsequent getMore command for the resulting cursor) has been received, the connection is available to do other things.

Let's take the following code from the Change Stream example:

$changeStream = $collection->watch();
$documents = [];
for ($i = 0; $i < 10; $i++) {
 $documents[] = ['x' => $i];
}
$collection->insertMany($documents);
$changeStream->rewind();
$startTime = time();
while (true) {
 if ($changeStream->valid()) {
 $event = $changeStream->current();
 assert(is_object($event));
 printf("%s\n", toJSON($event));
 }
 $changeStream->next();
 if (time() - $startTime > 3) {
 echo "Aborting after 3 seconds...\n";
 break;
 }
}

The return value of watch is a ChangeStream instance, a special iterator that wraps the cursor we receive from the server. This is necessary to track resume tokens, which are used to resume the change stream at the position where we stopped processing it in case the connection drops. The underlying cursor works the same as any other cursor: it receives a batch of documents to iterate, and when this batch is exhausted it will request another batch through the getMore command. The only slight difference is that a regular cursor will report an empty cursorId when returning an empty batch, indicating that the cursor is exhausted and closed, whereas a change stream will keep reporting a cursorId, thus indicating to the driver that additional getMore commands may produce more documents (even when none have been received in this batch).

While the example above immediately calls next even when valid returns false, thus triggering another getMore, an application may choose to pause or otherwise wait a certain amount of time before attempting to fetch the next document. During this time, the connection is available to be used for other purposes. Also, a call to next may not necessarily use the connection, for example if not all documents received in the last getMore have been iterated.

This brings us to connection counts: yes, if you want to have change streams open for 30 collections and iterate them in a truly parallel fashion, due to the single-threaded nature of PHP you'd have to have 30 PHP processes running, which in turn means that there are at least 30 connections open to your deployment (note that when connecting to a replica set, the driver opens a connection to each replica set member it has discovered). This isn't entirely necessary though - your script could also open the 30 change streams in the same process. Here's an example with two change streams, just quickly hacked together without testing:

$changeStreams = [
 $client->db->coll1->watch(),
 $client->db->coll2->watch(),
];
while (true) {
 if ($changeStreams[0]->valid()) {
 // Handle change stream 1
 }
 if ($changeStreams[1]->valid()) {
 // Handle change stream 2
 }
 // Advance both change streams
 $changeStreams[0]->next();
 $changeStreams[1]->next();
 // TODO: may want to stop at some point
}

The downside here is that if one of the change streams is very busy (i.e. generates lots of documents) and the other isn't, the less busy change stream will invoke a getMore on every iteration, potentially slowing down iteration of the busier change stream. However, as this is a theoretical example to show that it is indeed possible to handle multiple change streams on a single collection, I'll leave fixing that issue as an exercise to the reader.

You must be logged in to vote
0 replies
Comment options

Thanks for explaining to me @alcaeus

To get more idea, I have a laravel command being run indefinitely by supervisor

Each entry in collection there is 4 different events, my plan is to add more collection that monitors and convert those events into laravel events. Each collection will have its own pcntl_fork, has monitoring to restart if there is problem and resume token to resume in case of downtime.

Now if I add more entries in collection, base on my code it will open new Mongodb Client, I was thinking of instead of watching collection, it might be better to watch the whole database in performance perspective?

 protected $collections = [
 'users' => [
 'insert' =>\App\Events\UserInsertedEvent::class,
 // 'update' => \App\Events\UpdateEvent::class,
 // 'delete' => \App\Events\DeleteEvent::class,
 // 'replace' => \App\Events\ReplaceEvent::class,
 ],
 'hobbies' => [
 'insert' => HobbyInsertedEvent::class,
 // 'update' => \App\Events\UpdateEvent::class,
 // 'delete' => \App\Events\DeleteEvent::class,
 // 'replace' => \App\Events\ReplaceEvent::class,
 ],
 // Add more collections and their operation event classes here
 ];
 /**
 * Execute the console command.
 */
 public function handle()
 {
 $client = $this->createMongoClient();
 $databaseName = config('database.connections.mongodb.database');
 foreach ($this->collections as $collectionName => $eventClasses) {
 // Fork process for each collection
 $this->forkProcess($client, $databaseName, $collectionName, $eventClasses);
 }
 // Monitor and restart processes
 $this->monitorProcesses();
 }
 /**
 * Create and configure a MongoDB client instance.
 */
 private function createMongoClient(): Client
 {
 return new Client(config('database.connections.mongodb.dsn'));
 }
 /**
 * Fork a process to handle change streams for a specific collection.
 */
 private function forkProcess(Client $client, string $databaseName, string $collectionName, array $eventClasses)
 {
 $pid = pcntl_fork();
 if ($pid == -1) {
 $this->error('Could not fork process.');
 return;
 } elseif ($pid == 0) {
 // Child process
 try {
 $this->processChangeStream($client, $databaseName, $collectionName, $eventClasses);
 } catch (\Exception $e) {
 $this->error('Error in process: '.$e->getMessage());
 }
 exit;
 } else {
 // Parent process
 $this->processes[$collectionName] = $pid;
 }
 }
You must be logged in to vote
2 replies
Comment options

There isn't any clear guidance, except for "it depends". If you find yourself constrained by lots of connections, it might be more beneficial to open a single change stream, filter for the events and collections you're looking for. On the other hand, if you find you're getting lots of events for all collections combined to the point where it's starting to bottleneck, you'll want to create multiple change streams to handle them in parallel, using multiple connections to the server.

Answer selected by masterbater
Comment options

There isn't any clear guidance, except for "it depends". If you find yourself constrained by lots of connections, it might be more beneficial to open a single change stream, filter for the events and collections you're looking for. On the other hand, if you find you're getting lots of events for all collections combined to the point where it's starting to bottleneck, you'll want to create multiple change streams to handle them in parallel, using multiple connections to the server.

Thanks a lot for this answer @alcaeus and giving time to enlighten.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet

AltStyle によって変換されたページ (->オリジナル) /