Interroger des données Cosmos DB avec Spark
Une fois que vous avez ajouté un service lié pour votre magasin analytique Azure Cosmos DB, vous pouvez l’utiliser pour interroger les données à l’aide d’un pool Spark dans votre espace de travail Azure Synapse Analytics.
Chargement de données analytiques Azure Cosmos DB dans un dataframe
Pour l’exploration initiale ou l’analyse rapide des données à partir d’un service lié Azure Cosmos DB, il est souvent plus facile de charger des données à partir d’un conteneur dans un dataframe à l’aide d’un langage pris en charge par Spark, comme PySpark (implémentation spécifique à Python) ou Scala (langage Java souvent utilisé sur Spark).
Par exemple, le code PySpark suivant peut être utilisé pour charger un dataframe nommé df à partir des données du conteneur my-container connecté à l’aide du service lié my_linked_service et afficher les 10 premières lignes de données :
df = spark.read
.format("cosmos.olap")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.load()
display(df.limit(10))
Supposons que le conteneur my-container est utilisé pour stocker des éléments similaires à l’exemple suivant :
{
"productID": 123,
"productName": "Widget",
"id": "7248f072-11c3-42b1-a368-...",
"_rid": "mjMaAL...==",
"_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
"_etag": "\"54004b09-0000-2300-...\"",
"_attachments": "attachments/",
"_ts": 1655414791
}
La sortie du code PySpark serait similaire au tableau suivant :
_rid | _ts | productID | ProductName | id | _etag |
---|---|---|---|---|---|
mjMaAL...== | 1655414791 | 123 | Widget | 7248f072-11c3-42b1-a368-... | 54004b09-0000-2300-... |
mjMaAL...== | 1655414829 | 124 | Wotsit | dc33131c-65c7-421a-a0f7-... | 5400ca09-0000-2300-... |
mjMaAL...== | 1655414835 | 125 | Thingumy | ce22351d-78c7-428a-a1h5-... | 5400ca09-0000-2300-... |
... | ... | ... | ... | ... | ... |
Les données sont chargées à partir du magasin analytique dans le conteneur, et non à partir du magasin opérationnel, garantissant qu’il n’y a pas de surcharge d’interrogation sur le magasin opérationnel. Les champs du magasin de données analytique incluent les champs définis par l’application (dans ce cas productID et productName) et les champs de métadonnées créés automatiquement.
Après avoir chargé le dataframe, vous pouvez utiliser ses méthodes natives pour explorer les données. Par exemple, le code suivant crée un dataframe contenant uniquement les colonnes productID et productName, classées par productName :
products_df = df.select("productID", "productName").orderBy("productName")
display(products_df.limit(10))
La sortie de ce code ressemble à ce tableau :
productID | ProductName |
---|---|
125 | Thingumy |
123 | Widget |
124 | Wotsit |
... | ... |
Écriture d’un dataframe dans un conteneur Cosmos DB
Dans la plupart des scénarios HTAP, vous devez utiliser le service lié pour lire des données dans Spark à partir du magasin analytique. Toutefois, vous pouvez écrire le contenu d’un dataframe dans le conteneur, comme illustré dans l’exemple suivant :
mydf.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.mode('append')\
.save()
Notes
L’écriture d’un dataframe dans un conteneur met à jour le magasin opérationnel et peut avoir un impact sur ses performances. Les modifications sont ensuite synchronisées avec le magasin analytique.
Utilisation de Spark SQL pour interroger des données analytiques Azure Cosmos DB
Spark SQL est une API Spark qui fournit une syntaxe de langage et une sémantique de base de données relationnelle SQL dans un pool Spark. Vous pouvez utiliser Spark SQL pour définir des métadonnées pour les tables qui peuvent être interrogées à l’aide de SQL.
Par exemple, le code suivant crée une table nommée Products sur la base du conteneur hypothétique utilisé dans les exemples précédents :
%%sql
-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;
USE mydb;
-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
spark.synapse.linkedService 'my_linked_service',
spark.cosmos.container 'my-container'
);
-- Query the table
SELECT productID, productName
FROM products;
Conseil
Le mot clé %%sql
au début du code est une valeur magique qui donne l’instruction au pool Spark d’exécuter le code en SQL plutôt que dans le langage par défaut (qui est généralement défini comme PySpark).
À l’aide de cette approche, vous pouvez créer une base de données logique dans votre pool Spark que vous pouvez ensuite utiliser pour interroger les données analytiques dans Azure Cosmos DB pour prendre en charge les charges de travail d’analyse et de création de rapports de données sans avoir d’impact sur le magasin opérationnel dans votre compte Azure Cosmos DB.