Wykonywanie zapytań dotyczących danych usługi Cosmos DB za pomocą platformy Spark
Po dodaniu połączonej usługi dla bazy danych usługi Azure Cosmos DB z włączoną obsługą magazynu analitycznego możesz użyć jej do wykonywania zapytań dotyczących danych przy użyciu puli Spark w obszarze roboczym usługi Azure Synapse Analytics.
Ładowanie danych analitycznych usługi Azure Cosmos DB do ramki danych
W przypadku początkowej eksploracji lub szybkiej analizy danych z połączonej usługi Azure Cosmos DB często najłatwiej jest załadować dane z kontenera do ramki danych przy użyciu języka obsługiwanego przez platformę Spark, takiego jak PySpark (implementacja języka Python specyficzna dla platformy Spark) lub Scala (język oparty na języku Java często używany na platformie Spark).
Na przykład następujący kod PySpark może służyć do załadowania ramki danych o nazwie df z danych w kontenerze my-container połączonym przy użyciu połączonej usługi my_linked_service i wyświetlenia pierwszych 10 wierszy danych:
df = spark.read
.format("cosmos.olap")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.load()
display(df.limit(10))
Załóżmy, że kontener my-container jest używany do przechowywania elementów podobnych do następującego przykładu:
{
"productID": 123,
"productName": "Widget",
"id": "7248f072-11c3-42b1-a368-...",
"_rid": "mjMaAL...==",
"_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
"_etag": "\"54004b09-0000-2300-...\"",
"_attachments": "attachments/",
"_ts": 1655414791
}
Dane wyjściowe z kodu PySpark będą podobne do poniższej tabeli:
_rid | _ts | productID | productName | identyfikator | _etag |
---|---|---|---|---|---|
mjMaAL...== | 1655414791 | 123 | Widget | 7248f072-11c3-42b1-a368-... | 54004b09-0000-2300-... |
mjMaAL...== | 1655414829 | 124 | Wotsit | dc33131c-65c7-421a-a0f7-... | 5400ca09-0000-2300-... |
mjMaAL...== | 1655414835 | 125 | Thingumy | ce22351d-78c7-428a-a1h5-... | 5400ca09-0000-2300-... |
... | ... | ... | ... | ... | ... |
Dane są ładowane z magazynu analitycznego w kontenerze, a nie z magazynu operacyjnego; zapewnienie, że w magazynie operacyjnym nie ma żadnych obciążeń związanych z wykonywaniem zapytań. Pola w magazynie danych analitycznych obejmują pola zdefiniowane przez aplikację (w tym przypadku productID i productName) i automatycznie utworzone pola metadanych.
Po załadowaniu ramki danych możesz użyć jej natywnych metod do eksplorowania danych. Na przykład poniższy kod tworzy nową ramkę danych zawierającą tylko kolumny productID i productName uporządkowane według productName:
products_df = df.select("productID", "productName").orderBy("productName")
display(products_df.limit(10))
Dane wyjściowe tego kodu będą wyglądać podobnie do tej tabeli:
productID | productName |
---|---|
125 | Thingumy |
123 | Widget |
124 | Wotsit |
... | ... |
Zapisywanie ramki danych w kontenerze usługi Cosmos DB
W większości scenariuszy HTAP należy użyć połączonej usługi do odczytywania danych do platformy Spark z magazynu analitycznego. Można jednak zapisać zawartość ramki danych w kontenerze, jak pokazano w poniższym przykładzie:
mydf.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.mode('append')\
.save()
Uwaga
Zapisywanie ramki danych w kontenerze aktualizuje magazyn operacyjny i może mieć wpływ na jego wydajność. Zmiany są następnie synchronizowane z magazynem analitycznym.
Wykonywanie zapytań dotyczących danych analitycznych usługi Azure Cosmos DB przy użyciu usługi Spark SQL
Spark SQL to interfejs API platformy Spark, który zapewnia składnię języka SQL i semantyka relacyjnej bazy danych w puli platformy Spark. Możesz użyć usługi Spark SQL do zdefiniowania metadanych dla tabel, które mogą być odpytywane przy użyciu języka SQL.
Na przykład poniższy kod tworzy tabelę o nazwie Products na podstawie hipotetycznego kontenera używanego w poprzednich przykładach:
%%sql
-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;
USE mydb;
-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
spark.synapse.linkedService 'my_linked_service',
spark.cosmos.container 'my-container'
);
-- Query the table
SELECT productID, productName
FROM products;
Napiwek
Słowo %%sql
kluczowe na początku kodu to magia , która instruuje pulę Spark, aby uruchamiała kod jako SQL, a nie jako język domyślny (zazwyczaj ustawiony na PySpark).
Korzystając z tego podejścia, można utworzyć logiczną bazę danych w puli Spark, której następnie można użyć do wykonywania zapytań dotyczących danych analitycznych w usłudze Azure Cosmos DB w celu obsługi analizy danych i raportowania obciążeń bez wpływu na magazyn operacyjny na koncie usługi Azure Cosmos DB.