Freigeben über


Tutorial: COPY INTO mit Spark SQL

Von Databricks wird der Befehl COPY INTO für inkrementelles und Massenladen von Daten für Datenquellen, die mehrere tausend Dateien enthalten, empfohlen. Databricks empfiehlt, Autoloader für erweiterte Anwendungsfälle zu verwenden.

In diesem Tutorial verwenden Sie den Befehl COPY INTO, um Daten aus dem Cloudobjektspeicher in eine Tabelle in Ihrem Azure Databricks-Arbeitsbereich zu laden.

Anforderungen

  1. Ein Azure-Abonnement, ein Azure Databricks-Arbeitsbereich in diesem Abonnement und ein Cluster in diesem Arbeitsbereich. Eine Anleitung zum Erstellen dieser Komponenten finden Sie unter Schnellstart: Ausführen eines Spark-Auftrags im Azure Databricks-Arbeitsbereich über das Azure-Portal. Wenn Sie diese Schnellstart befolgen, müssen Sie die Anweisungen im Abschnitt Ausführen eines Spark SQL-Auftrags nicht ausführen.
  2. Ein universeller Cluster in Ihrem Arbeitsbereich, in dem Databricks Runtime 11.3 LTS oder höher ausgeführt wird. Informationen zum Erstellen eines Allzweck-Clusters finden Sie unter Computekonfigurationsreferenz.
  3. Vertrautheit mit der Benutzeroberfläche des Azure Databricks-Arbeitsbereichs. Informationen finden Sie unter Navigieren im Arbeitsbereich.
  4. Vertrautheit mit der Arbeit mit Databricks-Notebooks.
  5. Ein Speicherort, in den Sie Daten schreiben können. Diese Demo verwendet den DBFS-Stamm als Beispiel, doch von Databricks wird ein externer Speicherort empfohlen, der mit Unity Catalog konfiguriert wurde.

Schritt 1: Konfigurieren Ihrer Umgebung und Erstellen eines Daten-Generators

Dieses Tutorial setzt eine grundlegende Vertrautheit mit Azure Databricks und einer Standardkonfiguration des Arbeitsbereichs voraus. Wenn Sie den bereitgestellten Code nicht ausführen können, wenden Sie sich an Ihre*n Arbeitsbereichsadministrator*in, um sicherzustellen, dass Sie Zugriff auf Computeressourcen und einen Speicherort haben, in den Sie Daten schreiben können.

Beachten Sie, dass der bereitgestellte Code einen source-Parameter verwendet, um den Speicherort anzugeben, den Sie als Ihre COPY INTO-Datenquelle konfigurieren. Wie bereits beschrieben, verweist dieser Code auf einen Speicherort im DBFS-Stamm. Wenn Sie über Schreibberechtigungen für einen externen Objektspeicherort verfügen, ersetzen Sie den dbfs:/-Teil der Quellzeichenfolge durch den Pfad zu Ihrem Objektspeicher. Stellen Sie sicher, dass Sie damit nicht auf Produktionsdaten verweisen und das geschachtelte Verzeichnis /user/{username}/copy-into-demo beibehalten, um das Überschreiben oder Löschen vorhandener Daten zu vermeiden, da dieser Codeblock auch einen rekursiven Löschvorgang zum Zurücksetzen dieser Demo ausführt.

  1. Erstellen Sie ein neues SQL-Notebook und fügen Sie es an einen Cluster an, in dem Databricks Runtime 11.3 LTS oder höher ausgeführt wird.

  2. Kopieren Sie den folgenden Code, und führen Sie ihn aus, um den Speicherort und die Datenbank zurückzusetzen, die in diesem Tutorial verwendet werden:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Kopieren Sie den folgenden Code, und führen Sie ihn aus, um bestimmte Tabellen und Funktionen zu konfigurieren, die zum zufälligen Generieren von Daten verwendet werden:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Schritt 2: Schreiben der Stichprobendaten in den Cloud-Speicher

Das Schreiben in andere Datenformate als Delta Lake kommt in Azure Databricks selten vor. Der hier bereitgestellte Code schreibt in JSON und simuliert ein externes System, das Ergebnisse aus einem anderen System in den Objektspeicher abbilden kann.

  1. Kopieren Sie den folgenden Code, und führen Sie ihn aus, um einen Batch JSON-Rohdaten zu schreiben:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Schritt 3: Verwenden von „COPY INTO“ zum idempotenten Laden von JSON-Daten

Sie müssen eine Delta Lake-Zieltabelle erstellen, bevor Sie COPY INTO verwenden können. In Databricks Runtime 11.3 LTS und höher müssen Sie in Ihrer CREATE TABLE-Anweisung nur einen Tabellennamen angeben. Bei früheren Versionen von Databricks Runtime müssen Sie beim Erstellen einer leeren Tabelle ein Schema angeben.

  1. Kopieren Sie den folgenden Code, und führen Sie ihn aus, um Ihre Delta-Zieltabelle zu erstellen und Daten aus Ihrer Quelle zu laden:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Da diese Aktion idempotent ist, können Sie sie mehrmals ausführen, doch Daten werden nur einmal geladen.

Schritt 4: Anzeigen einer Vorschau des Inhalts Ihrer Tabelle

Sie können eine einfache SQL-Abfrage ausführen, um den Inhalt dieser Tabelle manuell zu überprüfen.

  1. Kopieren Sie den folgenden Code, und führen Sie ihn aus, um eine Vorschau Ihrer Tabelle anzuzeigen:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Schritt 5: Laden weiterer Daten und Anzeigen einer Vorschau der Ergebnisse

Sie können die Schritte 2–4 mehrmals wiederholen, um neue Batches zufälliger JSON-Rohdaten in Ihrer Quelle abzulegen, sie mit COPY INTO idempotent in Delta Lake zu laden und die Ergebnisse in einer Vorschau anzuzeigen. Führen Sie diese Schritte in einer anderen Reihenfolge oder mehrmals durch, um das Schreiben mehrerer Batches an Rohdaten zu simulieren, oder führen Sie COPY INTO wiederholt aus, ohne dass neue Daten eingetroffen sind.

Schritt 6: Bereinigungstutorial

Wenn Sie dieses Tutorial beendet haben, können Sie die zugehörigen Ressourcen bereinigen, falls Sie sie nicht behalten möchten.

  1. Kopieren Sie den folgenden Code, und führen Sie ihn aus, um die Datenbank und die Tabellen abzulegen und alle Daten zu entfernen:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. Um Ihre Computeressource zu beenden, wechseln Sie zur Registerkarte Cluster, und beenden Sie Ihren Cluster.

Zusätzliche Ressourcen