Noções básicas do PySpark
Este artigo apresenta exemplos simples para ilustrar o uso do PySpark. Ele pressupõe que você entenda os conceitos fundamentais do Apache Spark e esteja executando comandos em um bloco de anotações do Azure Databricks conectado à computação. Você cria DataFrames usando dados de exemplo, executa transformações básicas, incluindo operações de linha e coluna nesses dados, combina vários DataFrames e agrega esses dados, visualiza esses dados e os salva em uma tabela ou arquivo.
Carregar dados
Alguns exemplos neste artigo usam dados de exemplo fornecidos pelo Databricks para demonstrar o uso de DataFrames para carregar, transformar e salvar dados. Se você quiser usar seus próprios dados que ainda não estão no Databricks, você pode carregá-los primeiro e criar um DataFrame a partir dele. Consulte Criar ou modificar uma tabela usando o upload de arquivos e Carregar arquivos para um volume do Catálogo Unity.
Sobre os dados de exemplo do Databricks
Databricks fornece dados de exemplo no samples
catálogo e no /databricks-datasets
diretório.
- Para acessar os dados de exemplo no
samples
catálogo, use o formatosamples.<schema-name>.<table-name>
. Este artigo usa tabelas nosamples.tpch
esquema, que contém dados de uma empresa fictícia. Acustomer
tabela contém informações sobre os clientes eorders
contém informações sobre os pedidos feitos por esses clientes. - Use
dbutils.fs.ls
para explorar dados no/databricks-datasets
. Use o Spark SQL ou DataFrames para consultar dados nesse local usando caminhos de arquivo. Para saber mais sobre os dados de exemplo fornecidos pelo Databricks, consulte Conjuntos de dados de exemplo.
Importar tipos de dados
Muitas operações do PySpark exigem que você use funções SQL ou interaja com tipos nativos do Spark. Você pode importar diretamente apenas as funções e os tipos necessários ou importar o módulo inteiro.
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
Como algumas funções importadas podem substituir funções internas do Python, alguns usuários optam por importar esses módulos usando um alias. Os exemplos a seguir mostram um alias comum usado em exemplos de código do Apache Spark:
import pyspark.sql.types as T
import pyspark.sql.functions as F
Para obter uma lista abrangente de tipos de dados, consulte Tipos de dados do Spark.
Para obter uma lista abrangente das funções SQL do PySpark, consulte Funções do Spark.
Criar um DataFrame
Há várias maneiras de criar um DataFrame. Normalmente, você define um DataFrame em relação a uma fonte de dados, como uma tabela ou coleção de arquivos. Em seguida, conforme descrito na seção de conceitos fundamentais do Apache Spark, use uma ação, como display
, para acionar as transformações a serem executadas. O display
método produz DataFrames.
Criar um DataFrame com valores especificados
Para criar um DataFrame com valores especificados, use o createDataFrame
método, onde as linhas são expressas como uma lista de tuplas:
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
Observe na saída que os tipos de dados de colunas de df_children
são automaticamente inferidos. Como alternativa, você pode especificar os tipos adicionando um esquema. Os esquemas são definidos usando o StructType
que é composto por que especificam o nome, o tipo de StructFields
dados e um sinalizador booleano indicando se eles contêm um valor nulo ou não. Você deve importar tipos de dados do pyspark.sql.types
.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Criar um DataFrame a partir de uma tabela no Unity Catalog
Para criar um DataFrame a partir de uma tabela no Unity Catalog, use o table
método que identifica a tabela usando o formato <catalog-name>.<schema-name>.<table-name>
. Clique em Catálogo na barra de navegação esquerda para usar o Catalog Explorer para navegar até a tabela. Clique nele e selecione Copiar caminho da tabela para inserir o caminho da tabela no bloco de anotações.
O exemplo a seguir carrega a tabela samples.tpch.customer
, mas você pode, alternativamente, fornecer o caminho para sua própria tabela.
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
Criar um DataFrame a partir de um arquivo carregado
Para criar um DataFrame a partir de um arquivo que você carregou para volumes do Catálogo Unity, use a read
propriedade. Esse método retorna um DataFrameReader
, que você pode usar para ler o formato apropriado. Clique na opção de catálogo na pequena barra lateral à esquerda e use o navegador de catálogo para localizar seu arquivo. Selecione-o e clique em Copiar caminho do arquivo de volume.
O exemplo abaixo lê a partir de um *.csv
arquivo, mas DataFrameReader
suporta o upload de arquivos em muitos outros formatos. Consulte Métodos DataFrameReader.
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Para obter mais informações sobre os volumes do Catálogo Unity, consulte O que são volumes do Catálogo Unity?.
Criar um DataFrame a partir de uma resposta JSON
Para criar um DataFrame a partir de uma carga útil de resposta JSON retornada por uma API REST, use o pacote Python requests
para consultar e analisar a resposta. Você deve importar o pacote para usá-lo. Este exemplo usa dados do banco de dados de aplicativos de medicamentos da Food and Drug Administration dos Estados Unidos.
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
Para obter informações sobre como trabalhar com JSON e outros dados semiestruturados no Databricks, consulte Modelar dados semiestruturados.
Selecionar um campo ou objeto JSON
Para selecionar um campo ou objeto específico do JSON convertido, use a []
notação. Por exemplo, para selecionar o products
campo que é uma matriz de produtos:
display(df_drugs.select(df_drugs["products"]))
Você também pode encadear chamadas de método para atravessar vários campos. Por exemplo, para produzir o nome da marca do primeiro produto em uma aplicação de medicamento:
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
Criar um DataFrame a partir de um arquivo
Para demonstrar a criação de um DataFrame a partir de um arquivo, este exemplo carrega dados CSV no /databricks-datasets
diretório.
Para navegar até os conjuntos de dados de exemplo, você pode usar os comandos do sistema de arquivos Databricks Utilties . O exemplo a seguir usa dbutils
para listar os conjuntos de dados disponíveis em /databricks-datasets
:
display(dbutils.fs.ls('/databricks-datasets'))
Como alternativa, você pode usar %fs
para acessar os comandos do sistema de arquivos da CLI do Databricks, conforme mostrado no exemplo a seguir:
%fs ls '/databricks-datasets'
Para criar um DataFrame a partir de um arquivo ou diretório de arquivos, especifique o load
caminho no método:
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
Transforme dados com DataFrames
DataFrames facilitam a transformação de dados usando métodos internos para classificar, filtrar e agregar dados. Muitas transformações não são especificadas como métodos em DataFrames, mas são fornecidas no spark.sql.functions
pacote. Consulte Databricks Spark SQL Functions.
- Operações de coluna
- Operações de linha
- Junte-se a DataFrames
- Dados agregados
- Encadeamento de chamadas
Operações de coluna
O Spark fornece muitas operações básicas de coluna:
Gorjeta
Para gerar a saída de todas as colunas em um DataFrame, use columns
, por exemplo df_customer.columns
, .
Selecionar colunas
Você pode selecionar colunas específicas usando select
e col
. A col
função está no pyspark.sql.functions
submódulo.
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
Você também pode se referir a uma coluna usando expr
uma expressão definida como uma cadeia de caracteres:
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
Você também pode usar selectExpr
, que aceita expressões SQL:
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
Para selecionar colunas usando um literal de cadeia de caracteres, faça o seguinte:
df_customer.select(
"c_custkey",
"c_acctbal"
)
Para selecionar explicitamente uma coluna de um DataFrame específico, você pode usar o []
operador ou o .
operador. (O .
operador não pode ser usado para selecionar colunas que comecem com um número inteiro ou que contenham um espaço ou caractere especial.) Isso pode ser especialmente útil quando você está unindo DataFrames onde algumas colunas têm o mesmo nome.
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
Criar colunas
Para criar uma nova coluna, use o withColumn
método. O exemplo a seguir cria uma nova coluna que contém um valor booleano com base em se o saldo c_acctbal
da conta do cliente excede 1000
:
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
Mudar o nome das colunas
Para renomear uma coluna, use o withColumnRenamed
método, que aceita os nomes de coluna novos e existentes:
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
O alias
método é especialmente útil quando você deseja renomear suas colunas como parte de agregações:
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
Tipos de coluna de transmissão
Em alguns casos, você pode querer alterar o tipo de dados para uma ou mais das colunas em seu DataFrame. Para fazer isso, use o cast
método para converter entre tipos de dados de coluna. O exemplo a seguir mostra como converter uma coluna de um inteiro para o tipo de cadeia de caracteres, usando o col
método para fazer referência a uma coluna:
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
Remover colunas
Para remover colunas, você pode omitir colunas durante uma seleção ou select(*) except
pode usar o drop
método:
df_customer_flag_renamed.drop("balance_flag_renamed")
Você também pode soltar várias colunas de uma só vez:
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
Operações de linha
O Spark fornece muitas operações básicas de linha:
- Filtrar linhas
- Remover linhas duplicadas
- Manipular valores nulos
- Acrescentar linhas
- Ordenar linhas
- Filtrar linhas
Filtrar linhas
Para filtrar linhas, use o filter
método ou where
em um DataFrame para retornar apenas determinadas linhas. Para identificar uma coluna para filtrar, use o col
método ou uma expressão que é avaliada como uma coluna.
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
Para filtrar em várias condições, use operadores lógicos. Por exemplo, &
e |
permitir que você e AND
OR
condições, respectivamente. O exemplo a seguir filtra linhas em que o c_nationkey
é igual a 20
e c_acctbal
é maior que 1000
.
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
Remover linhas duplicadas
Para eliminar a duplicação de linhas, use distinct
, que retorna apenas as linhas exclusivas.
df_unique = df_customer.distinct()
Manipular valores nulos
Para manipular valores nulos, solte linhas que contenham valores nulos usando o na.drop
método. Esse método permite especificar se você deseja soltar linhas contendo any
valores nulos ou all
valores nulos.
Para descartar quaisquer valores nulos, use um dos exemplos a seguir.
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
Se, em vez disso, você quiser filtrar apenas as linhas que contêm todos os valores nulos, use o seguinte:
df_customer_no_nulls = df_customer.na.drop("all")
Você pode aplicar isso a um subconjunto de colunas especificando isso, conforme mostrado abaixo:
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
Para preencher os valores em falta, utilize o fill
método. Você pode optar por aplicar isso a todas as colunas ou a um subconjunto de colunas. No exemplo abaixo, os saldos de conta que têm um valor nulo para o saldo c_acctbal
da conta são preenchidos com 0
.
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
Para substituir cadeias de caracteres por outros valores, use o replace
método. No exemplo abaixo, todas as cadeias de endereços vazias são substituídas pela palavra UNKNOWN
:
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
Acrescentar linhas
Para acrescentar linhas, você precisa usar o union
método para criar um novo DataFrame. No exemplo a seguir, o DataFrame df_that_one_customer
criado anteriormente e df_filtered_customer
são combinados, que retorna um DataFrame com três clientes:
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
Nota
Você também pode combinar DataFrames gravando-os em uma tabela e, em seguida, anexando novas linhas. Para cargas de trabalho de produção, o processamento incremental de fontes de dados para uma tabela de destino pode reduzir drasticamente a latência e os custos de computação à medida que os dados crescem em tamanho. Consulte Ingerir dados em uma casa de lago Databricks.
Ordenar linhas
Importante
A classificação pode ser cara em escala, e se você armazenar dados classificados e recarregar os dados com o Spark, o pedido não é garantido. Certifique-se de que você é intencional em seu uso de classificação.
Para classificar linhas por uma ou mais colunas, use o sort
método ou orderBy
. Por padrão, esses métodos classificam em ordem crescente:
df_customer.orderBy(col("c_acctbal"))
Para filtrar em ordem decrescente, use desc
:
df_customer.sort(col("c_custkey").desc())
O exemplo a seguir mostra como classificar em duas colunas:
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
Para limitar o número de linhas a serem retornadas depois que o DataFrame for classificado, use o limit
método. O exemplo a seguir exibe apenas os 10
principais resultados:
display(df_sorted.limit(10))
Junte-se a DataFrames
Para unir dois ou mais DataFrames, use o join
método. Você pode especificar como gostaria que os DataFrames fossem unidos nos how
parâmetros (o tipo de junção) e on
(em quais colunas basear a junção). Os tipos comuns de junção incluem:
-
inner
: Este é o padrão de tipo de junção, que retorna um DataFrame que mantém apenas as linhas onde há uma correspondência para oon
parâmetro em todos os DataFrames. -
left
: Isso mantém todas as linhas do primeiro DataFrame especificado e apenas as linhas do segundo DataFrame especificado que têm uma correspondência com o primeiro. -
outer
: Uma junção externa mantém todas as linhas de ambos os DataFrames, independentemente da correspondência.
Para obter informações detalhadas sobre associações, consulte Trabalhar com associações no Azure Databricks. Para obter uma lista de junções suportadas no PySpark, consulte Associações DataFrame.
O exemplo a seguir retorna um único DataFrame onde cada linha do DataFrame é unida orders
com a linha correspondente do customers
DataFrame. Uma junção interna é usada, pois a expectativa é que cada pedido corresponda exatamente a um cliente.
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
Para unir em várias condições, use operadores booleanos como &
e |
para especificar AND
e OR
, respectivamente. O exemplo a seguir adiciona uma condição adicional, filtrando apenas para as linhas que têm o_totalprice
maior que 500,000
:
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
Dados agregados
Para agregar dados em um DataFrame, semelhante a um GROUP BY
em SQL, use o groupBy
método para especificar colunas para agrupar e o agg
método para especificar agregações. Importar agregações comuns, incluindo avg
, sum
, max
, e min
de pyspark.sql.functions
. O exemplo a seguir mostra o saldo médio do cliente por segmento de mercado:
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
Algumas agregações são ações, o que significa que desencadeiam cálculos. Neste caso, você não precisa usar outras ações para produzir resultados.
Para contar linhas em um DataFrame, use o count
método:
df_customer.count()
Encadeamento de chamadas
Os métodos que transformam DataFrames retornam DataFrames, e o Spark não age em transformações até que as ações sejam chamadas. Esta avaliação preguiçosa significa que você pode encadear vários métodos para conveniência e legibilidade. O exemplo a seguir mostra como encadear a filtragem, a agregação e a ordenação:
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
Visualize seu DataFrame
Para visualizar um DataFrame em um bloco de anotações, clique no + sinal ao lado da tabela no canto superior esquerdo do DataFrame e selecione Visualização para adicionar um ou mais gráficos com base no seu DataFrame. Para obter detalhes sobre visualizações, consulte Visualizações em blocos de anotações Databricks.
display(df_order)
Para executar visualizações adicionais, o Databricks recomenda o uso da API pandas para Spark. O .pandas_api()
permite que você converta para a API pandas correspondente para um Spark DataFrame. Para obter mais informações, consulte API Pandas no Spark.
Guardar os dados
Depois de transformar seus dados, você pode salvá-los usando os DataFrameWriter
métodos. Uma lista completa desses métodos pode ser encontrada em DataFrameWriter. As seções a seguir mostram como salvar seu DataFrame como uma tabela e como uma coleção de arquivos de dados.
Salve seu DataFrame como uma tabela
Para salvar seu DataFrame como uma tabela no Unity Catalog, use o write.saveAsTable
método e especifique o caminho no formato <catalog-name>.<schema-name>.<table-name>
.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
Escreva seu DataFrame como CSV
Para gravar seu DataFrame no *.csv
formato, use o write.csv
método, especificando o formato e as opções. Por padrão, se existirem dados no caminho especificado, a operação de gravação falhará. Você pode especificar um dos seguintes modos para executar uma ação diferente:
-
overwrite
substitui todos os dados existentes no caminho de destino pelo conteúdo do DataFrame. -
append
acrescenta conteúdo do DataFrame aos dados no caminho de destino. -
ignore
Falha silenciosamente na gravação se existirem dados no caminho de destino.
O exemplo a seguir demonstra a substituição de dados com conteúdo DataFrame como arquivos CSV:
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
Próximos passos
Para aproveitar mais recursos do Spark no Databricks, consulte: