Consultar dados do Cosmos DB com o Spark

Concluído

Depois de adicionar um serviço vinculado para seu banco de dados do Azure Cosmos DB habilitado para armazenamento analítico, você pode usá-lo para consultar os dados usando um pool do Spark em seu espaço de trabalho do Azure Synapse Analytics.

Carregando dados analíticos do Azure Cosmos DB em um quadro de dados

Para exploração inicial ou análise rápida de dados de um serviço vinculado do Azure Cosmos DB, geralmente é mais fácil carregar dados de um contêiner em um dataframe usando uma linguagem suportada pelo Spark, como PySpark (uma implementação específica do Python do Spark) ou Scala (uma linguagem baseada em Java frequentemente usada no Spark).

Por exemplo, o seguinte código PySpark pode ser usado para carregar um dataframe chamado df dos dados no contêiner my-container conectado ao uso do serviço vinculado my_linked_service e exibir as primeiras 10 linhas de dados:

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

Vamos supor que o contêiner my-container seja usado para armazenar itens semelhantes ao exemplo a seguir:

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

A saída do código PySpark seria semelhante à tabela a seguir:

_rid _ts productID productName id _etag
mjMaAL...== 1655414791 123 Widget 7248F072-11C3-42B1-A368-... 54004B09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400CA09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400CA09-0000-2300-...
... ... ... ... ... ...

Os dados são carregados a partir do armazenamento analítico no contentor, não do armazém operacional; garantindo que não haja sobrecarga de consultas no repositório operacional. Os campos no armazenamento de dados analíticos incluem os campos definidos pelo aplicativo (neste caso , productID e productName) e campos de metadados criados automaticamente.

Depois de carregar o dataframe, você pode usar seus métodos nativos para explorar os dados. Por exemplo, o código a seguir cria um novo dataframe contendo apenas as colunas productID e productName , ordenadas pelo productName:

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

A saída deste código seria semelhante a esta tabela:

productID productName
125 Thingumy
123 Widget
124 Wotsit
... ...

Gravando um dataframe em um contêiner do Cosmos DB

Na maioria dos cenários HTAP, você deve usar o serviço vinculado para ler dados no Spark a partir do repositório analítico. No entanto, você pode gravar o conteúdo de um dataframe no contêiner, conforme mostrado no exemplo a seguir:

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

Nota

Gravar um dataframe em um contêiner atualiza o armazenamento operacional e pode ter um impacto em seu desempenho. As alterações são então sincronizadas com o repositório analítico.

Usando o Spark SQL para consultar dados analíticos do Azure Cosmos DB

O Spark SQL é uma API do Spark que fornece sintaxe de linguagem SQL e semântica de banco de dados relacional em um pool do Spark. Você pode usar o Spark SQL para definir metadados para tabelas que podem ser consultadas usando SQL.

Por exemplo, o código a seguir cria uma tabela chamada Products com base no contêiner hipotético usado nos exemplos anteriores:

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

Gorjeta

A %%sql palavra-chave no início do código é uma mágica que instrui o pool do Spark a executar o código como SQL em vez da linguagem padrão (que geralmente é definida como PySpark).

Usando essa abordagem, você pode criar um banco de dados lógico em seu pool do Spark que pode ser usado para consultar os dados analíticos no Azure Cosmos DB para dar suporte à análise de dados e às cargas de trabalho de relatório sem afetar o armazenamento operacional em sua conta do Azure Cosmos DB.