Επεξεργασία

Κοινή χρήση μέσω


Enable real-time sync of MongoDB Atlas data changes to Azure Synapse Analytics

Azure Synapse Analytics

Real-time analytics can help you make quick decisions and perform automated actions based on current insights. It can also help you deliver enhanced customer experiences. This solution describes how to keep Azure Synapse Analytics data pools in sync with operational data changes in MongoDB.

Architecture

The following diagram shows how to implement real-time sync from Atlas to Azure Synapse Analytics. This simple flow ensures that any changes that occur in the MongoDB Atlas collection are replicated to the default Azure Data Lake Storage repository in the Azure Synapse Analytics workspace. After the data is in Data Lake Storage, you can use Azure Synapse Analytics pipelines to push the data to dedicated SQL pools, Spark pools, or other solutions, depending on your analytics requirements.

Diagram that shows an architecture for implementing real-time sync from MongoDB Atlas to Azure Synapse Analytics.

Download a PowerPoint file of this architecture.

Dataflow

Real-time changes in the MongoDB Atlas operational data store (ODS) are captured and made available to Data Lake Storage in an Azure Synapse Analytics workspace for real-time analytics use cases, live reports, and dashboards.

  1. Data changes in the MongoDB Atlas operational/transactional datastore are captured by Atlas triggers.

  2. When an Atlas database trigger observes an event, it passes the change type and the document that's changed (full or delta) to an Atlas function.

  3. The Atlas function triggers an Azure function, passing the change event and a JSON document.

  4. Azure Functions uses the Azure Storage Files Data Lake client library to write the changed document to the configured Data Lake Storage in the Azure Synapse Analytics workspace.

  5. After the data is in Data Lake Storage, it can be sent to dedicated SQL pools, Spark pools, and other solutions. Alternatively, you can convert the data from JSON to Parquet or Delta formats by using Azure Synapse Analytics data flows or Copy pipelines to run additional BI reporting or AI / machine learning on the current data.

Components

  • MongoDB Atlas change streams enable you to notify applications of changes to a collection, database, or deployment cluster. Change streams give applications access to real-time data changes and enable them to immediately react to changes. This functionality is critical in use cases like IoT event tracking and financial data changes, where alarms need to be raised and responsive actions need to be taken immediately. Atlas triggers use change streams to monitor collections for changes and automatically invoke the associated Atlas function in response to the trigger event.
  • Atlas triggers respond to document inserts, updates, and deletes in a specific collection and can automatically invoke an Atlas function in response to the change event.
  • Atlas functions are serverless, server-side JavaScript code implementations that can perform actions based on the events that invoke an Atlas trigger. Combining Atlas triggers with Atlas functions simplifies the implementation of event-driven architectures.
  • Azure Functions is an event-driven, serverless compute platform that you can use to develop applications efficiently with the programming language of your choice. You can also use it to connect seamlessly with other Azure services. In this scenario, an Azure function captures a change event and uses it to write a blob containing the changed data into Data Lake Storage by using the Azure Storage Files Data Lake client library.
  • Data Lake Storage is the default storage solution in Azure Synapse Analytics. You can use serverless pools to query the data directly.
  • Pipelines and data flows in Azure Synapse Analytics can be used to push the blob that contains the MongoDB changed data to dedicated SQL pools or Spark pools for further analysis. Pipelines enable you to act on changed datasets in Data Lake Storage by using both storage event triggers and scheduled triggers to build solutions for both real-time and near real-time use cases. This integration accelerates downstream consumption of change datasets.

Diagram that shows how Azure Synapse Analytics pipelines can push data to pools.

Alternatives

This solution uses Atlas triggers to wrap the code for listening to Atlas change streams and triggering Azure Functions in response to the change event. It's therefore much easier to implement than the previously provided alternative solution. For that solution, you need to write code to listen to change streams in an Azure App Service web app.

Another alternative is to use the MongoDB Spark Connector to read MongoDB stream data and write it to Delta tables. The code is run continuously in a Spark Notebook that's part of a pipeline in Azure Synapse Analytics. For more information on implementing this solution, see Sync from Atlas to Azure Synapse Analytics using Spark streaming.

However, using Atlas triggers with Azure Functions provides a completely serverless solution. Because it's serverless, the solution provides robust scalability and cost optimization. Pricing is based on a pay-as-you-go cost model. You can save more money by using the Atlas function to combine a few change events before invoking the Azure Functions endpoint. This strategy can be useful in heavy-traffic scenarios.

Also, Microsoft Fabric unifies your data estate and makes it easier to run analytics and AI over the data, so you get insights quickly. Azure Synapse Analytics data engineering, data science, data warehousing, and real-time analytics in Fabric can now make better use of MongoDB data that's pushed to OneLake. You can use both Dataflow Gen2 and data pipeline connectors for Atlas to load Atlas data directly to OneLake. This no-code mechanism provides a powerful way to ingest data from Atlas to OneLake.

Diagram that shows how Microsoft Fabric pushes data to OneLake.

In Fabric, you can directly reference data that's pushed to Data Lake Storage by using OneLake shortcuts, without any extract, transform, load (ETL).

You can push the data to Power BI to create reports and visualizations for BI reporting.

Scenario details

MongoDB Atlas, the operational data layer of many enterprise applications, stores data from internal applications, customer-facing services, and third-party APIs from multiple channels. You can use the data pipelines in Azure Synapse Analytics to combine this data with relational data from other traditional applications and with unstructured data from sources like logs, object stores, and clickstreams.

Enterprises use MongoDB capabilities like Aggregations, analytical nodes, Atlas Search, Vector Search, Atlas Data Lake, Atlas SQL Interface, Data Federation, and Charts to enable application-driven intelligence. However, the transactional data in MongoDB is extracted, transformed, and loaded to Azure Synapse Analytics dedicated SQL pools or Spark pools for batch, AI / machine learning, and data-warehouse BI analytics and intelligence.

There are two scenarios for data movement between Atlas and Azure Synapse Analytics: batch integration and real-time sync.

Batch integration

You can use batch and micro-batch integration to move data from Atlas to Data Lake Storage in Azure Synapse Analytics. You can fetch the entire historical data at once or fetch incremental data based on filter criteria.

MongoDB on-premises instances and MongoDB Atlas can be integrated as a source or a sink resource in Azure Synapse Analytics. For information about the connectors, see Copy data from or to MongoDB or Copy data from or to MongoDB Atlas.

The source connector makes it convenient to run Azure Synapse Analytics on operational data that's stored in on-premises MongoDB or in Atlas. You can fetch data from Atlas by using the source connector and load the data to Data Lake Storage in Parquet, Avro, JSON, and text formats or as CSV blob storage. These files can then be transformed or joined with other files from other data sources in multi-database, multicloud, or hybrid cloud scenarios. This use case is common in enterprise data warehouse (EDW) and analytics-at-scale scenarios. You can also use the sink connector to store the results of the analytics back in Atlas. For more information about batch integration, see Analyze operational data on MongoDB Atlas using Azure Synapse Analytics.

Real-time sync

The architecture described in this article can help you implement real-time sync to keep your Azure Synapse Analytics storage current with the MongoDB operational data.

This solution is composed of two primary functions:

  • Capturing the changes in Atlas
  • Triggering the Azure function to propagate the changes to Azure Synapse Analytics

Capture the changes in Atlas

You can capture the changes by using an Atlas trigger, which you can configure in the Add Trigger UI or by using the Atlas App Services Admin API. Triggers listen for database changes caused by database events like inserts, updates, and deletes. Atlas triggers also trigger an Atlas function when a change event is detected. You can use the Add Trigger UI to add the function. You can also create an Atlas function and associate it as the trigger invocation endpoint by using the Atlas Admin API.

The following screenshot shows the form that you can use to create and edit an Atlas trigger. In the Trigger Source Details section, you specify the collection that the trigger watches for change events and the database events it watches for (insert, update, delete, and/or replace).

Screenshot that shows the form for creating an Atlas trigger.

The trigger can invoke an Atlas function in response to the event that it's enabled for. The following screenshot shows the simple JavaScript code, added as an Atlas function, to invoke in response to the database trigger. The Atlas function invokes an Azure function, passing it the metadata of the change event together with the document that was inserted, updated, deleted, or replaced, depending on what the trigger is enabled for.

Screenshot that shows JavaScript code added to the trigger.

Atlas function code

The Atlas function code triggers the Azure function that's associated with the Azure function endpoint by passing the entire changeEvent in the body of the request to the Azure function.

You need to replace the <Azure function URL endpoint> placeholder with the actual Azure function URL endpoint.

exports =  function(changeEvent) {

    // Invoke Azure function that inserts the change stream into Data Lake Storage.
    console.log(typeof fullDocument);
    const response =  context.http.post({
        url: "<Azure function URL endpoint>",
        body: changeEvent,
        encodeBodyAsJSON: true
    });
    return response;
};

Trigger the Azure function to propagate the changes to Azure Synapse Analytics

The Atlas function is coded to invoke an Azure function that writes the change document to Data Lake Storage in Azure Synapse Analytics. The Azure function uses the Azure Data Lake Storage client library for Python SDK to create an instance of the DataLakeServiceClient class that represents your storage account.

The Azure function uses a storage key for authentication. You can also use Microsoft Entra ID OAuth implementations. The storage_account_key and other attributes related to Dake Lake Storage are fetched from the configured OS environment variables. After the request body is decoded, the fullDocument (the entire inserted or updated document) is parsed from the request body and then written to Data Lake Storage by the Data Lake client functions append_data and flush_data.

For a delete operation, fullDocumentBeforeChange is used instead of fullDocument. fullDocument doesn't have any value in a delete operation, so the code fetches the document that was deleted, which is captured in fullDocumentBeforeChange. Note that fullDocumentBeforeChange is only populated when the Document Preimage setting is set to on, as shown in the previous screenshot.

import json
import logging
import os
import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a new request.')
    logging.info(req)
    storage_account_name = os.environ["storage_account_name"]
    storage_account_key = os.environ["storage_account_key"]
    storage_container = os.environ["storage_container"]
    storage_directory = os.environ["storage_directory"]
    storage_file_name = os.environ["storage_file_name"]
    service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
            "https", storage_account_name), credential=storage_account_key)
    json_data = req.get_body()
    logging.info(json_data)
    object_id = "test"
    try:
        json_string = json_data.decode("utf-8")
        json_object = json.loads(json_string)

        if json_object["operationType"] == "delete":
            object_id = json_object["fullDocumentBeforeChange"]["_id"]["$oid"]
            data = {"operationType": json_object["operationType"], "data":json_object["fullDocumentBeforeChange"]}
        else:
            object_id = json_object["fullDocument"]["_id"]["$oid"]
            data = {"operationType": json_object["operationType"], "data":json_object["fullDocument"]}

        logging.info(object_id)
        encoded_data = json.dumps(data)
    except Exception as e:
        logging.info("Exception occurred : "+ str(e))

    file_system_client = service_client.get_file_system_client(file_system=storage_container)
    directory_client = file_system_client.get_directory_client(storage_directory)
    file_client = directory_client.create_file(storage_file_name + "-" + str(object_id) + ".txt")
    file_client.append_data(data=encoded_data, offset=0, length=len(encoded_data))
    file_client.flush_data(len(encoded_data))
    return func.HttpResponse(f"This HTTP triggered function executed successfully.")

So far, you've seen how the Atlas trigger captures any change that occurs and passes it to an Azure function via an Atlas function, and that the Azure function writes the change document as a new file in Data Lake Storage in the Azure Synapse Analytics workspace.

After the file is added to Data Lake Storage, you can set up a storage event trigger to trigger a pipeline that can then write the change document to a dedicated SQL pool or to a Spark pool table. The pipeline can use the Copy activity and transform the data by using a data flow. Alternatively, if your final target is a dedicated SQL pool, you can modify the Azure function to write directly to the dedicated SQL pool in Azure Synapse Analytics. For a SQL pool, get the ODBC connection string for the SQL pool connection. See Use Python to query a database for an example of Python code that you can use to query the SQL pool table by using the connection string. You can modify this code to use an Insert query to write to a dedicated SQL pool. There are configuration settings and roles that need to be assigned to enable the function to write to a dedicated SQL pool. Information about these settings and roles is outside the scope of this article.

If you want a near real-time solution and you don't need the data to be synchronized in real time, using scheduled pipeline runs might be a good option. You can set up scheduled triggers to trigger a pipeline with the Copy activity or a data flow, at a frequency that's at the near real-time frequency that your business can afford, to use the MongoDB connector to fetch the data from MongoDB that was inserted, updated, or deleted between the last scheduled run and the current run. The pipeline uses the MongoDB connector as source connector to fetch the delta data from MongoDB Atlas and push it to Data Lake Storage or Azure Synapse Analytics dedicated SQL pools, using these as sink connections. This solution uses a pull mechanism (as opposed to the main solution described in this article, which is a push mechanism) from MongoDB Atlas as changes occur in the MongoDB Atlas collection that the Atlas trigger is listening to.

Potential use cases

MongoDB and the Azure Synapse Analytics EDW and analytical services can serve numerous use cases:

Retail

  • Building intelligence into product bundling and product promotion
  • Implementing customer 360 and hyper-personalization
  • Predicting stock depletion and optimizing supply-chain orders
  • Implementing dynamic discount pricing and smart search in e-commerce

Banking and finance

  • Customizing customer financial services
  • Detecting and blocking fraudulent transactions

Telecommunications

  • Optimizing next-generation networks
  • Maximizing the value of edge networks

Automotive

  • Optimizing the parameterization of connected vehicles
  • Detecting anomalies in IoT communication in connected vehicles

Manufacturing

  • Providing predictive maintenance for machinery
  • Optimizing storage and inventory management

Considerations

These considerations implement the pillars of the Azure Well-Architected Framework, which is a set of guiding tenets that you can use to improve the quality of a workload. For more information, see Microsoft Azure Well-Architected Framework.

Security

Security provides assurances against deliberate attacks and the abuse of your valuable data and systems. For more information, see Overview of the security pillar.

Azure Functions is a serverless managed service, so the app resources and platform components are protected by enhanced security. However, we recommend that you use HTTPS protocol and the latest TLS versions. It's also a good practice to validate the input to ensure that it's a MongoDB change document. See Securing Azure Functions for security considerations for Azure Functions.

MongoDB Atlas is a managed database as a service, so MongoDB provides enhanced platform security. MongoDB provides multiple mechanisms to help ensure 360-degree security for stored data, including database access, network security, encryption at rest and in transit, and data sovereignty. See MongoDB Atlas Security for the MongoDB Atlas security whitepaper and other articles that can help you ensure that the data in MongoDB is secure throughout the data lifecycle.

Cost optimization

Cost optimization is about reducing unnecessary expenses and improving operational efficiencies. For more information, see Overview of the cost optimization pillar.

To estimate the cost of Azure products and configurations, use the Azure pricing calculator. Azure helps you avoid unnecessary costs by determining the correct number of resources to use, analyzing spending over time, and scaling to meet business needs without overspending. Azure Functions incur costs only when they're invoked. However, depending on the volume of changes in MongoDB Atlas, you can evaluate using a batching mechanism in the Atlas function to store changes in another temporary collection and trigger the Azure function only if the batch exceeds a certain limit.

For information about Atlas clusters, see 5 Ways to Reduce Costs With MongoDB Atlas and Cluster Configuration Costs. The MongoDB pricing page can help you understand pricing options for MongoDB Atlas clusters and other offerings of the MongoDB Atlas developer data platform. Atlas Data Federation can be deployed in Azure and supports Azure Blob Storage (in preview). If you're considering using batching to optimize costs, consider writing to Blob Storage instead of a MongoDB temporary collection.

Performance efficiency

Performance efficiency is the ability of your workload to scale to meet the demands placed on it by users in an efficient manner. For more information, see Performance efficiency pillar overview.

Atlas triggers and Azure Functions are time-tested for performance and scalability. See Performance and scale in Durable Functions (Azure Functions) to understand performance and scalability considerations for Azure Functions. See Scale On-Demand for some considerations for enhancing the performance of your MongoDB Atlas instances. See Best Practices Guide for MongoDB Performance for best practices for MongoDB Atlas configuration.

Conclusion

MongoDB Atlas seamlessly integrates with Azure Synapse Analytics, enabling Atlas customers to easily use Atlas as a source or a sink for Azure Synapse Analytics. This solution enables you to use MongoDB operational data in real-time from Azure Synapse Analytics for complex analytics and AI inference.

Deploy this scenario

Real-Time Sync from MongoDB Atlas to Azure Synapse Analytics

Contributors

This article is maintained by Microsoft. It was originally written by the following contributors.

Principal authors:

Other contributors:

To see non-public LinkedIn profiles, sign in to LinkedIn.

Next steps