Trabalhar com dados em um dataframe do Spark

Concluído

Nativamente, o Spark usa uma estrutura de dados chamada RDD (conjunto de dados distribuídos resilientes), mas, embora você possa escrever um código que funcione de maneira direta com os RDDs, a estrutura de dados mais usada para trabalhar com os dados estruturados no Spark é o dataframe, que é fornecido como parte da biblioteca do Spark SQL. Os dataframes no Spark são semelhantes aos da biblioteca Python onipresente Pandas, mas otimizados para funcionar no ambiente de processamento distribuído do Spark.

Observação

Além da API de Dataframe, o Spark SQL fornece uma API de Conjunto de Dados fortemente tipada com suporte para Java e Scala. Neste módulo, vamos no concentrar na API de Dataframe.

Carregando dados em um dataframe

Vamos explorar um exemplo hipotético para ver como você pode usar um dataframe para trabalhar usando dados. Imagine que você tem os seguintes dados em um arquivo de texto delimitado por vírgulas chamado products.csv na pasta Arquivos/dados do lakehouse:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Inferir um esquema

Em um notebook do Spark, é possível usar o seguinte código PySpark para carregar os dados do arquivo em um dataframe e exibir as dez primeiras linhas:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

A linha %%pyspark no início é chamada magic e informa ao Spark que a linguagem usada nessa célula é PySpark. Você pode selecionar a linguagem que deseja usar como padrão na barra de ferramentas da interface do Notebook e, depois, usar o magic para substituir essa escolha por uma célula específica. Por exemplo, este é o código Scala equivalente do exemplo de dados de produtos:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

O magic %%spark é usada para especificar Scala.

Ambos os exemplos de código produziriam uma saída como esta:

ProductID ProductName Categoria ListPrice
771 Mountain-100 Silver, 38 Mountain bikes 3399.9900
772 Mountain-100 Silver, 42 Mountain bikes 3399.9900
773 Mountain-100 Silver, 44 Mountain bikes 3399.9900
... ... ... ...

Especificar um esquema explícito

No exemplo anterior, a primeira linha do arquivo CSV continha os nomes das colunas, e o Spark conseguiu inferir o tipo de dados de cada coluna com base nos dados contidos nela. Você também pode especificar um esquema explícito para os dados, o que é útil quando os nomes de coluna não são incluídos no arquivo de dados, como este exemplo de CSV:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

O seguinte exemplo do PySpark mostra como especificar um esquema para o dataframe a ser carregado de um arquivo chamado product-data.csv neste formato:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Os resultados seriam mais uma vez semelhantes a:

ProductID ProductName Categoria ListPrice
771 Mountain-100 Silver, 38 Mountain bikes 3399.9900
772 Mountain-100 Silver, 42 Mountain bikes 3399.9900
773 Mountain-100 Silver, 44 Mountain bikes 3399.9900
... ... ... ...

Dica

A especificação de um esquema explícito também melhora o desempenho.

Filtrando e agrupando dataframes

Você pode usar os métodos da classe Dataframe para filtrar, classificar, agrupar e manipular os dados contidos nela. Por exemplo, o exemplo de código a seguir usa o método select para recuperar as colunas ProductID e ListPrice do dataframe df que contém dados do produto no exemplo anterior:

pricelist_df = df.select("ProductID", "ListPrice")

Os resultados desse exemplo de código seriam semelhantes a estes:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Em comum com a maioria dos métodos de manipulação de dados, select retorna um novo objeto de dataframe.

Dica

Selecionar um subconjunto de colunas de um dataframe é uma operação comum, que também pode ser realizada usando a seguinte sintaxe mais curta:

pricelist_df = df["ProductID", "ListPrice"]

Você pode "encadear" métodos para executar uma série de manipulações que resultam em um dataframe transformado. Por exemplo, este código de exemplo encadeia os métodos select e where para criar um dataframe contendo as colunas ProductName e ListPrice para produtos nas categorias Mountain Bikes ou Road Bikes:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Os resultados desse exemplo de código seriam semelhantes a estes:

ProductName Categoria ListPrice
Mountain-100 Silver, 38 Mountain bikes 3399.9900
Road-750 Black, 52 Bicicletas de estrada 539,9900
... ... ...

Para agrupar e agregar dados, você pode usar o método groupBy e as funções de agregação. Por exemplo, o seguinte código PySpark conta o número de produtos de cada categoria:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Os resultados desse exemplo de código seriam semelhantes a estes:

Categoria count
Fones de ouvido 3
Rodas 14
Mountain bikes 32
... ...

Salvar um dataframe

Muitas vezes, você deseja usar o Spark para transformar dados brutos e salvar os resultados para análises posteriores ou processamento downstream. O exemplo de código a seguir salva o dataFrame em um arquivo Parquet no data lake, substituindo os arquivos existentes com o mesmo nome.

bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')

Observação

Normalmente, o formato Parquet é preferencial para os arquivos de dados que você usará para análise ou ingestão posterior em um repositório analítico. O Parquet é um formato muito eficiente que é compatível com a maioria dos sistemas de análise de dados em grande escala. Na verdade, às vezes, seu requisito de transformação de dados pode ser apenas converter dados de outro formato (como CSV) em Parquet.

Particionar o arquivo de saída

O particionamento é uma técnica de otimização que permite ao Spark maximizar o desempenho nos nós de trabalho. Mais ganhos de desempenho podem ser obtidos ao filtrar dados em consultas eliminando a E/S de disco desnecessária.

Para salvar um dataframe como um conjunto particionado de arquivos, use o método partitionBy ao gravar os dados. O seguinte exemplo salva o dataframe bikes_df (que contém os dados do produto para as categorias bicicletas de montanha e bicicletas de estrada) e particiona os dados por categoria:

bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")

Os nomes de pasta gerados ao particionar um dataframe incluem o nome e o valor da coluna de particionamento em um formato column=value, portanto, o exemplo de código cria uma pasta chamada bike_data que contém as seguintes subpastas:

  • Category=Mountain Bikes
  • Category=Road Bikes

Cada subpasta contém um ou mais arquivos parquet com os dados do produto para a categoria apropriada.

Observação

Você pode particionar os dados por várias colunas, o que resulta em uma hierarquia de pastas para cada chave de particionamento. Por exemplo, é possível particionar dados de ordens de vendas por ano e por mês, para que a hierarquia de pastas inclua uma pasta para cada valor de ano, que contém uma subpasta para cada valor de mês.

Carregar dados particionados

Ao ler dados particionados em um dataframe, é possível carregar dados de qualquer pasta na hierarquia especificando valores explícitos ou curingas para os campos particionados. O seguinte exemplo carrega dados para produtos na categoria bicicletas de estrada:

road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))

Observação

As colunas de particionamento especificadas no caminho do arquivo são omitidas no dataframe resultante. Os resultados produzidos pela consulta de exemplo não incluiriam uma coluna Categoria. A categoria para todas as linhas seria bicicletas de estrada.