Partager via


Tutoriel : COPY INTO avec Spark SQL

Databricks vous recommande d’utiliser la commande COPY INTO pour charger des données en bloc et de façon incrémentielle pour les sources de données contenant des milliers de fichiers. Databricks recommande d’utiliser le chargeur automatique pour les cas d’usage avancés.

Dans ce didacticiel, vous allez utiliser la commande COPY INTO pour charger des données contenues dans un stockage d’objets cloud dans une table de votre espace de travail Azure Databricks.

Spécifications

  1. Un abonnement Azure, un espace de travail Azure Databricks dans de cet abonnement et un cluster au sein de cet espace de travail. Pour créer ceux-ci, consultez Démarrage rapide : Exécuter un travail Spark sur Azure Databricks Workspace à l’aide du portail Azure Si vous suivez ce de démarrage rapide, vous n’avez pas besoin de suivre les instructions fournies dans la section Exécuter un travail Spark SQL.
  2. Un cluster polyvalent dans votre espace de travail exécutant Databricks Runtime 11.3 LTS ou version ultérieure. Pour créer un cluster polyvalent, consultez Informations de référence sur la configuration de calcul.
  3. Bonne connaissance de l’interface utilisateur de l’espace de travail Azure Databricks. Consultez Naviguer dans l’espace de travail.
  4. Connaissance de l’utilisation des notebooks Databricks.
  5. Un emplacement dans lequel vous pouvez écrire des données ; cette démo utilise la racine DBFS comme exemple, mais Databricks recommande un emplacement de stockage externe configuré avec le catalogue Unity.

Étape 1. Configurer votre environnement et créer un générateur de données

Ce didacticiel suppose une connaissance de base d’Azure Databricks et une configuration d’espace de travail par défaut. Si vous ne parvenez pas à exécuter le code fourni, contactez l’administrateur de votre espace de travail pour vous assurer que vous avez accès aux ressources de calcul et à un emplacement dans lequel vous pouvez écrire des données.

Notez que le code fourni utilise un paramètre source pour spécifier l’emplacement que vous allez configurer comme source de données COPY INTO. Tel qu’il est écrit, ce code pointe vers un emplacement sur la racine DBFS. Si vous disposez d’autorisations d’écriture sur un emplacement de stockage d’objets externe, remplacez la partie dbfs:/ de la chaîne source par le chemin d’accès à votre stockage d’objets. Comme ce bloc de code effectue également une suppression récursive pour réinitialiser cette démo, veillez à ne pas pointer sur les données de production et à conserver le répertoire imbriqué /user/{username}/copy-into-demo pour éviter de remplacer ou de supprimer des données existantes.

  1. Créez un notebook SQL et attachez-le à un cluster exécutant Databricks Runtime 11.3 LTS ou version ultérieure.

  2. Copiez et exécutez le code suivant pour réinitialiser l’emplacement de stockage et la base de données utilisés dans ce didacticiel :

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Copiez et exécutez le code suivant pour configurer certaines tables et fonctions qui seront utilisées pour générer des données de façon aléatoire :

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Étape 2 : Télécharger l’exemple de données vers le stockage cloud

L’écriture dans des formats de données autres que Delta Lake est rare sur Azure Databricks. Le code fourni ici écrit en JSON, simulant un système externe qui peut vider les résultats d’un autre système dans le stockage d’objets.

  1. Copiez et exécutez le code suivant pour écrire un lot de données JSON brutes :

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Étape 3 : Utiliser COPY INTO pour charger des données JSON de façon idempotente

Vous devez créer une table Delta Lake cible avant de pouvoir utiliser COPY INTO. Dans Databricks Runtime 11.3 LTS et versions ultérieures, vous n’avez besoin de fournir rien d’autre qu’un nom de table dans votre instruction CREATE TABLE. Pour les versions précédentes de Databricks Runtime, vous devez fournir un schéma lors de la création d’une table vide.

  1. Copiez et exécutez le code suivant pour créer votre table Delta cible et charger des données à partir de votre source :

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Étant donné que cette action est idempotente, vous pouvez l’exécuter plusieurs fois, mais les données ne seront chargées qu’une seule fois.

Étape 4 : Afficher un aperçu du contenu de votre table

Vous pouvez exécuter une requête SQL simple pour examiner manuellement le contenu de cette table.

  1. Copiez et exécutez le code suivant pour afficher un aperçu de votre table :

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Étape 5 : Charger d’autres données et prévisualiser les résultats

Vous pouvez réexécuter les étapes 2 à 4 plusieurs fois pour enrichir votre source de nouveaux lots de données JSON brutes aléatoires, les charger de manière idempotente sur Delta Lake avec COPY INTO, et afficher un aperçu des résultats. Essayez d’exécuter ces étapes dans le désordre ou plusieurs fois pour simuler plusieurs lots de données brutes en cours d’écriture, ou d’exécuter COPY INTO plusieurs fois sans que de nouvelles données n’arrivent.

Étape 6 : Nettoyer le didacticiel

Une fois que vous avez terminé ce didacticiel, vous pouvez nettoyer les ressources associées si vous ne souhaitez plus les conserver.

  1. Copiez et exécutez le code suivant pour supprimer la base de données, les tables et supprimer toutes les données associées :

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. Pour arrêter votre ressource de calcul, accédez à l’onglet Clusters et cliquez sur Terminer pour mettre fin à votre cluster.

Ressources supplémentaires