Editar

Compartir vía


Habilitación de la sincronización en tiempo real de los cambios de datos de MongoDB Atlas en Azure Synapse Analytics

Azure Synapse Analytics

El análisis en tiempo real puede ayudarle a tomar decisiones rápidamente y a realizar acciones automatizadas en función de la información actual. También puede ayudarle a ofrecer experiencias mejoradas para el cliente. Esta solución describe cómo mantener los grupos de datos de Azure Synapse Analytics sincronizados con los cambios de datos operativos en MongoDB.

Architecture

Este diagrama se muestra cómo implementar la sincronización en tiempo real de Atlas a Azure Synapse Analytics. Este flujo sencillo garantiza que los cambios que se produzcan en la colección de MongoDB Atlas se repliquen en el repositorio predeterminado de Azure Data Lake Storage en el área de trabajo de Azure Synapse Analytics. Una vez que los datos estén en Data Lake Storage, podrá usar canalizaciones de Azure Synapse Analytics para insertar los datos en grupos de SQL dedicados, grupos de Spark u otras soluciones, en función de los requisitos de análisis.

Diagrama que muestra una arquitectura para implementar la sincronización en tiempo real desde MongoDB Atlas a Azure Synapse Analytics.

Descargue un archivo de PowerPoint de esta arquitectura.

Flujo de datos

Los cambios en tiempo real en el almacén de datos operativos (ODS) de MongoDB Atlas se capturan y se ponen a disposición de Data Lake Storage en un área de trabajo de Azure Synapse Analytics para casos de uso de análisis en tiempo real, informes activos y paneles.

  1. Los desencadenadores Atlas capturan los cambios de datos en el almacén de datos operativo o transaccional de MongoDB Atlas.

  2. Cuando un desencadenador de base de datos Atlas observa un evento, pasa el tipo de cambio y el documento que ha cambiado (completo o delta) a una función Atlas.

  3. La función Atlas desencadena una función de Azure, pasando el evento de cambio y un documento JSON.

  4. Azure Functions usa la biblioteca cliente de Azure Storage Files Data Lake para escribir el documento modificado en el almacenamiento de Data Lake configurado en el área de trabajo de Azure Synapse Analytics.

  5. Una vez que los datos estén en Data Lake Storage, se podrán enviar a grupos de SQL dedicados, grupos de Spark y otras soluciones. Como alternativa, puede convertir los datos de JSON a formatos Parquet o Delta mediante flujos de datos de Azure Synapse Analytics o canalizaciones Copy para ejecutar informes de BI adicionales, o IA o aprendizaje automático en los datos actuales.

Componentes

  • Los flujos de cambios de MongoDB Atlas le permiten notificar a las aplicaciones los cambios en una colección, base de datos o clúster de implementación. Los flujos de cambio proporcionan a las aplicaciones acceso a los cambios de datos en tiempo real y les permiten reaccionar inmediatamente a estos. Esta funcionalidad es fundamental en casos de uso como el seguimiento de eventos de IoT y los cambios en los datos financieros, donde es necesario alertarse y tomar medidas con capacidad de respuesta inmediata. Los desencadenadores Atlas usan flujos de cambios para supervisar las colecciones de cambios e invocar automáticamente la función Atlas asociada en respuesta al evento desencadenador.
  • Los desencadenadores Atlas responden a inserciones, actualizaciones y eliminaciones de documentos en una colección específica y pueden invocar automáticamente una función Atlas en respuesta al evento de cambio.
  • Las funciones Atlas son implementaciones de código JavaScript sin servidor que pueden realizar acciones basadas en los eventos que invocan un desencadenador Atlas. La combinación de desencadenadores Atlas con funciones Atlas simplifica la implementación de arquitecturas controladas por eventos.
  • Azure Functions es una plataforma de proceso sin servidor controlada por eventos que puede utilizar para desarrollar aplicaciones de forma eficaz con el lenguaje de programación que prefiera. También puede usarla para conectarse sin problemas con otros servicios de Azure. En ese caso, una función de Azure captura un evento de cambio y lo usa para escribir un blob que contiene los datos modificados en Data Lake Storage mediante la biblioteca cliente de Azure Storage Files Data Lake.
  • Data Lake Storage es la solución de almacenamiento predeterminada en Azure Synapse Analytics. Puede usar grupos sin servidor para consultar directamente los datos.
  • Las canalizaciones y los flujos de datos en Azure Synapse Analytics se pueden usar para insertar el blob que contiene los datos modificados de MongoDB en grupos de SQL dedicados o grupos de Spark para un análisis posterior. Las canalizaciones permiten actuar en conjuntos de datos modificados en Data Lake Storage mediante desencadenadores de eventos de almacenamiento y desencadenadores programados para crear soluciones para casos de uso tanto en tiempo real como casi en tiempo real. Esta integración acelera el consumo descendente de conjuntos de datos de cambios.

Diagrama que muestra cómo las canalizaciones de Azure Synapse Analytics pueden insertar datos en grupos.

Alternativas

Esta solución usa desencadenadores de Atlas para ajustar el código para escuchar flujos de cambio Atlas y desencadenar Azure Functions en respuesta al evento de cambio. Por lo tanto, es mucho más fácil implementar que la solución alternativa proporcionada anteriormente. Para esa solución, debe escribir código para escuchar las secuencias de cambios en una aplicación web de Azure App Service.

Otra alternativa es usar el MongoDB Spark Connector para leer los datos del flujo de MongoDB y escribirlos en Tablas delta. El código se ejecuta continuamente en un Spark Notebook que forma parte de una canalización en Azure Synapse Analytics. Para saber más sobre la implementación de esta solución, consulte Sincronización entre Atlas y Azure Synapse Analytics mediante Spark streaming.

Sin embargo, el uso de desencadenadores Atlas con Azure Functions proporciona una solución completamente sin servidor. Ya que es sin servidor, la solución proporciona una sólida escalabilidad y optimización de costos. Los precios se basan en un modelo de costo de pago por uso. Puede ahorrar más dinero usando la función Atlas para combinar algunos eventos de cambio antes de invocar el punto de conexión de Azure Functions. Esta estrategia puede ser útil en casos de tráfico intensivo.

Además, Microsoft Fabric unifica su patrimonio de datos y facilita la ejecución de análisis e IA a través de los datos, por lo que obtendrá información rápidamente. La ingeniería de datos, la ciencia de datos, el almacenamiento de datos y el análisis en tiempo real de Azure Synapse Analytics en Fabric ahora pueden usar mejor los datos de MongoDB que se insertan en OneLake. Puede usar tanto Dataflow Gen2 como conectores de canalización de datos para Atlas para cargar datos de Atlas directamente en OneLake. Este mecanismo sin código proporciona una forma eficaz de ingerir datos de Atlas a OneLake.

Diagrama que muestra cómo Microsoft Fabric inserta datos en OneLake.

En Fabric puede hacer referencia directamente a los datos que se insertan en Data Lake Storage mediante accesos directos de OneLake, sin ningún proceso de extracción, transformación y carga (ETL).

Puede insertar los datos en Power BI para crear informes y visualizaciones para informes de BI.

Detalles del escenario

MongoDB Atlas, la capa de datos operativa de muchas aplicaciones empresariales, almacena datos de aplicaciones internas, servicios orientados al cliente y las API de terceros de varios canales. Puede utilizar las canalizaciones de datos en Azure Synapse Analytics para combinar estos datos con datos relacionales de otras aplicaciones tradicionales y con datos no estructurados de orígenes como registros, almacenes de objetos y secuencias de clic.

Las empresas usan funcionalidades de MongoDB como agregaciones, nodos analíticos, Atlas Search, Vector Search, Atlas Data Lake, Atlas SQL Interface, Data Federation y Charts para habilitar la inteligencia controlada por aplicaciones. Sin embargo, los datos transaccionales de MongoDB se extraen, transforman y cargan en grupos de SQL dedicados de Azure Synapse Analytics o grupos de Spark para lotes, IA o aprendizaje automático y análisis e inteligencia de BI de almacenamiento de datos.

Hay dos situaciones para el movimiento de datos entre Atlas y Azure Synapse Analytics: integración por lotes y sincronización en tiempo real.

Integración de lotes

Puede utilizar la integración por lotes y microprocesos para mover datos de Atlas a Data Lake Storage en Azure Synapse Analytics. Puede capturar los datos cronológicos completos a la vez o capturar datos incrementales en función de los criterios de filtro.

Las instancias locales de MongoDB y MongoDB Atlas pueden integrarse como un recurso de origen o receptor en Azure Synapse Analytics. Para más información sobre los conectores, consulte Copia de datos desde/a MongoDB o Copia de datos desde/a MongoDB Atlas.

El conector de origen facilita la ejecución de Azure Synapse Analytics en los datos operativos almacenados en el entorno local de MongoDB o en Atlas. Puede capturar datos de Atlas mediante el conector de origen y cargar los datos en Data Lake Storage en Parquet, Avro, JSON y formatos de texto, o como almacenamiento de blobs CSV. Estos archivos se pueden transformar o combinar con otros archivos de otros orígenes de datos en casos de varias bases de datos, multinube o nube híbrida. Este caso de uso es común en casos de almacenamiento de datos empresariales (EDW) y análisis a escala. También puede usar el conector receptor para almacenar los resultados del análisis de vuelta en Atlas. Para saber más sobre la integración por lotes, consulte Análisis de datos operativos en MongoDB Atlas con Azure Synapse Analytics.

Sincronización en tiempo real

La arquitectura descrita en este artículo puede ayudarle a implementar la sincronización en tiempo real para mantener el almacenamiento de Azure Synapse Analytics actualizado con los datos operativos de MongoDB.

Esta solución se compone de dos funciones principales:

  • Capturar los cambios en Atlas
  • Desencadenar la función de Azure para propagar los cambios a Azure Synapse Analytics

Captura de los cambios en Atlas

Puede capturar los cambios mediante un desencadenador Atlas, que puede configurar en la UI Agregar desencadenador o mediante la API de administración de Atlas App Services. Los desencadenadores escuchan los cambios de base de datos causados por eventos de base de datos como inserciones, actualizaciones y eliminaciones. Los desencadenadores Atlas también desencadenan una función Atlas cuando se detecta un evento de cambio. Puede usar la UI Agregar desencadenador para agregar la función. También puede crear una función Atlas y asociarla como punto de conexión de invocación de desencadenador mediante Atlas Admin API.

En esta captura de pantalla se muestra el formulario que puede usar para crear y editar un desencadenador Atlas. En la sección Detalles del origen del desencadenador, especifique la colección que el desencadenador supervisa para eventos de cambio y los eventos de la base de datos que supervisa (insertar, actualizar, eliminar y/o reemplazar).

Captura de pantalla que muestra el formulario para crear un desencadenador de Atlas.

El desencadenador puede invocar una función Atlas en respuesta al evento para el que está habilitado. En esta captura de pantalla se muestra el código JavaScript simple, agregado como una función Atlas, para invocar en respuesta al desencadenador de base de datos. La función Atlas invoca una función de Azure, pasando los metadatos del evento de cambio junto con el documento que se insertó, actualizó, eliminó o reemplazó, en función de para que está habilitado el desencadenador.

Captura de pantalla que muestra el código JavaScript agregado al desencadenador.

Código de función Atlas

El código de función atlas desencadena la función de Azure asociada al punto de conexión de la función de Azure pasando changeEvent por completo al cuerpo de la solicitud a la función de Azure.

Debe reemplazar el marcador de posición<Azure function URL endpoint> por el punto de conexión de la URL de la función de Azure.

exports =  function(changeEvent) {

    // Invoke Azure function that inserts the change stream into Data Lake Storage.
    console.log(typeof fullDocument);
    const response =  context.http.post({
        url: "<Azure function URL endpoint>",
        body: changeEvent,
        encodeBodyAsJSON: true
    });
    return response;
};

Desencadenar la función de Azure para propagar los cambios a Azure Synapse Analytics

La función Atlas está programada para invocar una función de Azure que escribe el documento de cambio en Data Lake Storage en Azure Synapse Analytics. La función de Azure usa el SDK Biblioteca cliente de Azure Data Lake Storage para Python para crear una instancia de la clase DataLakeServiceClient que representa la cuenta de almacenamiento.

La función de Azure usa una clave de almacenamiento para la autenticación. También puede usar implementaciones de OAuth de Microsoft Entra ID. Los atributos storage_account_key y otros atributos relacionados con Dake Lake Storage se capturan de las variables de entorno del SO configuradas. Después de descodificar el cuerpo de la solicitud, el objeto fullDocument (todo el documento insertado o actualizado) se analiza desde el cuerpo de la solicitud y luego se escribe en Data Lake Storage mediante las funciones de cliente de Data Lake append_data y flush_data.

Para una operación de eliminación, se usa fullDocumentBeforeChange en lugar de fullDocument. fullDocument no tiene ningún valor en una operación de eliminación, por lo que el código captura el documento que se eliminó, capturado en fullDocumentBeforeChange. Tenga en cuenta que fullDocumentBeforeChange solo se rellena cuando la Preimagen del documento está activada, como se muestra en la captura de pantalla anterior.

import json
import logging
import os
import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a new request.')
    logging.info(req)
    storage_account_name = os.environ["storage_account_name"]
    storage_account_key = os.environ["storage_account_key"]
    storage_container = os.environ["storage_container"]
    storage_directory = os.environ["storage_directory"]
    storage_file_name = os.environ["storage_file_name"]
    service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
            "https", storage_account_name), credential=storage_account_key)
    json_data = req.get_body()
    logging.info(json_data)
    object_id = "test"
    try:
        json_string = json_data.decode("utf-8")
        json_object = json.loads(json_string)

        if json_object["operationType"] == "delete":
            object_id = json_object["fullDocumentBeforeChange"]["_id"]["$oid"]
            data = {"operationType": json_object["operationType"], "data":json_object["fullDocumentBeforeChange"]}
        else:
            object_id = json_object["fullDocument"]["_id"]["$oid"]
            data = {"operationType": json_object["operationType"], "data":json_object["fullDocument"]}

        logging.info(object_id)
        encoded_data = json.dumps(data)
    except Exception as e:
        logging.info("Exception occurred : "+ str(e))

    file_system_client = service_client.get_file_system_client(file_system=storage_container)
    directory_client = file_system_client.get_directory_client(storage_directory)
    file_client = directory_client.create_file(storage_file_name + "-" + str(object_id) + ".txt")
    file_client.append_data(data=encoded_data, offset=0, length=len(encoded_data))
    file_client.flush_data(len(encoded_data))
    return func.HttpResponse(f"This HTTP triggered function executed successfully.")

Hasta ahora, ha visto cómo el desencadenador de Atlas captura cualquier cambio que se produce y lo pasa a una función de Azure a través de una función Atlas y que la función de Azure escribe el documento de cambio como un nuevo archivo en Data Lake Storage en el área de trabajo de Azure Synapse Analytics.

Después de agregar el archivo a Data Lake Storage, puede configurar un desencadenador de eventos de almacenamiento para desencadenar una canalización que luego pueda escribir el documento de cambio en un grupo de SQL dedicado o en una tabla de grupos de Spark. La canalización puede usar la Actividad de copia y transformar los datos mediante un flujo de datos. Como alternativa, si el destino final es un grupo de SQL dedicado, puede modificar la función de Azure para escribir directamente en el grupo de SQL en Azure Synapse Analytics. Para un grupo de SQL, obtenga la cadena de conexión ODBC para la conexión del grupo de SQL. Consulte Uso de Python para consultar una base de datos para obtener un ejemplo de código de Python que puede usar para consultar la tabla del grupo de SQL mediante la cadena de conexión. Puede modificar este código para usar una consulta de inserción para escribir en un grupo de SQL dedicado. Hay opciones de configuración y roles que deben asignarse para permitir que la función escriba en un grupo de SQL dedicado. La información sobre estas opciones de configuración y roles no pertenece al ámbito de este artículo.

Si desea una solución casi en tiempo real y no necesita que los datos se sincronicen en tiempo real, el uso de ejecuciones de canalización programadas podría ser una buena opción. Puede programar desencadenadores para desencadenar una canalización con la actividad de copia o un flujo de datos, con una frecuencia que se encuentra en la de casi en tiempo real que su empresa puede permitir, para usar el conector de MongoDB para capturar los datos de MongoDB que se insertaron, actualizaron o eliminaron entre la última ejecución programada y la ejecución actual. La canalización usa el conector de MongoDB como conector de origen para capturar los datos diferenciales de MongoDB Atlas e insertarlos en los grupos de SQL dedicados de Data Lake Storage o Azure Synapse Analytics, con estos como conexiones receptoras. Esta solución usa un mecanismo de extracción (en lugar de la solución principal descrita en este artículo, que es un mecanismo de inserción) de MongoDB Atlas a medida que se producen cambios en la colección Atlas de MongoDB a la que escucha el desencadenador Atlas.

Posibles casos de uso

MongoDB y los servicios analíticos y EDW de Azure Synapse Analytics pueden atender numerosos casos de uso:

Minoristas

  • Creación de inteligencia en la agrupación y promoción de productos
  • Implementación del cliente 360 e hiperpersonalización
  • Predicción del agotamiento de existencias y optimización de los pedidos de la cadena de suministro
  • Implementación de precios de descuento dinámicos y búsqueda inteligente en comercio electrónico

Banca y finanzas

  • Personalización de servicios financieros de clientes
  • Detección y bloqueo de transacciones fraudulentas

Telecomunicaciones

  • Optimización de redes de próxima generación
  • Maximización del valor de las redes perimetrales

Automoción

  • Optimización de la parametrización de vehículos conectados
  • Detección de anomalías en la comunicación de IoT en vehículos conectados

Fabricación

  • Proporcionar mantenimiento predictivo para maquinaria
  • Optimización de la administración de almacenamiento e inventario

Consideraciones

Estas consideraciones implementan los pilares del Azure Well-Architected Framework, que es un conjunto de principios rectores que puede utilizar para mejorar la calidad de una carga de trabajo. Para más información, consulte Marco de buena arquitectura de Microsoft Azure.

Seguridad

La seguridad proporciona garantías contra ataques deliberados y el abuso de datos y sistemas valiosos. Para más información, consulte Introducción al pilar de seguridad.

Azure Functions es un servicio administrado sin servidor, por lo que los recursos de la aplicación y los componentes de la plataforma están protegidos por una seguridad mejorada. No obstante, se recomienda usar el protocolo HTTPS y las versiones más recientes de TLS. También se recomienda validar la entrada para asegurarse de que es un documento de cambio de MongoDB. Consulte Protección de Azure Functions para conocer las consideraciones de seguridad de Azure Functions.

MongoDB Atlas es una base de datos administrada como servicio, por lo que MongoDB proporciona una seguridad de plataforma mejorada. MongoDB proporciona varios mecanismos para garantizar la seguridad de 360 grados de los datos almacenados, incluido el acceso a bases de datos, la seguridad de red, el cifrado en reposo y en tránsito, y la soberanía de los datos. Consulte las notas del producto Seguridad de MongoDB Atlas para MongoDB Atlas y otros artículos que pueden ayudarle a asegurarse de que los datos de MongoDB están protegidos durante todo el ciclo de vida de los datos.

Optimización de costos

La optimización de costos trata de reducir los gastos innecesarios y mejorar las eficiencias operativas. Para más información, vea Información general del pilar de optimización de costos.

Para calcular el costo de los productos y las configuraciones de Azure, use la calculadora de precios de Azure. Azure le ayuda a evitar costos innecesarios al permitir la identificación del número correcto de recursos, el análisis de los gastos a lo largo del tiempo y el escalado para satisfacer las necesidades empresariales sin gastos excesivos. Las funciones Azure solo incurren en costes cuando se invocan. Sin embargo, en función del volumen de cambios en MongoDB Atlas, puede evaluarlo con un mecanismo de procesamiento por lotes en la función Atlas para almacenar los cambios en otra colección temporal y desencadenar la función de Azure, solo si el lote supera un límite determinado.

Para obtener información sobre los clústeres de Atlas, consulte 5 formas de reducir los costos con MongoDB Atlas y Costos de configuración de clústeres. La Página de precios de MongoDB puede ayudarle a comprender las opciones de precios de los clústeres de MongoDB Atlas y otras ofertas de la plataforma de datos para desarrolladores de MongoDB Atlas. Atlas Data Federation puede implementarse en Azure y es compatible con Azure Blob Storage (en versión preliminar). Si está pensando en usar el procesamiento por lotes para optimizar los costos, considere la posibilidad de escribir a Blob Storage en lugar de una colección temporal de MongoDB.

Eficiencia del rendimiento

La eficiencia del rendimiento es la capacidad de la carga de trabajo para escalar con el fin de satisfacer de manera eficiente las demandas que los usuarios hayan ejercido sobre ella. Para obtener más información, vea Resumen del pilar de eficiencia del rendimiento.

Los desencadenadores Atlas y las funciones Azure son de eficacia probada para mejorar el rendimiento y la escalabilidad. Consulte Rendimiento y escalado en Durable Functions (Azure Functions) para comprender las consideraciones de rendimiento y escalabilidad de Azure Functions. Consulte Escalado a petición para conocer algunas consideraciones para mejorar el rendimiento de las instancias de MongoDB Atlas. Consulte La Guía de procedimientos recomendados para el rendimiento de MongoDB para conocer los procedimientos recomendados para la configuración de MongoDB Atlas.

Conclusión

MongoDB Atlas se integra perfectamente con Azure Synapse Analytics, lo que permite a los clientes de Atlas usarlo fácilmente como origen o receptor para Azure Synapse Analytics. Esta solución le permite usar datos operativos de MongoDB en tiempo real desde Azure Synapse Analytics para la inferencia de análisis complejos e IA.

Implementación de este escenario

Sincronización en tiempo real desde MongoDB Atlas a Azure Synapse Analytics

Colaboradores

Microsoft mantiene este artículo. Originalmente lo escribieron los siguientes colaboradores.

Creadores de entidad de seguridad:

Otros colaboradores:

Para ver los perfiles no públicos de LinkedIn, inicie sesión en LinkedIn.

Pasos siguientes