Wykonywanie zapytań dotyczących danych usługi Cosmos DB za pomocą platformy Spark

Ukończone

Po dodaniu połączonej usługi dla bazy danych usługi Azure Cosmos DB z włączoną obsługą magazynu analitycznego możesz użyć jej do wykonywania zapytań dotyczących danych przy użyciu puli Spark w obszarze roboczym usługi Azure Synapse Analytics.

Ładowanie danych analitycznych usługi Azure Cosmos DB do ramki danych

W przypadku początkowej eksploracji lub szybkiej analizy danych z połączonej usługi Azure Cosmos DB często najłatwiej jest załadować dane z kontenera do ramki danych przy użyciu języka obsługiwanego przez platformę Spark, takiego jak PySpark (implementacja języka Python specyficzna dla platformy Spark) lub Scala (język oparty na języku Java często używany na platformie Spark).

Na przykład następujący kod PySpark może służyć do załadowania ramki danych o nazwie df z danych w kontenerze my-container połączonym przy użyciu połączonej usługi my_linked_service i wyświetlenia pierwszych 10 wierszy danych:

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

Załóżmy, że kontener my-container jest używany do przechowywania elementów podobnych do następującego przykładu:

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

Dane wyjściowe z kodu PySpark będą podobne do poniższej tabeli:

_rid _ts productID productName identyfikator _etag
mjMaAL...== 1655414791 123 Widget 7248f072-11c3-42b1-a368-... 54004b09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400ca09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400ca09-0000-2300-...
... ... ... ... ... ...

Dane są ładowane z magazynu analitycznego w kontenerze, a nie z magazynu operacyjnego; zapewnienie, że w magazynie operacyjnym nie ma żadnych obciążeń związanych z wykonywaniem zapytań. Pola w magazynie danych analitycznych obejmują pola zdefiniowane przez aplikację (w tym przypadku productID i productName) i automatycznie utworzone pola metadanych.

Po załadowaniu ramki danych możesz użyć jej natywnych metod do eksplorowania danych. Na przykład poniższy kod tworzy nową ramkę danych zawierającą tylko kolumny productID i productName uporządkowane według productName:

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

Dane wyjściowe tego kodu będą wyglądać podobnie do tej tabeli:

productID productName
125 Thingumy
123 Widget
124 Wotsit
... ...

Zapisywanie ramki danych w kontenerze usługi Cosmos DB

W większości scenariuszy HTAP należy użyć połączonej usługi do odczytywania danych do platformy Spark z magazynu analitycznego. Można jednak zapisać zawartość ramki danych w kontenerze, jak pokazano w poniższym przykładzie:

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

Uwaga

Zapisywanie ramki danych w kontenerze aktualizuje magazyn operacyjny i może mieć wpływ na jego wydajność. Zmiany są następnie synchronizowane z magazynem analitycznym.

Wykonywanie zapytań dotyczących danych analitycznych usługi Azure Cosmos DB przy użyciu usługi Spark SQL

Spark SQL to interfejs API platformy Spark, który zapewnia składnię języka SQL i semantyka relacyjnej bazy danych w puli platformy Spark. Możesz użyć usługi Spark SQL do zdefiniowania metadanych dla tabel, które mogą być odpytywane przy użyciu języka SQL.

Na przykład poniższy kod tworzy tabelę o nazwie Products na podstawie hipotetycznego kontenera używanego w poprzednich przykładach:

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

Napiwek

Słowo %%sql kluczowe na początku kodu to magia , która instruuje pulę Spark, aby uruchamiała kod jako SQL, a nie jako język domyślny (zazwyczaj ustawiony na PySpark).

Korzystając z tego podejścia, można utworzyć logiczną bazę danych w puli Spark, której następnie można użyć do wykonywania zapytań dotyczących danych analitycznych w usłudze Azure Cosmos DB w celu obsługi analizy danych i raportowania obciążeń bez wpływu na magazyn operacyjny na koncie usługi Azure Cosmos DB.