Udostępnij za pośrednictwem


Ładowanie danych przy użyciu tabel przesyłania strumieniowego w usłudze Databricks SQL

Databricks rekomenduje używanie tabel przesyłania strumieniowego do pozyskiwania danych przy użyciu Databricks SQL. tabeli przesyłania strumieniowego to tabela zarejestrowana w Katalogu Unity z dodatkową obsługą do przesyłania strumieniowego lub przy przetwarzania danych przyrostowych. Pipeline Delta Live Tables jest tworzony automatycznie dla każdej tabeli przesyłania strumieniowego. Tabele przesyłania strumieniowego można używać do przyrostowego ładowania danych z magazynu obiektów platformy Kafka i chmury.

W tym artykule pokazano, jak wykorzystywać tabele strumieniowe do ładowania danych z magazynu obiektów w chmurze, skonfigurowanego jako wolumin Unity Catalog (zalecane) lub zewnętrzne miejsce przechowywania danych.

Uwaga

Aby dowiedzieć się, jak używać tabel usługi Delta Lake jako źródeł przesyłania strumieniowego i ujścia, zobacz artykuł Delta table streaming reads and writes.

Ważne

Tabele przesyłania strumieniowego utworzone w usłudze Databricks SQL są obsługiwane przez bezserwerowy potok Delta Live Tables. Aby korzystać z tej funkcji, obszar roboczy musi obsługiwać potoki bezserwerowe.

Przed rozpoczęciem

Przed rozpoczęciem należy spełnić następujące wymagania.

Wymagania dotyczące obszaru roboczego:

Wymagania dotyczące obliczeń:

Należy użyć jednej z następujących opcji:

  • Usługa SQL Warehouse korzystająca z kanału Current .

  • Środowisko obliczeniowe z trybem dostępu współdzielonego w środowisku Databricks Runtime 13.3 LTS lub nowszym.

  • Środowisko obliczeniowe z trybem dostępu pojedynczego użytkownika w środowisku Databricks Runtime 15.4 LTS lub nowszym.

    W środowisku Databricks Runtime 15.3 i poniżej nie można używać obliczeń pojedynczego użytkownika do wykonywania zapytań dotyczących tabel przesyłania strumieniowego należących do innych użytkowników. Możesz użyć obliczeń dla pojedynczego użytkownika w wersji Databricks Runtime 15.3 lub starszych tylko wtedy, gdy jesteś właścicielem tabeli strumieniowej. Twórcą tabeli jest właściciel.

    Środowisko Databricks Runtime 15.4 LTS i nowsze obsługują zapytania dotyczące tabel generowanych przez tabele delta live w obliczeniach pojedynczego użytkownika, niezależnie od własności tabeli. Aby skorzystać z filtrowania danych dostępnego w środowisku Databricks Runtime 15.4 LTS lub nowszym, należy upewnić się, że obszar roboczy jest włączony dla bezserwerowych obliczeniowych, ponieważ funkcja filtrowania danych, która obsługuje tabele generowane przez tabele delta live, działa na bezserwerowych obliczeniach. Opłaty za zasoby obliczeniowe bezserwerowe mogą być naliczane w przypadku korzystania z obliczeń pojedynczego użytkownika do uruchamiania operacji filtrowania danych. Zobacz Szczegółowa kontrola dostępu w obliczeniach pojedynczego użytkownika.

Wymagania dotyczące uprawnień:

Inne wymagania:

  • Ścieżka do danych źródłowych.

    Przykład ścieżki woluminu: /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    Przykład ścieżki lokalizacji zewnętrznej: abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis

    Uwaga

    W tym artykule założono, że dane, które chcesz załadować, znajdują się w lokalizacji magazynowej w chmurze, która odpowiada woluminowi lub lokalizacji zewnętrznej Unity Catalog, do której masz dostęp.

Odnajdywanie i wyświetlanie podglądu danych źródłowych

  1. Na pasku bocznym obszaru roboczego kliknij pozycję Zapytania, a następnie kliknij pozycję Utwórz zapytanie.

  2. W edytorze zapytań wybierz magazyn SQL, który używa kanału Current z listy rozwijanej.

  3. Wklej następujący kod do edytora, podstawiając wartości w nawiasach kątowych (<>) w celu uzyskania informacji identyfikujących dane źródłowe, a następnie kliknij przycisk Uruchom.

    Uwaga

    Błędy wnioskowania schematu mogą wystąpić podczas uruchamiania funkcji tabeli read_files z wartościami, jeśli wartości domyślne funkcji nie mogą przetworzyć danych. Na przykład może być konieczne skonfigurowanie trybu wielowierszowego dla wielowierszowych plików CSV lub JSON. Aby uzyskać listę opcji analizatora, zobacz read_files funkcji wartości tabeli.

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
    

Ładowanie danych do tabeli przesyłania strumieniowego

Aby stworzyć tabelę strumieniową na podstawie danych w magazynie obiektów w chmurze, wklej poniższe w edytorze zapytań, a następnie kliknij przycisk Uruchom:

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')

Ustaw kanał środowiska uruchomieniowego

Tabele przesyłania strumieniowego utworzone przy użyciu magazynów SQL są automatycznie odświeżane przy użyciu potoku Delta Live Tables. Potoki usługi Delta Live Tables domyślnie używają środowiska uruchomieniowego w kanale current. Aby dowiedzieć się o procesie wydania, zobacz uwagi dotyczące wersji Delta Live Tables oraz proces aktualizacji wersji.

Usługa Databricks zaleca używanie kanału current dla obciążeń produkcyjnych. Nowe funkcje są po raz pierwszy udostępniane w preview kanale. Możesz ustawić potok na kanał Delta Live Tables w wersji zapoznawczej, określając preview jako właściwość tabeli, aby przetestować nowe funkcje. Tę właściwość można określić podczas tworzenia tabeli lub po utworzeniu tabeli przy użyciu instrukcji ALTER.

W poniższym przykładzie kodu pokazano, jak ustawić kanał na podgląd w instrukcji CREATE:

CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
  *
FROM
  range(5)

Odśwież tabelę przesyłania strumieniowego przy użyciu potoku DLT

W tej sekcji opisano wzorce odświeżania tabeli przesyłania strumieniowego z najnowszymi danymi dostępnymi ze źródeł zdefiniowanych w zapytaniu.

Kiedy CREATE lub REFRESH tabeli przesyłania strumieniowego, procesy aktualizacji zachodzą przy użyciu potoku Delta Live Tables bezserwerowych. Każda zdefiniowana tabela przesyłania strumieniowego ma skojarzony potok Delta Live Tables.

Po uruchomieniu polecenia REFRESH zostanie zwrócony link do potoku DLT. Aby sprawdzić stan odświeżania, możesz użyć linku do pipeline'u DLT.

Uwaga

Tylko właściciel tabeli może odświeżyć tabelę przesyłania strumieniowego, aby pobrać najnowsze dane. Użytkownik tworzący tabelę jest właścicielem i nie można zmienić właściciela. Może być konieczne odświeżenie tabeli przesyłania strumieniowego przed użyciem zapytań podróży w czasie.

Zobacz Co to jest Delta Live Tables?.

Tylko pozyskiwanie nowych danych

Domyślnie funkcja read_files odczytuje wszystkie istniejące dane w katalogu źródłowym podczas tworzenia tabeli, a następnie przetwarza nowo przybywające rekordy przy każdym odświeżeniu.

Aby uniknąć pozyskiwania danych, które już istnieją w katalogu źródłowym w momencie tworzenia tabeli, ustaw opcję includeExistingFiles na false. Oznacza to, że tylko dane, które docierają do katalogu po utworzeniu tabeli, są przetwarzane. Na przykład:

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
  'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
  includeExistingFiles => false)

W pełni odśwież tabelę przesyłania strumieniowego

Pełne odświeżenia ponownie przetwarzają wszystkie dane dostępne w źródle przy użyciu najnowszej definicji. Nie zaleca się wywoływania pełnych odświeżeń w źródłach, które nie przechowują całej historii danych lub mają krótkie okresy przechowywania, takie jak Kafka, ponieważ pełne odświeżanie obcina istniejące dane. Odzyskanie starych danych może nie być możliwe, jeśli dane nie są już dostępne w źródle.

Na przykład:

REFRESH STREAMING TABLE my_bronze_table FULL

Zaplanuj tabelę transmisji strumieniowej do automatycznego odświeżania

Aby skonfigurować tabelę przesyłania strumieniowego w celu automatycznego odświeżania na podstawie zdefiniowanego harmonogramu, wklej następujące polecenie w edytorze zapytań, a następnie kliknij przycisk Uruchom:

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

Na przykład, aby zobaczyć zapytania dotyczące harmonogramu odświeżania, sprawdź ALTER STREAMING TABLE.

Śledzenie stanu odświeżania

Stan odświeżania tabeli przesyłania strumieniowego można wyświetlić, wyświetlając potok, który zarządza tabelą przesyłania strumieniowego w interfejsie użytkownika tabel delta Live Tables lub wyświetlając Odśwież informacje zwrócone przez polecenie DESCRIBE EXTENDED dla tabeli przesyłania strumieniowego.

DESCRIBE EXTENDED <table-name>

Pozyskiwanie strumieniowe z platformy Kafka

Aby zapoznać się z przykładem pozyskiwania przesyłania strumieniowego z platformy Kafka, zobacz read_kafka.

Udzielanie użytkownikom dostępu do tabeli strumieniowej

Aby przyznać użytkownikom uprawnienia SELECT w tabeli przesyłania strumieniowego, aby mogli wykonywać względem niej zapytania, wklej następujące polecenie w edytorze zapytań, a następnie kliknij przycisk Uruchom:

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

Aby uzyskać więcej informacji na temat udzielania uprawnień do zabezpieczanych obiektów wykazu aparatu Unity, zobacz uprawnienia wykazu aparatu Unity i zabezpieczane obiekty.

Monitorowanie przebiegów przy użyciu historii zapytań

Możesz użyć strony historii zapytań, aby uzyskać dostęp do szczegółów zapytań i profilów zapytań, które mogą pomóc w identyfikowaniu niewydajnych zapytań i wąskich gardeł w potoku Delta Live Tables, który jest używany do zarządzania aktualizacjami tabel przesyłanych strumieniowo. Aby zapoznać się z omówieniem rodzaju informacji dostępnych w historiach zapytań i profilach zapytań, zobacz Historia zapytań i Profil zapytania.

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej. Administratorzy obszaru roboczego mogą włączyć tę funkcję na stronie Podglądy . Zobacz Zarządzanie wersjami zapoznawcza usługi Azure Databricks.

Wszystkie instrukcje związane z tabelami strumieniowymi są wyświetlane w historii zapytań. Możesz użyć filtru listy rozwijanej instrukcja, aby wybrać dowolne polecenie i sprawdzić powiązane zapytania. Po wszystkich poleceniach CREATE następuje polecenie REFRESH, które jest wykonywane asynchronicznie w pipeline'u Delta Live Tables. Instrukcje REFRESH zwykle zawierają szczegółowe plany zapytań, które zapewniają wgląd w optymalizację wydajności.

Aby uzyskać dostęp do REFRESH instrukcji w interfejsie użytkownika historii zapytań, wykonaj następujące kroki:

  1. Kliknij Ikona historii na lewym pasku bocznym, aby otworzyć interfejs użytkownika historii zapytań.
  2. Zaznacz pole wyboru REFRESH z filtru rozwijanego oświadczenia.
  3. Kliknij nazwę instrukcji zapytania, aby wyświetlić szczegóły podsumowania, takie jak czas trwania zapytania i zagregowane metryki.
  4. Kliknij pozycję Zobacz profil zapytania, aby otworzyć profil zapytania. Aby uzyskać szczegółowe informacje na temat nawigowania w profilu zapytania, zobacz Profil zapytania.
  5. Opcjonalnie możesz użyć linków w sekcji Źródło zapytania, aby otworzyć powiązane zapytanie lub potok.

Możesz również uzyskać dostęp do szczegółów zapytania przy użyciu linków w edytorze SQL lub notesu dołączonego do usługi SQL Warehouse.

Uwaga

Tabela przesyłania strumieniowego musi być skonfigurowana do uruchamiania przy użyciu kanału w wersji zapoznawczej. Zobacz Ustaw kanał środowiska uruchomieniowego.

Dodatkowe zasoby