Delen via


Interactie met Azure Cosmos DB met Apache Spark 3 in Azure Synapse Link

In dit artikel leert u hoe u kunt communiceren met Azure Cosmos DB met behulp van Synapse Apache Spark 3. Met de volledige ondersteuning voor Scala, Python, SparkSQL en C# is Synapse Apache Spark 3 centraal in analyse-, data engineering-, data science- en gegevensverkenningsscenario's in Azure Synapse Link voor Azure Cosmos DB.

De volgende mogelijkheden worden ondersteund bij interactie met Azure Cosmos DB:

  • Met Synapse Apache Spark 3 kunt u gegevens analyseren in uw Azure Cosmos DB-containers die zijn ingeschakeld met Azure Synapse Link in bijna realtime, zonder dat dit van invloed is op de prestaties van uw transactionele workloads. De volgende twee opties zijn beschikbaar om een query uit te voeren op de analytische opslag van Azure Cosmos DB vanuit Spark:
    • Laden naar Spark DataFrame
    • Spark-tabel maken
  • Met Synapse Apache Spark kunt u ook gegevens opnemen in Azure Cosmos DB. Het is belangrijk te weten dat gegevens altijd worden opgenomen in Azure Cosmos DB-containers via de transactionele opslag. Wanneer Synapse Link is ingeschakeld, worden nieuwe toevoegingen, updates en verwijderingen automatisch gesynchroniseerd met de analytische opslag.
  • Synapse Apache Spark biedt ook ondersteuning voor Spark Structured Streaming met Azure Cosmos DB als een bron en een sink.

In de volgende secties wordt stapsgewijs uitgelegd hoe u de syntaxis van de bovenstaande mogelijkheden kunt volgen. U kunt ook de Learn-module bekijken over het uitvoeren van query's op Azure Cosmos DB met Apache Spark voor Azure Synapse Analytics. Bewegingen in Azure Synapse Analytics-werkruimte zijn ontworpen om een eenvoudige out-of-the-box-ervaring te bieden om aan de slag te gaan. Gebaren worden weergegeven wanneer u met de rechtermuisknop op een Azure Cosmos DB-container klikt op het tabblad Gegevens van de Synapse-werkruimte. Met bewegingen kunt u snel code genereren en deze aanpassen aan uw behoeften. Bewegingen zijn ook ideaal om met één klik gegevens te ontdekken.

Belangrijk

U moet rekening houden met enkele beperkingen in het analytische schema die kunnen leiden tot onverwacht gedrag bij het laden van gegevens. Als voorbeeld zijn slechts de eerste 1000 eigenschappen van het transactionele schema beschikbaar in het analytische schema, eigenschappen met spaties zijn niet beschikbaar, enzovoort. Als u onverwachte resultaten ondervindt, controleert u de beperkingen van het schema voor analytische opslag voor meer informatie.

Analytische opslag van Azure Cosmos DB

Voordat u meer informatie krijgt over de twee mogelijke opties om een query uit te brengen op Azure Cosmos DB-analytische opslag, het laden van Spark DataFrame en het maken van een Spark-tabel, is het een goed idee om de opties in de ervaring te verkennen zodat u de optie kunt kiezen die geschikt is voor uw behoeften.

Het verschil in ervaring is om te bepalen of onderliggende gegevenswijzigingen in de Azure Cosmos DB-container automatisch moeten worden doorgevoerd in de analyse die in Spark wordt uitgevoerd. Wanneer een Spark DataFrame wordt geregistreerd of een Spark-tabel wordt gemaakt op basis van de analytische opslag van een container, worden de metagegevens van de huidige momentopname van de gegevens in het analytische archief naar Spark opgehaald voor efficiënte pushdown van de volgende analyse. Het is belangrijk te weten dat, omdat Spark een lui evaluatiebeleid volgt, tenzij een actie wordt aangeroepen op de Spark-DataFrame of een SparkSQL-query wordt uitgevoerd op basis van de Spark-tabel, de werkelijke gegevens niet worden opgehaald uit de analytische opslag van de onderliggende container.

In het geval van het laden naar Spark DataFrame, worden de opgehaalde metagegevens in de cache opgeslagen tijdens de levensduur van de Spark-sessie. Daarom worden de volgende acties die worden uitgevoerd op de DataFrame geëvalueerd op basis van de momentopname van de analytische opslag op het moment dat DataFrame wordt gemaakt.

Daarentegen, in het geval van het maken van een Spark-tabel, worden de metagegevens van de status van de analytische opslag niet in de cache opgeslagen in Spark en worden opnieuw geladen op elke SparkSQL-query-uitvoering in de Spark-tabel.

Daarom kunt u kiezen tussen het laden van Spark DataFrame en het maken van een Spark-tabel op basis van de vraag of u wilt dat uw Spark-analyse wordt geëvalueerd voor respectievelijk een vaste momentopname van de analytische opslag of met de meest recente momentopname van de analytische opslag.

Notitie

Als u query's wilt uitvoeren op Azure Cosmos DB voor MongoDB-accounts, vindt u meer informatie over de schemaweergave van volledige betrouwbaarheid in de analytische opslag en de uitgebreide eigenschapsnamen die moeten worden gebruikt.

Notitie

Houd er rekening mee dat alle options onderstaande opdrachten hoofdlettergevoelig zijn.

Laden naar Spark DataFrame

In dit voorbeeld maakt u een Spark-DataFrame dat verwijst naar de Azure Cosmos DB-analytische opslag. U kunt vervolgens extra analyses uitvoeren door Spark-acties aan te roepen voor het DataFrame. Deze bewerking heeft geen invloed op de transactionele opslag.

De syntaxis van Python is als volgt:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

De equivalente syntaxis in Scala is de volgende:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Spark-tabel maken

In dit voorbeeld maakt u een Spark-DataFrame dat verwijst naar de Azure Cosmos DB-analytische opslag. U kunt vervolgens extra analyses uitvoeren door Spark-acties aan te roepen voor het DataFrame. Deze bewerking heeft geen invloed op het transactionele archief en maakt geen gegevensverplaatsing. Als u besluit deze Spark-tabel te verwijderen, worden de onderliggende Azure Cosmos DB-container en de bijbehorende analytische opslag niet beïnvloed.

Dit scenario is handig voor hergebruik van Spark-tabellen via hulpprogramma's van derden en om de gegevens voor de uitvoering toegankelijk te maken.

De syntaxis voor het maken van een Spark-tabel is als volgt:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Notitie

Als u scenario's hebt waarin het schema van de onderliggende Azure Cosmos DB-container na verloop van tijd verandert; en als u wilt dat het bijgewerkte schema automatisch wordt weergegeven in de query's voor de Spark-tabel, kunt u dit doen door de optie true in te spark.cosmos.autoSchemaMerge stellen in de Spark-tabelopties.

Spark DataFrame naar Azure Cosmos DB-container schrijven

In dit voorbeeld schrijft u een Spark DataFrame naar een Azure Cosmos DB-container. Deze bewerking heeft invloed op de prestaties van transactionele workloads en verbruikt aanvraageenheden die zijn ingericht op de Azure Cosmos DB-container of de gedeelde database.

De syntaxis van Python is als volgt:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .mode('append')\
    .save()

De equivalente syntaxis in Scala is de volgende:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    mode(SaveMode.Append).
    save()

Streaming-dataframe laden vanuit een container

Met deze beweging gebruikt u de mogelijkheid van Spark-streaming om gegevens uit een container in een dataframe te laden. De gegevens worden opgeslagen in het primaire Data Lake-account (en bestandssysteem) dat u hebt gekoppeld aan de werkruimte.

Notitie

Als u op zoek bent naar externe bibliotheken in Synapse Apache Spark, vindt u hier meer informatie. Als u bijvoorbeeld een Spark DataFrame wilt opnemen in een container van Azure Cosmos DB voor MongoDB, kunt u hier gebruikmaken van de MongoDB-connector voor Spark.

Streaming-DataFrame laden vanuit een Azure Cosmos DB-container

In dit voorbeeld gebruikt u de structured streaming-mogelijkheid van Spark voor het laden van gegevens uit een Azure Cosmos DB-container naar een Spark-streaming DataFrame met behulp van de wijzigingsfunctie voor feeds in Azure Cosmos DB. De controlepuntgegevens die worden gebruikt door Spark, worden opgeslagen in het primaire datalake-account (en bestandssysteem) dat u hebt verbonden met de werkruimte.

De syntaxis van Python is als volgt:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp.changeFeed")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.startFrom", "Beginning")\
    .option("spark.cosmos.changeFeed.mode", "Incremental")\
    .load()

De equivalente syntaxis in Scala is de volgende:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp.changeFeed").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.startFrom", "Beginning").
    option("spark.cosmos.changeFeed.mode", "Incremental").
    load()

Streaming-DataFrame schrijven naar een Azure Cosmos DB-container

In dit voorbeeld schrijft u een streaming-DataFrame naar een Azure Cosmos DB-container. Deze bewerking heeft invloed op de prestaties van transactionele workloads en verbruikt aanvraageenheden die zijn ingericht op de Azure Cosmos DB-container of gedeelde database. Als de map /localWriteCheckpointFolder niet is gemaakt (in het onderstaande voorbeeld), gebeurt dit automatisch.

De syntaxis van Python is als volgt:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

streamQuery = dfStream\
    .writeStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("checkpointLocation", "/tmp/myRunId/")\
    .outputMode("append")\
    .start()

streamQuery.awaitTermination()

De equivalente syntaxis in Scala is de volgende:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val query = dfStream.
            writeStream.
            format("cosmos.oltp").
            outputMode("append").
            option("spark.synapse.linkedService", "<enter linked service name>").
            option("spark.cosmos.container", "<enter container name>").
            option("checkpointLocation", "/tmp/myRunId/").
            start()

query.awaitTermination()

Volgende stappen