Udostępnij za pośrednictwem


Samouczek: COPY INTO z usługą Spark SQL

Usługa Databricks zaleca użycie polecenia COPY INTO do przyrostowego i zbiorczego ładowania danych dla źródeł danych zawierających tysiące plików. Usługa Databricks zaleca używanie Auto Loader dla zaawansowanych zastosowań.

W tym samouczku użyjesz polecenia COPY INTO, aby załadować dane z magazynu obiektów w chmurze do tabeli w obszarze roboczym usługi Azure Databricks.

Wymagania

  1. Subskrypcja platformy Azure, obszar roboczy usługi Azure Databricks w tej subskrypcji i klaster w tym obszarze roboczym. Aby je utworzyć, zobacz Szybki start: uruchamianie zadania Spark w obszarze roboczym Azure Databricks przy użyciu Witryny Azure Portal. Jeśli wykonasz czynności opisane w tym przewodniku Szybki start, nie musisz postępować zgodnie z instrukcjami w sekcji Uruchamianie zadania Spark SQL.
  2. Klaster ogólnego przeznaczenia w obszarze roboczym z uruchomionym środowiskiem Databricks Runtime 11.3 LTS lub nowszym. Aby utworzyć klaster uniwersalny, zobacz Dokumentacja konfiguracji obliczeń.
  3. Znajomość interfejsu użytkownika obszaru roboczego usługi Azure Databricks. Zobacz Nawigowanie po obszarze roboczym.
  4. Znajomość pracy z notesami usługi Databricks.
  5. Lokalizacja, do której można zapisywać dane; w tym pokazie użyto katalogu głównego DBFS jako przykładu, ale usługa Databricks zaleca użycie zewnętrznej lokalizacji magazynowej skonfigurowanej za pomocą Unity Catalog.

Krok 1. Konfigurowanie środowiska i tworzenie generatora danych

W tym samouczku założono podstawową znajomość usługi Azure Databricks i domyślnej konfiguracji obszaru roboczego. Jeśli nie możesz uruchomić podanego kodu, skontaktuj się z administratorem obszaru roboczego, aby upewnić się, że masz dostęp do zasobów obliczeniowych i lokalizacji, do której można zapisywać dane.

Należy pamiętać, że podany kod używa parametru source w celu określenia lokalizacji, którą skonfigurujesz jako źródło danych COPY INTO. Zgodnie z zapisem ten kod wskazuje lokalizację w katalogu głównym systemu plików DBFS. Jeśli masz uprawnienia do zapisu w lokalizacji magazynu obiektów zewnętrznych, zastąp fragment dbfs:/ ciągu źródłowego ścieżką do magazynu obiektów. Ponieważ ta sekcja kodu wykonuje również rekursywne usuwanie w celu zresetowania tego pokazu, upewnij się, że nie wskazujesz tego na dane produkcyjne oraz że zachowasz zagnieżdżony katalog /user/{username}/copy-into-demo, aby nie zastąpić ani nie usunąć istniejących danych.

  1. Utwórz nowy notatnik SQL i dołącz go do klastra pracującego w środowisku Databricks Runtime 11.3 LTS lub nowszym.

  2. Skopiuj i uruchom następujący kod, aby zresetować lokalizację przechowywania i bazę danych używaną w tym samouczku.

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Skopiuj i uruchom następujący kod, aby skonfigurować niektóre tabele i funkcje, które będą używane do losowego generowania danych:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Krok 2: Zapisać przykładowe dane w przechowywaniu danych w chmurze

Zapisywanie w formatach danych innych niż usługa Delta Lake jest rzadkie w usłudze Azure Databricks. Podany tutaj kod zapisuje w formacie JSON, symulując system zewnętrzny, który może zrzucić wyniki z innego systemu do magazynu obiektów.

  1. Skopiuj i uruchom następujący kod, aby napisać partię nieprzetworzonych danych JSON:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Krok 3: Ładuj dane JSON idempotentnie przy użyciu COPY INTO

Przed użyciem COPY INTOnależy utworzyć docelową tabelę Delta Lake. W środowisku Databricks Runtime 11.3 LTS i nowszym nie trzeba podawać żadnych elementów innych niż nazwa tabeli w instrukcji CREATE TABLE. W przypadku poprzednich wersji środowiska Databricks Runtime należy podać schemat podczas tworzenia pustej tabeli.

  1. Skopiuj i uruchom następujący kod, aby utworzyć docelową tabelę delty i załadować dane ze źródła:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Ponieważ ta akcja jest idempotentna, można ją uruchomić wiele razy, ale dane będą ładowane tylko raz.

Krok 4. Podgląd zawartości tabeli

Możesz uruchomić proste zapytanie SQL, aby ręcznie przejrzeć zawartość tej tabeli.

  1. Skopiuj i wykonaj następujący kod, aby wyświetlić podgląd tabeli:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Krok 5. Ładowanie większej ilości danych i podgląd wyników

Możesz ponownie uruchomić kroki 2–4 wiele razy, aby wylądować nowe partie losowych nieprzetworzonych danych JSON w źródle, idempotentnie załadować je do usługi Delta Lake przy użyciu COPY INTOi wyświetlić podgląd wyników. Spróbuj uruchomić te kroki poza kolejnością lub wiele razy, aby zasymulować wiele partii nieprzetworzonych danych zapisywanych lub wykonywanych COPY INTO wiele razy bez przybycia nowych danych.

Krok 6. Samouczek dotyczący czyszczenia

Po ukończeniu tego samouczka możesz wyczyścić skojarzone zasoby, jeśli nie chcesz ich przechowywać.

  1. Skopiuj i uruchom następujący kod, aby usunąć bazę danych, tabele i usunąć wszystkie dane:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. Aby zatrzymać zasób obliczeniowy, przejdź do zakładki Klastry i zakończ swój klaster, wybierając opcję Zakończ.

Dodatkowe zasoby