Analisar dados com o Spark
Um dos benefícios do uso do Spark é escrever e executar código em várias linguagens de programação, permitindo que você use as suas habilidades de programação e a linguagem mais apropriada para uma tarefa específica. A linguagem padrão em um novo notebook Spark do Azure Synapse Analytics é o PySpark, uma versão do Python otimizada para Spark, que é popularmente usada por analistas e cientistas de dados devido ao forte suporte à manipulação e visualização de dados. Além disso, você pode usar linguagens como Scala (uma linguagem derivada de Java que pode ser usada interativamente) e SQL (uma variante da linguagem SQL bastante usada incluída na biblioteca do Spark SQL para trabalhar com estruturas de dados relacionais). Os engenheiros de software também podem criar soluções compiladas que são executadas no Spark usando estruturas como Java e Microsoft .NET.
Explorando dados com dataframes
Nativamente, o Spark usa uma estrutura de dados chamada RDD (conjunto de dados distribuídos resilientes), mas, embora você possa escrever um código que funcione de maneira direta com os RDDs, a estrutura de dados mais usada para trabalhar com os dados estruturados no Spark é o dataframe, que é fornecido como parte da biblioteca do Spark SQL. Os dataframes no Spark são semelhantes aos da biblioteca Python onipresente Pandas, mas otimizados para funcionar no ambiente de processamento distribuído do Spark.
Observação
Além da API de Dataframe, o Spark SQL fornece uma API de Conjunto de Dados fortemente tipada com suporte para Java e Scala. Neste módulo, vamos no concentrar na API de Dataframe.
Carregando dados em um dataframe
Vamos explorar um exemplo hipotético para ver como você pode usar um dataframe para trabalhar usando dados. Suponha que você tenha os seguintes dados em um arquivo de texto delimitado por vírgulas chamado products.csv na conta de armazenamento primária de um workspace do Azure Synapse Analytics:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Em um notebook Spark, você pode usar o seguinte código PySpark para carregar os dados em um dataframe e exibir as primeiras dez linhas:
%%pyspark
df = spark.read.load('abfss://container@store.dfs.core.windows.net/products.csv',
format='csv',
header=True
)
display(df.limit(10))
A linha %%pyspark
no início é chamada magic e informa ao Spark que a linguagem usada nessa célula é PySpark. Você pode selecionar a linguagem que deseja usar como padrão na barra de ferramentas da interface do Notebook e, depois, usar o magic para substituir essa escolha por uma célula específica. Por exemplo, este é o código Scala equivalente do exemplo de dados de produtos:
%%spark
val df = spark.read.format("csv").option("header", "true").load("abfss://container@store.dfs.core.windows.net/products.csv")
display(df.limit(10))
O magic %%spark
é usada para especificar Scala.
Ambos os exemplos de código produziriam uma saída como esta:
ProductID | ProductName | Categoria | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountain bikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountain bikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountain bikes | 3399.9900 |
... | ... | ... | ... |
Especificando um esquema de dataframe
No exemplo anterior, a primeira linha do arquivo CSV continha os nomes das colunas, e o Spark conseguiu inferir o tipo de dados de cada coluna com base nos dados contidos nela. Você também pode especificar um esquema explícito para os dados, o que é útil quando os nomes de coluna não são incluídos no arquivo de dados, como este exemplo de CSV:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
O seguinte exemplo do PySpark mostra como especificar um esquema para o dataframe a ser carregado de um arquivo chamado product-data.csv neste formato:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('abfss://container@store.dfs.core.windows.net/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Os resultados seriam mais uma vez semelhantes a:
ProductID | ProductName | Categoria | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountain bikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountain bikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountain bikes | 3399.9900 |
... | ... | ... | ... |
Filtrando e agrupando dataframes
Você pode usar os métodos da classe Dataframe para filtrar, classificar, agrupar e manipular os dados contidos nela. Por exemplo, o seguinte exemplo de código usa o método select para recuperar as colunas ProductName e ListPrice do dataframe df que contém os dados do produto no exemplo anterior:
pricelist_df = df.select("ProductID", "ListPrice")
Os resultados desse exemplo de código seriam semelhantes a estes:
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
Em comum com a maioria dos métodos de manipulação de dados, select retorna um novo objeto de dataframe.
Dica
Selecionar um subconjunto de colunas de um dataframe é uma operação comum, que também pode ser realizada usando a seguinte sintaxe mais curta:
pricelist_df = df["ProductID", "ListPrice"]
Você pode "encadear" métodos para executar uma série de manipulações que resultam em um dataframe transformado. Por exemplo, este código de exemplo encadeia os métodos select e where para criar um dataframe contendo as colunas ProductName e ListPrice para produtos nas categorias Mountain Bikes ou Road Bikes:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Os resultados desse exemplo de código seriam semelhantes a estes:
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 Black, 52 | 539,9900 |
... | ... |
Para agrupar e agregar dados, você pode usar o método groupBy e as funções de agregação. Por exemplo, o seguinte código PySpark conta o número de produtos de cada categoria:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Os resultados desse exemplo de código seriam semelhantes a estes:
Categoria | count |
---|---|
Fones de ouvido | 3 |
Rodas | 14 |
Mountain bikes | 32 |
... | ... |
Usando expressões SQL no Spark
A API de Dataframe faz parte de uma biblioteca do Spark chamada Spark SQL, a qual permite que analistas de dados usem expressões SQL para consultar e manipular dados.
Criando objetos de banco de dados no catálogo do Spark
O catálogo do Spark é um metastore para objetos de dados relacionais, como exibições e tabelas. O runtime do Spark pode usar o catálogo para integrar perfeitamente o código escrito em qualquer linguagem com suporte do Spark a expressões SQL que podem ser mais naturais para alguns analistas de dados ou desenvolvedores.
Uma das maneiras mais simples de disponibilizar dados em um dataframe para consulta no catálogo do Spark é criar uma exibição temporária, conforme mostrado no seguinte código de exemplo:
df.createOrReplaceTempView("products")
Uma exibição é temporária, o que significa que ela é excluída automaticamente no final da sessão atual. Você também pode criar tabelas que persistem no catálogo para definir um banco de dados que possa ser consultado usando o Spark SQL.
Observação
Não exploraremos as tabelas de catálogo do Spark a fundo neste módulo, mas aproveitaremos para realçar alguns pontos-chave:
- Você pode criar uma tabela vazia usando o método
spark.catalog.createTable
. Tabelas são estruturas de metadados que armazenam dados subjacentes no local de armazenamento associado ao catálogo. Excluir uma tabela também excluirá os dados subjacentes. - Você pode salvar um dataframe como uma tabela usando o método
saveAsTable
. - Você pode criar uma tabela externa usando o método
spark.catalog.createExternalTable
. As tabelas externas definem os metadados no catálogo, mas obtêm os dados subjacentes de um local de armazenamento externo; normalmente, de uma pasta em um data lake. Excluir uma tabela externa não excluirá os dados subjacentes.
Usando a API do Spark SQL para consultar dados
Você pode usar a API do Spark SQL em código escrito em qualquer linguagem para consultar dados no catálogo. Por exemplo, o código PySpark a seguir usa uma consulta SQL para retornar dados da exibição products como um dataframe.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Os resultados do exemplo de código seriam semelhantes a estes:
ProductID | ProductName | ListPrice |
---|---|---|
38 | Mountain-100 Silver, 38 | 3399.9900 |
52 | Road-750 Black, 52 | 539,9900 |
... | ... | ... |
Usando SQL código
O exemplo anterior demonstrou como usar a API do Spark SQL para inserir expressões SQL no código Spark. Em um notebook, você também pode usar o magic %%sql
para executar um código SQL que consulta objetos no catálogo, conforme exemplificado abaixo:
%%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
O exemplo de código SQL retorna um conjunto de resultados exibido automaticamente no notebook como uma tabela, por exemplo:
Categoria | ProductCount |
---|---|
Bretelles | 3 |
Racks de bicicleta | 1 |
Suportes de bicicleta | 1 |
... | ... |