Partilhar via


Desenvolva código de pipeline com Python

Delta Live Tables introduz várias novas construções de código Python para definir visualizações materializadas e tabelas de streaming em pipelines. O suporte Python para o desenvolvimento de pipelines baseia-se nos conceitos básicos do PySpark DataFrame e APIs de Streaming Estruturado.

Para usuários não familiarizados com Python e DataFrames, o Databricks recomenda o uso da interface SQL. Consulte Desenvolver código de pipeline com SQL.

Para obter uma referência completa da sintaxe Python do Delta Live Tables, consulte Referência da linguagem Python do Delta Live Tables.

Noções básicas de Python para desenvolvimento de pipeline

O código Python que cria conjuntos de dados Delta Live Tables deve retornar DataFrames.

Todas as APIs Python do Delta Live Tables são implementadas no dlt módulo. Seu código de pipeline Delta Live Tables implementado com Python deve importar explicitamente o dlt módulo na parte superior dos blocos de anotações e arquivos Python.

O código Python específico do Delta Live Tables difere de outros tipos de código Python de uma maneira crítica: o código de pipeline do Python não chama diretamente as funções que executam a ingestão e a transformação de dados para criar conjuntos de dados do Delta Live Tables. Em vez disso, o Delta Live Tables interpreta as funções do decorador do dlt módulo em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados.

Importante

Para evitar um comportamento inesperado quando o pipeline é executado, não inclua código que possa ter efeitos colaterais em suas funções que definem conjuntos de dados. Para saber mais, consulte a referência do Python.

Crie uma visualização materializada ou uma tabela de streaming com Python

O @dlt.table decorador diz à Delta Live Tables para criar uma vista materializada ou tabela de streaming com base nos resultados retornados por uma função. Os resultados de uma leitura em lote criam uma exibição materializada, enquanto os resultados de uma leitura de streaming criam uma tabela de streaming.

Por padrão, os nomes de tabela de exibição materializada e streaming são inferidos a partir de nomes de funções. O exemplo de código a seguir mostra a sintaxe básica para criar uma exibição materializada e uma tabela de streaming:

Nota

Ambas as funções fazem referência à samples mesma tabela no catálogo e usam a mesma função de decorador. Esses exemplos destacam que a única diferença na sintaxe básica para exibições materializadas e tabelas de streaming é usar spark.read versus spark.readStream.

Nem todas as fontes de dados suportam leituras de streaming. Algumas fontes de dados devem ser sempre processadas com semântica de streaming.

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Opcionalmente, você pode especificar o nome da tabela usando o name argumento no @dlt.table decorador. O exemplo a seguir demonstra esse padrão para uma exibição materializada e uma tabela de streaming:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Carregar dados do armazenamento de objetos

O Delta Live Tables dá suporte ao carregamento de dados de todos os formatos suportados pelo Azure Databricks. Consulte Opções de formato de dados.

Nota

Estes exemplos usam dados disponíveis sob o /databricks-datasets montado automaticamente em seu espaço de trabalho. O Databricks recomenda o uso de caminhos de volume ou URIs de nuvem para fazer referência a dados armazenados no armazenamento de objetos em nuvem. Consulte O que são volumes do Catálogo Unity?.

O Databricks recomenda o uso do Auto Loader e de tabelas de streaming ao configurar cargas de trabalho de ingestão incremental em relação aos dados armazenados no armazenamento de objetos na nuvem. Consulte O que é Auto Loader?.

O exemplo a seguir cria uma tabela de streaming a partir de arquivos JSON usando o Auto Loader:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

O exemplo a seguir usa semântica em lote para ler um diretório JSON e criar uma exibição materializada:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Valide dados com expectativas

Você pode usar as expectativas para definir e impor restrições de qualidade de dados. Consulte Gerenciar a qualidade dos dados com o Delta Live Tables.

O código a seguir usa @dlt.expect_or_drop para definir uma expectativa chamada valid_data que descarta registros que são nulos durante a ingestão de dados:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Consultar exibições materializadas e tabelas de streaming definidas em seu pipeline

Use o LIVE esquema para consultar outras exibições materializadas e tabelas de streaming definidas em seu pipeline.

O exemplo a seguir define quatro conjuntos de dados:

  • Uma tabela de streaming chamada orders que carrega dados JSON.
  • Uma vista materializada denominada customers que carrega dados CSV.
  • Um modo de exibição materializado chamado customer_orders que une registros dos conjuntos de dados e customers , converte o carimbo de orders data/hora do pedido em uma data e seleciona os customer_idcampos , order_number, statee order_date .
  • Uma visão materializada nomeada daily_orders_by_state que agrega a contagem diária de pedidos para cada estado.
import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("LIVE.orders")
    .join(spark.read.table("LIVE.customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("LIVE.customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Criar tabelas em um for loop

Você pode usar loops Python for para criar várias tabelas programaticamente. Isso pode ser útil quando você tem muitas fontes de dados ou conjuntos de dados de destino que variam em apenas alguns parâmetros, resultando em menos código total para manter e menos redundância de código.

O for loop avalia a lógica em ordem serial, mas uma vez que o planejamento é concluído para os conjuntos de dados, o pipeline executa a lógica em paralelo.

Importante

Ao usar esse padrão para definir conjuntos de dados, certifique-se de que a lista de valores passados para o for loop seja sempre aditiva. Se um conjunto de dados definido anteriormente em um pipeline for omitido de uma execução de pipeline futura, esse conjunto de dados será descartado automaticamente do esquema de destino.

O exemplo a seguir cria cinco tabelas que filtram pedidos de clientes por região. Aqui, o nome da região é usado para definir o nome das exibições materializadas de destino e filtrar os dados de origem. As visualizações temporárias são usadas para definir junções das tabelas de origem usadas na construção das exibições materializadas finais.

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("LIVE.customer_orders")
    nation_region = spark.read.table("LIVE.nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Segue-se um exemplo do gráfico de fluxo de dados para este pipeline:

Um gráfico de fluxo de dados de duas visualizações que levam a cinco tabelas regionais.

Solução de problemas: for o loop cria muitas tabelas com os mesmos valores

O modelo de execução lento que os pipelines usam para avaliar o código Python requer que sua lógica faça referência direta a valores individuais quando a função decorada por @dlt.table() é invocada.

O exemplo a seguir demonstra duas abordagens corretas para definir tabelas com um for loop. Em ambos os exemplos, cada nome de tabela da tables lista é explicitamente referenciado dentro da função decorada por @dlt.table().

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

O exemplo a seguir não faz referência a valores corretamente. Este exemplo cria tabelas com nomes distintos, mas todas as tabelas carregam dados do último valor no for loop:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)