Compartilhar via


Desenvolver código de pipeline com Python

O Delta Live Tables apresenta várias novas construções de código Python para definir exibições materializadas e tabelas de streaming em pipelines. O suporte do Python para o desenvolvimento de pipelines se baseia nas noções básicas do PySpark DataFrame e das APIs de Streaming Estruturado.

Para usuários que não estão familiarizados com Python e DataFrames, o Databricks recomenda usar a interface SQL. Consulte Desenvolver código de pipeline com SQL.

Para obter uma referência completa da sintaxe Python do Delta Live Tables, consulte Referência da linguagem Python do Delta Live Tables.

Noções básicas de Python para desenvolvimento de pipeline

O código Python que cria conjuntos de dados delta live tables deve retornar DataFrames.

Todas as APIs do Python do Delta Live Tables são implementadas no módulo dlt. O código de pipeline do Delta Live Tables implementado com o Python deve importar explicitamente o módulo dlt na parte superior dos notebooks e arquivos do Python.

O código Python específico do Delta Live Tables difere de outros tipos de código Python de uma maneira crítica: o código do pipeline do Python não chama diretamente as funções que executam a ingestão e a transformação de dados para criar conjuntos de dados do Delta Live Tables. Em vez disso, o Delta Live Tables interpreta as funções de decorador do dlt módulo em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados.

Importante

Para evitar um comportamento inesperado quando o pipeline é executado, não inclua código que possa ter efeitos colaterais em suas funções que definem conjuntos de dados. Para saber mais, confira a referência do Python.

Criar uma exibição materializada ou uma tabela de streaming com Python

O @dlt.table decorador informa ao Delta Live Tables para criar uma exibição materializada ou uma tabela de streaming com base nos resultados retornados por uma função. Os resultados de uma leitura em lote criam uma exibição materializada, enquanto os resultados de uma leitura de streaming criam uma tabela de streaming.

Por padrão, os nomes de tabela de exibição e streaming materializados são inferidos de nomes de função. O exemplo de código a seguir mostra a sintaxe básica para criar uma exibição materializada e uma tabela de streaming:

Observação

Ambas as funções fazem referência à mesma tabela no samples catálogo e usam a mesma função decoradora. Esses exemplos destacam que a única diferença na sintaxe básica para exibições materializadas e tabelas de streaming é usar spark.read versus spark.readStream.

Nem todas as fontes de dados dão suporte a leituras de streaming. Algumas fontes de dados sempre devem ser processadas com semântica de streaming.

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Opcionalmente, você pode especificar o nome da tabela usando o name argumento no @dlt.table decorador. O exemplo a seguir demonstra esse padrão para uma exibição materializada e uma tabela de streaming:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Carregar dados do armazenamento de objetos

O Delta Live Tables dá suporte ao carregamento de dados de todos os formatos compatíveis com o Azure Databricks. Confira Opções de formato de arquivo.

Observação

Esses exemplos usam dados disponíveis no /databricks-datasets montado automaticamente em seu espaço de trabalho. O Databricks recomenda o uso de caminhos de volume ou URIs de nuvem para fazer referência a dados armazenados no armazenamento de objetos de nuvem. Confira O que são os volumes do Catálogo do Unity?.

O Databricks recomenda usar o Carregador Automático e as tabelas de streaming ao configurar cargas de trabalho de ingestão incremental em relação aos dados armazenados no armazenamento de objetos de nuvem. Confira O que é o Carregador Automático?.

O exemplo a seguir cria uma tabela de streaming de arquivos JSON usando o Carregador Automático:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

O exemplo a seguir usa a semântica em lote para ler um diretório JSON e criar uma exibição materializada:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Valide dados com expectativas

Você pode usar expectativas para definir e impor restrições de qualidade de dados. Confira Gerenciar a qualidade dos dados com o Delta Live Tables.

O código a seguir usa @dlt.expect_or_drop para definir uma expectativa chamada valid_data que descarta registros nulos durante a ingestão de dados:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Consultar exibições materializadas e tabelas de streaming definidas em seu pipeline

Use o LIVE esquema para consultar outras exibições materializadas e tabelas de streaming definidas em seu pipeline.

O exemplo a seguir define quatro conjuntos de dados:

  • Uma tabela de streaming chamada orders que carrega dados JSON.
  • Uma exibição materializada chamada customers que carrega dados CSV.
  • Uma exibição materializada nomeada customer_orders que une orders registros dos conjuntos de dados e customers converte o carimbo de data/hora do pedido em uma data e seleciona os customer_idcampos , order_number, state, e order_date .
  • Uma exibição materializada chamada daily_orders_by_state que agrega a contagem diária de pedidos para cada estado.
import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("LIVE.orders")
    .join(spark.read.table("LIVE.customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("LIVE.customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Criar tabelas em loop for

Você pode usar loops do Python for para criar várias tabelas programaticamente. Isso pode ser útil quando você tem muitas fontes de dados ou conjuntos de dados de destino que variam em apenas alguns parâmetros, resultando em menos código total para manter e menos redundância de código.

O for loop avalia a lógica em ordem serial, mas depois que o planejamento é concluído para os conjuntos de dados, o pipeline executa a lógica em paralelo.

Importante

Ao usar esse padrão para definir conjuntos de dados, certifique-se de que a lista de valores passados para o for loop seja sempre aditiva. Se um conjunto de dados definido anteriormente em um pipeline for omitido de uma execução de pipeline futura, esse conjunto de dados será descartado automaticamente do esquema de destino.

O exemplo a seguir cria cinco tabelas que filtram pedidos de clientes por região. Aqui, o nome da região é usado para definir o nome das exibições materializadas de destino e filtrar os dados de origem. As exibições temporárias são usadas para definir junções das tabelas de origem usadas na construção das exibições materializadas finais.

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("LIVE.customer_orders")
    nation_region = spark.read.table("LIVE.nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Veja a seguir um exemplo do gráfico de fluxo de dados para esse pipeline:

Um gráfico de fluxo de dados de duas exibições que levam a cinco tabelas regionais.

Solução de problemas: for o loop cria muitas tabelas com os mesmos valores

O modelo de execução lento que os pipelines usam para avaliar o código Python requer que sua lógica faça referência direta a valores individuais quando a função decorada por @dlt.table() for invocada.

O exemplo a seguir demonstra duas abordagens corretas para definir tabelas com um for loop. Em ambos os exemplos, cada nome de tabela da tables lista é explicitamente referenciado na função decorada por @dlt.table().

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

O exemplo a seguir não faz referência a valores corretamente. Este exemplo cria tabelas com nomes distintos, mas todas as tabelas carregam dados do último valor no for loop:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)