A análise em tempo real pode ajudá-lo a tomar decisões rápidas e executar ações automatizadas com base em insights atuais. Ele também pode ajudá-lo a oferecer experiências aprimoradas ao cliente. Esta solução descreve como manter os pools de dados do Azure Synapse Analytics sincronizados com as alterações de dados operacionais no MongoDB.
Arquitetura
O diagrama a seguir mostra como implementar a sincronização em tempo real do Azure Synapse Analytics. Esse fluxo simples garante que todas as alterações que ocorrerem na coleção Atlas do MongoDB sejam replicadas para o repositório padrão do Armazenamento do Azure Data Lake no espaço de trabalho do Azure Synapse Analytics. Depois que os dados estiverem no Armazenamento Data Lake, você poderá usar os pipelines do Azure Synapse Analytics para enviar os dados para pools SQL dedicados, pools do Spark ou outras soluções, dependendo de seus requisitos de análise.
Baixe um arquivo do PowerPoint dessa arquitetura.
Fluxo de dados
As alterações em tempo real no armazenamento de dados operacionais (ODS) do MongoDB Atlas são capturadas e disponibilizadas para o Armazenamento Data Lake em um espaço de trabalho do Azure Synapse Analytics para casos de uso de análise em tempo real, relatórios dinâmicos e painéis.
As alterações de dados no armazenamento de dados operacional/transacional do MongoDB Atlas são capturadas pelos gatilhos do Atlas.
Quando um gatilho de banco de dados do Atlas observa um evento, ele passa o tipo de alteração e o documento alterado (completo ou delta) para uma função do Atlas.
A função Atlas dispara uma função do Azure, passando o evento change e um documento JSON.
O Azure Functions usa a biblioteca de cliente do Data Lake de Arquivos de Armazenamento do Azure para gravar o documento alterado no Armazenamento Data Lake configurado no espaço de trabalho do Azure Synapse Analytics.
Depois que os dados estiverem no Armazenamento Data Lake, eles poderão ser enviados para pools SQL dedicados, pools do Spark e outras soluções. Como alternativa, você pode converter os dados de JSON para os formatos Parquet ou Delta usando fluxos de dados do Azure Synapse Analytics ou pipelines de cópia para executar relatórios de BI adicionais ou IA/aprendizado de máquina nos dados atuais.
Componentes
- Os fluxos de alterações do MongoDB Atlas permitem que você notifique os aplicativos sobre alterações em uma coleção, banco de dados ou cluster de implantação. Os fluxos de alteração dão aos aplicativos acesso a alterações de dados em tempo real e permitem que eles reajam imediatamente às alterações. Essa funcionalidade é crítica em casos de uso como rastreamento de eventos de IoT e alterações de dados financeiros, onde alarmes precisam ser acionados e ações responsivas precisam ser tomadas imediatamente. Os gatilhos do Atlas usam fluxos de alterações para monitorar as coleções em busca de alterações e invocam automaticamente a função Atlas associada em resposta ao evento de disparo.
- Os gatilhos do Atlas respondem a inserções, atualizações e exclusões de documentos em uma coleção específica e podem invocar automaticamente uma função do Atlas em resposta ao evento de alteração.
- As funções do Atlas são implementações de código JavaScript do lado do servidor sem servidor que podem executar ações com base nos eventos que invocam um gatilho do Atlas. A combinação de gatilhos do Atlas com funções do Atlas simplifica a implementação de arquiteturas orientadas a eventos.
- O Azure Functions é uma plataforma de computação sem servidor orientada a eventos que você pode usar para desenvolver aplicativos de forma eficiente com a linguagem de programação de sua escolha. Você também pode usá-lo para se conectar perfeitamente com outros serviços do Azure. Nesse cenário, uma função do Azure captura um evento de alteração e o usa para gravar um blob contendo os dados alterados no Armazenamento do Data Lake usando a biblioteca de cliente do Data Lake dos Arquivos de Armazenamento do Azure.
- O Armazenamento Data Lake é a solução de armazenamento padrão no Azure Synapse Analytics. Você pode usar pools sem servidor para consultar os dados diretamente.
- Os pipelines e fluxos de dados no Azure Synapse Analytics podem ser usados para enviar por push o blob que contém os dados alterados do MongoDB para pools SQL dedicados ou pools do Spark para análise adicional. Os pipelines permitem que você aja em conjuntos de dados alterados no Data Lake Storage usando gatilhos de eventos de armazenamento e gatilhos agendados para criar soluções para casos de uso em tempo real e quase em tempo real. Essa integração acelera o consumo downstream de conjuntos de dados de alteração.
Alternativas
Essa solução usa gatilhos do Atlas para encapsular o código para ouvir fluxos de alteração do Atlas e acionar o Azure Functions em resposta ao evento de alteração. Portanto, é muito mais fácil de implementar do que a solução alternativa fornecida anteriormente. Para essa solução, você precisa escrever código para ouvir fluxos de alteração em um aplicativo web dos Serviços de Aplicativos do Azure.
Outra alternativa é usar o MongoDB Spark Connector para ler dados de transmissão do MongoDB e gravá-los em tabelas Delta. O código é executado continuamente em um Notebook do Spark que faz parte de um pipeline no Azure Synapse Analytics. Para obter mais informações sobre como implementar essa solução, consulte Sincronizar a partir do Atlas com o Azure Synapse Analytics usando streaming do Spark.
No entanto, o uso de gatilhos do Atlas com o Azure Functions fornece uma solução completamente sem servidor. Por não ter servidor, a solução oferece escalabilidade robusta e otimização de custos. O preço é baseado em um modelo de custo pré-pago. Você pode economizar mais dinheiro usando a função Atlas para combinar alguns eventos de alteração antes de invocar o ponto de extremidade do Azure Functions. Essa estratégia pode ser útil em cenários de tráfego pesado.
Além disso, o Microsoft Fabric unifica seu estado de dados e facilita a execução de análises e IA sobre os dados, para que você obtenha insights rapidamente. A engenharia de dados, a ciência de dados, o data warehouse e a análise em tempo real do Azure Synapse Analytics no Fabric agora podem fazer melhor uso dos dados do MongoDB enviados por push para o OneLake. Você pode usar os conectores Dataflow Gen2 e de pipeline de dados do Atlas para carregar dados do Atlas diretamente no OneLake. Este mecanismo sem código fornece uma maneira poderosa de ingerir dados do Atlas para o OneLake.
No Fabric, você pode fazer referência direta aos dados enviados por push para o Armazenamento Data Lake usando atalhos do OneLake, sem qualquer ETL.
Você pode enviar os dados por push para o Power BI para criar relatórios e visualizações para relatórios de BI.
Detalhes do cenário
O MongoDB Atlas, a camada de dados operacionais de muitos aplicativos corporativos, armazena dados de aplicativos internos, serviços voltados para o cliente e APIs de terceiros de vários canais. Você pode usar os pipelines de dados no Azure Synapse Analytics para combinar esses dados com dados relacionais de outros aplicativos tradicionais e com dados não estruturados de fontes como logs, repositórios de objetos e fluxos de cliques.
As empresas usam recursos do MongoDB como Agregações, nós analíticos, Pesquisa Atlas, busca em vetores, Atlas Data Lake, Interface SQL do Atlas, Federação de Dados e Gráficos para habilitar a inteligência orientada a aplicativos. No entanto, os dados transacionais no MongoDB são extraídos, transformados e carregados em pools SQL dedicados do Azure Synapse Analytics ou pools Spark para análise e inteligência de BI em lote, IA/aprendizado de máquina e data warehouse.
Há dois cenários para a movimentação de dados entre o Atlas e o Azure Synapse Analytics: integração em lote e sincronização em tempo real.
Integração do lote
Você pode usar a integração de lote e microlote para mover dados do Atlas para o Data Lake Storage no Azure Synapse Analytics. Você pode buscar todos os dados históricos de uma só vez ou buscar dados incrementais com base em critérios de filtro.
As instâncias locais do MongoDB e o Atlas do MongoDB podem ser integrados como uma fonte ou um recurso de coletor no Azure Synapse Analytics. Para obter informações sobre os conectores, consulte Copiar dados de ou para o MongoDB ou Copiar dados de ou para o MongoDB Atlas.
O conector de origem torna conveniente executar o Azure Synapse Analytics com base nos dados operacionais armazenados no MongoDB local ou no Atlas. Você pode buscar dados do Atlas usando o conector de origem e carregar os dados no Data Lake Storage em Parquet, Avro, JSON e formatos de texto ou como armazenamento de blobs CSV. Esses arquivos podem então ser transformados ou unidos a outros arquivos de outras fontes de dados em cenários de vários bancos de dados, várias núvens ou nuvem híbrida. Esse caso de uso é comum em cenários de EDW (Enterprise Data Warehouse) e análise em escala. Você também pode usar o conector do coletor para armazenar os resultados da análise no Atlas. Para obter mais informações sobre integração em lote, consulte Analisar dados operacionais no MongoDB Atlas usando o Azure Synapse Analytics.
Sincronização em tempo real
A arquitetura descrita neste artigo pode ajudá-lo a implementar a sincronização em tempo real para manter seu armazenamento do Azure Synapse Analytics atualizado com os dados operacionais do MongoDB.
Esta solução é composta por duas funções principais:
- Capturando as alterações no Atlas
- Acionando a função do Azure para propagar as alterações no Azure Synapse Analytics
Capturar as alterações no Atlas
Você pode capturar as alterações usando um gatilho do Atlas, que pode ser configurado na interface do usuário Adicionar gatilho ou usando a API de administração do Atlas App Services. Os gatilhos escutam as alterações no banco de dados causadas por eventos de banco de dados, como inserções, atualizações e exclusões. Os gatilhos do Atlas também acionam uma função do Atlas quando um evento de alteração é detectado. Você pode usar a interface de usuário Add Trigger para adicionar a função. Você também pode criar uma função do Atlas e associá-la como o ponto de extremidade de invocação do gatilho usando a API de Administração do Atlas.
A captura de tela a seguir mostra o formulário que você pode usar para criar e editar um gatilho do Atlas. Na seção Trigger Source Details, você especifica a coleção que o gatilho observa para eventos de alteração e os eventos de banco de dados que ele observa (inserir, atualizar, excluir e/ou substituir).
O gatilho pode invocar uma função Atlas em resposta ao evento para o qual está habilitado. A captura de tela a seguir mostra o código JavaScript simples, adicionado como uma função Atlas, a ser invocado em resposta ao gatilho do banco de dados. A função Atlas invoca uma função do Azure, passando-lhe os metadados do evento de alteração junto com o documento que foi inserido, atualizado, excluído ou substituído, dependendo do gatilho habilitado.
Código de função Atlas
O código da função Atlas aciona a função do Azure associada ao ponto de extremidade da função do Azure passando todo changeEvent
o corpo da solicitação para a função do Azure.
Você precisa substituir o <Azure function URL endpoint>
espaço reservado pelo ponto de extremidade de URL da função real do Azure.
exports = function(changeEvent) {
// Invoke Azure function that inserts the change stream into Data Lake Storage.
console.log(typeof fullDocument);
const response = context.http.post({
url: "<Azure function URL endpoint>",
body: changeEvent,
encodeBodyAsJSON: true
});
return response;
};
Acionar a função do Azure para propagar as alterações no Azure Synapse Analytics
A função Atlas é codificada para invocar uma função do Azure que grava o documento de alteração no Data Lake Storage no Azure Synapse Analytics. A função do Azure usa a biblioteca de cliente do Azure Data Lake Storag para o SDK do Python para criar uma instância da DataLakeServiceClient
classe que representa sua conta de armazenamento.
A função do Azure usa uma chave de armazenamento para autenticação. Você também pode usar implementações OAuth do Microsoft Entra ID. O storage_account_key
e outros atributos relacionados ao Dake Lake Storage são buscados das variáveis de ambiente do sistema operacional configuradas. Depois que o corpo da solicitação é decodificado, o fullDocument
(o documento inteiro inserido ou atualizado) é analisado do corpo da solicitação e, em seguida, gravado no Data Lake Storage pelas funções append_data
de cliente do Data Lake e flush_data
.
Para uma operação de exclusão, fullDocumentBeforeChange
é usado em vez de fullDocument
.
fullDocument
não tem nenhum valor em uma operação de exclusão, portanto, o código busca o documento que foi excluído, que é capturado no fullDocumentBeforeChange
. Observe que fullDocumentBeforeChange
só é preenchida quando a configuração Pré-imagem do documento está definida como ativada, conforme mostrado na captura de tela anterior.
import json
import logging
import os
import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a new request.')
logging.info(req)
storage_account_name = os.environ["storage_account_name"]
storage_account_key = os.environ["storage_account_key"]
storage_container = os.environ["storage_container"]
storage_directory = os.environ["storage_directory"]
storage_file_name = os.environ["storage_file_name"]
service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
"https", storage_account_name), credential=storage_account_key)
json_data = req.get_body()
logging.info(json_data)
object_id = "test"
try:
json_string = json_data.decode("utf-8")
json_object = json.loads(json_string)
if json_object["operationType"] == "delete":
object_id = json_object["fullDocumentBeforeChange"]["_id"]["$oid"]
data = {"operationType": json_object["operationType"], "data":json_object["fullDocumentBeforeChange"]}
else:
object_id = json_object["fullDocument"]["_id"]["$oid"]
data = {"operationType": json_object["operationType"], "data":json_object["fullDocument"]}
logging.info(object_id)
encoded_data = json.dumps(data)
except Exception as e:
logging.info("Exception occurred : "+ str(e))
file_system_client = service_client.get_file_system_client(file_system=storage_container)
directory_client = file_system_client.get_directory_client(storage_directory)
file_client = directory_client.create_file(storage_file_name + "-" + str(object_id) + ".txt")
file_client.append_data(data=encoded_data, offset=0, length=len(encoded_data))
file_client.flush_data(len(encoded_data))
return func.HttpResponse(f"This HTTP triggered function executed successfully.")
Até agora, você viu como o gatilho do Atlas captura qualquer alteração que ocorre e a passa para uma função do Azure por meio de uma função do Atlas e que a função do Azure grava o documento de alteração como um novo arquivo no Data Lake Storage no espaço de trabalho do Azure Synapse Analytics.
Depois que o arquivo for adicionado ao Data Lake Storage, você poderá configurar um gatilho de evento de armazenamento para disparar um pipeline que poderá gravar o documento de alteração em um pool SQL dedicado ou em uma tabela de pool do Spark. O pipeline pode usar a atividade Copiar e transformar os dados usando um fluxo de dados. Como alternativa, se o seu destino final for um pool de SQL dedicado, você poderá modificar a função do Azure para gravar diretamente no pool de SQL dedicado no Azure Synapse Analytics. Para um pool de SQL, obtenha a cadeia de conexão ODBC para a conexão do pool de SQL. Consulte Usar Python para consultar um banco de dados para obter um exemplo de código Python que você pode usar para consultar a tabela de pool de SQL usando a cadeia de conexão. Você pode modificar esse código para usar uma consulta Insert para gravar em um pool de SQL dedicado. Há definições de configuração e funções que precisam ser atribuídas para permitir que a função seja gravada em um pool de SQL dedicado. As informações sobre essas configurações e funções estão fora do escopo deste artigo.
Se você deseja uma solução quase em tempo real e não precisa que os dados sejam sincronizados em tempo real, usar execuções de pipeline agendadas pode ser uma boa opção. Você pode configurar gatilhos agendados para acionar um pipeline com a atividade Copiar ou um fluxo de dados, em uma frequência que esteja na frequência quase em tempo real que sua empresa pode pagar, para usar o conector do MongoDB para buscar os dados do MongoDB que foram inseridos, atualizados ou excluídos entre a última execução agendada e a execução atual. O pipeline usa o conector MongoDB como conector de origem para buscar os dados delta do MongoDB Atlas e enviá-los por push para o Armazenamento Data Lake ou pools de SQL dedicados do Azure Synapse Analytics, usando-os como conexões de coletor. Essa solução usa um mecanismo pull (em oposição à solução principal descrita neste artigo, que é um mecanismo de push) do MongoDB Atlas à medida que ocorrem alterações na coleção do MongoDB Atlas que o gatilho do Atlas está escutando.
Possíveis casos de uso
O MongoDB e o EDW e os serviços analíticos do Azure Synapse Analytics podem servir a vários casos de uso:
Retail
- Criando inteligência no agrupamento de produtos e na promoção de produtos
- Implementação do cliente 360 e hiperpersonalização
- Previsão de esgotamento dos estoques e otimização dos pedidos da cadeia de suprimentos
- Implementação de preços de desconto dinâmicos e pesquisa inteligente no comércio eletrônico
Serviços bancários e financeiros
- Personalizando serviços financeiros do cliente
- Detecção e bloqueio de transações fraudulentas
Telecomunicações
- Otimizando redes de última geração
- Maximizando o valor das redes de borda
Automotivo
- Otimização da parametrização de veículos conectados
- Detectando anomalias na comunicação de IoT em veículos conectados
Produção
- Fornecendo manutenção preditiva para máquinas
- Otimizando o gerenciamento de armazenamento e inventário
Considerações
Essas considerações implementam os pilares do Azure Well-Architected Framework, um conjunto de princípios orientadores que você pode usar para aprimorar a qualidade de uma carga de trabalho. Para obter mais informações, confira Microsoft Azure Well-Architected Framework.
Segurança
A segurança fornece garantias contra ataques deliberados e o abuso de seus dados e sistemas valiosos. Para saber mais, confira Visão geral do pilar de segurança.
O Azure Functions é um serviço gerenciado sem servidor, portanto, os recursos do aplicativo e os componentes da plataforma são protegidos por segurança aprimorada. No entanto, recomendamos que você use o protocolo HTTPS e as versões TLS mais recentes. Também é uma boa prática validar a entrada para garantir que seja um documento de alteração do MongoDB. Consulte Protegendo o Azure Functions para obter considerações de segurança para o Azure Functions.
O MongoDB Atlas é um banco de dados gerenciado como um serviço, portanto, o MongoDB fornece segurança de plataforma aprimorada. O MongoDB fornece vários mecanismos para ajudar a garantir a segurança de 360 graus para dados armazenados, incluindo acesso ao banco de dados, segurança de rede, criptografia em repouso e em trânsito e soberania de dados. Consulte o whitepaper de segurança do MongoDB Atlas Security para obter o whitepaper de segurança do MongoDB Atlas e outros artigos que podem ajudá-lo a garantir que os dados no MongoDB estejam seguros durante todo o ciclo de vida dos dados.
Otimização de custo
A otimização de custos consiste em reduzir despesas desnecessárias e melhorar a eficiência operacional. Para obter mais informações, confira Visão geral do pilar de otimização de custo.
Para estimar o custo de produtos e configurações do Azure, use a calculadora de preços do Azure. O Azure ajuda você a evitar custos desnecessários determinando o número correto de recursos para usar, analisando os gastos ao longo do tempo e dimensionando para atender às necessidades de negócios sem gastos excessivos. O Azure Functions incorre em custos somente quando ele é invocado. No entanto, dependendo do volume de alterações no MongoDB Atlas, você pode avaliar o uso de um mecanismo de lote na função Atlas para armazenar alterações em outra coleção temporária e acionar a função do Azure somente se o lote exceder um determinado limite.
Para obter informações sobre clusters do Atlas, consulte 5 maneiras de reduzir custos com o MongoDB Atlas e os custos de configuração de cluster. A página de preços do MongoDB pode ajudá-lo a entender as opções de preços para clusters do MongoDB Atlas e outras ofertas da plataforma de dados do desenvolvedor MongoDB Atlas. A Federação de Dados do Atlas pode ser implantada no Azure e oferece suporte ao Armazenamento de Blobs do Azure (em visualização). Se você estiver considerando usar o envio em lote para otimizar os custos, considere gravar no Armazenamento de Blobs em vez de uma coleção temporária do MongoDB.
Eficiência de desempenho
A eficiência do desempenho é a capacidade de dimensionar sua carga de trabalho para atender às demandas colocadas por usuários de maneira eficiente. Para obter mais informações, consulte Visão geral do pilar de eficiência de desempenho.
Os gatilhos do Atlas e o Azure Functions são testados pelo tempo para desempenho e escalabilidade. Consulte Desempenho e dimensionamento em Funções duráveis (Azure Functions) para entender as considerações de desempenho e escalabilidade do Azure Functions. Consulte Dimensionar sob demanda para obter algumas considerações para melhorar o desempenho de suas instâncias do MongoDB Atlas. Consulte o Guia de Práticas Recomendadas para Desempenho do MongoDB para obter as práticas recomendadas para a configuração do MongoDB Atlas.
Conclusão
O MongoDB Atlas integra-se perfeitamente com o Azure Synapse Analytics, permitindo que os clientes do Atlas usem facilmente o Atlas como uma origem ou um coletor para o Azure Synapse Analytics. Essa solução permite que você use dados operacionais do MongoDB em tempo real do Azure Synapse Analytics para análises complexas e inferência de IA.
Implantar este cenário
Sincronização em tempo real do MongoDB Atlas para o Azure Synapse Analytics
Colaboradores
Esse artigo é mantido pela Microsoft. Ele foi originalmente escrito pelos colaboradores a seguir.
Principais autores:
- Diana Annie Jenosh | Arquiteta de Soluções Sênior - equipe MongoDB Partners
- Venkatesh Shanbag | Arquiteto de Soluções Sênior - equipe MongoDB Partners
Outros colaboradores:
- Sunil Sabat | Gerenciador de Programas Principal - equipe ADF
- Wee Hyong Tok | Diretor Principal da equipe PM - ADF
Para ver perfis não públicos do LinkedIn, entre no LinkedIn.