Trabalhar com dados em um dataframe do Spark

Concluído

Nativamente, o Spark usa uma estrutura de dados chamada RDD (conjunto de dados distribuído resiliente), mas embora você possa escrever código que funcione diretamente com RDDs, a estrutura de dados mais usada para trabalhar com dados estruturados no Spark é o dataframe, que é fornecido como parte da biblioteca SQL do Spark. Os dataframes no Spark são semelhantes aos da onipresente biblioteca Pandas Python, mas otimizados para funcionar no ambiente de processamento distribuído do Spark.

Nota

Além da API Dataframe, o Spark SQL fornece uma API de conjunto de dados fortemente tipada que é suportada em Java e Scala. Vamos nos concentrar na API Dataframe neste módulo.

Carregando dados em um dataframe

Vamos explorar um exemplo hipotético para ver como você pode usar um dataframe para trabalhar com dados. Suponha que você tenha os seguintes dados em um arquivo de texto delimitado por vírgulas chamado products.csv na pasta Arquivos/dados em sua casa do lago:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Inferindo um esquema

Em um bloco de anotações do Spark, você pode usar o seguinte código PySpark para carregar os dados do arquivo em um dataframe e exibir as primeiras 10 linhas:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

A %%pyspark linha no início é chamada de magia, e diz a Spark que a linguagem usada nesta célula é PySpark. Você pode selecionar o idioma que deseja usar como padrão na barra de ferramentas da interface do Bloco de Anotações e, em seguida, usar uma mágica para substituir essa opção para uma célula específica. Por exemplo, aqui está o código Scala equivalente para o exemplo de dados de produtos:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

A magia %%spark é usada para especificar Scala.

Ambos os exemplos de código produziriam uma saída como esta:

ProductID ProductName Categoria PreçoListado
771 Montanha 100 Prateado, 38 Mountain Bikes 3399.9900
772 Montanha 100 Prateado, 42 Mountain Bikes 3399.9900
773 Montanha 100 Prateado, 44 Mountain Bikes 3399.9900
... ... ... ...

Especificando um esquema explícito

No exemplo anterior, a primeira linha do arquivo CSV continha os nomes das colunas, e o Spark era capaz de inferir o tipo de dados de cada coluna a partir dos dados que ela contém. Você também pode especificar um esquema explícito para os dados, que é útil quando os nomes das colunas não são incluídos no arquivo de dados, como este exemplo CSV:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

O exemplo PySpark a seguir mostra como especificar um esquema para o dataframe a ser carregado de um arquivo chamado product-data.csv neste formato:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Os resultados seriam, mais uma vez, semelhantes a:

ProductID ProductName Categoria PreçoListado
771 Montanha 100 Prateado, 38 Mountain Bikes 3399.9900
772 Montanha 100 Prateado, 42 Mountain Bikes 3399.9900
773 Montanha 100 Prateado, 44 Mountain Bikes 3399.9900
... ... ... ...

Gorjeta

Especificar um esquema explícito também melhora o desempenho!

Filtragem e agrupamento de dataframes

Você pode usar os métodos da classe Dataframe para filtrar, classificar, agrupar e manipular os dados que ela contém. Por exemplo, o exemplo de código a seguir usa o método select para recuperar as colunas ProductID e ListPrice do dataframe df que contém dados do produto no exemplo anterior:

pricelist_df = df.select("ProductID", "ListPrice")

Os resultados deste exemplo de código seriam mais ou menos assim:

ProductID PreçoListado
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Em comum com a maioria dos métodos de manipulação de dados, select retorna um novo objeto dataframe.

Gorjeta

Selecionar um subconjunto de colunas de um dataframe é uma operação comum, que também pode ser obtida usando a seguinte sintaxe mais curta:

pricelist_df = df["ProductID", "ListPrice"]

Você pode "encadear" métodos juntos para executar uma série de manipulações que resultam em um dataframe transformado. Por exemplo, este código de exemplo encadeia os métodos select e where para criar um novo dataframe contendo as colunas ProductName e ListPrice para produtos com uma categoria de Mountain Bikes ou Road Bikes:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Os resultados deste exemplo de código seriam mais ou menos assim:

ProductName Categoria PreçoListado
Montanha 100 Prateado, 38 Mountain Bikes 3399.9900
Road-750 Preta, 52 Road Bikes 539.9900
... ... ...

Para agrupar e agregar dados, você pode usar o método groupBy e agregar funções. Por exemplo, o seguinte código PySpark conta o número de produtos para cada categoria:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Os resultados deste exemplo de código seriam mais ou menos assim:

Categoria contagem
Auriculares 3
Pneus 14
Mountain Bikes 32
... ...

Salvando um dataframe

Muitas vezes, você desejará usar o Spark para transformar dados brutos e salvar os resultados para análise adicional ou processamento downstream. O exemplo de código a seguir salva o dataFrame em um arquivo parquet no data lake, substituindo qualquer arquivo existente com o mesmo nome.

bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')

Nota

O formato Parquet é normalmente preferido para arquivos de dados que você usará para análise adicional ou ingestão em um repositório analítico. O Parquet é um formato muito eficiente que é suportado pela maioria dos sistemas de análise de dados de grande escala. Na verdade, às vezes seu requisito de transformação de dados pode ser simplesmente converter dados de outro formato (como CSV) para Parquet!

Particionando o arquivo de saída

O particionamento é uma técnica de otimização que permite ao Spark maximizar o desempenho entre os nós de trabalho. Mais ganhos de desempenho podem ser obtidos ao filtrar dados em consultas, eliminando E/S de disco desnecessárias.

Para salvar um dataframe como um conjunto particionado de arquivos, use o método partitionBy ao gravar os dados. O exemplo a seguir salva o dataframe bikes_df (que contém os dados do produto para as categorias mountain bikes e bicicletas de estrada) e particiona os dados por categoria:

bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")

Os nomes de pasta gerados ao particionar um dataframe incluem o nome e o valor da coluna de particionamento em um formato column=value, portanto, o exemplo de código cria uma pasta chamada bike_data que contém as seguintes subpastas:

  • Categoria=Bicicletas de montanha
  • Categoria=Bicicletas de Estrada

Cada subpasta contém um ou mais arquivos de parquet com os dados do produto para a categoria apropriada.

Nota

Você pode particionar os dados por várias colunas, o que resulta em uma hierarquia de pastas para cada chave de particionamento. Por exemplo, você pode particionar dados de ordem de venda por ano e mês, para que a hierarquia de pastas inclua uma pasta para cada valor de ano, que, por sua vez, contém uma subpasta para cada valor de mês.

Carregar dados particionados

Ao ler dados particionados em um dataframe, você pode carregar dados de qualquer pasta dentro da hierarquia especificando valores explícitos ou curingas para os campos particionados. O exemplo a seguir carrega dados para produtos na categoria Road Bikes :

road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))

Nota

As colunas de particionamento especificadas no caminho do arquivo são omitidas no dataframe resultante. Os resultados produzidos pela consulta de exemplo não incluiriam uma coluna Categoria - a categoria para todas as linhas seria Road Bikes.