Exercício - Integrar um bloco de anotações no Azure Synapse Pipelines
Nesta unidade, você cria um bloco de anotações do Azure Synapse Spark para analisar e transformar dados carregados por um fluxo de dados de mapeamento e armazenar os dados em um data lake. Você cria uma célula de parâmetro que aceita um parâmetro de cadeia de caracteres que define o nome da pasta para os dados que o bloco de anotações grava no data lake.
Em seguida, adicione esse bloco de anotações a um pipeline Synapse e passe o ID de execução de pipeline exclusivo para o parâmetro do bloco de anotações para que você possa correlacionar posteriormente a execução do pipeline com os dados salvos pela atividade do bloco de anotações.
Finalmente, use o hub Monitor no Synapse Studio para monitorar a execução do pipeline, obter a ID de execução e, em seguida, localizar os arquivos correspondentes armazenados no data lake.
Sobre o Apache Spark e notebooks
O Apache Spark é uma estrutura de processamento paralelo que suporta processamento na memória para aumentar o desempenho de aplicativos analíticos de big data. O Apache Spark no Azure Synapse Analytics é uma das implementações da Microsoft do Apache Spark na cloud.
Um notebook Apache Spark no Synapse Studio é uma interface web para você criar arquivos que contêm código ao vivo, visualizações e texto narrativo. Os blocos de notas são um bom local para validar ideias e utilizar experiências rápidas para obter informações dos seus dados. Os notebooks também são amplamente utilizados na preparação de dados, visualização de dados, aprendizado de máquina e outros cenários de Big Data.
Criar um bloco de anotações Synapse Spark
Suponha que você criou um fluxo de dados de mapeamento no Synapse Analytics para processar, unir e importar dados de perfil de usuário. Agora, você quer encontrar os cinco principais produtos para cada usuário, com base em quais são os preferidos e a principal escolha, e ter o maior número de compras nos últimos 12 meses. Em seguida, você deseja calcular os cinco principais produtos em geral.
Neste exercício, você cria um bloco de anotações Synapse Spark para fazer esses cálculos.
Abra o Synapse Analytics Studio (https://web.azuresynapse.net/) e vá para o hub de dados .
Selecione a guia Vinculado (1) e expanda a conta de armazenamento do data lake principal (2) abaixo do Azure Data Lake Storage Gen2. Selecione o contêiner wwi-02 (3) e abra a pasta top-products (4). Clique com o botão direito do mouse em qualquer arquivo Parquet (5), selecione o item de menu Novo bloco de anotações (6) e, em seguida, selecione Carregar para DataFrame (7). Se não vir a pasta, selecione
Refresh
.Verifique se o bloco de anotações está conectado ao pool do Spark.
Substitua o nome do arquivo Parquet por
*.parquet
(1) para selecionar todos os arquivos Parquet natop-products
pasta. Por exemplo, o caminho deve ser semelhante a:abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
.Selecione Executar tudo na barra de ferramentas do bloco de anotações para executar o bloco de anotações.
Nota
Na primeira vez que você executa um bloco de anotações em um pool do Spark, o Synapse cria uma nova sessão. Isto pode demorar aproximadamente 3 a 5 minutos.
Nota
Para executar apenas a célula, passe o mouse sobre ela e selecione o ícone Executar célula à esquerda da célula ou selecione a célula e digite Ctrl+Enter.
Crie uma nova célula abaixo selecionando o + botão e selecionando o item Célula de código. O botão + está localizado abaixo da célula do bloco de anotações à esquerda. Como alternativa, você também pode expandir o menu + Célula na barra de ferramentas Bloco de Anotações e selecionar o item Célula de código.
Execute o seguinte comando na nova célula para preencher um novo dataframe chamado
topPurchases
, crie uma nova exibição temporária chamadatop_purchases
e mostre as primeiras 100 linhas:topPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
O resultado deve ser algo semelhante ao seguinte:
+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
Execute o seguinte comando em uma nova célula para criar um novo modo de exibição temporário usando SQL:
%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
Nota
Não há saída para esta consulta.
A consulta usa o
top_purchases
modo de exibição temporário como fonte e aplica umrow_number() over
método para aplicar um número de linha para os registros de cada usuário ondeItemsPurchasedLast12Months
é maior. Awhere
cláusula filtra os resultados para que só recuperemos até cinco produtos onde ambosIsTopProduct
eIsPreferredProduct
estão definidos como true. Isso nos dá os cinco principais produtos mais comprados para cada usuário, onde esses produtos também são identificados como seus produtos favoritos, de acordo com seu perfil de usuário armazenado no Azure Cosmos DB.Execute o seguinte comando em uma nova célula para criar e exibir um novo DataFrame que armazena
top_5_products
os resultados do modo de exibição temporário criado na célula anterior:top5Products = sqlContext.table("top_5_products") top5Products.show(100)
Você verá uma saída semelhante à seguinte, que exibe os cinco principais produtos preferidos por usuário:
Calcule os cinco principais produtos em geral, com base naqueles que são preferidos pelos clientes e comprados mais. Para fazer isso, execute o seguinte comando em uma nova célula:
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
Nesta célula, agrupamos os cinco principais produtos preferidos por ID de produto, somamos o total de itens comprados nos últimos 12 meses, classificamos esse valor em ordem decrescente e retornamos os cinco principais resultados. A saída deve ser semelhante ao seguinte:
+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
Criar uma célula de parâmetro
Os pipelines do Azure Synapse procuram a célula de parâmetros e tratam essa célula como padrão para os parâmetros passados no momento da execução. O mecanismo de execução adicionará uma nova célula abaixo da célula de parâmetros com parâmetros de entrada para substituir os valores padrão. Quando uma célula de parâmetros não é designada, a célula injetada será inserida na parte superior do bloco de notas.
Vamos executar este caderno a partir de um pipeline. Queremos passar um parâmetro que define um valor de
runId
variável que será usado para nomear o arquivo Parquet. Execute o seguinte comando em uma nova célula:import uuid # Generate random GUID runId = uuid.uuid4()
Estamos usando a biblioteca que vem com o
uuid
Spark para gerar um GUID aleatório. Queremos substituir arunId
variável por um parâmetro passado pelo pipeline. Para fazer isso, precisamos alternar isso como uma célula de parâmetro.Selecione as reticências de ações (...) no canto superior direito da célula (1) e, em seguida, selecione Alternar célula de parâmetro (2).
Depois de alternar essa opção, você verá a tag Parâmetros na célula.
Cole o código a seguir em uma nova célula para usar a
runId
variável como o nome do arquivo Parquet no/top5-products/
caminho na conta principal do data lake. SubstituaYOUR_DATALAKE_NAME
no caminho pelo nome da sua conta principal do data lake. Para encontrar isso, role até a célula 1 na parte superior da página (1). Copie a conta de armazenamento do data lake do caminho (2). Cole este valor como um substituto paraYOUR_DATALAKE_NAME
no caminho (3) dentro da nova célula e, em seguida, execute o comando na célula.%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
Verifique se o arquivo foi gravado no data lake. Vá para o hub de dados e selecione a guia Vinculado (1). Expanda a conta de armazenamento principal do data lake e selecione o contêiner wwi-02 (2). Vá para a pasta top5-products (3). Você deve ver uma pasta para o arquivo Parquet no diretório com um GUID como o nome do arquivo (4).
O método de gravação Parquet no dataframe na célula Notebook criou esse diretório porque ele não existia anteriormente.
Adicionar o Bloco de Anotações a um pipeline Synapse
Voltando ao Fluxo de Dados de Mapeamento que descrevemos no início do exercício, suponha que você queira executar este bloco de anotações depois que o Fluxo de Dados for executado como parte do seu processo de orquestração. Para fazer isso, adicione este bloco de anotações a um pipeline como uma nova atividade do Bloco de Anotações.
Regresse ao bloco de notas. Selecione Propriedades (1) no canto superior direito do bloco de notas e, em seguida, introduza
Calculate Top 5 Products
o Nome (2).Selecione Adicionar ao pipeline (1) no canto superior direito do bloco de anotações e, em seguida, selecione Pipeline existente (2).
Selecione o pipeline Gravar dados de perfil de usuário no ASA (1) e, em seguida, selecione Adicionar *(2).
O Synapse Studio adiciona a atividade Notebook ao pipeline. Reorganize a atividade do Bloco de Anotações para que ela fique à direita da atividade Fluxo de dados. Selecione a atividade Fluxo de dados e arraste uma caixa verde Conexão de pipeline de atividade de sucesso para a atividade Bloco de Anotações.
A seta Atividade de sucesso instrui o pipeline a executar a atividade Bloco de Anotações depois que a atividade Fluxo de dados for executada com êxito.
Selecione a atividade Bloco de notas (1) e, em seguida, selecione o separador Definições (2), expanda Parâmetros básicos (3) e, em seguida, selecione + Novo (4). Insira
runId
o campo Nome (5). Selecione String para o tipo (6). Para o Valor, selecione Adicionar conteúdo dinâmico (7).Selecione ID de execução do pipeline em Variáveis do sistema (1). Isso aumenta
@pipeline().RunId
a caixa de conteúdo dinâmico (2). Selecione Concluir (3) para fechar a caixa de diálogo.O valor Pipeline run ID é um GUID exclusivo atribuído a cada execução de pipeline. Usaremos esse valor para o nome do arquivo Parquet passando esse valor como o
runId
parâmetro Notebook. Podemos então examinar o histórico de execução do pipeline e encontrar o arquivo Parquet específico criado para cada execução do pipeline.Selecione Publicar tudo e, em seguida, Publicar para salvar as alterações.
Após a conclusão da publicação, selecione Adicionar gatilho (1) e, em seguida , Acionar agora (2) para executar o pipeline atualizado.
Selecione OK para executar o gatilho.
Monitorizar a execução do pipeline.
O hub Monitor permite monitorar atividades atuais e históricas para SQL, Apache Spark e Pipelines.
Vá para o hub Monitor .
Selecione Pipeline runs (1) e aguarde até que a execução do pipeline seja concluída com êxito (2). Pode ser necessário atualizar (3) a exibição.
Selecione o nome do pipeline para exibir as execuções de atividade do pipeline.
Observe a atividade de fluxo de dados e a nova atividade do Bloco de Anotações (1). Anote o valor de ID de execução do pipeline (2). Vamos comparar isso com o nome do arquivo Parquet gerado pelo notebook. Selecione o nome do Bloco de Anotações Calcular os 5 principais produtos para visualizar seus detalhes (3).
Aqui, vemos os detalhes da execução do Notebook. Você pode selecionar Reprodução (1) para assistir a uma reprodução do progresso através dos trabalhos (2). Na parte inferior, você pode visualizar os Diagnósticos e Logs com diferentes opções de filtro (3). À direita, podemos ver os detalhes da execução, como a duração, Livy ID, detalhes da piscina Spark e assim por diante. Selecione o link Exibir detalhes em um trabalho para exibir seus detalhes (5).
A interface do usuário do aplicativo Spark é aberta em uma nova guia onde podemos ver os detalhes do palco. Expanda a visualização do DAG para exibir os detalhes do palco.
Volte para o hub de dados .
Selecione a guia Vinculado (1) e, em seguida, selecione o contêiner wwi-02 (2) na conta de armazenamento do data lake principal, vá para a pasta top5-products (3) e verifique se existe uma pasta para o arquivo Parquet cujo nome corresponde ao ID de execução do Pipeline.
Como você pode ver, temos um arquivo cujo nome corresponde ao ID de execução do Pipeline que observamos anteriormente:
Esses valores correspondem porque passamos o ID de execução do Pipeline para o
runId
parâmetro na atividade do Bloco de Anotações.