Návod: COPY INTO pomocí Spark SQL
Databricks doporučuje použít příkaz COPY INTO pro přírůstkové a hromadné načítání dat pro zdroje dat, které obsahují tisíce souborů. Databricks doporučuje používat Auto Loader pro pokročilé případy použití.
V tomto kurzu použijete příkaz COPY INTO
k načtení dat z cloudového úložiště objektů do tabulky v pracovním prostoru Azure Databricks.
Požadavky
- Předplatné Azure, pracovní prostor Azure Databricks v daném předplatném a cluster v daném pracovním prostoru. Pokud je chcete vytvořit, projděte si rychlý start: Spuštění úlohy Sparku v pracovním prostoru Azure Databricks pomocí webu Azure Portal. Pokud postupujete podle tohoto rychlého startu, nemusíte postupovat podle pokynů v části Spuštění úlohy Spark SQL.
- Univerzální cluster ve vašem pracovním prostoru se spuštěným Databricks Runtime 11.3 LTS nebo novějším. Informace o vytvoření univerzálního clusteru najdete v tématu Referenční informace o konfiguraci výpočetních prostředků.
- Znalost uživatelského rozhraní pracovního prostoru Azure Databricks Viz Navigace v pracovním prostoru.
- Zkušenost s prací v poznámkových blocích Databricks .
- Umístění, do které můžete zapisovat data; tato ukázka používá jako příklad kořen DBFS, ale Databricks doporučuje externí umístění úložiště nakonfigurované pomocí katalogu Unity.
Krok 1. Konfigurace prostředí a vytvoření generátoru dat
V tomto kurzu se předpokládá základní znalost Azure Databricks a výchozí konfigurace pracovního prostoru. Pokud zadaný kód nemůžete spustit, obraťte se na správce pracovního prostoru a ujistěte se, že máte přístup k výpočetním prostředkům a umístění, do kterého můžete zapisovat data.
Všimněte si, že zadaný kód používá parametr source
k určení umístění, které nakonfigurujete jako zdroj dat COPY INTO
. Jak je uvedeno, tento kód odkazuje na umístění v kořenovém adresáři DBFS. Pokud máte oprávnění k zápisu do externího umístění úložiště objektů, nahraďte část řetězce zdroje dbfs:/
cestou k úložišti objektů. Vzhledem k tomu, že tento blok kódu také rekurzivním odstraněním resetuje tuto demonstraci, ujistěte se, že neukazujete na produkční data a že ponecháte vnořený adresář /user/{username}/copy-into-demo
, abyste se vyhnuli přepsání ani odstranění existujících dat.
Vytvořte nový poznámkový blok SQL a připojte ho ke clusteru, který běží na Databricks Runtime 11.3 LTS nebo novějším.
Zkopírujte a spusťte následující kód pro resetování umístění úložiště a databáze použité v tomto kurzu:
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
Zkopírujte a spusťte následující kód a nakonfigurujte některé tabulky a funkce, které se použijí k náhodnému vygenerování dat:
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
Krok 2: Zápis ukázkových dat do cloudového úložiště
Zápis do jiných formátů dat než Delta Lake je v Azure Databricks vzácný. Zde uvedený kód zapisuje do FORMÁTU JSON a simuluje externí systém, který by mohl vypsat výsledky z jiného systému do úložiště objektů.
Zkopírujte a spusťte následující kód pro zápis dávky nezpracovaných dat JSON:
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
Krok 3: Načtení idempotentních dat JSON pomocí COPY INTO
Než budete moct použít COPY INTO
, musíte vytvořit cílovou tabulku Delta Lake. Ve verzi Databricks Runtime 11.3 LTS a vyšší nemusíte v příkazu CREATE TABLE
zadávat nic jiného než název tabulky. Pro předchozí verze Databricks Runtime musíte při vytváření prázdné tabulky zadat schéma.
Zkopírujte a spusťte následující kód, který vytvoří cílovou tabulku Delta a načte data ze zdroje:
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
Vzhledem k tomu, že tato akce je idempotentní, můžete ji spustit několikrát, ale data se načtou jenom jednou.
Krok 4: Zobrazení náhledu obsahu tabulky
Můžete spustit jednoduchý dotaz SQL a ručně zkontrolovat obsah této tabulky.
Zkopírujte a spusťte následující kód, abyste si mohli zobrazit náhled tabulky:
-- Review updated table SELECT * FROM user_ping_target
Krok 5: Načtení dalších dat a náhledu výsledků
Kroky 2–4 můžete opakovaně spouštět, abyste do svého zdroje uložili nové dávky náhodných nezpracovaných dat JSON, které idempotentně načtete do Delta Lake pomocí COPY INTO
, a zobrazit si tak výsledky. Zkuste tyto kroky provést mimo pořadí nebo několikrát, abyste simulovali zapisování více dávek nezpracovaných dat, nebo znovu spusťte COPY INTO
opakovaně bez příchodu nových dat.
Krok 6: Vyčištění školení
Až budete s tímto kurzem hotovi, můžete související prostředky vyčistit, pokud je už nechcete zachovat.
Zkopírujte a spusťte následující kód, který odstraní databázi, tabulky a odebere všechna data:
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
Pokud chcete výpočetní prostředek zastavit, přejděte na kartu Clustery a ukončete svůj cluster.
Další zdroje informací
- Referenční článek COPY INTO