Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 041e735

Browse files
committed
Add change streams
Add pipeline to changestream Add StreamAPI monitoring Add closeChangeStream function Update documentation
1 parent 7f3cead commit 041e735

File tree

2 files changed

+277
-0
lines changed

2 files changed

+277
-0
lines changed

‎changeStreams.js

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
const { MongoClient } = require('mongodb');
2+
const stream = require('stream');
3+
4+
async function main() {
5+
/**
6+
* Connection URI. Update <username>, <password>, and <your-cluster-url> to reflect your cluster.
7+
* See http://bit.ly/NodeDocs_lauren for more details
8+
*/
9+
const uri = "mongodb+srv://<username>:<password>@<your-cluster-url>/test?retryWrites=true&w=majority";
10+
11+
/**
12+
* The Mongo Client you will use to interact with your database
13+
* See bit.ly/Node_MongoClient for more details
14+
*/
15+
const client = new MongoClient(uri);
16+
17+
try {
18+
// Connect to the MongoDB cluster
19+
await client.connect();
20+
21+
// Make the appropriate DB calls
22+
23+
/**
24+
* An aggregation pipeline that matches on new listings in the country of Australia and the Sydney market
25+
*/
26+
const pipeline = [
27+
{
28+
'$match': {
29+
'operationType': 'insert',
30+
'fullDocument.address.country': 'Australia',
31+
'fullDocument.address.market': 'Sydney'
32+
}
33+
}
34+
];
35+
36+
// This script contains three ways to monitor new listings in the listingsAndReviews collection.
37+
// Comment in the monitoring function you'd like to use.
38+
39+
// OPTION ONE: Monitor new listings using EventEmitter's on() function.
40+
// await monitorListingsUsingEventEmitter(client, 30000, pipeline);
41+
42+
// OPTION TWO: Monitor new listings using ChangeStream's hasNext() function
43+
// await monitorListingsUsingHasNext(client, 30000, pipeline);
44+
45+
// OPTION THREE: Monitor new listings using the Stream API
46+
// await monitorListingsUsingStreamAPI(client, 30000, pipeline);
47+
48+
} finally {
49+
// Close the connection to the MongoDB cluster
50+
await client.close();
51+
}
52+
}
53+
54+
main().catch(console.error);
55+
56+
/**
57+
* Close the given change stream after the given amount of time
58+
* @param {*} timeInMs The amount of time in ms to monitor listings
59+
* @param {*} changeStream The open change stream that should be closed
60+
*/
61+
function closeChangeStream(timeInMs = 60000, changeStream) {
62+
return new Promise((resolve) => {
63+
setTimeout(() => {
64+
console.log("Closing the change stream");
65+
changeStream.close();
66+
resolve();
67+
}, timeInMs)
68+
})
69+
};
70+
71+
/**
72+
* Monitor listings in the listingsAndReviews collections for changes
73+
* This function uses the on() function from the EventEmitter class to monitor changes
74+
* @param {MongoClient} client A MongoClient that is connected to a cluster with the sample_airbnb database
75+
* @param {Number} timeInMs The amount of time in ms to monitor listings
76+
* @param {Object} pipeline An aggregation pipeline that determines which change events should be output to the console
77+
*/
78+
async function monitorListingsUsingEventEmitter(client, timeInMs = 60000, pipeline = []) {
79+
const collection = client.db("sample_airbnb").collection("listingsAndReviews");
80+
81+
// See http://bit.ly/Node_watch for the watch() docs
82+
const changeStream = collection.watch(pipeline);
83+
84+
// ChangeStream inherits from the Node Built-in Class EventEmitter (https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_class_eventemitter).
85+
// We can use EventEmitter's on() to add a listener function that will be called whenever a change occurs in the change stream.
86+
// See https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_emitter_on_eventname_listener for the on() docs.
87+
changeStream.on('change', (next) => {
88+
console.log(next);
89+
});
90+
91+
// Wait the given amount of time and then close the change stream
92+
await closeChangeStream(timeInMs, changeStream);
93+
}
94+
95+
/**
96+
* Monitor listings in the listingsAndReviews collections for changes
97+
* This function uses the hasNext() function from the MongoDB Node.js Driver's ChangeStream class to monitor changes
98+
* @param {MongoClient} client A MongoClient that is connected to a cluster with the sample_airbnb database
99+
* @param {Number} timeInMs The amount of time in ms to monitor listings
100+
* @param {Object} pipeline An aggregation pipeline that determines which change events should be output to the console
101+
*/
102+
async function monitorListingsUsingHasNext(client, timeInMs = 60000, pipeline = []) {
103+
const collection = client.db("sample_airbnb").collection("listingsAndReviews");
104+
105+
// See http://bit.ly/Node_watch for the watch() docs
106+
const changeStream = collection.watch(pipeline);
107+
108+
// Set a timer that will close the change stream after the given amount of time
109+
// Function execution will continue because we are not using "await" here
110+
closeChangeStream(timeInMs, changeStream);
111+
112+
// We can use ChangeStream's hasNext() function to wait for a new change in the change stream.
113+
// If the change stream is closed, hasNext() will return false so the while loop will exit.
114+
// See http://bit.ly/Node_ChangeStream for the ChangeStream docs.
115+
while (await changeStream.hasNext()) {
116+
console.log(await changeStream.next());
117+
}
118+
}
119+
120+
/**
121+
* Monitor listings in the listingsAndReviews collection for changes
122+
* This function uses the Stream API (https://nodejs.org/api/stream.html) to monitor changes
123+
* @param {MongoClient} client A MongoClient that is connected to a cluster with the sample_airbnb database
124+
* @param {Number} timeInMs The amount of time in ms to monitor listings
125+
* @param {Object} pipeline An aggregation pipeline that determines which change events should be output to the console
126+
*/
127+
async function monitorListingsUsingStreamAPI(client, timeInMs = 60000, pipeline = []) {
128+
const collection = client.db('sample_airbnb').collection('listingsAndReviews');
129+
130+
// See http://bit.ly/Node_watch for the watch() docs
131+
const changeStream = collection.watch(pipeline);
132+
133+
// See http://bit.ly/Node_pipe for the pipe() docs
134+
changeStream.pipe(
135+
new stream.Writable({
136+
objectMode: true,
137+
write: function (doc, _, cb) {
138+
console.log(doc);
139+
cb();
140+
}
141+
})
142+
);
143+
144+
// Wait the given amount of time and then close the change stream
145+
await closeChangeStream(timeInMs, changeStream);
146+
}

‎changeStreamsTestData.js

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/**
2+
* This script can be used to create, update, and delete sample data.
3+
* This script is especially helpful when testing change streams.
4+
*/
5+
const { MongoClient } = require('mongodb');
6+
7+
async function main() {
8+
/**
9+
* Connection URI. Update <username>, <password>, and <your-cluster-url> to reflect your cluster.
10+
* See http://bit.ly/NodeDocs_lauren for more details
11+
*/
12+
const uri = "mongodb+srv://<username>:<password>@<your-cluster-url>/test?retryWrites=true&w=majority";
13+
14+
/**
15+
* The Mongo Client you will use to interact with your database
16+
* See bit.ly/Node_MongoClient for more details
17+
*/
18+
const client = new MongoClient(uri);
19+
20+
try {
21+
// Connect to the MongoDB cluster
22+
await client.connect();
23+
24+
// Make the appropriate DB calls
25+
const operaHouseViews = await createListing(client, {
26+
name: "Opera House Views",
27+
summary: "Beautiful apartment with views of the iconic Sydney Opera House",
28+
property_type: "Apartment",
29+
bedrooms: 1,
30+
bathrooms: 1,
31+
beds: 1,
32+
address: {
33+
market: "Sydney",
34+
country: "Australia"
35+
}
36+
});
37+
38+
const privateRoomInLondon = await createListing(client, {
39+
name: "Private room in London",
40+
property_type: "Apartment",
41+
bedrooms: 1,
42+
bathroom: 1
43+
});
44+
45+
const beautifulBeachHouse = await createListing(client, {
46+
name: "Beautiful Beach House",
47+
summary: "Enjoy relaxed beach living in this house with a private beach",
48+
bedrooms: 4,
49+
bathrooms: 2.5,
50+
beds: 7,
51+
last_review: new Date()
52+
});
53+
54+
await updateListing(client, operaHouseViews, { beds: 2 });
55+
56+
await updateListing(client, beautifulBeachHouse, {
57+
address: {
58+
market: "Sydney",
59+
country: "Australia"
60+
}
61+
});
62+
63+
const italianVilla = await createListing(client, {
64+
name: "Italian Villa",
65+
property_type: "Entire home/apt",
66+
bedrooms: 6,
67+
bathrooms: 4,
68+
address: {
69+
market: "Cinque Terre",
70+
country: "Italy"
71+
}
72+
});
73+
74+
const sydneyHarbourHome = await createListing(client, {
75+
name: "Sydney Harbour Home",
76+
bedrooms: 4,
77+
bathrooms: 2.5,
78+
address: {
79+
market: "Sydney",
80+
country: "Australia"
81+
}
82+
});
83+
84+
await deleteListing(client, sydneyHarbourHome);
85+
86+
} finally {
87+
// Close the connection to the MongoDB cluster
88+
await client.close();
89+
}
90+
}
91+
92+
main().catch(console.error);
93+
94+
/**
95+
* Create a new Airbnb listing
96+
* @param {MongoClient} client A MongoClient that is connected to a cluster with the sample_airbnb database
97+
* @param {Object} newListing The new listing to be added
98+
* @returns {String} The id of the new listing
99+
*/
100+
async function createListing(client, newListing) {
101+
// See http://bit.ly/Node_InsertOne for the insertOne() docs
102+
const result = await client.db("sample_airbnb").collection("listingsAndReviews").insertOne(newListing);
103+
console.log(`New listing created with the following id: ${result.insertedId}`);
104+
return result.insertedId;
105+
}
106+
107+
/**
108+
* Update an Airbnb listing
109+
* @param {MongoClient} client A MongoClient that is connected to a cluster with the sample_airbnb database
110+
* @param {String} listingId The id of the listing you want to update
111+
* @param {object} updatedListing An object containing all of the properties to be updated for the given listing
112+
*/
113+
async function updateListing(client, listingId, updatedListing) {
114+
// See http://bit.ly/Node_updateOne for the updateOne() docs
115+
const result = await client.db("sample_airbnb").collection("listingsAndReviews").updateOne({ _id: listingId }, { $set: updatedListing });
116+
117+
console.log(`${result.matchedCount} document(s) matched the query criteria.`);
118+
console.log(`${result.modifiedCount} document(s) was/were updated.`);
119+
}
120+
121+
/**
122+
* Delete an Airbnb listing
123+
* @param {MongoClient} client A MongoClient that is connected to a cluster with the sample_airbnb database
124+
* @param {String} listingId The id of the listing you want to delete
125+
*/
126+
async function deleteListing(client, listingId) {
127+
// See http://bit.ly/Node_deleteOne for the deleteOne() docs
128+
const result = await client.db("sample_airbnb").collection("listingsAndReviews").deleteOne({ _id: listingId });
129+
130+
console.log(`${result.deletedCount} document(s) was/were deleted.`);
131+
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /