Analýzy v reálném čase vám můžou pomoct při rychlém rozhodování a provádění automatizovaných akcí na základě aktuálních přehledů. Může vám také pomoct poskytovat vylepšené prostředí pro zákazníky. Toto řešení popisuje, jak udržovat fondy dat Azure Synapse Analytics synchronizované se změnami provozních dat v MongoDB.
Architektura
Následující diagram ukazuje, jak implementovat synchronizaci v reálném čase z Atlasu do Azure Synapse Analytics. Tento jednoduchý tok zajišťuje, že všechny změny, ke kterým dochází v kolekci MongoDB Atlas, se replikují do výchozího úložiště Azure Data Lake Storage v pracovním prostoru Azure Synapse Analytics. Jakmile jsou data ve službě Data Lake Storage, můžete pomocí kanálů Azure Synapse Analytics odesílat data do vyhrazených fondů SQL, fondů Spark nebo jiných řešení v závislosti na požadavcích analýzy.
Stáhněte si soubor PowerPointu této architektury.
Tok dat
Změny v provozním úložišti dat MongoDB Atlas v reálném čase jsou zachyceny a zpřístupněny službě Data Lake Storage v pracovním prostoru Azure Synapse Analytics pro případy použití analýz v reálném čase, živé sestavy a řídicí panely.
Změny dat v provozním/transakčním úložišti dat MongoDB Atlas jsou zachyceny triggery Atlasu.
Když trigger databáze Atlas zaznamená událost, předá typ změny a dokument, který se změnil (plný nebo rozdílový) do funkce Atlas.
Funkce Atlas aktivuje funkci Azure a předá událost změny a dokument JSON.
Azure Functions používá klientskou knihovnu Azure Storage Files Data Lake k zápisu změněného dokumentu do nakonfigurovaného úložiště Data Lake Storage v pracovním prostoru Azure Synapse Analytics.
Jakmile jsou data ve službě Data Lake Storage, můžete je odeslat do vyhrazených fondů SQL, fondů Sparku a dalších řešení. Alternativně můžete převést data z FORMÁTU JSON na formáty Parquet nebo Delta pomocí toků dat Azure Synapse Analytics nebo kopírování kanálů pro spuštění dalších generování sestav BI nebo umělé inteligence nebo strojového učení na aktuálních datech.
Komponenty
- Streamy změn MongoDB Atlas umožňují upozorňovat aplikace na změny kolekce, databáze nebo clusteru nasazení. Streamy změn poskytují aplikacím přístup ke změnám dat v reálném čase a umožňují jim okamžitě reagovat na změny. Tato funkce je důležitá v případech použití, jako je sledování událostí IoT a změny finančních dat, kdy je potřeba okamžitě provést alarmy a reagovat. Triggery Atlas používají ke sledování kolekcí změny datové proudy a automaticky vyvolat přidruženou funkci Atlas v reakci na událost triggeru.
- Triggery Atlas reagují na vložení, aktualizace a odstranění dokumentu v konkrétní kolekci a mohou automaticky vyvolat funkci Atlas v reakci na událost změny.
- Funkce Atlas jsou bezserverové implementace javascriptového kódu na straně serveru, které můžou provádět akce na základě událostí, které vyvolávají trigger Atlas. Kombinování triggerů Atlas s funkcemi Atlas zjednodušuje implementaci architektur řízených událostmi.
- Azure Functions je bezserverová výpočetní platforma založená na událostech, kterou můžete použít k efektivnímu vývoji aplikací pomocí programovacího jazyka podle vašeho výběru. Můžete ho také použít k bezproblémovému propojení s dalšími službami Azure. V tomto scénáři funkce Azure zachytí událost změny a použije ji k zápisu objektu blob obsahujícího změněná data do Data Lake Storage pomocí klientské knihovny Azure Storage Files Data Lake.
- Data Lake Storage je výchozí řešení úložiště ve službě Azure Synapse Analytics. Bezserverové fondy můžete použít k přímému dotazování dat.
- Kanály a toky dat ve službě Azure Synapse Analytics se dají použít k nasdílení objektu blob, který obsahuje změněná data MongoDB do vyhrazených fondů SQL nebo fondů Spark pro další analýzu. Kanály umožňují pracovat se změněnými datovými sadami ve službě Data Lake Storage pomocí triggerů událostí úložiště i plánovaných triggerů k vytváření řešení pro případy použití v reálném čase i téměř v reálném čase. Tato integrace urychluje příjem datových sad změn.
Alternativy
Toto řešení používá triggery Atlas k zabalení kódu pro naslouchání datovým proudům změn Atlas a aktivaci azure Functions v reakci na událost změny. Proto je mnohem jednodušší implementovat než dříve poskytnuté alternativní řešení. Pro toto řešení je potřeba napsat kód pro naslouchání změnám datových proudů ve webové aplikaci Aplikace Azure Service.
Další alternativou je použití konektoru MongoDB Spark ke čtení dat streamu MongoDB a jejich zápisu do tabulek Delta. Kód se spouští nepřetržitě v poznámkovém bloku Sparku, který je součástí kanálu ve službě Azure Synapse Analytics. Další informace o implementaci tohoto řešení najdete v tématu Synchronizace z Atlasu do Azure Synapse Analytics pomocí streamování Sparku.
Použití triggerů Atlas se službou Azure Functions ale poskytuje zcela bezserverové řešení. Vzhledem k tomu, že je bezserverová, poskytuje řešení robustní škálovatelnost a optimalizaci nákladů. Ceny jsou založené na modelu nákladů s průběžným platbou. Pokud chcete před vyvoláním koncového bodu Azure Functions zkombinovat několik událostí změn, můžete ušetřit více peněz. Tato strategie může být užitečná ve scénářích s velkým provozem.
Microsoft Fabric také sjednocuje vaše datová aktiva a usnadňuje spouštění analýz a umělé inteligence nad daty, takže rychle získáte přehledy. Analýzy dat Azure Synapse Analytics, datové vědy, datové sklady a analýzy v reálném čase v Prostředcích infrastruktury teď můžou lépe využívat data MongoDB, která se odesílala do OneLake. Dataflow Gen2 a konektory datového kanálu můžete použít k načtení dat Atlasu přímo do OneLake. Tento mechanismus bez kódu poskytuje účinný způsob, jak ingestovat data z Atlasu do OneLake.
V prostředcích infrastruktury můžete přímo odkazovat na data, která se odsílají do Data Lake Storage, pomocí zkratek OneLake bez extrakce, transformace, načítání (ETL).
Data můžete odeslat do Power BI, abyste mohli vytvářet sestavy a vizualizace pro vytváření sestav BI.
Podrobnosti scénáře
MongoDB Atlas, provozní datová vrstva mnoha podnikových aplikací, ukládá data z interních aplikací, služeb orientovaných na zákazníky a rozhraní API třetích stran z více kanálů. Datové kanály v Azure Synapse Analytics můžete použít ke kombinování těchto dat s relačními daty z jiných tradičních aplikací a s nestrukturovanými daty ze zdrojů, jako jsou protokoly, úložiště objektů a clickstreamy.
Podniky používají funkce MongoDB, jako jsou Agregace, analytické uzly, Atlas Search, Vector Search, Atlas Data Lake, Atlas SQL Interface, Federace dat a grafy, které umožňují inteligentní funkce řízené aplikacemi. Transakční data v MongoDB se ale extrahují, transformují a načítají do vyhrazených fondů SQL služby Azure Synapse Analytics nebo fondů Spark pro dávkové, AI / strojové učení a analýzy a inteligentní funkce bi datového skladu.
Existují dva scénáře pro přesun dat mezi Atlasem a Azure Synapse Analytics: dávková integrace a synchronizace v reálném čase.
Dávková integrace
K přesunu dat z Atlasu do Data Lake Storage ve službě Azure Synapse Analytics můžete použít dávkovou a mikrodávkovou integraci. Můžete načíst celá historická data najednou nebo načíst přírůstková data na základě kritérií filtru.
Místní instance MongoDB a MongoDB Atlas je možné integrovat jako zdroj nebo prostředek jímky ve službě Azure Synapse Analytics. Informace o konektorech najdete v tématu Kopírování dat z mongoDB nebo kopírování dat z nebo do MongoDB Atlas.
Zdrojový konektor usnadňuje spouštění Azure Synapse Analytics na provozních datech uložených v místním MongoDB nebo v Atlasu. Data z Atlasu můžete načíst pomocí zdrojového konektoru a načíst data do Data Lake Storage v Parquet, Avro, JSON a textových formátech nebo jako úložiště objektů blob CSV. Tyto soubory je pak možné transformovat nebo připojit k jiným souborům z jiných zdrojů dat ve scénářích s více databázemi, multicloudem nebo hybridním cloudem. Tento případ použití je běžný ve scénářích podnikového datového skladu (EDW) a analýz ve velkém měřítku. Konektor jímky můžete také použít k uložení výsledků analýzy zpět v Atlasu. Další informace o dávkové integraci najdete v tématu Analýza provozních dat v MongoDB Atlas pomocí Azure Synapse Analytics.
Synchronizace v reálném čase
Architektura popsaná v tomto článku vám může pomoct implementovat synchronizaci v reálném čase, aby vaše úložiště Azure Synapse Analytics bylo aktuální s provozními daty MongoDB.
Toto řešení se skládá ze dvou primárních funkcí:
- Zachycení změn v Atlasu
- Aktivace funkce Azure za účelem rozšíření změn ve službě Azure Synapse Analytics
Zachycení změn v atlasu
Změny můžete zachytit pomocí triggeru Atlas, který můžete nakonfigurovat v uživatelském rozhraní pro přidání triggeru nebo pomocí rozhraní API pro správu služby Atlas App Services. Triggery naslouchají změnám databáze způsobeným událostmi databáze, jako jsou vložení, aktualizace a odstranění. Triggery Atlasu také aktivují funkci Atlas při zjištění události změny. K přidání funkce můžete použít uživatelské rozhraní pro přidání triggeru . Můžete také vytvořit funkci Atlas a přidružit ji jako koncový bod vyvolání triggeru pomocí rozhraní API pro správu Atlas.
Následující snímek obrazovky ukazuje formulář, který můžete použít k vytvoření a úpravě triggeru Atlas. V části Podrobnosti o zdroji triggeru zadáte kolekci, na kterou trigger sleduje události změn, a události databáze, pro které sleduje (vložení, aktualizace, odstranění a/nebo nahrazení).
Trigger může vyvolat funkci Atlas v reakci na událost, pro kterou je povolená. Následující snímek obrazovky ukazuje jednoduchý javascriptový kód přidaný jako funkci Atlas, který se má vyvolat v reakci na trigger databáze. Funkce Atlas vyvolá funkci Azure, která předá metadata události změny společně s dokumentem vloženým, aktualizovaným, odstraněným nebo nahrazeným v závislosti na tom, k čemu je trigger povolený.
Kód funkce Atlas
Kód funkce Atlas aktivuje funkci Azure, která je přidružená ke koncovému bodu funkce Azure předáním celého changeEvent
textu požadavku do funkce Azure.
Zástupný symbol musíte nahradit <Azure function URL endpoint>
skutečným koncovým bodem adresy URL funkce Azure.
exports = function(changeEvent) {
// Invoke Azure function that inserts the change stream into Data Lake Storage.
console.log(typeof fullDocument);
const response = context.http.post({
url: "<Azure function URL endpoint>",
body: changeEvent,
encodeBodyAsJSON: true
});
return response;
};
Aktivace funkce Azure pro šíření změn do Azure Synapse Analytics
Funkce Atlas je kódovaná pro vyvolání funkce Azure, která zapisuje dokument změn do Data Lake Storage ve službě Azure Synapse Analytics. Funkce Azure používá klientskou knihovnu Azure Data Lake Storage pro Python SDK k vytvoření instance DataLakeServiceClient
třídy, která představuje váš účet úložiště.
Funkce Azure používá k ověřování klíč úložiště. Můžete také použít implementace Microsoft Entra ID OAuth. Z storage_account_key
nakonfigurovaných proměnných prostředí operačního systému se načítají atributy související se službou Dake Lake Storage. Po dekódování fullDocument
textu požadavku se (celý vložený nebo aktualizovaný dokument) parsuje z textu požadavku a potom se zapíše do Data Lake Storage pomocí funkcí append_data
klienta Data Lake a flush_data
.
Pro operaci fullDocumentBeforeChange
odstranění se používá místo fullDocument
.
fullDocument
nemá žádnou hodnotu v operaci odstranění, takže kód načte dokument, který byl odstraněn, který je zachycen v fullDocumentBeforeChange
. Všimněte si, že se vyplní jenom v případě, že fullDocumentBeforeChange
je nastavení předběžného nastavení dokumentu nastavené na zapnuté, jak je znázorněno na předchozím snímku obrazovky.
import json
import logging
import os
import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a new request.')
logging.info(req)
storage_account_name = os.environ["storage_account_name"]
storage_account_key = os.environ["storage_account_key"]
storage_container = os.environ["storage_container"]
storage_directory = os.environ["storage_directory"]
storage_file_name = os.environ["storage_file_name"]
service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
"https", storage_account_name), credential=storage_account_key)
json_data = req.get_body()
logging.info(json_data)
object_id = "test"
try:
json_string = json_data.decode("utf-8")
json_object = json.loads(json_string)
if json_object["operationType"] == "delete":
object_id = json_object["fullDocumentBeforeChange"]["_id"]["$oid"]
data = {"operationType": json_object["operationType"], "data":json_object["fullDocumentBeforeChange"]}
else:
object_id = json_object["fullDocument"]["_id"]["$oid"]
data = {"operationType": json_object["operationType"], "data":json_object["fullDocument"]}
logging.info(object_id)
encoded_data = json.dumps(data)
except Exception as e:
logging.info("Exception occurred : "+ str(e))
file_system_client = service_client.get_file_system_client(file_system=storage_container)
directory_client = file_system_client.get_directory_client(storage_directory)
file_client = directory_client.create_file(storage_file_name + "-" + str(object_id) + ".txt")
file_client.append_data(data=encoded_data, offset=0, length=len(encoded_data))
file_client.flush_data(len(encoded_data))
return func.HttpResponse(f"This HTTP triggered function executed successfully.")
Zatím jste viděli, jak trigger Atlas zachycuje všechny změny, ke kterým dojde, a předává ji funkci Azure prostřednictvím funkce Atlas a že funkce Azure zapisuje dokument změn jako nový soubor ve službě Data Lake Storage v pracovním prostoru Azure Synapse Analytics.
Po přidání souboru do Data Lake Storage můžete nastavit trigger události úložiště, který aktivuje kanál, který pak může zapsat dokument změn do vyhrazeného fondu SQL nebo do tabulky fondu Sparku. Kanál může používat aktivita Copy a transformovat data pomocí toku dat. Pokud je konečným cílem vyhrazený fond SQL, můžete funkci Azure upravit tak, aby se zapsal přímo do vyhrazeného fondu SQL ve službě Azure Synapse Analytics. Pro fond SQL získejte připojovací řetězec ODBC pro připojení fondu SQL. Viz Použití Pythonu k dotazování databáze pro příklad kódu Pythonu, který můžete použít k dotazování tabulky fondu SQL pomocí připojovací řetězec. Tento kód můžete upravit tak, aby k zápisu do vyhrazeného fondu SQL použil dotaz Insert. Existují konfigurační nastavení a role, které je potřeba přiřadit, aby funkce mohla zapisovat do vyhrazeného fondu SQL. Informace o těchto nastaveních a rolích jsou mimo rozsah tohoto článku.
Pokud chcete řešení téměř v reálném čase a nepotřebujete, aby se data synchronizovala v reálném čase, může být vhodné použít naplánované spuštění kanálu. Naplánované triggery můžete nastavit tak, aby aktivovaly kanál s aktivita Copy nebo tokem dat s frekvencí téměř v reálném čase, kterou si vaše firma může dovolit, a použít konektor MongoDB k načtení dat z MongoDB, která byla vložena, aktualizována nebo odstraněna mezi posledním naplánovaným spuštěním a aktuálním spuštěním. Kanál používá konektor MongoDB jako zdrojový konektor k načtení rozdílových dat z MongoDB Atlas a jejich odeslání do vyhrazených fondů SQL Data Lake Storage nebo Azure Synapse Analytics, které používají jako připojení jímky. Toto řešení používá mechanismus vyžádání změn (na rozdíl od hlavního řešení popsaného v tomto článku, což je mechanismus nabízení) z MongoDB Atlas, protože změny probíhají v kolekci MongoDB Atlas, kterou trigger Atlas naslouchá.
Potenciální případy použití
MongoDB a EDW a analytické služby Azure Synapse Analytics můžou sloužit mnoha případům použití:
Retail
- Sestavování inteligentních informací do sdružování produktů a propagace produktů
- Implementace zákazníka 360 a hyper-personalizace
- Předpověď vyčerpání zásob a optimalizace objednávek dodavatelského řetězce
- Implementace dynamických cen slev a inteligentního vyhledávání v elektronickém obchodování
Bankovní a finanční sektor
- Přizpůsobení zákaznických finančních služeb
- Zjišťování a blokování podvodných transakcí
Telekomunikace
- Optimalizace sítí nové generace
- Maximalizace hodnoty hraničních sítí
Automobilový průmysl
- Optimalizace parametrizace připojených vozidel
- Detekce anomálií v komunikaci IoT v připojených vozidlech
Výroba
- Zajištění prediktivní údržby strojů
- Optimalizace správy úložiště a inventáře
Důležité informace
Tyto aspekty implementují pilíře dobře architektuře Azure, což je sada hlavních principů, které můžete použít ke zlepšení kvality úlohy. Další informace naleznete v tématu Microsoft Azure Well-Architected Framework.
Zabezpečení
Zabezpečení poskytuje záruky proti záměrným útokům a zneužití cenných dat a systémů. Další informace najdete v tématu Přehled pilíře zabezpečení.
Azure Functions je bezserverová spravovaná služba, takže prostředky aplikací a komponenty platformy jsou chráněné rozšířeným zabezpečením. Doporučujeme ale používat protokol HTTPS a nejnovější verze protokolu TLS. Je také vhodné ověřit vstup, aby se zajistilo, že se jedná o dokument změn MongoDB. Informace o zabezpečení služby Azure Functions najdete v tématu Zabezpečení služby Azure Functions.
MongoDB Atlas je spravovaná databáze jako služba, takže MongoDB poskytuje rozšířené zabezpečení platformy. MongoDB poskytuje několik mechanismů, které pomáhají zajistit 360stupňové zabezpečení uložených dat, včetně přístupu k databázi, zabezpečení sítě, šifrování neaktivních uložených dat a přenášených dat a suverenity dat. Podívejte se na dokument white paper o zabezpečení MongoDB Atlas pro MongoDB Atlas a další články, které vám pomůžou zajistit zabezpečení dat v MongoDB v průběhu životního cyklu dat.
Optimalizace nákladů
Optimalizace nákladů se týká snížení zbytečných výdajů a zlepšení efektivity provozu. Další informace najdete v tématu Přehled pilíře optimalizace nákladů.
Pokud chcete odhadnout náklady na produkty a konfigurace Azure, použijte cenovou kalkulačku Azure. Azure pomáhá vyhnout se zbytečným nákladům tím, že určí správný počet prostředků, které se mají použít, analyzuje výdaje v průběhu času a škáluje se tak, aby vyhovovaly obchodním potřebám bez nadměrného využití. Azure Functions se účtují náklady pouze v případech, kdy jsou vyvolány. V závislosti na objemu změn v MongoDB Atlas však můžete vyhodnotit použití dávkového mechanismu ve funkci Atlas k uložení změn v jiné dočasné kolekci a aktivaci funkce Azure pouze v případě, že dávka překročí určitý limit.
Informace o clusterech Atlas naleznete v tématu 5 Způsoby snížení nákladů pomocí MongoDB Atlas a nákladů na konfiguraci clusteru. Stránka s cenami MongoDB vám pomůže pochopit cenové možnosti pro clustery MongoDB Atlas a další nabídky vývojářské platformy MongoDB Atlas. Federace dat Atlas je možné nasadit v Azure a podporuje službu Azure Blob Storage (ve verzi Preview). Pokud zvažujete použití dávkování k optimalizaci nákladů, zvažte místo dočasné kolekce MongoDB zápis do služby Blob Storage.
Efektivita výkonu
Efektivita výkonu je schopnost úlohy škálovat se tak, aby efektivním způsobem splňovala požadavky, které na ni kladou uživatelé. Další informace najdete v tématu Přehled pilíře efektivity výkonu.
Triggery Atlas a Azure Functions jsou časově testovány na výkon a škálovatelnost. Informace o výkonu a škálovatelnosti azure Functions (Azure Functions) najdete v tématu Výkon a škálování, abyste porozuměli aspektům výkonu a škálovatelnosti pro Azure Functions. Informace o zvýšení výkonu instancí MongoDB Atlas najdete v tématu Škálování na vyžádání . Informace o osvědčených postupech pro konfiguraci MongoDB Atlas najdete v průvodci osvědčenými postupy pro výkon MongoDB .
Závěr
MongoDB Atlas se bezproblémově integruje se službou Azure Synapse Analytics a umožňuje zákazníkům atlas snadno používat jako zdroj nebo jímku pro Azure Synapse Analytics. Toto řešení umožňuje používat provozní data MongoDB v reálném čase z Azure Synapse Analytics pro komplexní analýzy a odvozování umělé inteligence.
Nasazení tohoto scénáře
Synchronizace v reálném čase z MongoDB Atlas do Azure Synapse Analytics
Přispěvatelé
Tento článek spravuje Microsoft. Původně byla napsána následujícími přispěvateli.
Hlavní autoři:
- Diana Annie Jenosh | Vedoucí architekt řešení – tým partnerů MongoDB
- Venkatesh Shanbag| Vedoucí architekt řešení – tým partnerů MongoDB
Další přispěvatelé:
- Sunil Sabat | Hlavní programový manažer – tým ADF
- Wee Hyong Tok | Hlavní ředitel PM – tým ADF
Pokud chcete zobrazit neveřejné profily LinkedIn, přihlaste se na LinkedIn.