Dela via


Självstudie: COPY INTO med Spark SQL

Databricks rekommenderar att du använder kommandot COPY INTO för inkrementell och massinläsning av datakällor som innehåller tusentals filer. Databricks rekommenderar att du använder Auto Loader för avancerade användningsfall.

I den här självstudien använder du kommandot COPY INTO för att läsa in data från molnobjektlagring till en table på din Azure Databricks-arbetsyta.

Krav

  1. En Azure-prenumeration, en Azure Databricks-arbetsyta i den prenumerationen och ett kluster på den arbetsytan. Information om hur du skapar dessa finns i Snabbstart: Kör ett Spark-jobb på Azure Databricks-arbetsytan med azure-portalen. Om du följer den här snabbstarten behöver du inte följa anvisningarna i avsnittet Kör ett Spark SQL-jobb.
  2. Ett mångsidigt kluster på arbetsytan som kör Databricks Runtime 11.3 LTS eller nyare. Information om hur du skapar ett kluster för alla syften finns i Referens för beräkningskonfiguration.
  3. Kunskaper om användargränssnittet för Azure Databricks-arbetsytan. Se Navigera på arbetsytan.
  4. Kunskaper om att arbeta med Databricks-anteckningsböcker.
  5. En plats som du kan skriva data till. Den här demonstrationen använder DBFS-roten som exempel, men Databricks rekommenderar en extern lagringsplats som konfigurerats med Unity Catalog.

Steg 1. Konfigurera din miljö och skapa en datagenerator

Den här självstudien förutsätter grundläggande kunskaper om Azure Databricks och en standardkonfiguration av arbetsytor. Om du inte kan köra den angivna koden kontaktar du arbetsyteadministratören för att se till att du har åtkomst till beräkningsresurser och en plats där du kan skriva data.

Observera att den angivna koden använder en source parameter för att ange den plats som du ska konfigurera som din COPY INTO datakälla. Som det är skrivet pekar den här koden på en plats på DBFS-roten. Om du har skrivbehörighet på en lagringsplats för externa objekt ersätter du den dbfs:/ delen av källsträngen med sökvägen till objektlagringen. Eftersom det här kodblocket också utför en rekursiv borttagning för reset i denna demonstration, ska du se till att du inte använder detta på produktionsdata och att du behåller den undantagna katalogen /user/{username}/copy-into-demo för att undvika att skriva över eller ta bort befintliga data.

  1. Skapa en ny SQL Notebook- och koppla den till ett kluster som kör Databricks Runtime 11.3 LTS eller senare.

  2. Kopiera och kör följande kod för att reset lagringsplatsen och databasen som används i den här självstudien:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Kopiera och kör följande kod för att konfigurera några tables och funktioner som ska användas för att slumpmässigt använda generate data.

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Steg 2: Skriva exempeldata till molnlagring

Det är ovanligt att skriva till andra dataformat än Delta Lake i Azure Databricks. Koden som anges här skriver till JSON och simulerar ett externt system som kan dumpa resultat från ett annat system till objektlagring.

  1. Kopiera och kör följande kod för att skriva en batch med råa JSON-data:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Steg 3: Använd COPY INTO för att läsa in JSON-data utan att resultatet förändras vid upprepade inläsningar.

Du måste skapa ett Delta Lake-mål table innan du kan använda COPY INTO. I Databricks Runtime 11.3 LTS och senare behöver du inte ange något annat än ett table namn i din CREATE TABLE-instruktion. För tidigare versioner av Databricks Runtime måste du ange en schema när du skapar en tom table.

  1. Kopiera och kör följande kod för att skapa din måldeltatabell table och läsa in data från källan:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Eftersom den här åtgärden är idempotent kan du köra den flera gånger, men data läses bara in en gång.

Steg 4: Förhandsgranska innehållet i din table

Du kan köra en enkel SQL-fråga för att manuellt granska innehållet i den här table.

  1. Kopiera och kör följande kod för att förhandsgranska din table:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Steg 5: Läs in mer data och förhandsgranska resultat

Du kan köra om steg 2–4 många gånger för att införskaffa nya batchar av slumpmässig rå JSON-data i källan, läsa in dem på ett förutsägbart sätt till Delta Lake med COPY INTOoch förhandsgranska resultaten. Prova att köra de här stegen i oordning eller flera gånger för att simulera flera batcher av rådata som skrivs, eller att köra COPY INTO flera gånger utan att ny data having har anlänt.

Steg 6: Rensa handledning

När du är klar med den här handledningen kan du rensa de associerade resurserna om du inte längre vill behålla dem.

  1. Kopiera och kör följande kod för att ta bort databasen samt alla tablesoch remove data.

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. Om du vill stoppa beräkningsresursen går du till fliken kluster och Avsluta klustret.

Ytterligare resurser