Tutoriel : COPY INTO avec Spark SQL
Databricks vous recommande d’utiliser la commande COPY INTO pour charger des données en bloc et de façon incrémentielle pour les sources de données contenant des milliers de fichiers. Databricks recommande d’utiliser le chargeur automatique pour les cas d’usage avancés.
Dans ce didacticiel, vous allez utiliser la commande COPY INTO
pour charger des données contenues dans un stockage d’objets cloud dans une table de votre espace de travail Azure Databricks.
Spécifications
- Un abonnement Azure, un espace de travail Azure Databricks dans de cet abonnement et un cluster au sein de cet espace de travail. Pour créer ceux-ci, consultez Démarrage rapide : Exécuter un travail Spark sur Azure Databricks Workspace à l’aide du portail Azure Si vous suivez ce de démarrage rapide, vous n’avez pas besoin de suivre les instructions fournies dans la section Exécuter un travail Spark SQL.
- Un cluster polyvalent dans votre espace de travail exécutant Databricks Runtime 11.3 LTS ou version ultérieure. Pour créer un cluster polyvalent, consultez Informations de référence sur la configuration de calcul.
- Bonne connaissance de l’interface utilisateur de l’espace de travail Azure Databricks. Consultez Naviguer dans l’espace de travail.
- Connaissance de l’utilisation des notebooks Databricks.
- Un emplacement dans lequel vous pouvez écrire des données ; cette démo utilise la racine DBFS comme exemple, mais Databricks recommande un emplacement de stockage externe configuré avec le catalogue Unity.
Étape 1. Configurer votre environnement et créer un générateur de données
Ce didacticiel suppose une connaissance de base d’Azure Databricks et une configuration d’espace de travail par défaut. Si vous ne parvenez pas à exécuter le code fourni, contactez l’administrateur de votre espace de travail pour vous assurer que vous avez accès aux ressources de calcul et à un emplacement dans lequel vous pouvez écrire des données.
Notez que le code fourni utilise un paramètre source
pour spécifier l’emplacement que vous allez configurer comme source de données COPY INTO
. Tel qu’il est écrit, ce code pointe vers un emplacement sur la racine DBFS. Si vous disposez d’autorisations d’écriture sur un emplacement de stockage d’objets externe, remplacez la partie dbfs:/
de la chaîne source par le chemin d’accès à votre stockage d’objets. Comme ce bloc de code effectue également une suppression récursive pour réinitialiser cette démo, veillez à ne pas pointer sur les données de production et à conserver le répertoire imbriqué /user/{username}/copy-into-demo
pour éviter de remplacer ou de supprimer des données existantes.
Créez un notebook SQL et attachez-le à un cluster exécutant Databricks Runtime 11.3 LTS ou version ultérieure.
Copiez et exécutez le code suivant pour réinitialiser l’emplacement de stockage et la base de données utilisés dans ce didacticiel :
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
Copiez et exécutez le code suivant pour configurer certaines tables et fonctions qui seront utilisées pour générer des données de façon aléatoire :
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
Étape 2 : Télécharger l’exemple de données vers le stockage cloud
L’écriture dans des formats de données autres que Delta Lake est rare sur Azure Databricks. Le code fourni ici écrit en JSON, simulant un système externe qui peut vider les résultats d’un autre système dans le stockage d’objets.
Copiez et exécutez le code suivant pour écrire un lot de données JSON brutes :
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
Étape 3 : Utiliser COPY INTO pour charger des données JSON de façon idempotente
Vous devez créer une table Delta Lake cible avant de pouvoir utiliser COPY INTO
. Dans Databricks Runtime 11.3 LTS et versions ultérieures, vous n’avez besoin de fournir rien d’autre qu’un nom de table dans votre instruction CREATE TABLE
. Pour les versions précédentes de Databricks Runtime, vous devez fournir un schéma lors de la création d’une table vide.
Copiez et exécutez le code suivant pour créer votre table Delta cible et charger des données à partir de votre source :
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
Étant donné que cette action est idempotente, vous pouvez l’exécuter plusieurs fois, mais les données ne seront chargées qu’une seule fois.
Étape 4 : Afficher un aperçu du contenu de votre table
Vous pouvez exécuter une requête SQL simple pour examiner manuellement le contenu de cette table.
Copiez et exécutez le code suivant pour afficher un aperçu de votre table :
-- Review updated table SELECT * FROM user_ping_target
Étape 5 : Charger d’autres données et prévisualiser les résultats
Vous pouvez réexécuter les étapes 2 à 4 plusieurs fois pour enrichir votre source de nouveaux lots de données JSON brutes aléatoires, les charger de manière idempotente sur Delta Lake avec COPY INTO
, et afficher un aperçu des résultats. Essayez d’exécuter ces étapes dans le désordre ou plusieurs fois pour simuler plusieurs lots de données brutes en cours d’écriture, ou d’exécuter COPY INTO
plusieurs fois sans que de nouvelles données n’arrivent.
Étape 6 : Nettoyer le didacticiel
Une fois que vous avez terminé ce didacticiel, vous pouvez nettoyer les ressources associées si vous ne souhaitez plus les conserver.
Copiez et exécutez le code suivant pour supprimer la base de données, les tables et supprimer toutes les données associées :
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
Pour arrêter votre ressource de calcul, accédez à l’onglet Clusters et cliquez sur Terminer pour mettre fin à votre cluster.
Ressources supplémentaires
- Article de référence pour COPY INTO