Sdílet prostřednictvím


Návod: COPY INTO pomocí Spark SQL

Databricks doporučuje použít příkaz COPY INTO pro přírůstkové a hromadné načítání dat pro zdroje dat, které obsahují tisíce souborů. Databricks doporučuje používat Auto Loader pro pokročilé případy použití.

V tomto kurzu použijete příkaz COPY INTO k načtení dat z cloudového úložiště objektů do tabulky v pracovním prostoru Azure Databricks.

Požadavky

  1. Předplatné Azure, pracovní prostor Azure Databricks v daném předplatném a cluster v daném pracovním prostoru. Pokud je chcete vytvořit, projděte si rychlý start: Spuštění úlohy Sparku v pracovním prostoru Azure Databricks pomocí webu Azure Portal. Pokud postupujete podle tohoto rychlého startu, nemusíte postupovat podle pokynů v části Spuštění úlohy Spark SQL.
  2. Univerzální cluster ve vašem pracovním prostoru se spuštěným Databricks Runtime 11.3 LTS nebo novějším. Informace o vytvoření univerzálního clusteru najdete v tématu Referenční informace o konfiguraci výpočetních prostředků.
  3. Znalost uživatelského rozhraní pracovního prostoru Azure Databricks Viz Navigace v pracovním prostoru.
  4. Zkušenost s prací v poznámkových blocích Databricks .
  5. Umístění, do které můžete zapisovat data; tato ukázka používá jako příklad kořen DBFS, ale Databricks doporučuje externí umístění úložiště nakonfigurované pomocí katalogu Unity.

Krok 1. Konfigurace prostředí a vytvoření generátoru dat

V tomto kurzu se předpokládá základní znalost Azure Databricks a výchozí konfigurace pracovního prostoru. Pokud zadaný kód nemůžete spustit, obraťte se na správce pracovního prostoru a ujistěte se, že máte přístup k výpočetním prostředkům a umístění, do kterého můžete zapisovat data.

Všimněte si, že zadaný kód používá parametr source k určení umístění, které nakonfigurujete jako zdroj dat COPY INTO. Jak je uvedeno, tento kód odkazuje na umístění v kořenovém adresáři DBFS. Pokud máte oprávnění k zápisu do externího umístění úložiště objektů, nahraďte část řetězce zdroje dbfs:/ cestou k úložišti objektů. Vzhledem k tomu, že tento blok kódu také rekurzivním odstraněním resetuje tuto demonstraci, ujistěte se, že neukazujete na produkční data a že ponecháte vnořený adresář /user/{username}/copy-into-demo, abyste se vyhnuli přepsání ani odstranění existujících dat.

  1. Vytvořte nový poznámkový blok SQL a připojte ho ke clusteru, který běží na Databricks Runtime 11.3 LTS nebo novějším.

  2. Zkopírujte a spusťte následující kód pro resetování umístění úložiště a databáze použité v tomto kurzu:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Zkopírujte a spusťte následující kód a nakonfigurujte některé tabulky a funkce, které se použijí k náhodnému vygenerování dat:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Krok 2: Zápis ukázkových dat do cloudového úložiště

Zápis do jiných formátů dat než Delta Lake je v Azure Databricks vzácný. Zde uvedený kód zapisuje do FORMÁTU JSON a simuluje externí systém, který by mohl vypsat výsledky z jiného systému do úložiště objektů.

  1. Zkopírujte a spusťte následující kód pro zápis dávky nezpracovaných dat JSON:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Krok 3: Načtení idempotentních dat JSON pomocí COPY INTO

Než budete moct použít COPY INTO, musíte vytvořit cílovou tabulku Delta Lake. Ve verzi Databricks Runtime 11.3 LTS a vyšší nemusíte v příkazu CREATE TABLE zadávat nic jiného než název tabulky. Pro předchozí verze Databricks Runtime musíte při vytváření prázdné tabulky zadat schéma.

  1. Zkopírujte a spusťte následující kód, který vytvoří cílovou tabulku Delta a načte data ze zdroje:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Vzhledem k tomu, že tato akce je idempotentní, můžete ji spustit několikrát, ale data se načtou jenom jednou.

Krok 4: Zobrazení náhledu obsahu tabulky

Můžete spustit jednoduchý dotaz SQL a ručně zkontrolovat obsah této tabulky.

  1. Zkopírujte a spusťte následující kód, abyste si mohli zobrazit náhled tabulky:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Krok 5: Načtení dalších dat a náhledu výsledků

Kroky 2–4 můžete opakovaně spouštět, abyste do svého zdroje uložili nové dávky náhodných nezpracovaných dat JSON, které idempotentně načtete do Delta Lake pomocí COPY INTO, a zobrazit si tak výsledky. Zkuste tyto kroky provést mimo pořadí nebo několikrát, abyste simulovali zapisování více dávek nezpracovaných dat, nebo znovu spusťte COPY INTO opakovaně bez příchodu nových dat.

Krok 6: Vyčištění školení

Až budete s tímto kurzem hotovi, můžete související prostředky vyčistit, pokud je už nechcete zachovat.

  1. Zkopírujte a spusťte následující kód, který odstraní databázi, tabulky a odebere všechna data:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. Pokud chcete výpočetní prostředek zastavit, přejděte na kartu Clustery a ukončete svůj cluster.

Další zdroje informací