MongoDB Change Streams and Go


Change streams allow you to subscribe to real-time updates in your MongoDB collections and databases. With the MongoDB Go Driver, you can tap into these streams and build reactive applications that respond to data changes in MongoDB instantly. You can build features like real-time notifications and collaborative apps or kick off different workflows based on changes to your data.

In this tutorial, we’ll take a look at how you can work with MongoDB change streams when building Go applications. We’ll use the native MongoDB Go Driver and MongoDB Atlas to showcase various use cases that rely on change streams. 

Table of Contents

Prerequisites

For this application, I’ll be using:

Existing knowledge of MongoDB and the Go programming language is required.

Setting Up MongoDB Change Streams in Go

The first thing we’ll do is set up a basic Go application that connects to our MongoDB Atlas cluster and sets up a global database change stream that will fire off an event any time a change is made in our database. The code will look like the following: 

package main

import (
    "context"
    "fmt"
    "log"

    "go.mongodb.org/mongo-driver/v2/bson"
    "go.mongodb.org/mongo-driver/v2/mongo"
    "go.mongodb.org/mongo-driver/v2/mongo/options"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
  
    // Connect to MongoDB
    client, err := mongo.Connect(options.Client().ApplyURI("YOUR-MONGODB-URI"))
    if err != nil {
        log.Fatal(err)
    }
    defer client.Disconnect(ctx)

    // Select the database
    db := client.Database("stockTrader")

    // Start a basic change stream on our stockTrader database
    cs, err := db.Watch(ctx, mongo.Pipeline{})
    if err != nil {
        log.Fatal(err)
    }

    defer cs.Close(ctx)

    log.Println("Started watching changes...")

    for cs.Next(ctx) {
        var changeDoc bson.D
        if err := cs.Decode(&changeDoc); err != nil {
            log.Printf("Error decoding change stream document: %v", err)
            continue
        }

        // Find the fullDocument element in the bson.D slice
        for _, elem := range changeDoc {
            if elem.Key == "fullDocument" {
                fmt.Println(elem.Value)
                break
            }
        }
    }
}

This is our basic change stream implementation that is going to update us whenever any data is changed in the stockTrader database. For now, it will just print the full document to the terminal. After you start up the application, simply go into your MongoDB Atlas Data Explorer or MongoDB Compass and add some data. In the console, you’ll see an output like this whenever you add or update data:

Output

Alternatively, I could scope down the change stream to only a particular collection with:

// Select the database and collection
collection := client.Database("stockTrader").Collection("trades")

// Start a basic change stream (no filtering)
cs, err := collection.Watch(ctx, mongo.Pipeline{})

In this case, I would only be watching for changes on the trades collection, not the entire stockTrader database. I could take it a step further and watch my entire cluster and all databases with:

// Watch the entire MongoDB cluster
cs, err := client.Watch(ctx, pipeline, csOptions)

I would advise against watching an entire cluster, though. There are use cases where it may be done, but typically, you’d want your change streams to be focused and optimized for specific use cases.

Filtering Change Streams

Your database and even an individual collection are going to be generating a lot of data. Especially a stock trading application, like the hypothetical one we’re building in this tutorial, could generate thousands of records every second. Subscribing to all of these events would generate too much noise to be of any practical use. This is where filtering can greatly help. We can set up filtering on our change streams to only alert us when certain criteria are met.

We can use MongoDB’s powerful aggregation pipelines to filter out irrelevant data and only surface data we really care about. To see an example of that, let’s update our existing code so that the change stream only fires off when a trade is conducted on a particular stock, the trade is a sell, and the quantity of stock in that trade is greater than 50,000.

Let’s update our code to the following:

package main

import (
    "context"
    "fmt"
    "log"

    "go.mongodb.org/mongo-driver/v2/bson"
    "go.mongodb.org/mongo-driver/v2/mongo"
    "go.mongodb.org/mongo-driver/v2/mongo/options"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    client, err := mongo.Connect(ctx, options.Client().ApplyURI("YOUR-MONGODB-URI"))
    if err != nil {
        log.Fatal(err)
    }

    defer client.Disconnect(ctx)

    db := client.Database("stockTrader")
    collection := db.Collection("trades")

    // Start a change stream with filtering for specific attributes. Specifically a quantity greater than or equal to 50,000, the stock symbol AAPL, MSFT, or GOOG, and the action of sell
    cs, err := collection.Watch(ctx, mongo.Pipeline{
        bson.D{
            {"$match", bson.D{
                {"fullDocument.quantity", bson.D{{"$gte", 50000}}},
                {"fullDocument.stock", bson.D{{"$in", bson.A{"AAPL", "MSFT", "GOOG"}}}},
                {"fullDocument.action", "sell"},
            }},
        },
    }, options.ChangeStream().SetFullDocument(options.UpdateLookup))

    if err != nil {
        log.Fatal(err)
    } 
    defer cs.Close(ctx)

    log.Println("Started watching changes...")

    for cs.Next(ctx) {
        var changeDoc bson.D
        if err := cs.Decode(&changeDoc); err != nil {
            log.Printf("Error decoding change stream document: %v", err)
            continue
        }

        // Find the fullDocument element in the bson.D slice
        for _, elem := range changeDoc {
            if elem.Key == "fullDocument" {
                fmt.Println(elem.Value)
                break
            }
        }
    }
}

We have now updated our change stream to trigger only on specific conditions. Specifically, this change stream will trigger only if all of the following criteria are met:

  • The stock quantity of the trade is greater than 50,000.
  • The stock is AAPL, MSFT, or GOOG.
  • The action of the trade is a sell.

You may use this as an indicator that the major blue-chip tech stocks are selling off, for example. 

If we were to now add this document to our database:

Inserting the document to the database

If you want to try it yourself, here is the document:

{
    "_id":{"$oid":"68029db0c9034b66740d7406"}, 
    "action": "sell",
    "stock": "MSFT",
    "quantity": 75000
}

We’d see the change stream fire off, and we could act on this data:

Data

We can do all sorts of filtering to really fine-tune when the change stream fires off an alert. Here are a couple of additional examples:

// Using $expr for complex conditions
        bson.D{
            {"$match", bson.D{
                {"$expr", bson.D{
                    {"$gte", bson.A{
                        bson.D{{"$multiply", bson.A{"$fullDocument.price", "$fullDocument.quantity"}}},
                        1000000, // Only show trades worth at least $1M
                    }},
                }},
            }},
        },

// Pattern matching with $regex
        bson.D{
            {"$match", bson.D{
                {"fullDocument.traderName", bson.D{
                    {"$regex", "^Smith"}, // Traders whose names start with Smith
                    {"$options", "i"},    // Case insensitive
                }},
            }},
        },

// Logical OR conditions
        bson.D{
            {"$match", bson.D{
                {"$or", bson.A{
                    bson.D{{"fullDocument.sector", "Technology"}}, // Trade is in the technology sector
                    bson.D{ // or price of the stock is greater than or equal to 500 and quantity greater than or equal to 10000
                        {"fullDocument.price", bson.D{{"$gte", 500}}},
                        {"fullDocument.quantity", bson.D{{"$gte", 10000}}},
                    },
                }},
            }},
        },

Handling Change Stream Interruptions

Change streams work great as long as the application is running, but what happens when the application crashes? On restarting the app, change streams resume listening from the current moment, meaning any changes that happened during the downtime may be lost. 

But if you want to build fault tolerance and resilience and get the change stream notifications for any items that should have been processed via the change stream while your app was not running, MongoDB has a concept of a resume token that keeps track of change streams that have already been processed. 

Let’s implement the resume token functionality into our app. I will also simplify our change stream aggregation to only look at the number of shares in a trade.

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "go.mongodb.org/mongo-driver/v2/bson"
    "go.mongodb.org/mongo-driver/v2/mongo"
    "go.mongodb.org/mongo-driver/v2/mongo/options"
)

// Constants for resume token storage
const (
    metadataCollectionName = "changeStreamMetadata"
    tokenDocumentID        = "tradesChangeStreamToken"
)

// saveResumeToken saves the resume token to the metadata collection
func saveResumeToken(ctx context.Context, db *mongo.Database, token bson.Raw) error {
    metadataColl := db.Collection(metadataCollectionName)

    // Log token for debugging
    log.Printf("Saving resume token: %v", token)

    // Create the document to store
    doc := bson.D{
        {Key: "_id", Value: tokenDocumentID},
        {Key: "tokenBinary", Value: token},
        {Key: "updatedAt", Value: time.Now()},
    }

    // Use upsert to create or update the document
    opts := options.UpdateOne().SetUpsert(true)
    _, err := metadataColl.UpdateOne(
        ctx,
        bson.D{{Key: "_id", Value: tokenDocumentID}},
        bson.D{{Key: "$set", Value: doc}},
        opts,
    )
  
    return err
}

// loadResumeToken loads the resume token from the metadata collection
func loadResumeToken(ctx context.Context, db *mongo.Database) (bson.Raw, error) {
    metadataColl := db.Collection(metadataCollectionName)

    var result struct {
        ID          string    `bson:"_id"`
        TokenBinary bson.Raw  `bson:"tokenBinary"`
        UpdatedAt   time.Time `bson:"updatedAt"`
    }

    err := metadataColl.FindOne(ctx, bson.D{{Key: "_id", Value: tokenDocumentID}}).Decode(&result)
    if err != nil {
        if err == mongo.ErrNoDocuments {
            log.Println("No resume token found in database")
            return nil, nil
        }

        return nil, err
    }

    if len(result.TokenBinary) == 0 {
        log.Println("Resume token is empty")
        return nil, nil
    }

    log.Printf("Loaded resume token: %v, updated at: %v", result.TokenBinary, result.UpdatedAt)
    return result.TokenBinary, nil
}



func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Connect to MongoDB
    client, err := mongo.Connect(options.Client().ApplyURI("YOUR-MONGODB-URI"))
    if err != nil {
        log.Fatal(err)
    }

    defer client.Disconnect(ctx)

    // Select the database and collection
    db := client.Database("stockTrader")
    collection := db.Collection("trades")

    // Load resume token if it exists
    resumeToken, err := loadResumeToken(ctx, db)
    if err != nil {
        log.Printf("Error loading resume token: %v", err)
    }

    // Configure change stream options
    streamOptions := options.ChangeStream().SetFullDocument(options.UpdateLookup)
    if len(resumeToken) > 0 {
        log.Println("Resuming change stream from saved token")
        streamOptions.SetResumeAfter(resumeToken)
    } else {
        log.Println("Starting new change stream")
    }

    // Start a change stream with filtering for specific trades
    cs, err := collection.Watch(ctx, mongo.Pipeline{
        bson.D{
            {Key: "$match", Value: bson.D{
                {Key: "fullDocument.quantity", Value: bson.D{{Key: "$gte", Value: 50000}}},
            }},
        },
    }, streamOptions)

    if err != nil {
        log.Fatal(err)
    }

    defer cs.Close(ctx)

    log.Println("Started watching changes...")

    // Create a function to check and print if we have a valid resume token
    checkResumeToken := func() {
        rt := cs.ResumeToken()
        if len(rt) > 0 {
            log.Printf("Current resume token: %v", rt)
        } else {
            log.Println("No resume token available yet")
        }
    }

    // Check token before we start processing events
    checkResumeToken()
    for cs.Next(ctx) {
        var changeDoc bson.D
        if err := cs.Decode(&changeDoc); err != nil {
            log.Printf("Error decoding change stream document: %v", err)
            continue
        }

        // Find the fullDocument element in the bson.D slice
        for _, elem := range changeDoc {
            if elem.Key == "fullDocument" {
                fmt.Println(elem.Value)
                break
            }
        }

        // Save the resume token after processing each event
        rt := cs.ResumeToken()
        if len(rt) > 0 {
            if err := saveResumeToken(ctx, db, rt); err != nil {
                log.Printf("Error saving resume token: %v", err)
            }
        } else {
            log.Println("Warning: No resume token available to save")
        }
    }

    // Check if the stream was closed due to an error
    if err := cs.Err(); err != nil {
        log.Printf("Change stream error: %v", err)
        log.Println("Application will exit. On next start, it will resume from the last saved token.")
    }
}

With resume token functionality implemented, now we don’t have to worry about missing change stream events if our application crashes. A good way to test this is to start the application to make sure it’s working. Insert a document into your trades collection:

```

{

  "_id":{"$oid":"6814eb8c3f593c417814d5ce"},

  "quantity": 100000

}

```

You should see that the event is captured by the change stream and displayed in the terminal.

Event is captured by the change stream

Then, close your application, and while the application is not running, add a few more documents to your trades collection that should trigger the change stream. Start the application up again, and you’ll see all of the items you added once the app is running again.

All of the items added

The way this works is every time a change stream operation executes, we’ll update the changeStreamMetadata collection with our changeStreamToken. When the app is restarted, we’ll start our change stream from that token and process any events that we may have missed while the application was not running, continuously updating the token and tokenBinary data with new events.

I chose to store the resume token information in my MongoDB database, but you can also store this token as a file in your application, a different database, or wherever you see fit.

MongoDB database

Additionally, if we look at our console now and insert a document that triggers a change stream, we’ll see our resume token loaded as well as updated after an event:

Resume token is loaded

Working With Large Documents and Change Streams

When working with large datasets, you may run into an issue where the size of the change stream event exceeds 16 MiB. As of MongoDB 6.0.9, we have a $changeStreamSplitLargeEvent aggregation that can help you split the large events and process events that exceed 16 MiB.

Although you are unlikely to run into this, I want to show you a complete example of how you can handle this with the MongoDB Go Driver. To do this, I will update the existing application to do a few things to create this scenario of working with data that exceeds 16 MiB. 

First, we’ll create a script that continuously adds data to the database. We’ll add one small document, and then one large document that is almost 16 MiB in size. Finally, we’ll perform an update operation on that large file. Next, we’ll update our collection to enable it to retrieve a document both pre- and post-update, so that when we do the update, we’ll get back two documents that together exceed the 16 MiB limit. Finally, we’ll implement our $changeStreamSplitLargeEvent logic to split apart the larger events and still process them. The code looks like this:

package main

import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "strings"
    "time"

    "go.mongodb.org/mongo-driver/v2/bson"
    "go.mongodb.org/mongo-driver/v2/mongo"
    "go.mongodb.org/mongo-driver/v2/mongo/options"
)

const (
    metadataCollectionName = "changeStreamMetadata"
    tokenDocumentID        = "tradesChangeStreamToken"
)

func saveResumeToken(ctx context.Context, db *mongo.Database, token bson.Raw) error {
    metadataColl := db.Collection(metadataCollectionName)
    log.Printf("Saving resume token: %v", token)

    doc := bson.D{
        {Key: "_id", Value: tokenDocumentID},
        {Key: "tokenBinary", Value: token},
        {Key: "updatedAt", Value: time.Now()},
    }

    opts := options.UpdateOne().SetUpsert(true)
    _, err := metadataColl.UpdateOne(
        ctx,
        bson.D{{Key: "_id", Value: tokenDocumentID}},
        bson.D{{Key: "$set", Value: doc}},
        opts,
    )

    return err
}

func loadResumeToken(ctx context.Context, db *mongo.Database) (bson.Raw, error) {
    metadataColl := db.Collection(metadataCollectionName)

    var result struct {
        ID          string    `bson:"_id"`
        TokenBinary bson.Raw  `bson:"tokenBinary"`
        UpdatedAt   time.Time `bson:"updatedAt"`
    }

    err := metadataColl.FindOne(ctx, bson.D{{Key: "_id", Value: tokenDocumentID}}).Decode(&result)
    if err != nil {
        if err == mongo.ErrNoDocuments {
            log.Println("No resume token found in database")
            return nil, nil
        }

        return nil, err
    }

    if len(result.TokenBinary) == 0 {
        log.Println("Resume token is empty")
        return nil, nil
    }

    log.Printf("Loaded resume token: %v, updated at: %v", result.TokenBinary, result.UpdatedAt)

    return result.TokenBinary, nil
}

// generateLargeDocument creates a document that's large enough to trigger splitting
func generateLargeDocument() bson.D {
    return bson.D{
        {Key: "_id", Value: bson.NewObjectID()},
        {Key: "stock", Value: "AAPL"},
        {Key: "quantity", Value: 100000},
        {Key: "price", Value: 175.50},
        {Key: "action", Value: "buy"},
        {Key: "trader", Value: "AutomatedTrader"},
        {Key: "timestamp", Value: time.Now()},
        {Key: "isLargeDocument", Value: true},
        // Using ~8MB for the field, which will become >16MB when both pre and post images are included
        {Key: "largeField", Value: strings.Repeat("a", 8*1024*1024)},
    }
}

// generateSmallDocument creates a document that's small enough to not need splitting
func generateSmallDocument() bson.D {
    return bson.D{
        {Key: "_id", Value: bson.NewObjectID()},
        {Key: "stock", Value: randomStock()},
        {Key: "quantity", Value: 60000},
        {Key: "price", Value: randomPrice(100, 1000)},
        {Key: "action", Value: "sell"},
        {Key: "trader", Value: "SmallTrader"},
        {Key: "timestamp", Value: time.Now()},
        {Key: "isLargeDocument", Value: false},
        {Key: "notes", Value: "This is a small document that doesn't need splitting"},
    }
}

// randomStock returns a random stock ticker
func randomStock() string {
    stocks := []string{"AAPL", "MSFT", "GOOG", "AMZN", "META", "TSLA"}
    return stocks[rand.Intn(len(stocks))]
}

func randomPrice(min, max float64) float64 {
    return min + rand.Float64()*(max-min)
}

// insertTestDocuments inserts a large document and a small document every 5 seconds
// and also updates the large document to trigger change stream events with pre/post images
func insertTestDocuments(ctx context.Context, client *mongo.Client, collection *mongo.Collection) {
    log.Println("Starting document insertion process...")

    ticker := time.NewTicker(5 * time.Second)

    defer ticker.Stop()

    // Track if we should insert a large or small document next
    insertLarge := true
    count := 0

    // Store IDs of large documents for later updates
    largeDocumentIDs := []bson.ObjectID{}

    log.Println("Document insertion service started successfully")

    // Handle this synchronously for debugging
    for {
        select {
        case <-ctx.Done():
            log.Println("Stopping test document insertion due to context cancellation")
            return
        case <-ticker.C:
            count++
            // Every 3rd operation, update a previously inserted large document
            // This will trigger change events with full pre/post images
            if len(largeDocumentIDs) > 0 && count%3 == 0 {
                // Pick a random large document to update
                targetID := largeDocumentIDs[rand.Intn(len(largeDocumentIDs))]

                // Update its large field with a different repeated character
                // This will generate a large update with both pre-image and post-image
                letter := string(rune('b' + (count % 20))) // Cycle through different letters

                update := bson.D{
                    {Key: "$set", Value: bson.D{
                        {Key: "largeField", Value: strings.Repeat(letter, 8*1024*1024)},
                        {Key: "lastUpdated", Value: time.Now()},
                    }},
                }

                log.Printf("Updating large document %s with %s-filled field...", targetID.Hex(), letter)

                updateCtx, updateCancel := context.WithTimeout(ctx, 60*time.Second)
                _, err := collection.UpdateOne(updateCtx, bson.D{{Key: "_id", Value: targetID}}, update)
                updateCancel()

                if err != nil {
                    log.Printf("ERROR: Failed to update large document: %v", err)
                } else {
                    log.Printf("Successfully updated document with large field")
                }
                continue
            }
            log.Printf("Inserting document #%d...", count)

            var doc bson.D
            var docID bson.ObjectID

            if insertLarge {
                doc = generateLargeDocument()
                for _, elem := range doc {
                    if elem.Key == "_id" {
                        docID = elem.Value.(bson.ObjectID)
                        break
                    }
                }
            } else {
                doc = generateSmallDocument()
            }
          
            // Create a timeout context for the insertion
            insertCtx, insertCancel := context.WithTimeout(ctx, 60*time.Second)

            // Insert the document
            result, err := collection.InsertOne(insertCtx, doc)
            insertCancel()

            if err != nil {
                log.Printf("ERROR: Failed to insert document: %v", err)
            } else {
                log.Printf("Inserted document with ID: %v", result.InsertedID)

                // If this was a large document, save its ID for later updates
                if insertLarge {
                    largeDocumentIDs = append(largeDocumentIDs, docID)
                    log.Printf("Added large document ID to update list (total: %d)", len(largeDocumentIDs))

                    // Limit the number of tracked documents to avoid memory growth
                    if len(largeDocumentIDs) > 10 {
                        largeDocumentIDs = largeDocumentIDs[len(largeDocumentIDs)-10:]
                    }
                }
            }

            // Toggle between large and small documents
            insertLarge = !insertLarge
        }
    }
}

// enablePrePostImages enables pre and post images for change streams on the collection
func enablePrePostImages(ctx context.Context, db *mongo.Database, collection *mongo.Collection) {
    // Run the collMod command to enable changeStreamPreAndPostImages
    command := bson.D{
        {Key: "collMod", Value: collection.Name()},
        {Key: "changeStreamPreAndPostImages", Value: bson.D{
            {Key: "enabled", Value: true},
        }},
    }

    var result bson.D
    err := db.RunCommand(ctx, command).Decode(&result)
    if err != nil {
        log.Printf("Warning: Failed to enable changeStreamPreAndPostImages: %v", err)
        log.Println("Large document splitting may not work without pre/post images")
    } else {
        log.Printf("Successfully enabled changeStreamPreAndPostImages for %s", collection.Name())
    }
}

// watchCollectionWithRetry sets up a change stream with retry logic for token errors
func watchCollectionWithRetry(ctx context.Context, db *mongo.Database, collection *mongo.Collection, useResumeToken bool) error {

    // Try first with the resume token, and if that fails, try without
    err := setupAndWatchChangeStream(ctx, db, collection, useResumeToken)

    if err != nil {
        // Check for specific InvalidResumeToken error
        if strings.Contains(err.Error(), "InvalidResumeToken") ||
            strings.Contains(err.Error(), "resume") {
            log.Printf("Resume token error detected: %v", err)
            log.Println("Clearing invalid resume token and starting a new stream...")

            // Try to delete the invalid token
            metadataColl := db.Collection(metadataCollectionName)
            _, deleteErr := metadataColl.DeleteOne(ctx, bson.D{{Key: "_id", Value: tokenDocumentID}})
            if deleteErr != nil {
                log.Printf("Warning: Failed to delete invalid token: %v", deleteErr)
            }

            // Retry without using the resume token
            retryErr := setupAndWatchChangeStream(ctx, db, collection, false)
            if retryErr != nil {
                log.Fatalf("Failed to set up change stream after retry: %v", retryErr)
            }
        } else {
            log.Fatalf("Failed to set up change stream: %v", err)
        }
    }
    return nil
}



// setupAndWatchChangeStream creates and processes a change stream
func setupAndWatchChangeStream(ctx context.Context, db *mongo.Database, collection *mongo.Collection, useResumeToken bool) error {
    var resumeToken bson.Raw
    var err error

    // Configure change stream options to require full documents
    streamOptions := options.ChangeStream().
        SetFullDocument(options.UpdateLookup).
        SetFullDocumentBeforeChange(options.WhenAvailable)

    if useResumeToken {
        // Only try to load and use the resume token if requested
        resumeToken, err = loadResumeToken(ctx, db)
        if err != nil {
            log.Printf("Error loading resume token: %v", err)
        }

        if len(resumeToken) > 0 {
            log.Println("Resuming change stream from saved token")
            streamOptions.SetResumeAfter(resumeToken)
        } else {
            log.Println("No valid resume token found, starting new change stream")
        }
    } else {
        log.Println("Starting new change stream without resume token")
    }

    log.Println("Setting up change stream to watch for trades with quantity > 50000...")
    log.Println("Full document and pre-image are required (helps generate documents > 16MB)")

    // Start a change stream with filtering for specific trades
    // Using $changeStreamSplitLargeEvent to handle large documents automatically
    cs, err := collection.Watch(ctx, mongo.Pipeline{
        bson.D{
            {Key: "$match", Value: bson.D{
                {Key: "fullDocument.quantity", Value: bson.D{{Key: "$gte", Value: 50000}}},
            }},
        },
        bson.D{
            {Key: "$changeStreamSplitLargeEvent", Value: bson.D{}},
        },
    }, streamOptions)

    if err != nil {
        return err
    }

    defer cs.Close(ctx)

    log.Println("Change stream successfully established - watching for events...")

    // Create a function to check and print if we have a valid resume token
    checkResumeToken := func() {
        rt := cs.ResumeToken()
        if len(rt) > 0 {
            log.Printf("Current resume token: %v", rt)
        } else {
            log.Println("No resume token available yet")
        }
    }

    // Check token before we start processing events
    checkResumeToken()

    // Keep track of fragments for large events
    eventFragments := make(map[string][]bson.Raw)

    for cs.Next(ctx) {
        log.Println("=== NEW CHANGE EVENT DETECTED ===")
        checkResumeToken()

        var changeDoc bson.Raw
        if err := cs.Decode(&changeDoc); err != nil {
            log.Printf("Error decoding change stream document: %v", err)
            continue
        }

        // Check for invalidate events
        opTypeVal := changeDoc.Lookup("operationType")
        opType := ""

        if opTypeVal.Type == bson.TypeString {
            opType = opTypeVal.StringValue()
        }

        if opType == "invalidate" {
            log.Println("CHANGE EVENT: Received invalidate event - this stream cannot be resumed")
            // We could clear the resume token here, but we'll handle it during restart
            break
        }

        // Check if this is a split event fragment
        splitEventVal := changeDoc.Lookup("splitEvent")
        isSplitEvent := !splitEventVal.IsZero()

        if isSplitEvent {
            // Extract values from the split event
            var splitEvent bson.Raw
            _ = splitEventVal.Unmarshal(&splitEvent)

            fragmentVal := splitEvent.Lookup("fragment")
            ofVal := splitEvent.Lookup("of")

            var fragment, totalFragments int32

            if fragmentVal.Type == bson.TypeInt32 {
                fragment = fragmentVal.Int32()
            }

            if ofVal.Type == bson.TypeInt32 {
                totalFragments = ofVal.Int32()
            }

            // Get a unique ID for this event (using resume token as ID)
            rt := cs.ResumeToken()
            eventID := fmt.Sprintf("%v", rt) // Use resume token as a base for the ID

            log.Printf("CHANGE EVENT: Received fragment %d of %d", fragment, totalFragments)

            // Store fragment in our map
            eventFragments[eventID] = append(eventFragments[eventID], changeDoc)

            // If this is the last fragment, we can process the complete event
            if fragment == totalFragments {
                log.Printf("CHANGE EVENT: Received all %d fragments for this event", totalFragments)
                // Here you could combine fragments if needed
                // But for now we'll just process each fragment as is
            }

            // Process the fragment (each fragment has some fields of the entire event)
            processChangeFragment(changeDoc)

        } else {
            // This is a normal (non-split) event
            log.Println("CHANGE EVENT: Received non-split event")
            processChangeEvent(changeDoc)
        }

        // Save the resume token after processing each event
        rt := cs.ResumeToken()
        if len(rt) > 0 {
            if err := saveResumeToken(ctx, db, rt); err != nil {
                log.Printf("Error saving resume token: %v", err)
            }
        } else {
            log.Println("Warning: No resume token available to save")
        }

        log.Println("=== CHANGE EVENT PROCESSING COMPLETE ===")
    }

    // Check if the stream was closed due to an error
    if err := cs.Err(); err != nil {
        log.Printf("Change stream error: %v", err)
        return err
    }

    log.Println("Change stream closed normally")
    return nil
}



// processChangeEvent processes a non-split change event
func processChangeEvent(changeDoc bson.Raw) {
    // Get operation type
    operationVal := changeDoc.Lookup("operationType")
    operation := ""
    if operationVal.Type == bson.TypeString {
        operation = operationVal.StringValue()
    }

    // Get full document
    fullDocVal := changeDoc.Lookup("fullDocument")
    if !fullDocVal.IsZero() {
        var fullDoc bson.Raw
        if err := fullDocVal.Unmarshal(&fullDoc); err == nil {
            log.Printf("CHANGE EVENT: %s operation detected", operation)

            // Extract document ID
            idVal := fullDoc.Lookup("_id")
            docID := "unknown"

            if !idVal.IsZero() {
                docID = fmt.Sprintf("%v", idVal)
            }

            log.Printf("CHANGE EVENT: Document ID: %v", docID)

            // Extract key fields if they exist
            stockVal := fullDoc.Lookup("stock")

            if stockVal.Type == bson.TypeString {
                log.Printf("CHANGE EVENT: Stock: %s", stockVal.StringValue())
            }

            quantityVal := fullDoc.Lookup("quantity")

            if quantityVal.Type == bson.TypeInt32 {
                log.Printf("CHANGE EVENT: Quantity: %d", quantityVal.Int32())
            } else if quantityVal.Type == bson.TypeInt64 {
                log.Printf("CHANGE EVENT: Quantity: %d", quantityVal.Int64())
            } else if quantityVal.Type == bson.TypeDouble {
                log.Printf("CHANGE EVENT: Quantity: %.0f", quantityVal.Double())
            }

            actionVal := fullDoc.Lookup("action")

            if actionVal.Type == bson.TypeString {
                log.Printf("CHANGE EVENT: Action: %s", actionVal.StringValue())
            }
        }
    } else {
        log.Printf("CHANGE EVENT: Received but fullDocument not available or not in expected format")
    }
}

// processChangeFragment processes a fragment of a split change event
func processChangeFragment(fragment bson.Raw) {

    // Extract splitEvent information
    splitEventVal := fragment.Lookup("splitEvent")

    if !splitEventVal.IsZero() {
        var splitEvent bson.Raw
        if err := splitEventVal.Unmarshal(&splitEvent); err == nil {
            fragmentVal := splitEvent.Lookup("fragment")
            ofVal := splitEvent.Lookup("of")

            var fragmentNum, totalFragments int32

            if fragmentVal.Type == bson.TypeInt32 {
                fragmentNum = fragmentVal.Int32()
            }

            if ofVal.Type == bson.TypeInt32 {
                totalFragments = ofVal.Int32()
            }

            log.Printf("PROCESSING FRAGMENT: %d of %d", fragmentNum, totalFragments)

            // Process specific fields that might be in this fragment
            fullDocVal := fragment.Lookup("fullDocument")

            if !fullDocVal.IsZero() {
                var fullDoc bson.Raw
                if err := fullDocVal.Unmarshal(&fullDoc); err == nil {
                    log.Printf("FRAGMENT HAS fullDocument")

                    stockVal := fullDoc.Lookup("stock")

                    if stockVal.Type == bson.TypeString {
                        log.Printf("FRAGMENT DATA - Stock: %s", stockVal.StringValue())
                    }

                    quantityVal := fullDoc.Lookup("quantity")

                    if quantityVal.Type == bson.TypeInt32 {
                        log.Printf("FRAGMENT DATA - Quantity: %d", quantityVal.Int32())
                    } else if quantityVal.Type == bson.TypeInt64 {
                        log.Printf("FRAGMENT DATA - Quantity: %d", quantityVal.Int64())
                    } else if quantityVal.Type == bson.TypeDouble {
                        log.Printf("FRAGMENT DATA - Quantity: %.0f", quantityVal.Double())
                    }
                }
            }

            fullDocBeforeVal := fragment.Lookup("fullDocumentBeforeChange")
            if !fullDocBeforeVal.IsZero() {
                log.Printf("FRAGMENT HAS fullDocumentBeforeChange")
            }

            updateDescVal := fragment.Lookup("updateDescription")

            if !updateDescVal.IsZero() {
                log.Printf("FRAGMENT HAS updateDescription")
            }
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    client, err := mongo.Connect(options.Client().ApplyURI("YOUR-MONGODB-URI"))
    if err != nil {
        log.Fatal(err)
    }

    defer client.Disconnect(ctx)

    db := client.Database("stockTrader")
    collection := db.Collection("trades")

    // Enable change stream pre and post images for the collection
    enablePrePostImages(ctx, db, collection)

    // Start the test document insertion in a separate goroutine
    go insertTestDocuments(ctx, client, collection)

    // Watch the collection with proper error handling
    watchCollectionWithRetry(ctx, db, collection, true)
}

I know this is a lot of new code, but I’ve tried my best to comment on the important elements to give you a clear understanding of what is going on. I encourage you to copy this code and try it yourself to get a better feel for how it’s working. The way this app works now:

  • Every five seconds, a new document will be inserted into the trade collection.
  • The document inserted will be either a small document or a large document that is a little over 8 MiB in size.
  • Next, we’ll do an update operation on the large document. This will return both a pre- and post-image of the document, which will exceed the 16 MiB threshold.
  • But, due to our $changeStreamSplitLargeEvent aggregation and accompanying functions for processing it, we’ll handle it just fine. 

The output should look something like this:

2025/04/18 14:00:11 === NEW CHANGE EVENT DETECTED ===
2025/04/18 14:00:11 CHANGE EVENT: Received non-split event
2025/04/18 14:00:11 CHANGE EVENT: insert operation detected
2025/04/18 14:00:11 CHANGE EVENT: Document ID: ObjectID("6802af48e9705da6ae3f8be7")
2025/04/18 14:00:11 CHANGE EVENT: Fields count: 9
2025/04/18 14:00:11 CHANGE EVENT: Stock: AAPL
2025/04/18 14:00:11 CHANGE EVENT: Quantity: 100000
2025/04/18 14:00:11 CHANGE EVENT: Action: buy
2025/04/18 14:00:11 Saving resume token: {"_data": "826802AF4B000000012B042C0100296E5A10042C59376A5D254D23BCA7C647A2C1CECB463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F696400646802AF48E9705DA6AE3F8BE7000004"}
2025/04/18 14:00:11 === CHANGE EVENT PROCESSING COMPLETE ===
2025/04/18 14:00:11 === NEW CHANGE EVENT DETECTED ===
2025/04/18 14:00:11 Current resume token: {"_data": "826802AF4B000000032B0429296E1404"}
2025/04/18 14:00:11 CHANGE EVENT: Received non-split event
2025/04/18 14:00:11 CHANGE EVENT: insert operation detected
2025/04/18 14:00:11 CHANGE EVENT: Document ID: ObjectID("6802af4be9705da6ae3f8be8")
2025/04/18 14:00:11 CHANGE EVENT: Fields count: 9
2025/04/18 14:00:11 CHANGE EVENT: Stock: MSFT
2025/04/18 14:00:11 CHANGE EVENT: Quantity: 60000
2025/04/18 14:00:11 CHANGE EVENT: Action: sell
2025/04/18 14:00:11 Saving resume token: {"_data": "826802AF4B000000032B0429296E1404"}
2025/04/18 14:00:11 === CHANGE EVENT PROCESSING COMPLETE ===
2025/04/18 14:00:16 Updating large document 6802af37e9705da6ae3f8be5 with h-filled field...
2025/04/18 14:00:17 Successfully updated document with large field
2025/04/18 14:00:18 === NEW CHANGE EVENT DETECTED ===
2025/04/18 14:00:18 Current resume token: {"_data": "826802AF51000000012B042C0100296E5A10042C59376A5D254D23BCA7C647A2C1CECB463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F696400646802AF37E9705DA6AE3F8BE500002904"}
2025/04/18 14:00:18 CHANGE EVENT: Received fragment 1 of 2
2025/04/18 14:00:18 PROCESSING FRAGMENT: 1 of 2
2025/04/18 14:00:18 FRAGMENT CONTAINS FIELDS: [fullDocument _id splitEvent documentKey ns wallTime clusterTime operationType updateDescription]
2025/04/18 14:00:18 FRAGMENT HAS fullDocument with 10 fields
2025/04/18 14:00:18 FRAGMENT DATA - Stock: AAPL
2025/04/18 14:00:18 FRAGMENT DATA - Quantity: 100000
2025/04/18 14:00:18 FRAGMENT HAS updateDescription with 3 fields
2025/04/18 14:00:18 Saving resume token: {"_data": "826802AF51000000012B042C0100296E5A10042C59376A5D254D23BCA7C647A2C1CECB463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F696400646802AF37E9705DA6AE3F8BE500002904"}
2025/04/18 14:00:18 === CHANGE EVENT PROCESSING COMPLETE ===
2025/04/18 14:00:19 === NEW CHANGE EVENT DETECTED ===
2025/04/18 14:00:19 Current resume token: {"_data": "826802AF52000000012B0429296E1404"}
2025/04/18 14:00:19 CHANGE EVENT: Received fragment 2 of 2
2025/04/18 14:00:19 CHANGE EVENT: Received all 2 fragments for this event
2025/04/18 14:00:19 PROCESSING FRAGMENT: 2 of 2
2025/04/18 14:00:19 FRAGMENT CONTAINS FIELDS: [_id splitEvent fullDocumentBeforeChange]
2025/04/18 14:00:19 FRAGMENT HAS fullDocumentBeforeChange with 10 fields
2025/04/18 14:00:19 Saving resume token: {"_data": "826802AF52000000012B0429296E1404"}
2025/04/18 14:00:19 === CHANGE EVENT PROCESSING COMPLETE ===

Now, your change stream implementation is optimized for your specific use case, fault-tolerant with resume tokens, and optimized for working with large datasets. 

Best practices and performance considerations

Change streams unlock a ton of potential and use cases for your MongoDB data. To wrap up this tutorial, let’s cover a couple of best practices and good things to know about change streams and how to implement them for the best outcome.

Pipeline efficiency: Keep your aggregation pipelines as lean as possible. Complex pipelines might impact performance.

Resume tokens: Always capture and persist the resume token after processing events to ensure you can resume accurately.

Error handling: Implement robust error handling, including reconnect logic if the change stream is invalidated.

Resource management: Close change streams appropriately to avoid resource leaks.

Check out the Change Streams Production Recommendations guide from MongoDB for more tips.

Conclusion

In this tutorial, we learned how to work with MongoDB change streams using the MongoDB Go Driver to build an application that allowed us to tap into our MongoDB data as it was added and take action. But this is just the beginning. With change streams, you can do so much more. One awesome use case is using change streams to build fully collaborative, real-time applications. Or, in the case of our fictional Stock Trader application, we could expand it to detect fraudulent trades or great buying opportunities in real time.

Learn more about MongoDB change streams and join our community to share how you are using them in your applications. I hope you enjoyed this tutorial, and happy hacking!


Share this content:

I am a passionate blogger with extensive experience in web design. As a seasoned YouTube SEO expert, I have helped numerous creators optimize their content for maximum visibility.

Leave a Comment