Self-note: Resume / recover a MongoDB change stream
Introduction
- Sometimes replicator needs to be restarted
- We cannot afford to lose one or two entries in time-series since it would throw the statistics off, and in exceptional cases, lost the max and min value
Resume & recover
- Check for things collection, compare it with things table in RDB
- Flatten things collection
- Delete exsiting collection in RDB
- Write flattened things list to RDB
- Do the same thing with labels
- Resume MongoDB operations from the last recorded entry using change stream’s resume mechanism
https://docs.mongodb.com/manual/changeStreams/#watch-collection-database-deployment
Change streams are resumable by specifying a
resumeAfter
token when opening the cursor. For theresumeAfter
token, use the_id
value of the change stream event document. Passing the_id
value to the change stream attempts to resume notifications starting after the specified operation.IMPORTANT
In the example below,
resumeToken
contains the change stream notification id. TheresumeAfter
takes a parameter that must resolve to a resume token. Passing theresumeToken
to theresumeAfter
modifier directs the change stream to attempt to resume notifications starting after the operation specified.If the
featureCompatibilityVersion
(fcv) is set to"4.0"
or greater, newly opened change streams return a hex-encoded string for the resume token data, i.e. the_id._data
value. This change allows for the ability to compare and sort the resume tokens. If the fcv is3.6
, newly opened change streams return a BinData for the resume token data.IMPORTANT
The fcv value at the time of the cursor’s opening determine the resume token data type. That is, the modification of the fcv does not affect the resume tokens for change streams already opened before the fcv change.
Regardless of the fcv value, a 4.0 replica set or a sharded cluster can resume a change stream using either the BinData or string resume token.
As such, a 4.0 deployment can use a resume token from a change stream opened on a collection from a 3.6 deployment.
Implementation
Structure of a Mongo change event
{ _id : { <BSON Object> }, "operationType" : "<operation>", "fullDocument" : { <document> }, "ns" : { "db" : "<database>", "coll" : "<collection" }, "documentKey" : { "_id" : <ObjectId> }, "updateDescription" : { "updatedFields" : { <document> }, "removedFields" : [ "<field>", ... ] } "clusterTime" : <Timestamp>, "txnNumber" : <NumberLong>, "lsid" : { "id" : <UUID>, "uid" : <BinData> } }
Pay attention to _id
Metadata related to the operation.
Use this document as a
resumeToken
for theresumeAfter
parameter when resuming a change stream.If the
featureCompatibilityVersion
(fcv) is set to"4.0"
or greater, newly opened change streams return a hex-encoded string for the resume token data, i.e. the_id._data
value. This change allows for the ability to compare and sort the resume tokens. If the fcv is3.6
, newly opened change streams return a BinData for the resume token data.IMPORTANT
The fcv value at the time of the cursor’s opening determine the resume token data type. That is, the modification of the fcv does not affect the resume tokens for change streams already opened before the fcv change.
Regardless of the fcv value, a 4.0 replica set or a sharded cluster can resume a change stream using either the BinData or string resume token.
This field is BSON so it
- Can’t be saved with JSON.stringify
- Can’t be cast to ObjectID (wrong format, different than MongoDB documentation )
Solution: Include bson
const BSON = require(‘bson’);
function saveId(cb) {
if (lastId) {
let lastIdBuffer = bson.serialize(lastId);
fs.writeFile(ID_FILE, lastIdBuffer, (err) => {
if (err) {
logger.error(‘[saveId]’, err);
}
return cb && cb(err);
});
}
}
function loadId(cb) {
fs.readFile(ID_FILE, (err, data) => {
let buffer;
if (!err && data) {
buffer = bson.deserialize(data);
}
return cb && cb(err, buffer);
});
}
- Every one second, write the latest _id to disk
- Reload the _id object from disk on startup
- Use it to resume the change stream with
const pipeline = [
{
$match: { ‘ns.db’: config.MONGO_DB_NAME },
}
];
let changeStreamOptions = {};
changeStreamOptions[‘resumeAfter’] = resumeToken;
const changeStream = db.watch(pipeline, changeStreamOptions);
changeStream.on(‘change’, (change) => {
…
}
Reference
- db.watch documentation https://docs.mongodb.com/manual/reference/method/db.watch/#db.watch
db.
watch
(pipeline, options)New in version 4.0: Requires
featureCompatibilityVersion
(fCV) set to"4.0"
or greater. For more information on fCV, seesetFeatureCompatibilityVersion
.Opens a change stream cursor for a database to report on all its non-
system
collections.A sequence of one or more of the following aggregation stages:
See Aggregation for complete documentation on the aggregation framework.
Optional. Additional options that modify the behavior of
db.watch()
.You must pass an empty array
[]
to thepipeline
parameter if you are not specifying a pipeline but are passing theoptions
document.The
options
document can contain the following fields and values:Optional. Directs
db.watch()
to attempt resuming notifications starting after the operation specified in the resume token.Each change stream event document includes a resume token as the
_id
field. Pass the entire_id
field of the change event document that represents the operation you want to resume after.
resumeAfter
is mutually exclusive withstartAtOperationTime
.Optional. By default,
db.watch()
returns the delta of those fields modified by an update operation, instead of the entire updated document.Set
fullDocument
to"updateLookup"
to directdb.watch()
to look up the most current majority-committed version of the updated document.db.watch()
returns afullDocument
field with the document lookup in addition to theupdateDescription
delta.Optional. Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.
Has the same functionality as
cursor.batchSize()
.Optional. The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch.
Defaults to
1000
milliseconds.Optional. The starting point for the change stream. If the specified starting point is in the past, it must be in the time range of the oplog. To check the time range of the oplog, see
rs.printReplicationInfo()
.
startAtOperationTime
is mutually exclusive withresumeAfter
.SEE ALSO
- ObjectID documentation, how to serialize/deserialize it in Node code
https://docs.mongodb.com/manual/reference/method/ObjectId/
To generate a new ObjectId, use
ObjectId()
with no argument:In this example, the value of
x
would be:To generate a new ObjectId using
ObjectId()
with a unique hexadecimal string:In this example, the value of
y
would be:Access the
str
attribute of anObjectId()
object, as follows:This operation will return the following hexadecimal string:
- Object ID: How to include
https://mongodb.github.io/node-mongodb-native/api-bson-generated/objectid.html
Create a new ObjectID instance
class ObjectID()Arguments:id (string) – Can be a 24 byte hex string, 12 byte binary string or a Number.Returns:object instance of ObjectID.
Return the ObjectID id as a 24 byte hex string representation
toHexString()Returns:string return the 24 byte hex string representation.
Examples
Generate a 24 character hex string representation of the ObjectID