Interroger des données Cosmos DB avec Spark

Effectué

Une fois que vous avez ajouté un service lié pour votre magasin analytique Azure Cosmos DB, vous pouvez l’utiliser pour interroger les données à l’aide d’un pool Spark dans votre espace de travail Azure Synapse Analytics.

Chargement de données analytiques Azure Cosmos DB dans un dataframe

Pour l’exploration initiale ou l’analyse rapide des données à partir d’un service lié Azure Cosmos DB, il est souvent plus facile de charger des données à partir d’un conteneur dans un dataframe à l’aide d’un langage pris en charge par Spark, comme PySpark (implémentation spécifique à Python) ou Scala (langage Java souvent utilisé sur Spark).

Par exemple, le code PySpark suivant peut être utilisé pour charger un dataframe nommé df à partir des données du conteneur my-container connecté à l’aide du service lié my_linked_service et afficher les 10 premières lignes de données :

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

Supposons que le conteneur my-container est utilisé pour stocker des éléments similaires à l’exemple suivant :

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

La sortie du code PySpark serait similaire au tableau suivant :

_rid _ts productID ProductName id _etag
mjMaAL...== 1655414791 123 Widget 7248f072-11c3-42b1-a368-... 54004b09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400ca09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400ca09-0000-2300-...
... ... ... ... ... ...

Les données sont chargées à partir du magasin analytique dans le conteneur, et non à partir du magasin opérationnel, garantissant qu’il n’y a pas de surcharge d’interrogation sur le magasin opérationnel. Les champs du magasin de données analytique incluent les champs définis par l’application (dans ce cas productID et productName) et les champs de métadonnées créés automatiquement.

Après avoir chargé le dataframe, vous pouvez utiliser ses méthodes natives pour explorer les données. Par exemple, le code suivant crée un dataframe contenant uniquement les colonnes productID et productName, classées par productName :

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

La sortie de ce code ressemble à ce tableau :

productID ProductName
125 Thingumy
123 Widget
124 Wotsit
... ...

Écriture d’un dataframe dans un conteneur Cosmos DB

Dans la plupart des scénarios HTAP, vous devez utiliser le service lié pour lire des données dans Spark à partir du magasin analytique. Toutefois, vous pouvez écrire le contenu d’un dataframe dans le conteneur, comme illustré dans l’exemple suivant :

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

Notes

L’écriture d’un dataframe dans un conteneur met à jour le magasin opérationnel et peut avoir un impact sur ses performances. Les modifications sont ensuite synchronisées avec le magasin analytique.

Utilisation de Spark SQL pour interroger des données analytiques Azure Cosmos DB

Spark SQL est une API Spark qui fournit une syntaxe de langage et une sémantique de base de données relationnelle SQL dans un pool Spark. Vous pouvez utiliser Spark SQL pour définir des métadonnées pour les tables qui peuvent être interrogées à l’aide de SQL.

Par exemple, le code suivant crée une table nommée Products sur la base du conteneur hypothétique utilisé dans les exemples précédents :

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

Conseil

Le mot clé %%sql au début du code est une valeur magique qui donne l’instruction au pool Spark d’exécuter le code en SQL plutôt que dans le langage par défaut (qui est généralement défini comme PySpark).

À l’aide de cette approche, vous pouvez créer une base de données logique dans votre pool Spark que vous pouvez ensuite utiliser pour interroger les données analytiques dans Azure Cosmos DB pour prendre en charge les charges de travail d’analyse et de création de rapports de données sans avoir d’impact sur le magasin opérationnel dans votre compte Azure Cosmos DB.