Copiar dados do Azure Cosmos DB para um pool de SQL dedicado usando o Apache Spark
O Link do Azure Synapse para Azure Cosmos DB permite que os usuários executem análises quase em tempo real nos dados operacionais no Azure Cosmos DB. No entanto, há ocasiões em que alguns dados precisam ser agregados e enriquecidos para atender aos usuários do data warehouse. A coleta e a exportação de dados do Link do Azure Synapse pode ser feita com apenas algumas células em um notebook.
Pré-requisitos
- Provisionar um workspace do Synapse com:
- Provisionar uma conta do Azure Cosmos DB com um contêiner HTAP com dados
- Conectar o contêiner HTAP do Azure Cosmos DB ao workspace
- Ter a configuração correta para importar dados em um pool de SQL dedicado do Spark
Etapas
Neste tutorial, você se conectará ao repositório analítico para que não haja impacto sobre o repositório transacional (ele não consumirá nenhuma Unidade de Solicitação). Percorreremos as seguintes etapas:
- Leia o contêiner HTAP do Azure Cosmos DB em um dataframe do Spark
- Agregar os resultados em um novo dataframe
- Ingerir os dados em um pool de SQL dedicado
Dados
Nesse exemplo, usamos um contêiner HTAP chamado RetailSales. Ele faz parte de um serviço vinculado chamado ConnectedData e tem o seguinte esquema:
- _rid: string (nullable = true)
- _ts: long (nullable = true)
- logQuantity: double (nullable = true)
- productCode: string (nullable = true)
- quantity: long (nullable = true)
- price: long (nullable = true)
- id: string (nullable = true)
- advertising: long (nullable = true)
- storeId: long (nullable = true)
- weekStarting: long (nullable = true)
- _etag: string (nullable = true)
Agregaremos as vendas (quantity, revenue – preço x quantidade) por productCode e weekStarting para fins de relatório. Por fim, exportaremos esses dados para uma tabela do pool de SQL dedicado chamada dbo.productsales
.
Configurar um notebook do Spark
Crie um notebook do Spark com o Scala tendo o Spark (Scala) como a linguagem principal. Usaremos a configuração padrão do notebook para a sessão.
Ler os dados no Spark
Leia o contêiner HTAP do Azure Cosmos DB com Spark em um dataframe na primeira célula.
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
Agregar os resultados em um novo dataframe
Na segunda célula, executaremos a transformação e as agregações necessárias para o novo dataframe antes de carregá-lo em um banco de dados do pool de SQL dedicado.
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
Carregar os resultados em um pool de SQL dedicado
Na terceira célula, carregaremos os dados em um pool de SQL dedicado. Ele criará automaticamente uma tabela externa temporária, uma fonte de dados externa e um formato de arquivo externo que será excluído quando o trabalho for concluído.
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
Consultar os resultados com o SQL
Consulte o resultado usando uma consulta SQL simples, como o seguinte script SQL:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
Sua consulta apresentará os seguintes resultados em um modo de gráfico: