@@ -78,7 +78,7 @@ function closeChangeStream(timeInMs = 60000, changeStream) {
78
78
async function monitorListingsUsingEventEmitter ( client , timeInMs = 60000 , pipeline = [ ] ) {
79
79
const collection = client . db ( "sample_airbnb" ) . collection ( "listingsAndReviews" ) ;
80
80
81
- // See http ://bit.ly/Node_watch for the watch() docs
81
+ // See https ://mongodb.github.io/node-mongodb-native/3.6/api/Collection.html#watch for the watch() docs
82
82
const changeStream = collection . watch ( pipeline ) ;
83
83
84
84
// ChangeStream inherits from the Node Built-in Class EventEmitter (https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_class_eventemitter).
@@ -102,18 +102,25 @@ async function monitorListingsUsingEventEmitter(client, timeInMs = 60000, pipeli
102
102
async function monitorListingsUsingHasNext ( client , timeInMs = 60000 , pipeline = [ ] ) {
103
103
const collection = client . db ( "sample_airbnb" ) . collection ( "listingsAndReviews" ) ;
104
104
105
- // See http ://bit.ly/Node_watch for the watch() docs
105
+ // See https ://mongodb.github.io/node-mongodb-native/3.6/api/Collection.html#watch for the watch() docs
106
106
const changeStream = collection . watch ( pipeline ) ;
107
107
108
108
// Set a timer that will close the change stream after the given amount of time
109
109
// Function execution will continue because we are not using "await" here
110
110
closeChangeStream ( timeInMs , changeStream ) ;
111
111
112
112
// We can use ChangeStream's hasNext() function to wait for a new change in the change stream.
113
- // If the change stream is closed, hasNext() will return false so the while loop will exit.
114
- // See http://bit.ly/Node_ChangeStream for the ChangeStream docs.
115
- while ( await changeStream . hasNext ( ) ) {
116
- console . log ( await changeStream . next ( ) ) ;
113
+ // See https://mongodb.github.io/node-mongodb-native/3.6/api/ChangeStream.html for the ChangeStream docs.
114
+ try {
115
+ while ( await changeStream . hasNext ( ) ) {
116
+ console . log ( await changeStream . next ( ) ) ;
117
+ }
118
+ } catch ( error ) {
119
+ if ( changeStream . isClosed ( ) ) {
120
+ console . log ( "The change stream is closed. Will not wait on any more changes." )
121
+ } else {
122
+ throw error ;
123
+ }
117
124
}
118
125
}
119
126
@@ -127,11 +134,11 @@ async function monitorListingsUsingHasNext(client, timeInMs = 60000, pipeline =
127
134
async function monitorListingsUsingStreamAPI ( client , timeInMs = 60000 , pipeline = [ ] ) {
128
135
const collection = client . db ( 'sample_airbnb' ) . collection ( 'listingsAndReviews' ) ;
129
136
130
- // See http ://bit.ly/Node_watch for the watch() docs
137
+ // See https ://mongodb.github.io/node-mongodb-native/3.6/api/Collection.html#watch for the watch() docs
131
138
const changeStream = collection . watch ( pipeline ) ;
132
139
133
- // See http ://bit.ly/Node_pipe for the pipe() docs
134
- changeStream . pipe (
140
+ // See https ://mongodb.github.io/node-mongodb-native/3.6/api/ChangeStream.html#pipe for the pipe() docs
141
+ changeStream . stream ( ) . pipe (
135
142
new stream . Writable ( {
136
143
objectMode : true ,
137
144
write : function ( doc , _ , cb ) {
0 commit comments