-
Notifications
You must be signed in to change notification settings - Fork 266
PHPLIB-322: Add resume logic to ChangeStream::rewind() #479
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,9 +73,6 @@ public function testNextResumesAfterCursorNotFound() | |
| $this->assertSameDocument($expectedResult, $changeStream->current()); | ||
| } | ||
|
|
||
| /** | ||
| * @todo test that rewind() also resumes once PHPLIB-322 is implemented | ||
| */ | ||
| public function testNextResumesAfterConnectionException() | ||
| { | ||
| /* In order to trigger a dropped connection, we'll use a new client with | ||
|
|
@@ -129,6 +126,56 @@ function(stdClass $command) use (&$commands) { | |
| $this->assertSame($expectedCommands, $commands); | ||
| } | ||
|
|
||
| public function testRewindResumesAfterConnectionException() | ||
| { | ||
| /* In order to trigger a dropped connection, we'll use a new client with | ||
| * a socket timeout that is less than the change stream's maxAwaitTimeMS | ||
| * option. */ | ||
| $manager = new Manager($this->getUri(), ['socketTimeoutMS' => 50]); | ||
| $primaryServer = $manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY)); | ||
|
|
||
| $operation = new Watch($manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]); | ||
| $changeStream = $operation->execute($primaryServer); | ||
|
|
||
| $commands = []; | ||
|
|
||
| try { | ||
| (new CommandObserver)->observe( | ||
| function() use ($changeStream) { | ||
| $changeStream->rewind(); | ||
| }, | ||
| function(stdClass $command) use (&$commands) { | ||
| $commands[] = key((array) $command); | ||
| } | ||
| ); | ||
| $this->fail('ConnectionTimeoutException was not thrown'); | ||
| } catch (ConnectionTimeoutException $e) {} | ||
|
|
||
| $expectedCommands = [ | ||
| /* The initial aggregate command for change streams returns a cursor | ||
| * envelope with an empty initial batch, since there are no changes | ||
| * to report at the moment the change stream is created. Therefore, | ||
| * we expect a getMore to be issued when we first advance the change | ||
| * stream (with either rewind() or next()). */ | ||
| 'getMore', | ||
| /* Since socketTimeoutMS is less than maxAwaitTimeMS, the previous | ||
| * getMore command encounters a client socket timeout and leaves the | ||
| * cursor open on the server. ChangeStream should catch this error | ||
| * and resume by issuing a new aggregate command. */ | ||
| 'aggregate', | ||
| /* When ChangeStream resumes, it overwrites its original cursor with | ||
| * the new cursor resulting from the last aggregate command. This | ||
| * removes the last reference to the old cursor, which causes the | ||
| * driver to kill it (via mongoc_cursor_destroy()). */ | ||
| 'killCursors', | ||
| /* Finally, ChangeStream will rewind the new cursor as the last step | ||
| * of the resume process. This results in one last getMore. */ | ||
| 'getMore', | ||
| ]; | ||
|
|
||
| $this->assertSame($expectedCommands, $commands); | ||
| } | ||
|
|
||
| public function testNoChangeAfterResumeBeforeInsert() | ||
| { | ||
| $this->insertDocument(['_id' => 1, 'x' => 'foo']); | ||
|
|
@@ -260,7 +307,6 @@ public function testInitialCursorIsNotClosed() | |
|
|
||
| /** | ||
| * @expectedException MongoDB\Exception\ResumeTokenException | ||
| * @todo test that rewind() also attempts to extract the resume token once PHPLIB-322 is implemented | ||
| */ | ||
| public function testNextCannotExtractResumeToken() | ||
| { | ||
|
|
@@ -269,13 +315,28 @@ public function testNextCannotExtractResumeToken() | |
| $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]); | ||
| $changeStream = $operation->execute($this->getPrimaryServer()); | ||
|
|
||
| $changeStream->rewind(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume this was removed because I'd suggest replacing with: Borrowed from the socket exception test. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Side thought: does this actually throw or does it only introduce a delay with its own There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, the rewind() doesn't throw because it doesn't attempt extraction. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This now makes for very amusing |
||
|
|
||
| /* Note: we intentionally do not start iteration with rewind() to ensure | ||
| * that we test extraction functionality within next(). */ | ||
| $this->insertDocument(['x' => 1]); | ||
|
|
||
| $changeStream->next(); | ||
| } | ||
|
|
||
| /** | ||
| * @expectedException MongoDB\Exception\ResumeTokenException | ||
| */ | ||
| public function testRewindCannotExtractResumeToken() | ||
| { | ||
| $pipeline = [['$project' => ['_id' => 0 ]]]; | ||
|
|
||
| $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]); | ||
| $changeStream = $operation->execute($this->getPrimaryServer()); | ||
|
|
||
| $this->insertDocument(['x' => 1]); | ||
|
|
||
| $changeStream->rewind(); | ||
| } | ||
|
|
||
| public function testMaxAwaitTimeMS() | ||
| { | ||
| /* On average, an acknowledged write takes about 20 ms to appear in a | ||
|
|
@@ -320,6 +381,52 @@ public function testMaxAwaitTimeMS() | |
| $this->assertTrue($changeStream->valid()); | ||
| } | ||
|
|
||
| public function testRewindResumesAfterCursorNotFound() | ||
| { | ||
| $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]); | ||
| $changeStream = $operation->execute($this->getPrimaryServer()); | ||
|
|
||
| $this->killChangeStreamCursor($changeStream); | ||
|
|
||
| $changeStream->rewind(); | ||
| $this->assertFalse($changeStream->valid()); | ||
| $this->assertNull($changeStream->current()); | ||
| } | ||
|
|
||
| public function testRewindExtractsResumeTokenAndNextResumes() | ||
| { | ||
| $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]); | ||
| $changeStream = $operation->execute($this->getPrimaryServer()); | ||
|
|
||
| $this->insertDocument(['_id' => 1, 'x' => 'foo']); | ||
| $this->insertDocument(['_id' => 2, 'x' => 'bar']); | ||
|
|
||
| $changeStream->rewind(); | ||
| $this->assertTrue($changeStream->valid()); | ||
| $expectedResult = [ | ||
| '_id' => $changeStream->current()->_id, | ||
| 'operationType' => 'insert', | ||
| 'fullDocument' => ['_id' => 1, 'x' => 'foo'], | ||
| 'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], | ||
| 'documentKey' => ['_id' => 1], | ||
| ]; | ||
| $this->assertSameDocument($expectedResult, $changeStream->current()); | ||
|
|
||
| $this->killChangeStreamCursor($changeStream); | ||
|
|
||
| $changeStream->next(); | ||
| $this->assertTrue($changeStream->valid()); | ||
|
|
||
| $expectedResult = [ | ||
| '_id' => $changeStream->current()->_id, | ||
| 'operationType' => 'insert', | ||
| 'fullDocument' => ['_id' => 2, 'x' => 'bar'], | ||
| 'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], | ||
| 'documentKey' => ['_id' => 2], | ||
| ]; | ||
| $this->assertSameDocument($expectedResult, $changeStream->current()); | ||
| } | ||
|
|
||
| private function insertDocument($document) | ||
| { | ||
| $insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document); | ||
|
|
||