Tutorial: Executar um pipeline de análise do lakehouse de ponta a ponta
Este tutorial mostra como configurar um pipeline de análise de ponta a ponta para um lakehouse do Azure Databricks.
Importante
Este tutorial usa notebooks interativos para concluir tarefas ETL comuns no Python em clusters habilitados para o Catálogo do Unity. Se você não estiver usando o Catálogo do Unity, confira Executar sua primeira carga de trabalho de ETL no Azure Databricks.
Tarefas neste tutorial
Ao final deste artigo, você se sentirá confortável:
- Inicializar um cluster de cálculo habilitado para Catálogo do Unity.
- Criar um notebook do Databricks.
- Gravar e fazer a leitura d dados a partir de um local externo do Catálogo do Unity.
- Configurar a ingestão de dados incremental para uma tabela do Catálogo do Unity com o Carregador Automático.
- Executar células de notebook para processar, consultar e visualizar dados.
- Agendar um notebook como um trabalho do Databricks.
- Consultar tabelas do Catálogo do Unity do SQL do Databricks
O Azure Databricks fornece um conjunto de ferramentas prontas para produção que permitem aos profissionais de dados desenvolver e implantar rapidamente pipelines de ETL (extração, transformação e carregamento). O Catálogo do Unity permite que os administradores de dados configurem e protejam credenciais de armazenamento, locais externos e objetos de banco de dados para usuários em toda uma organização. O SQL do Databricks permite que os analistas executem consultas SQL nas mesmas tabelas usadas nas cargas de trabalho de ETL de produção, permitindo o business intelligence em tempo real em escala.
Use também o Delta Live Tables para criar pipelines de ETL. O Databricks criou o Delta Live Tables para reduzir a complexidade da criação, da implantação e da manutenção de pipelines de ETL de produção. Consulte Tutorial: como executar seu primeiro pipeline do Delta Live Tables.
Requisitos
Observação
Se você não tiver privilégios de controle de cluster, ainda poderá concluir a maioria das etapas abaixo, desde que tenha acesso a um cluster.
Etapa 1: criar um cluster
Para fazer a análise de dados exploratória e a engenharia de dados, crie um cluster para fornecer os recursos de computação necessários para executar comandos.
- Clique em
Computação na barra lateral.
- Clique em
Novo na barra lateral e selecione Cluster. Isso abre a página Novo cluster/computação.
- Especifique um nome exclusivo para o cluster.
- Selecione o botão de opção Nó Único.
- Selecione Usuário Único na lista suspensa do modo de acesso.
- Verifique se o endereço de email está visível no campo de Usuário Único.
- Selecione a versão de runtime do Databricks desejada, 11.1 ou superior para usar o Catálogo do Unity.
- Clique em Criar computação para criar o cluster.
Para saber mais sobre clusters do Databricks, confira Computação.
Etapa 2: criar um notebook do Databricks
Para criar um bloco de anotações em seu workspace, clique em Novo na barra lateral e clique em Notebook. Um notebook em branco é aberto no workspace.
Para saber mais sobre como criar e gerenciar notebooks, consulte Gerenciar notebooks.
Etapa 3: gravar e ler dados de um local externo gerenciado pelo Catálogo do Unity
O Databricks recomenda o uso do Carregador Automático para ingestão de dados incremental. O Carregador Automático detecta e processa automaticamente arquivos novos confirme eles chegam no armazenamento de objeto da nuvem.
Use o Catálogo do Unity para gerenciar o acesso seguro a locais externos. Usuários ou entidades de serviço com permissões READ FILES
em um local externo podem usar o Carregador Automático para ingerir dados.
Normalmente, os dados chegarão em um local externo devido a gravações de outros sistemas. Nesta demonstração, você pode simular a chegada de dados gravando arquivos JSON em um local externo.
Copie o código abaixo em uma célula do notebook. Substitua o valor da cadeia de caracteres por catalog
com o nome de um catálogo com as permissões CREATE CATALOG
e USE CATALOG
. Substitua o valor da cadeia de caracteres por external_location
com o caminho de um local externo pelas permissões READ FILES
, WRITE FILES
e CREATE EXTERNAL TABLE
.
Os locais externos podem ser definidos como um contêiner de armazenamento inteiro, mas geralmente apontam para um diretório aninhado em um contêiner.
O formato correto para um caminho de local externo é "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"
.
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
A execução dessa célula deve imprimir uma linha em que se lê "12 bytes", imprimir a cadeia de caracteres "Olá, mundo!", e exibir todos os bancos de dados presentes no catálogo fornecido. Se você não conseguir que essa célula seja executada, confirme se você está em um workspace habilitado para o Catálogo do Unity e solicite permissões adequadas do administrador do workspace para concluir este tutorial.
O código Python abaixo usa seu endereço de email para criar um banco de dados exclusivo no catálogo fornecido e um local de armazenamento exclusivo no local externo fornecido. A execução dessa célula removerá todos os dados associados a este tutorial, permitindo que você execute este exemplo de maneira idempotente. Uma classe é definida e criará uma instância que você usará para simular lotes de dados que chegam de um sistema conectado ao seu local externo de origem.
Copie esse código para uma nova célula em seu notebook e execute-o para configurar seu ambiente.
Observação
As variáveis definidas nesse código devem permitir que você o execute com segurança sem risco de conflito com ativos de workspace existentes ou outros usuários. Permissões restritas de rede ou armazenamento gerarão erros ao executar esse código, por isso contate o administrador do workspace para solucionar problemas dessas restrições.
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
Agora você pode colocar um lote de dados copiando o código a seguir em uma célula e executando-o. Você pode executar manualmente essa célula até 60 vezes para disparar a chegada de novos dados.
RawData.land_batch()
Etapa 4: configurar o Carregador Automático para ingerir dados no Catálogo do Unity
O Databricks recomenda armazenar dados com o Delta Lake. O Delta Lake é uma camada de armazenamento código aberto que fornece transações ACID e habilita o data lakehouse. O Delta Lake é o formato padrão para tabelas criadas no Databricks.
A fim de configurar o Carregador Automático para ingerir dados em uma tabela do Catálogo do Unity, copie e cole o seguinte código em uma célula vazia em seu notebook:
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
Para saber mais sobre o Carregador Automático, confira O que é o Carregador Automático?.
Para saber mais sobre o fluxo estruturado com o Catálogo do Unity, consulte Usando o Catálogo do Unity com fluxo estruturado.
Etapa 5: processar e interagir com os dados
Os notebooks executam célula por célula lógica. Use estas etapas para executar a lógica em sua célula:
Para executar a célula concluída na etapa anterior, selecione a célula e pressione SHIFT+ENTER.
Para consultar a tabela que você acabou de criar, copie e cole o código a seguir em uma célula vazia e pressione SHIFT+ENTER para executar a célula.
df = spark.read.table(table)
Para visualizar os dados em seu DataFrame, copie e cole o código a seguir em uma célula vazia e pressione SHIFT+ENTER para executar a célula.
display(df)
Para saber mais sobre as opções interativas para visualização de dados, consulte Visualizações em notebooks de Databricks.
Etapa 6: agendar um trabalho
Você pode executar notebooks do Databricks como scripts de produção adicionando-os como uma tarefa em um trabalho do Databricks. Nesta etapa, você criará um novo trabalho que poderá disparar manualmente.
Para agendar seu notebook como uma tarefa:
- Clique em Agendamento no lado direito da barra de cabeçalho.
- Insira um nome exclusivo para o nome do trabalho.
- Clique em Manual.
- Na lista suspensa cluster, selecione o cluster que você criou na etapa 1.
- Clique em Criar.
- Na janela exibida, clique em Executar agora.
- Para consultar os resultados de execução de trabalho, clique no ícone
próximo ao timestamp de Última execução.
Para obter mais informações sobre trabalhos, consulte O que são trabalhos?.
Etapa 7: tabela de consulta do SQL do Databricks
Qualquer pessoa com a permissão USE CATALOG
no catálogo atual, a permissão USE SCHEMA
no esquema atual e a permissão SELECT
na tabela pode consultar o conteúdo da tabela de sua API do Databricks preferida.
Você precisa de acesso a um SQL Warehouse em execução para executar consultas no SQL do Databricks.
A tabela que você criou anteriormente neste tutorial tem o nome target_table
. Você pode consultá-lo usando o catálogo fornecido na primeira célula e o banco de dados com o patern e2e_lakehouse_<your-username>
. Você pode usar o Explorador de Catálogos para localizar os objetos de dados criados.
Integrações adicionais
Saiba mais sobre integrações e ferramentas para engenharia de dados com o Azure Databricks: