Transformar dados com o SQL

Concluído

A biblioteca SparkSQL, que fornece a estrutura do dataframe, também permite que você use o SQL como uma forma de trabalhar com os dados. Com essa abordagem, você pode consultar e transformar dados em dataframes usando consultas SQL e persistir os resultados como tabelas.

Observação

As tabelas são abstrações de metadados em arquivos. Os dados não são armazenados em uma tabela relacional, mas a tabela fornece uma camada relacional em arquivos no data lake.

Definir tabelas e exibições

As definições de tabela do Spark são armazenadas no metastore, uma camada de metadados que encapsula abstrações relacionais em arquivos. As tabelas externas são tabelas relacionais no metastore que referenciam arquivos em um local do data lake especificado por você. Acesse esses dados consultando a tabela ou lendo os arquivos diretamente do data lake.

Observação

As tabelas externas têm "associação flexível" aos arquivos subjacentes, e a exclusão da tabela não exclui os arquivos. Isso permite que você use o Spark para fazer o trabalho pesado da transformação e, depois, persistir os dados no lake. Depois de fazer isso, você poderá remover a tabela, e os processos downstream poderão acessar essas estruturas otimizadas. Defina também tabelas gerenciadas, para as quais os arquivos de dados subjacentes são armazenados em um local de armazenamento gerenciado internamente associado ao metastore.metastore. As tabelas gerenciadas têm "associação rígida" em relação aos arquivos, e a remoção de uma tabela gerenciada exclui os arquivos associados.

O exemplo de código a seguir salva um dataframe (carregado de arquivos CSV) como uma tabela externa chamada sales_orders. Os arquivos são armazenados na pasta /sales_orders_table no data lake.

order_details.write.saveAsTable('sales_orders', format='parquet', mode='overwrite', path='/sales_orders_table')

Usar o SQL para consultar e transformar os dados

Depois de definir uma tabela, você poderá usar o SQL para consultar e transformar os dados. O código a seguir cria duas colunas derivadas chamadas Year e Month e cria uma tabela transformed_orders com as novas colunas derivadas adicionadas.

# Create derived columns
sql_transform = spark.sql("SELECT *, YEAR(OrderDate) AS Year, MONTH(OrderDate) AS Month FROM sales_orders")

# Save the results
sql_transform.write.partitionBy("Year","Month").saveAsTable('transformed_orders', format='parquet', mode='overwrite', path='/transformed_orders_table')

Os arquivos de dados da nova tabela são armazenados em uma hierarquia de pastas com o formato Year=*NNNN* / Month=*N*, com cada pasta contendo um arquivo Parquet para os pedidos correspondentes por ano e mês.

Consultar o metastore

Como essa nova tabela foi criada no metastore, você pode usar o SQL para consultá-la diretamente com a chave magic %%sql na primeira linha para indicar que a sintaxe SQL será usada conforme mostrado no seguinte script:

%%sql

SELECT * FROM transformed_orders
WHERE Year = 2021
    AND Month = 1

Remover tabelas

Ao trabalhar com tabelas externas, você pode usar o comando DROP para excluir as definições de tabela do metastore sem afetar os arquivos do data lake. Essa abordagem permite que você limpe o metastore depois de usar o SQL para transformar os dados, disponibilizando os arquivos de dados transformados para processos de análise e ingestão de dados downstream.

%%sql

DROP TABLE transformed_orders;
DROP TABLE sales_orders;