Interakce se službou Azure Cosmos DB pomocí Apache Sparku 3 ve službě Azure Synapse Link
V tomto článku se dozvíte, jak pracovat se službou Azure Cosmos DB pomocí Synapse Apache Spark 3. Díky plné podpoře jazyka Scala, Pythonu, SparkSQL a C# je Synapse Apache Spark 3 ústředním cílem analýzy, přípravy dat, datových věd a scénářů zkoumání dat ve službě Azure Synapse Link pro Azure Cosmos DB.
Při interakci se službou Azure Cosmos DB se podporují následující možnosti:
- Synapse Apache Spark 3 umožňuje analyzovat data v kontejnerech Azure Cosmos DB, které jsou povoleny s Azure Synapse Linkem téměř v reálném čase, aniž by to mělo vliv na výkon transakčních úloh. Pro dotazování analytického úložiště Azure Cosmos DB ze Sparku jsou k dispozici následující dvě možnosti:
- Načtení do datového rámce Spark
- Vytvoření tabulky Spark
- Synapse Apache Spark také umožňuje ingestovat data do služby Azure Cosmos DB. Je důležité si uvědomit, že data se vždy ingestují do kontejnerů Azure Cosmos DB prostřednictvím transakčního úložiště. Když je synapse Link povolený, všechny nové vložení, aktualizace a odstranění se pak automaticky synchronizují do analytického úložiště.
- Synapse Apache Spark také podporuje strukturované streamování Sparku se službou Azure Cosmos DB jako zdrojem a jímkou.
Následující části vás provedou syntaxí výše uvedených funkcí. Můžete si také prohlédnout modul Learn o dotazování služby Azure Cosmos DB pomocí Apache Sparku pro Azure Synapse Analytics. Gesta v pracovním prostoru Azure Synapse Analytics jsou navržená tak, aby poskytovala snadné integrované prostředí, které vám umožní začít. Gesta se zobrazí, když kliknete pravým tlačítkem na kontejner Azure Cosmos DB na kartě Data v pracovním prostoru Synapse. Pomocí gest můžete rychle vygenerovat kód a přizpůsobit ho vašim potřebám. Gesta jsou také ideální ke zjišťování dat jedním kliknutím.
Důležité
Měli byste vědět o některých omezeních analytického schématu, která by mohla vést k neočekávanému chování při operacích načítání dat. Například v analytickém schématu jsou k dispozici pouze prvních 1000 vlastností z transakčního schématu, vlastnosti s mezerami nejsou k dispozici atd. Pokud dochází k neočekávaným výsledkům, podívejte se na omezení schématu analytického úložiště, kde najdete další podrobnosti.
Dotazování analytického úložiště Azure Cosmos DB
Než se seznámíte se dvěma možnými možnostmi dotazování analytického úložiště Azure Cosmos DB, načtení do datového rámce Sparku a vytvoření tabulky Spark, je vhodné prozkoumat rozdíly v prostředí, abyste mohli zvolit možnost, která bude vyhovovat vašim potřebám.
Rozdíl v zkušenostech spočívá v tom, jestli by se základní změny dat v kontejneru Azure Cosmos DB měly automaticky promítat do analýzy provedené ve Sparku. Při registraci datového rámce Sparku nebo vytvoření tabulky Sparku v analytickém úložišti kontejneru se metadata kolem aktuálního snímku dat v analytickém úložišti načítají do Sparku za účelem efektivního nasdílení následných analýz. Je důležité si uvědomit, že vzhledem k tomu, že Spark se řídí opožděnými zásadami vyhodnocení, pokud není vyvolána akce v datovém rámci Sparku nebo v dotazu SparkSQL, se skutečná data nenačítají z analytického úložiště podkladového kontejneru.
Při načítání do datového rámce Sparku jsou načtená metadata uložená v mezipaměti po celou dobu relace Sparku. To znamená, že k vyhodnocení následných akcí prováděných s datovým rámcem se použije snímek analytického úložiště v okamžiku vytvoření datového rámce.
V případě vytvoření tabulky Sparku se metadata o stavu analytického úložiště neukládají do mezipaměti Sparku, ale znovu se načtou při každém spuštění dotazu SparkSQL do tabulky Sparku.
Můžete si tedy vybrat, jestli načtete datový rámec Sparku nebo vytvoříte tabulku Sparku podle toho, jestli chcete k vyhodnocení analýzy Sparku použít pevně daný snímek analytického úložiště nebo jeho nejnovější snímek.
Poznámka:
Pokud chcete dotazovat účty Azure Cosmos DB pro MongoDB, přečtěte si další informace o úplné reprezentaci schématu věrnosti v analytickém úložišti a o rozšířených názvech vlastností, které se mají použít.
Poznámka:
Upozorňujeme, že options
v následujících příkazech se rozlišují malá a velká písmena.
Načtení do datového rámce Spark
V tomto příkladu vytvoříte datový rámec Sparku, který odkazuje na analytické úložiště Azure Cosmos DB. Pak můžete provést další analýzu vyvoláním akcí Sparku proti datovému rámci. Tato operace nemá vliv na transakční úložiště.
Syntaxe v Pythonu by byla následující:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
df = spark.read.format("cosmos.olap")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.load()
Ekvivalentní syntaxe v jazyce Scala by byla následující:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
load()
Vytvoření tabulky Spark
V tomto příkladu vytvoříte tabulku Sparku, která odkazuje na analytické úložiště Azure Cosmos DB. Pak můžete provést další analýzu vyvoláním dotazů SparkSQL na tabulku. Tato operace nemá vliv na transakční úložiště ani nedochází k žádnému přesunu dat. Pokud se rozhodnete odstranit tuto tabulku Sparku, nebude ovlivněn základní kontejner Azure Cosmos DB a odpovídající analytické úložiště.
Tento scénář je vhodný pro opakované použití tabulek Sparku prostřednictvím nástrojů třetích stran a zajištění přístupnosti podkladových dat za běhu.
Syntaxe pro vytvoření tabulky Sparku je následující:
%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options
create table call_center using cosmos.olap options (
spark.synapse.linkedService '<enter linked service name>',
spark.cosmos.container '<enter container name>'
)
Poznámka:
Pokud máte scénáře, ve kterých se schéma základního kontejneru Azure Cosmos DB mění v průběhu času; a pokud chcete, aby se aktualizované schéma automaticky odráželo v dotazech na tabulku Spark, můžete toho dosáhnout nastavením spark.cosmos.autoSchemaMerge
možnosti true
v možnostech tabulky Spark.
Zápis datového rámce Sparku do kontejneru Azure Cosmos DB
V tomto příkladu napíšete datový rámec Sparku do kontejneru Azure Cosmos DB. Tato operace ovlivní výkon transakčních úloh a spotřebuje jednotky žádostí zřízené v kontejneru Azure Cosmos DB nebo sdílené databázi.
Syntaxe v Pythonu by byla následující:
# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
YOURDATAFRAME.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.mode('append')\
.save()
Ekvivalentní syntaxe v jazyce Scala by byla následující:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
import org.apache.spark.sql.SaveMode
df.write.format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
mode(SaveMode.Append).
save()
Načtení streamovaného datového rámce z kontejneru
V tomto gestu použijete funkci streamování Sparku k načtení dat z kontejneru do datového rámce. Data budou uložená v primárním účtu Data Lake (a systému souborů), který jste připojili k pracovnímu prostoru.
Poznámka:
Pokud chcete odkazovat na externí knihovny ve Službě Synapse Apache Spark, přečtěte si další informace tady. Pokud například chcete ingestovat datový rámec Sparku do kontejneru služby Azure Cosmos DB pro MongoDB, můžete využít konektor MongoDB pro Spark tady.
Načtení streamovaného datového rámce z kontejneru Azure Cosmos DB
V tomto příkladu použijete funkci strukturovaného streamování Sparku k načtení dat z kontejneru Azure Cosmos DB do streamovaného datového rámce Sparku pomocí funkcí kanálu změn ve službě Azure Cosmos DB. Data kontrolního bodu používaná Sparkem budou uložená v primárním účtu Data Lake (a systému souborů), který jste připojili k pracovnímu prostoru.
Syntaxe v Pythonu by byla následující:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
dfStream = spark.readStream\
.format("cosmos.oltp.changeFeed")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.changeFeed.startFrom", "Beginning")\
.option("spark.cosmos.changeFeed.mode", "Incremental")\
.load()
Ekvivalentní syntaxe v jazyce Scala by byla následující:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val dfStream = spark.readStream.
format("cosmos.oltp.changeFeed").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.changeFeed.startFrom", "Beginning").
option("spark.cosmos.changeFeed.mode", "Incremental").
load()
Zápis streamovaného datového rámce do kontejneru Azure Cosmos DB
V tomto příkladu napíšete datový rámec streamování do kontejneru Azure Cosmos DB. Tato operace ovlivní výkon transakčních úloh a spotřebuje jednotky žádostí zřízené v kontejneru nebo sdílené databázi Azure Cosmos DB. Pokud složka /localWriteCheckpointFolder není vytvořená (v následujícím příkladu), automaticky se vytvoří.
Syntaxe v Pythonu by byla následující:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
streamQuery = dfStream\
.writeStream\
.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("checkpointLocation", "/tmp/myRunId/")\
.outputMode("append")\
.start()
streamQuery.awaitTermination()
Ekvivalentní syntaxe v jazyce Scala by byla následující:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val query = dfStream.
writeStream.
format("cosmos.oltp").
outputMode("append").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("checkpointLocation", "/tmp/myRunId/").
start()
query.awaitTermination()