Eseguire una query nei dati di Cosmos DB con Spark

Completato

Dopo aver aggiunto un servizio collegato per il database di Azure Cosmos DB abilitato per l'archivio analitico, è possibile usarlo per eseguire query sui dati usando un pool Spark nell'area di lavoro Azure Synapse Analytics.

Caricamento di dati analitici di Azure Cosmos DB in un dataframe

Per l'esplorazione iniziale o l'analisi rapida dei dati da un servizio collegato di Azure Cosmos DB, è spesso più semplice caricare i dati da un contenitore in un dataframe usando un linguaggio supportato da Spark come PySpark (implementazione di Python specifica di Spark) o Scala (un linguaggio basato su Java usato spesso in Spark).

Ad esempio, il codice PySpark seguente può essere usato per caricare un dataframe denominato df dai dati nel contenitore my-container connesso all'uso del servizio collegato my_linked_service e visualizzare le prime 10 righe di dati:

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

Si supponga che il contenitore my-container venga usato per archiviare elementi simili all'esempio seguente:

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

L'output del codice PySpark sarà simile alla tabella seguente:

_rid _ts productID productName ID _etag
mjMaAL...== 1655414791 123 Widget 7248f072-11c3-42b1-a368-... 54004b09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400ca09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400ca09-0000-2300-...
... ... ... ... ... ...

I dati vengono caricati nel contenitore dall'archivio analitico, non dall'archivio operativo, garantendo in questo modo che non siano presenti sovraccarichi di query nell'archivio operativo. I campi nell'archivio dati analitici includono i campi definiti dall'applicazione (in questo caso productID e productName) e i campi metadati creati automaticamente.

Dopo aver caricato il dataframe, è possibile usare i metodi nativi per esplorare i dati. Ad esempio, il codice seguente crea un nuovo dataframe contenente solo le colonne productID e productName ordinate per productName:

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

L'output di questo codice sarà simile alla tabella seguente:

productID productName
125 Thingumy
123 Widget
124 Wotsit
... ...

Scrittura di un dataframe in un contenitore di Cosmos DB

Nella maggior parte degli scenari HTAP è consigliabile usare il servizio collegato per leggere i dati in Spark dall'archivio analitico. Tuttavia, è possibile scrivere il contenuto di un dataframe nel contenitore, come illustrato nell'esempio seguente:

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

Nota

La scrittura di un dataframe in un contenitore aggiorna l'archivio operativo e può avere un impatto sulle prestazioni. Le modifiche vengono quindi sincronizzate con l'archivio analitico.

Uso di Spark SQL per eseguire query sui dati analitici di Azure Cosmos DB

Spark SQL è un'API Spark che offre la sintassi del linguaggio SQL e la semantica del database relazionale in un pool Spark. È possibile usare Spark SQL per definire i metadati per le tabelle su cui è possibile eseguire query usando SQL.

Ad esempio, il codice seguente crea una tabella denominata Products in base al contenitore ipotetico usato negli esempi precedenti:

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

Suggerimento

La parola chiave %%sql all'inizio del codice è un magic che indica al pool Spark di eseguire il codice come SQL anziché nel linguaggio predefinito (che in genere è impostato su PySpark).

Usando questo approccio, è possibile creare un database logico nel pool Spark che è quindi possibile usare per eseguire query sui dati analitici in Azure Cosmos DB per supportare l'analisi dei dati e i carichi di lavoro di creazione di report senza influire sull'archivio operativo nell'account di Azure Cosmos DB.