Editar

Partilhar via


Habilite a sincronização em tempo real das alterações de dados do MongoDB Atlas para o Azure Synapse Analytics

Azure Synapse Analytics

A análise em tempo real pode ajudá-lo a tomar decisões rápidas e executar ações automatizadas com base nos insights atuais. Também pode ajudá-lo a oferecer experiências aprimoradas ao cliente. Esta solução descreve como manter os pools de dados do Azure Synapse Analytics sincronizados com as alterações de dados operacionais no MongoDB.

Arquitetura

O diagrama a seguir mostra como implementar a sincronização em tempo real do Atlas para o Azure Synapse Analytics. Esse fluxo simples garante que todas as alterações que ocorram na coleção do MongoDB Atlas sejam replicadas para o repositório padrão do Azure Data Lake Storage no espaço de trabalho do Azure Synapse Analytics. Depois que os dados estiverem no Armazenamento Data Lake, você poderá usar os pipelines do Azure Synapse Analytics para enviar os dados por push para pools SQL dedicados, pools do Spark ou outras soluções, dependendo dos seus requisitos de análise.

Diagrama que mostra uma arquitetura para implementar a sincronização em tempo real do MongoDB Atlas para o Azure Synapse Analytics.

Transfira um ficheiro PowerPoint desta arquitetura.

Fluxo de dados

As alterações em tempo real no armazenamento de dados operacionais (ODS) do MongoDB Atlas são capturadas e disponibilizadas para o Data Lake Storage em um espaço de trabalho do Azure Synapse Analytics para casos de uso de análise em tempo real, relatórios ao vivo e painéis.

  1. As alterações de dados no armazenamento de dados operacional/transacional do MongoDB Atlas são capturadas pelos gatilhos do Atlas.

  2. Quando um gatilho de banco de dados Atlas observa um evento, ele passa o tipo de alteração e o documento alterado (completo ou delta) para uma função Atlas.

  3. A função Atlas dispara uma função do Azure, passando o evento change e um documento JSON.

  4. O Azure Functions usa a biblioteca de cliente Data Lake dos Arquivos de Armazenamento do Azure para gravar o documento alterado no Armazenamento Data Lake configurado no espaço de trabalho do Azure Synapse Analytics.

  5. Depois que os dados estiverem no Armazenamento Data Lake, eles poderão ser enviados para pools SQL dedicados, pools do Spark e outras soluções. Como alternativa, você pode converter os dados dos formatos JSON para Parquet ou Delta usando fluxos de dados do Azure Synapse Analytics ou pipelines de cópia para executar relatórios de BI adicionais ou IA/aprendizado de máquina nos dados atuais.

Componentes

  • Os fluxos de alterações do MongoDB Atlas permitem que você notifique os aplicativos sobre alterações em uma coleção, banco de dados ou cluster de implantação. Os fluxos de alterações dão aos aplicativos acesso a alterações de dados em tempo real e permitem que eles reajam imediatamente às alterações. Essa funcionalidade é crítica em casos de uso, como rastreamento de eventos de IoT e alterações de dados financeiros, onde alarmes precisam ser disparados e ações responsivas precisam ser tomadas imediatamente. Os gatilhos do Atlas usam fluxos de alterações para monitorar coleções em busca de alterações e invocam automaticamente a função Atlas associada em resposta ao evento de gatilho.
  • Os gatilhos do Atlas respondem a inserções, atualizações e exclusões de documentos em uma coleção específica e podem invocar automaticamente uma função do Atlas em resposta ao evento de alteração.
  • As funções do Atlas são implementações de código JavaScript do lado do servidor sem servidor que podem executar ações com base nos eventos que invocam um gatilho do Atlas. A combinação de gatilhos do Atlas com funções do Atlas simplifica a implementação de arquiteturas orientadas a eventos.
  • O Azure Functions é uma plataforma de computação sem servidor orientada a eventos que pode utilizar para desenvolver aplicações de forma eficiente com a linguagem de programação da sua escolha. Você também pode usá-lo para se conectar diretamente com outros serviços do Azure. Nesse cenário, uma função do Azure captura um evento de alteração e o usa para gravar um blob contendo os dados alterados no Armazenamento Data Lake usando a biblioteca de cliente Data Lake dos Arquivos de Armazenamento do Azure.
  • O Armazenamento Data Lake é a solução de armazenamento padrão no Azure Synapse Analytics. Você pode usar pools sem servidor para consultar os dados diretamente.
  • Os pipelines e fluxos de dados no Azure Synapse Analytics podem ser usados para enviar por push o blob que contém os dados alterados do MongoDB para pools SQL dedicados ou pools do Spark para análise posterior. Os pipelines permitem que você atue em conjuntos de dados alterados no Armazenamento Data Lake usando gatilhos de eventos de armazenamento e gatilhos agendados para criar soluções para casos de uso em tempo real e quase em tempo real. Essa integração acelera o consumo downstream de conjuntos de dados de alteração.

Diagrama que mostra como os pipelines do Azure Synapse Analytics podem enviar dados por push para pools.

Alternativas

Esta solução usa gatilhos do Atlas para encapsular o código para ouvir fluxos de alteração do Atlas e acionar o Azure Functions em resposta ao evento de alteração. É, portanto, muito mais fácil de implementar do que a solução alternativa fornecida anteriormente. Para essa solução, você precisa escrever código para ouvir fluxos de alterações em um aplicativo Web do Serviço de Aplicativo do Azure.

Outra alternativa é usar o MongoDB Spark Connector para ler dados de fluxo do MongoDB e gravá-los em tabelas Delta. O código é executado continuamente em um Bloco de Anotações Spark que faz parte de um pipeline no Azure Synapse Analytics. Para obter mais informações sobre como implementar essa solução, consulte Sincronizar do Atlas com o Azure Synapse Analytics usando o streaming do Spark.

No entanto, usar gatilhos do Atlas com o Azure Functions fornece uma solução completamente sem servidor. Por ser sem servidor, a solução oferece escalabilidade robusta e otimização de custos. O preço é baseado em um modelo de custo pré-pago. Você pode economizar mais dinheiro usando a função Atlas para combinar alguns eventos de alteração antes de invocar o ponto de extremidade do Azure Functions. Essa estratégia pode ser útil em cenários de tráfego intenso.

Além disso, o Microsoft Fabric unifica seu patrimônio de dados e facilita a execução de análises e IA sobre os dados, para que você obtenha insights rapidamente. A engenharia de dados, a ciência de dados, o armazenamento de dados e a análise em tempo real do Azure Synapse Analytics no Fabric agora podem fazer melhor uso dos dados do MongoDB enviados para o OneLake. Você pode usar os conectores Dataflow Gen2 e pipeline de dados para o Atlas para carregar dados do Atlas diretamente no OneLake. Esse mecanismo sem código fornece uma maneira poderosa de ingerir dados do Atlas para o OneLake.

Diagrama que mostra como o Microsoft Fabric envia dados para o OneLake.

No Fabric, você pode fazer referência direta a dados enviados por push para o Armazenamento Data Lake usando atalhos do OneLake, sem qualquer extração, transformação, carga (ETL).

Você pode enviar os dados por push para o Power BI para criar relatórios e visualizações para relatórios de BI.

Detalhes do cenário

O MongoDB Atlas, a camada de dados operacionais de muitos aplicativos corporativos, armazena dados de aplicativos internos, serviços voltados para o cliente e APIs de terceiros de vários canais. Você pode usar os pipelines de dados no Azure Synapse Analytics para combinar esses dados com dados relacionais de outros aplicativos tradicionais e com dados não estruturados de fontes como logs, repositórios de objetos e clickstreams.

As empresas usam recursos do MongoDB como Agregações, nós analíticos, Atlas Search, Vetor Search, Atlas Data Lake, Atlas SQL Interface, Data Federation e Charts para habilitar a inteligência orientada a aplicativos. No entanto, os dados transacionais no MongoDB são extraídos, transformados e carregados para pools SQL dedicados do Azure Synapse Analytics ou pools do Spark para análise e inteligência de BI em lote, IA/aprendizado de máquina e data warehouse.

Há dois cenários para a movimentação de dados entre o Atlas e o Azure Synapse Analytics: integração em lote e sincronização em tempo real.

Integração em lote

Você pode usar a integração em lote e microlote para mover dados do Atlas para o Armazenamento Data Lake no Azure Synapse Analytics. Você pode buscar todos os dados históricos de uma só vez ou buscar dados incrementais com base em critérios de filtro.

As instâncias locais do MongoDB e o MongoDB Atlas podem ser integrados como uma fonte ou um recurso de coletor no Azure Synapse Analytics. Para obter informações sobre os conectores, consulte Copiar dados de ou para o MongoDB ou Copiar dados de ou para o MongoDB Atlas.

O conector de origem torna conveniente executar o Azure Synapse Analytics em dados operacionais armazenados no MongoDB local ou no Atlas. Você pode buscar dados do Atlas usando o conector de origem e carregar os dados para o Armazenamento Data Lake nos formatos Parquet, Avro, JSON e texto ou como armazenamento de blob CSV. Esses arquivos podem ser transformados ou unidos a outros arquivos de outras fontes de dados em cenários de vários bancos de dados, multinuvem ou nuvem híbrida. Esse caso de uso é comum em cenários de EDW (Enterprise Data Warehouse) e análise em escala. Você também pode usar o conector do coletor para armazenar os resultados da análise no Atlas. Para obter mais informações sobre integração em lote, consulte Analisar dados operacionais no MongoDB Atlas usando o Azure Synapse Analytics.

Sincronização em tempo real

A arquitetura descrita neste artigo pode ajudá-lo a implementar a sincronização em tempo real para manter seu armazenamento do Azure Synapse Analytics atualizado com os dados operacionais do MongoDB.

Esta solução é composta por duas funções principais:

  • Capturando as alterações no Atlas
  • Acionando a função do Azure para propagar as alterações no Azure Synapse Analytics

Capturar as alterações no Atlas

Você pode capturar as alterações usando um gatilho do Atlas, que pode ser configurado na interface do usuário Adicionar gatilho ou usando a API de administração dos Serviços de Aplicativo do Atlas. Os gatilhos escutam alterações no banco de dados causadas por eventos do banco de dados, como inserções, atualizações e exclusões. Os gatilhos do Atlas também acionam uma função do Atlas quando um evento de alteração é detetado. Você pode usar a interface do usuário Add Trigger para adicionar a função. Você também pode criar uma função do Atlas e associá-la como o ponto de extremidade de invocação de gatilho usando a API de administração do Atlas.

A captura de tela a seguir mostra o formulário que você pode usar para criar e editar um gatilho do Atlas. Na seção Detalhes da Origem do Gatilho , especifique a coleção que o gatilho observa para eventos de alteração e os eventos de banco de dados que ele observa (inserir, atualizar, excluir e/ou substituir).

Captura de tela que mostra o formulário para criar um gatilho do Atlas.

O gatilho pode invocar uma função do Atlas em resposta ao evento para o qual está habilitada. A captura de tela a seguir mostra o código JavaScript simples, adicionado como uma função do Atlas, para invocar em resposta ao gatilho do banco de dados. A função Atlas invoca uma função do Azure, passando-lhe os metadados do evento de alteração juntamente com o documento que foi inserido, atualizado, excluído ou substituído, dependendo do que o gatilho está habilitado.

Captura de tela que mostra o código JavaScript adicionado ao gatilho.

Código da função Atlas

O código da função Atlas aciona a função do Azure associada ao ponto de extremidade da função do Azure passando o todo changeEvent no corpo da solicitação para a função do Azure.

Você precisa substituir o espaço reservado <Azure function URL endpoint> pelo ponto de extremidade de URL da função do Azure real.

exports =  function(changeEvent) {

    // Invoke Azure function that inserts the change stream into Data Lake Storage.
    console.log(typeof fullDocument);
    const response =  context.http.post({
        url: "<Azure function URL endpoint>",
        body: changeEvent,
        encodeBodyAsJSON: true
    });
    return response;
};

Acionar a função do Azure para propagar as alterações no Azure Synapse Analytics

A função Atlas é codificada para invocar uma função do Azure que grava o documento de alteração no Armazenamento Data Lake no Azure Synapse Analytics. A função do Azure usa a biblioteca de cliente do Armazenamento do Azure Data Lake para Python SDK para criar uma instância da DataLakeServiceClient classe que representa sua conta de armazenamento.

A função do Azure usa uma chave de armazenamento para autenticação. Você também pode usar implementações OAuth do Microsoft Entra ID. Os storage_account_key e outros atributos relacionados ao Dake Lake Storage são obtidos das variáveis de ambiente do sistema operacional configuradas. Depois que o corpo da solicitação é decodificado, o (todo o fullDocument documento inserido ou atualizado) é analisado do corpo da solicitação e, em seguida, gravado no Armazenamento do Data Lake pelas funções append_data do cliente Data Lake e flush_data.

Para uma operação de exclusão, fullDocumentBeforeChange é usado em vez de fullDocument. fullDocument não tem nenhum valor em uma operação de exclusão, então o código busca o documento que foi excluído, que é capturado em fullDocumentBeforeChange. Observe que fullDocumentBeforeChange só é preenchido quando a configuração Pré-imagem do documento está definida como ativada, conforme mostrado na captura de tela anterior.

import json
import logging
import os
import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a new request.')
    logging.info(req)
    storage_account_name = os.environ["storage_account_name"]
    storage_account_key = os.environ["storage_account_key"]
    storage_container = os.environ["storage_container"]
    storage_directory = os.environ["storage_directory"]
    storage_file_name = os.environ["storage_file_name"]
    service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
            "https", storage_account_name), credential=storage_account_key)
    json_data = req.get_body()
    logging.info(json_data)
    object_id = "test"
    try:
        json_string = json_data.decode("utf-8")
        json_object = json.loads(json_string)

        if json_object["operationType"] == "delete":
            object_id = json_object["fullDocumentBeforeChange"]["_id"]["$oid"]
            data = {"operationType": json_object["operationType"], "data":json_object["fullDocumentBeforeChange"]}
        else:
            object_id = json_object["fullDocument"]["_id"]["$oid"]
            data = {"operationType": json_object["operationType"], "data":json_object["fullDocument"]}

        logging.info(object_id)
        encoded_data = json.dumps(data)
    except Exception as e:
        logging.info("Exception occurred : "+ str(e))

    file_system_client = service_client.get_file_system_client(file_system=storage_container)
    directory_client = file_system_client.get_directory_client(storage_directory)
    file_client = directory_client.create_file(storage_file_name + "-" + str(object_id) + ".txt")
    file_client.append_data(data=encoded_data, offset=0, length=len(encoded_data))
    file_client.flush_data(len(encoded_data))
    return func.HttpResponse(f"This HTTP triggered function executed successfully.")

Até agora, você viu como o gatilho do Atlas captura qualquer alteração que ocorre e a passa para uma função do Azure por meio de uma função do Atlas, e que a função do Azure grava o documento de alteração como um novo arquivo no Armazenamento Data Lake no espaço de trabalho do Azure Synapse Analytics.

Depois que o arquivo é adicionado ao Armazenamento Data Lake, você pode configurar um gatilho de evento de armazenamento para disparar um pipeline que pode gravar o documento de alteração em um pool SQL dedicado ou em uma tabela de pool do Spark. O pipeline pode usar a atividade Copiar e transformar os dados usando um fluxo de dados. Como alternativa, se o destino final for um pool SQL dedicado, você poderá modificar a função do Azure para gravar diretamente no pool SQL dedicado no Azure Synapse Analytics. Para um pool SQL, obtenha a cadeia de conexão ODBC para a conexão do pool SQL. Consulte Usar Python para consultar um banco de dados para obter um exemplo de código Python que você pode usar para consultar a tabela de pool SQL usando a cadeia de conexão. Você pode modificar esse código para usar uma consulta Inserir para gravar em um pool SQL dedicado. Há definições de configuração e funções que precisam ser atribuídas para permitir que a função grave em um pool SQL dedicado. As informações sobre essas configurações e funções estão fora do escopo deste artigo.

Se você quiser uma solução quase em tempo real e não precisar que os dados sejam sincronizados em tempo real, usar execuções de pipeline agendadas pode ser uma boa opção. Você pode configurar gatilhos agendados para acionar um pipeline com a atividade de cópia ou um fluxo de dados, em uma frequência que esteja na frequência quase em tempo real que sua empresa pode pagar, para usar o conector MongoDB para buscar os dados do MongoDB que foram inseridos, atualizados ou excluídos entre a última execução agendada e a execução atual. O pipeline usa o conector MongoDB como conector de origem para buscar os dados delta do MongoDB Atlas e enviá-los para o Data Lake Storage ou pools SQL dedicados do Azure Synapse Analytics, usando-os como conexões de coletor. Essa solução usa um mecanismo de pull (em oposição à solução principal descrita neste artigo, que é um mecanismo de push) do MongoDB Atlas à medida que ocorrem alterações na coleção do MongoDB Atlas que o gatilho do Atlas está escutando.

Potenciais casos de utilização

O MongoDB e o Azure Synapse Analytics EDW e os serviços analíticos podem servir vários casos de uso:

Retail

  • Desenvolver informações sobre a agregação e a promoção de produtos
  • Implementação do Customer 360 e hiperpersonalização
  • Previsão do esgotamento de estoque e otimização de pedidos da cadeia de suprimentos
  • Implementação de preços de desconto dinâmicos e pesquisa inteligente no e-commerce

Banca e finanças

  • Personalização dos serviços financeiros ao cliente
  • Detetar e bloquear transações fraudulentas

Telecomunicações

  • Otimização de redes de última geração
  • Maximizando o valor das redes de borda

Automóvel

  • Otimizar a parametrização de veículos conectados
  • Deteção de anomalias na comunicação da Internet das coisas em veículos conectados

Manufacturing

  • Fornecimento de manutenção preditiva para máquinas
  • Otimizando o gerenciamento de armazenamento e estoque

Considerações

Essas considerações implementam os pilares do Azure Well-Architected Framework, que é um conjunto de princípios orientadores que você pode usar para melhorar a qualidade de uma carga de trabalho. Para obter mais informações, consulte Microsoft Azure Well-Architected Framework.

Segurança

A segurança oferece garantias contra ataques deliberados e o abuso de seus valiosos dados e sistemas. Para obter mais informações, consulte Visão geral do pilar de segurança.

O Azure Functions é um serviço gerenciado sem servidor, portanto, os recursos do aplicativo e os componentes da plataforma são protegidos por segurança aprimorada. No entanto, recomendamos que você use o protocolo HTTPS e as versões mais recentes do TLS. Também é uma boa prática validar a entrada para garantir que seja um documento de alteração do MongoDB. Consulte Protegendo o Azure Functions para obter considerações de segurança para o Azure Functions.

O MongoDB Atlas é um banco de dados gerenciado como um serviço, portanto, o MongoDB oferece segurança de plataforma aprimorada. O MongoDB fornece vários mecanismos para ajudar a garantir a segurança de 360 graus para os dados armazenados, incluindo acesso ao banco de dados, segurança de rede, criptografia em repouso e em trânsito e soberania de dados. Consulte MongoDB Atlas Security para obter o whitepaper de segurança do MongoDB Atlas e outros artigos que podem ajudá-lo a garantir que os dados no MongoDB estejam seguros durante todo o ciclo de vida dos dados.

Otimização de custos

A otimização de custos consiste em reduzir despesas desnecessárias e melhorar a eficiência operacional. Para obter mais informações, consulte Visão geral do pilar de otimização de custos.

Para estimar o custo dos produtos e configurações do Azure, use a calculadora de preços do Azure. O Azure ajuda você a evitar custos desnecessários, determinando o número correto de recursos a serem usados, analisando os gastos ao longo do tempo e dimensionando para atender às necessidades de negócios sem gastos excessivos. O Azure Functions incorre em custos apenas quando são invocados. No entanto, dependendo do volume de alterações no MongoDB Atlas, você pode avaliar usando um mecanismo de processamento em lote na função Atlas para armazenar alterações em outra coleção temporária e acionar a função do Azure somente se o lote exceder um determinado limite.

Para obter informações sobre clusters Atlas, consulte 5 maneiras de reduzir custos com o MongoDB Atlas e custos de configuração de cluster. A página de preços do MongoDB pode ajudá-lo a entender as opções de preços para clusters do MongoDB Atlas e outras ofertas da plataforma de dados do desenvolvedor do MongoDB Atlas. A Federação de Dados do Atlas pode ser implantada no Azure e dá suporte ao Armazenamento de Blobs do Azure (em visualização). Se você estiver considerando usar o processamento em lote para otimizar custos, considere gravar no Armazenamento de Blobs em vez de uma coleção temporária do MongoDB.

Eficiência de desempenho

Eficiência de desempenho é a capacidade da sua carga de trabalho para dimensionar para satisfazer as exigências que os utilizadores lhe colocam de forma eficiente. Para obter mais informações, consulte Visão geral do pilar de eficiência de desempenho.

Os gatilhos do Atlas e o Azure Functions são testados pelo tempo para desempenho e escalabilidade. Consulte Desempenho e dimensionamento em Funções Duráveis (Azure Functions) para entender as considerações de desempenho e escalabilidade do Azure Functions. Consulte Dimensionamento sob demanda para obter algumas considerações sobre como melhorar o desempenho de suas instâncias do MongoDB Atlas. Consulte Guia de práticas recomendadas para o desempenho do MongoDB para obter as práticas recomendadas para a configuração do MongoDB Atlas.

Conclusão

O MongoDB Atlas integra-se perfeitamente com o Azure Synapse Analytics, permitindo que os clientes do Atlas usem facilmente o Atlas como fonte ou coletor para o Azure Synapse Analytics. Esta solução permite que você use dados operacionais do MongoDB em tempo real do Azure Synapse Analytics para análises complexas e inferência de IA.

Implementar este cenário

Sincronização em tempo real do MongoDB Atlas para o Azure Synapse Analytics

Contribuidores

Este artigo é mantido pela Microsoft. Foi originalmente escrito pelos seguintes contribuidores.

Principais autores:

  • Diana Annie Jenosh - Brasil | Arquiteto de Soluções Sênior - equipe MongoDB Partners
  • Venkatesh Shanbag - Brasil | Arquiteto de Soluções Sênior - equipe MongoDB Partners

Outros contribuidores:

Para ver perfis não públicos do LinkedIn, inicie sessão no LinkedIn.

Próximos passos