Delen via


Zelfstudie: COPY INTO met Spark SQL

Databricks raadt u aan de COPY INTO opdracht te gebruiken voor incrementeel en bulksgewijs laden van gegevensbronnen die duizenden bestanden bevatten. Databricks raadt u aan AutoLoader te gebruiken voor geavanceerde gebruiksvoorbeelden.

In deze zelfstudie gebruikt u de opdracht COPY INTO om gegevens uit cloudobjectopslag te laden in een table in uw Azure Databricks-werkruimte.

Eisen

  1. Een Azure-abonnement, een Azure Databricks-werkruimte in dat abonnement en een cluster in die werkruimte. Zie Quickstart: Om deze te maken, een Spark-taak uitvoeren in Azure Databricks Workspace met behulp van Azure Portal. Als u deze quickstart volgt, hoeft u de instructies niet te volgen in de sectie Een Spark SQL-taak uitvoeren.
  2. Een all-purpose cluster in uw werkruimte met Databricks Runtime 11.3 LTS of hoger. Zie Referentiemateriaal voor compute-configuratieom een cluster voor alle doeleinden te maken.
  3. Bekendheid met de gebruikersinterface van de Azure Databricks-werkruimte. Zie Navigeren in de werkruimte.
  4. Bekendheid met het werken met Databricks-notebooks.
  5. Een locatie waarnaar u gegevens kunt schrijven; in deze demo wordt de DBFS-hoofdmap als voorbeeld gebruikt, maar Databricks raadt een externe opslaglocatie aan die is geconfigureerd met Unity Catalog.

Stap 1. Uw omgeving configureren en een gegevensgenerator maken

In deze zelfstudie wordt ervan uitgegaan dat u basiskennis hebt van Azure Databricks en een standaardconfiguratie voor werkruimten. Als u de opgegeven code niet kunt uitvoeren, neemt u contact op met uw werkruimtebeheerder om ervoor te zorgen dat u toegang hebt tot rekenresources en een locatie waarnaar u gegevens kunt schrijven.

Houd er rekening mee dat voor de opgegeven code een source parameter wordt gebruikt om de locatie op te geven die u configureert als uw COPY INTO gegevensbron. Zoals geschreven, verwijst deze code naar een locatie in de DBFS-hoofdmap. Als u schrijfmachtigingen hebt voor een opslaglocatie voor externe objecten, vervangt u het dbfs:/ gedeelte van de brontekenreeks door het pad naar de objectopslag. Omdat dit codeblok tijdens deze demo ook recursief verwijdert naar reset, moet u ervoor zorgen dat het niet naar productiegegevens wijst en dat u de geneste directory /user/{username}/copy-into-demo behoudt om te voorkomen dat bestaande gegevens worden overschreven of verwijderd.

  1. maak een nieuw SQL-notebook en deze koppelen aan een cluster waarop Databricks Runtime 11.3 LTS of hoger wordt uitgevoerd.

  2. Kopieer en voer de volgende code uit om de opslaglocatie en de database die in deze handleiding worden gebruikt te reset:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Kopieer en voer de volgende code uit om een aantal tables en functies te configureren die worden gebruikt om willekeurig gegevens te generate:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Stap 2: De voorbeeldgegevens naar cloudopslag schrijven

Schrijven naar andere gegevensindelingen dan Delta Lake is zeldzaam in Azure Databricks. De code die hier wordt opgegeven, schrijft naar JSON en simuleert een extern systeem dat mogelijk resultaten van een ander systeem in objectopslag dumpt.

  1. Kopieer en voer de volgende code uit om een batch onbewerkte JSON-gegevens te schrijven:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Stap 3: Gebruik COPY INTO om JSON-gegevens idempotent te laden

U moet een Delta Lake-doel voor table maken voordat u COPY INTOkunt gebruiken. In Databricks Runtime 11.3 LTS en hoger hoeft u niets anders op te geven dan een table naam in uw CREATE TABLE-instructie. Voor eerdere versies van Databricks Runtime moet u een schema opgeven tijdens het aanmaken van een lege table.

  1. Kopieer en voer de volgende code uit om uw doel-Delta-table te maken en gegevens uit uw bron te laden:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Omdat deze actie idempotent is, kunt u deze meerdere keren uitvoeren, maar gegevens worden slechts eenmaal geladen.

Stap 4: Een voorbeeld van de inhoud van uw table

U kunt een eenvoudige SQL-query uitvoeren om de inhoud van deze tablehandmatig te controleren.

  1. Kopieer en voer de volgende code uit om een voorbeeld van uw tablete bekijken:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Stap 5: Meer gegevens laden en voorbeeldresultaten bekijken

U kunt stappen 2-4 vaak opnieuw uitvoeren om nieuwe batches met willekeurige onbewerkte JSON-gegevens in uw bron te landen, ze idempotent laden in Delta Lake met COPY INTOen een voorbeeld van de resultaten bekijken. Probeer deze stappen buiten volgorde of meerdere keren uit te voeren om te simuleren dat er meerdere batches onbewerkte gegevens worden beschreven, of om COPY INTO meerdere keren uit te voeren zonder dat er nieuwe gegevens having aangekomen zijn.

Stap 6: Opschonen van de handleiding

Wanneer u klaar bent met deze zelfstudie, kunt u de bijbehorende resources opschonen als u ze niet meer wilt behouden.

  1. Kopieer en voer de volgende code uit om de database, tablesen remove alle gegevens te verwijderen:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. Als u de rekenresource wilt stoppen, gaat u naar het tabblad Clusters en beƫindig uw cluster.

Aanvullende informatiebronnen