Query's uitvoeren op Cosmos DB-gegevens met Spark

Voltooid

Nadat u een gekoppelde service hebt toegevoegd voor uw Azure Cosmos DB-database met analytische opslag, kunt u deze gebruiken om query's uit te voeren op de gegevens met behulp van een Spark-pool in uw Azure Synapse Analytics-werkruimte.

Analytische gegevens van Azure Cosmos DB laden in een dataframe

Voor de eerste verkenning of snelle analyse van gegevens uit een gekoppelde Azure Cosmos DB-service, is het vaak het eenvoudigst om gegevens uit een container in een dataframe te laden met behulp van een door Spark ondersteunde taal, zoals PySpark (een Spark-specifieke implementatie van Python) of Scala (een op Java gebaseerde taal die vaak wordt gebruikt in Spark).

De volgende PySpark-code kan bijvoorbeeld worden gebruikt om een dataframe met de naam df te laden uit de gegevens in de my-container die is verbonden met behulp van de gekoppelde my_linked_service-service en de eerste 10 rijen met gegevens weer te geven:

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

Stel dat de container mijn container wordt gebruikt om items op te slaan die vergelijkbaar zijn met het volgende voorbeeld:

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

De uitvoer van de PySpark-code zou vergelijkbaar zijn met de volgende tabel:

_rid _ts productID productName id _etag
mjMaAL...== 1655414791 123 Widget 7248f072-11c3-42b1-a368-... 54004b09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400ca09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400ca09-0000-2300-...
... ... ... ... ... ...

De gegevens worden geladen uit de analytische opslag in de container, niet uit het operationele archief; ervoor zorgen dat er geen query-overhead op het operationele archief is. De velden in het analytische gegevensarchief bevatten de door de toepassing gedefinieerde velden (in dit geval productID en productName) en automatisch gemaakte metagegevensvelden.

Nadat u het dataframe hebt geladen, kunt u de systeemeigen methoden gebruiken om de gegevens te verkennen. Met de volgende code wordt bijvoorbeeld een nieuw dataframe gemaakt dat alleen de kolommen productID en productName bevat, gesorteerd op productName:

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

De uitvoer van deze code ziet er ongeveer als volgt uit:

productID productName
125 Thingumy
123 Widget
124 Wotsit
... ...

Een dataframe schrijven naar een Cosmos DB-container

In de meeste HTAP-scenario's moet u de gekoppelde service gebruiken om gegevens in Spark te lezen vanuit de analytische opslag. U kunt echter de inhoud van een dataframe naar de container schrijven, zoals wordt weergegeven in het volgende voorbeeld:

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

Notitie

Het schrijven van een dataframe naar een container werkt het operationele archief bij en kan invloed hebben op de prestaties. De wijzigingen worden vervolgens gesynchroniseerd met de analytische opslag.

Spark SQL gebruiken om query's uit te voeren op analytische gegevens van Azure Cosmos DB

Spark SQL is een Spark-API die sql-taalsyntaxis en relationele databasesemantiek in een Spark-pool biedt. U kunt Spark SQL gebruiken om metagegevens te definiƫren voor tabellen waarop query's kunnen worden uitgevoerd met behulp van SQL.

Met de volgende code wordt bijvoorbeeld een tabel gemaakt met de naam Producten op basis van de hypothetische container die in de vorige voorbeelden wordt gebruikt:

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

Tip

Het %%sql trefwoord aan het begin van de code is een magie die de Spark-pool instrueert om de code uit te voeren als SQL in plaats van de standaardtaal (die meestal is ingesteld op PySpark).

Met deze methode kunt u een logische database maken in uw Spark-pool die u vervolgens kunt gebruiken om query's uit te voeren op de analytische gegevens in Azure Cosmos DB ter ondersteuning van gegevensanalyse en rapportageworkloads zonder dat dit van invloed is op het operationele archief in uw Azure Cosmos DB-account.