Tutorial: COPY INTO com o Spark SQL
O Databricks recomenda que você use o comando COPY INTO para carregamento de dados incrementais e em massa para fontes de dados que contêm milhares de arquivos. O Databricks recomenda que você use o Carregador Automático para casos de uso avançados.
Neste tutorial, você usa o comando COPY INTO
para carregar dados do armazenamento de objetos em nuvem em uma tabela no workspace do Azure Databricks.
Requisitos
- Uma assinatura do Azure, um workspace do Azure Databricks nessa assinatura e um cluster nesse espaço de trabalho. Para criar esses dois, confira Início Rápido: executar um trabalho do Spark no Workspace do Azure Databricks usando o portal do Azure. Se você seguir este guia de início rápido, não precisa seguir as instruções na seção Executar um trabalho do Spark SQL.
- Um cluster para todos os fins em seu espaço de trabalho executando o Databricks Runtime 11.3 LTS ou superior. Para criar um cluster para todas as finalidades, consulte Referência de configuração de computação.
- Familiaridade com a interface do usuário do workspace do Azure Databricks. Consulte Navegar no workspace.
- Familiaridade ao trabalhar com notebooks do Databricks.
- Um local em que você pode gravar dados; essa demonstração usa a raiz DBFS como exemplo, entretanto, o Databricks recomenda um local de armazenamento externo configurado com o Catálogo do Unity.
Etapa 1. Configurar seu ambiente e criar um gerador de dados
Este tutorial pressupõe uma familiaridade básica com o Azure Databricks e uma configuração de espaço de trabalho padrão. Se não for possível executar o código fornecido, entre em contato com o administrador do espaço de trabalho para garantir que você tenha acesso aos recursos de computação e a um local para o qual você pode gravar dados.
Observe que o código fornecido usa um parâmetro source
para especificar o local que você configurará como sua fonte de dados COPY INTO
. Conforme escrito, esse código aponta para um local na raiz DBFS. Se você tiver permissões de gravação em um local de armazenamento de objeto externo, substitua a parte dbfs:/
da cadeia de caracteres de origem pelo caminho para o armazenamento de objetos. Como esse bloco de código também faz uma exclusão recursiva para redefinir essa demonstração, certifique-se de que você não aponte isso para os dados de produção e mantenha o diretório /user/{username}/copy-into-demo
aninhado para evitar substituir ou excluir dados existentes.
Criar um novo notebook SQL e anexá-lo a um cluster que esteja executando o Databricks Runtime 11.3 LTS ou superior.
Copie e execute o seguinte código para redefinir o local de armazenamento e o banco de dados usados neste tutorial:
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
Copie e execute o seguinte código para configurar algumas tabelas e funções que serão usadas para gerar dados aleatoriamente:
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
Etapa 2: gravar os dados de exemplo no armazenamento em nuvem
Gravar em formatos de dados diferentes do Delta Lake é raro no Azure Databricks. O código fornecido aqui grava no JSON, simulando um sistema externo que pode despejar resultados de outro sistema no armazenamento de objetos.
Copie e execute o seguinte código para gravar um lote de dados JSON brutos:
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
Etapa 3: usar o COPY INTO para carregar dados JSON de forma idempotente
Você deve criar uma tabela Delta Lake de destino antes de poder usar COPY INTO
. No Databricks Runtime 11.3 LTS e superior, não é necessário fornecer nada além de um nome de tabela na instrução CREATE TABLE
. Para versões anteriores do Databricks Runtime, você deve fornecer um esquema ao criar uma tabela vazia.
Copie e execute o seguinte código para criar a tabela Delta de destino e carregar dados de sua origem:
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
Como essa ação é idempotente, você pode executá-la várias vezes, mas os dados serão carregados apenas uma vez.
Etapa 4: visualizar o conteúdo da tabela
Você pode executar uma consulta de SQL simples para examinar manualmente o conteúdo desta tabela.
Copie e execute o seguinte código para visualizar sua tabela:
-- Review updated table SELECT * FROM user_ping_target
Etapa 5: carregar mais dados e visualizar resultados
Você pode executar novamente as etapas de 2 a 4 várias vezes para obter novos lotes de dados JSON brutos aleatórios em sua origem, carregá-los de forma idempotente no Delta Lake com COPY INTO
e visualizar os resultados. Tente executar essas etapas fora de ordem ou várias vezes para simular vários lotes de dados brutos sendo gravados ou executados COPY INTO
várias vezes sem que novos dados tenham chegado.
Etapa 6: tutorial de limpeza
Ao concluir este tutorial, você poderá limpar os recursos associados, caso não os queira mais.
Copie e execute o seguinte código para remover o banco de dados, tabelas e todos os dados:
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
Para interromper o recurso de computação, acesse a guia Clusters e Encerre seu cluster.
Recursos adicionais
- O artigo de referência do COPY INTO