Labs ICT
โญ Pro Login

Change Streams

Real-time data change notifications.

What are Change Streams?

Change streams let you watch for real-time changes in your database. Instead of polling the database repeatedly, you get notified when something happens. It's like setting up a doorbell instead of constantly checking if someone's at the door.

Change streams use MongoDB's replication mechanism to detect changes. They can track inserts, updates, deletes, and even collection drops. You get a complete event stream of everything happening in your data.

They work on replica sets and sharded clusters. Change streams are persistent, meaning you can resume them from where you left off, even after a disconnection.

Watching for Changes

Opening a change stream is simple. You call watch() on a collection, database, or the entire client. The stream then emits events whenever changes occur.

You can filter changes by operation type, full document content, or specific fields. This lets you focus on exactly the changes you care about.

Change streams are push-based, not pull-based. This means your application doesn't waste resources checking for updates that don't exist.

const changeStream = db.orders.watch()

changeStream.on("change", (change) => {
  print("Operation: " + change.operationType)
  print("Document: " + JSON.stringify(change.fullDocument))
  print("Time: " + change.clusterTime)
})

db.orders.insertOne({
  orderId: "ORD-001",
  total: 59.99,
  status: "new"
})

Resume Tokens

Resume tokens let you pick up where you left off if your application restarts or disconnects. Each change event includes a resume token that you can save and use later.

Think of it like a bookmark in a book. If you stop reading, you don't start from the beginning next time. You open to your bookmark and continue.

Save your resume token regularly, and use it when reopening the stream. MongoDB keeps a limited history of events, so don't wait too long before resuming.

let lastToken = null

const changeStream = db.inventory.watch()

changeStream.on("change", (change) => {
  print("Change detected: " + change.operationType)
  processChange(change)
  lastToken = change._id
})

process.on("SIGTERM", () => {
  if (lastToken) {
    require("fs").writeFileSync("resume-token.json", JSON.stringify(lastToken))
  }
  changeStream.close()
})

Use Cases

Change streams are perfect for real-time applications. Chat applications, live dashboards, notification systems, and data synchronization all benefit from watching changes as they happen.

They're also great for event-driven architectures. When a document changes, you can trigger downstream processes like sending emails, updating caches, or syncing with external systems.

Another common use case is auditing. Change streams capture who changed what and when, giving you a complete audit trail without extra application logic.

const pipeline = [
  { $match: { "operationType": "insert" } },
  { $match: { "fullDocument.status": "urgent" } }
]

const urgentStream = db.supportTickets.watch(pipeline)

urgentStream.on("change", (change) => {
  const ticket = change.fullDocument
  sendSlackNotification(`New urgent ticket: ${ticket.title}`)
  createIncidentTicket(ticket)
})

๐Ÿงช Quick Quiz

What are change streams?