Zelfstudie: COPY INTO met Spark SQL
Databricks raadt u aan de COPY INTO opdracht te gebruiken voor incrementeel en bulksgewijs laden van gegevensbronnen die duizenden bestanden bevatten. Databricks raadt u aan AutoLoader te gebruiken voor geavanceerde gebruiksvoorbeelden.
In deze zelfstudie gebruikt u de opdracht COPY INTO
om gegevens uit cloudobjectopslag te laden in een table in uw Azure Databricks-werkruimte.
Eisen
- Een Azure-abonnement, een Azure Databricks-werkruimte in dat abonnement en een cluster in die werkruimte. Zie Quickstart: Om deze te maken, een Spark-taak uitvoeren in Azure Databricks Workspace met behulp van Azure Portal. Als u deze quickstart volgt, hoeft u de instructies niet te volgen in de sectie Een Spark SQL-taak uitvoeren.
- Een all-purpose cluster in uw werkruimte met Databricks Runtime 11.3 LTS of hoger. Zie Referentiemateriaal voor compute-configuratieom een cluster voor alle doeleinden te maken.
- Bekendheid met de gebruikersinterface van de Azure Databricks-werkruimte. Zie Navigeren in de werkruimte.
- Bekendheid met het werken met Databricks-notebooks.
- Een locatie waarnaar u gegevens kunt schrijven; in deze demo wordt de DBFS-hoofdmap als voorbeeld gebruikt, maar Databricks raadt een externe opslaglocatie aan die is geconfigureerd met Unity Catalog.
Stap 1. Uw omgeving configureren en een gegevensgenerator maken
In deze zelfstudie wordt ervan uitgegaan dat u basiskennis hebt van Azure Databricks en een standaardconfiguratie voor werkruimten. Als u de opgegeven code niet kunt uitvoeren, neemt u contact op met uw werkruimtebeheerder om ervoor te zorgen dat u toegang hebt tot rekenresources en een locatie waarnaar u gegevens kunt schrijven.
Houd er rekening mee dat voor de opgegeven code een source
parameter wordt gebruikt om de locatie op te geven die u configureert als uw COPY INTO
gegevensbron. Zoals geschreven, verwijst deze code naar een locatie in de DBFS-hoofdmap. Als u schrijfmachtigingen hebt voor een opslaglocatie voor externe objecten, vervangt u het dbfs:/
gedeelte van de brontekenreeks door het pad naar de objectopslag. Omdat dit codeblok tijdens deze demo ook recursief verwijdert naar reset, moet u ervoor zorgen dat het niet naar productiegegevens wijst en dat u de geneste directory /user/{username}/copy-into-demo
behoudt om te voorkomen dat bestaande gegevens worden overschreven of verwijderd.
maak een nieuw SQL-notebook en deze koppelen aan een cluster waarop Databricks Runtime 11.3 LTS of hoger wordt uitgevoerd.
Kopieer en voer de volgende code uit om de opslaglocatie en de database die in deze handleiding worden gebruikt te reset:
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
Kopieer en voer de volgende code uit om een aantal tables en functies te configureren die worden gebruikt om willekeurig gegevens te generate:
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
Stap 2: De voorbeeldgegevens naar cloudopslag schrijven
Schrijven naar andere gegevensindelingen dan Delta Lake is zeldzaam in Azure Databricks. De code die hier wordt opgegeven, schrijft naar JSON en simuleert een extern systeem dat mogelijk resultaten van een ander systeem in objectopslag dumpt.
Kopieer en voer de volgende code uit om een batch onbewerkte JSON-gegevens te schrijven:
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
Stap 3: Gebruik COPY INTO om JSON-gegevens idempotent te laden
U moet een Delta Lake-doel voor table maken voordat u COPY INTO
kunt gebruiken. In Databricks Runtime 11.3 LTS en hoger hoeft u niets anders op te geven dan een table naam in uw CREATE TABLE
-instructie. Voor eerdere versies van Databricks Runtime moet u een schema opgeven tijdens het aanmaken van een lege table.
Kopieer en voer de volgende code uit om uw doel-Delta-table te maken en gegevens uit uw bron te laden:
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
Omdat deze actie idempotent is, kunt u deze meerdere keren uitvoeren, maar gegevens worden slechts eenmaal geladen.
Stap 4: Een voorbeeld van de inhoud van uw table
U kunt een eenvoudige SQL-query uitvoeren om de inhoud van deze tablehandmatig te controleren.
Kopieer en voer de volgende code uit om een voorbeeld van uw tablete bekijken:
-- Review updated table SELECT * FROM user_ping_target
Stap 5: Meer gegevens laden en voorbeeldresultaten bekijken
U kunt stappen 2-4 vaak opnieuw uitvoeren om nieuwe batches met willekeurige onbewerkte JSON-gegevens in uw bron te landen, ze idempotent laden in Delta Lake met COPY INTO
en een voorbeeld van de resultaten bekijken. Probeer deze stappen buiten volgorde of meerdere keren uit te voeren om te simuleren dat er meerdere batches onbewerkte gegevens worden beschreven, of om COPY INTO
meerdere keren uit te voeren zonder dat er nieuwe gegevens having aangekomen zijn.
Stap 6: Opschonen van de handleiding
Wanneer u klaar bent met deze zelfstudie, kunt u de bijbehorende resources opschonen als u ze niet meer wilt behouden.
Kopieer en voer de volgende code uit om de database, tablesen remove alle gegevens te verwijderen:
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
Als u de rekenresource wilt stoppen, gaat u naar het tabblad Clusters en beƫindig uw cluster.
Aanvullende informatiebronnen
- Het COPY INTO referentieartikel