Samouczek: uruchamianie potoku analitycznego typu lakehouse od końca do końca
W tym samouczku dowiesz się, jak skonfigurować kompleksowy potok analityczny dla Azure Databricks Lakehouse.
Ważne
W tym samouczku używa się interaktywnych notesów do wykonywania typowych zadań ETL w języku Python w klastrach z włączonym Unity Catalog. Jeśli nie używasz Unity Catalog, zobacz Uruchom swoje pierwsze obciążenie ETL w usłudze Azure Databricks.
Zadania w tym samouczku
Na końcu tego artykułu poczujesz się komfortowo:
- uruchamianie klastra obliczeniowego z włączonym katalogiem Unity.
- Tworzenie notesu Databricks.
- Zapisywanie i odczytywanie danych z zewnętrznej lokalizacji Unity Catalog.
- Konfigurowanie przyrostowego ładowania danych do tabeli Unity Catalog za pomocą Auto Loader.
- Wykonywanie komórek notesu w celu przetwarzania, wykonywania zapytań i podglądu danych.
- Planowanie notesu jako zadania Databricks.
- Wykonywanie zapytań z tabel katalogu Unity za pomocą Databricks SQL
Usługa Azure Databricks udostępnia zestaw narzędzi gotowych do produkcji, które umożliwiają specjalistom ds. danych szybkie opracowywanie i wdrażanie potoków wyodrębniania, przekształcania i ładowania (ETL). Katalog Unity umożliwia stewardom danych konfigurowanie i zabezpieczanie poświadczeń magazynowania, lokalizacji zewnętrznych i obiektów bazy danych dla użytkowników w całej organizacji. Usługa Databricks SQL umożliwia analitykom uruchamianie zapytań SQL względem tych samych tabel używanych w produkcyjnych obciążeniach ETL, co umożliwia analizę biznesową w czasie rzeczywistym na dużą skalę.
Możesz również użyć biblioteki DLT do kompilowania potoków ETL. Databricks stworzył DLT, aby zmniejszyć złożoność tworzenia, wdrażania i utrzymania produkcyjnych potoków ETL. Zobacz : Samouczek: Uruchom swój pierwszy potok DLT.
Wymagania
Uwaga
Jeśli nie masz uprawnień kontroli klastra, możesz wykonać większość poniższych kroków, o ile masz dostęp do klastra.
Krok 1. Tworzenie klastra
Aby wykonać eksploracyjne analizy danych i inżynierii danych, utwórz klaster w celu udostępnienia zasobów obliczeniowych potrzebnych do wykonywania poleceń.
- Kliknij
Komputer na pasku bocznym.
- Kliknij pozycję
Nowy na pasku bocznym, a następnie wybierz pozycję Cluster. Spowoduje to otwarcie strony Nowy klaster/Compute.
- Określ unikatową nazwę klastra.
- W sekcji Wydajność wybierz przycisk radiowy pojedynczy węzeł.
- W obszarze Advancedprzełącz ustawienie trybu dostępu na Ręczne, a następnie wybierz tryb dedykowany.
- W Pojedynczy użytkownik lub grupa wybierz swoją nazwę użytkownika.
- Wybierz żądaną wersję środowiska Databricks, 11.1 lub nowszą, aby użyć Unity Catalogu.
- Kliknij pozycję Utwórz zasoby obliczeniowe , aby utworzyć klaster.
Aby dowiedzieć się więcej o klastrach usługi Databricks, zobacz Obliczenia.
Krok 2. Tworzenie notesu usługi Databricks
Aby utworzyć notes w obszarze roboczym, kliknij pozycję Nowy na pasku bocznym, a następnie kliknij przycisk Notes. W obszarze roboczym otwiera się pusty notes.
Aby dowiedzieć się więcej na temat tworzenia notesów i zarządzania nimi, zobacz Zarządzanie notesami.
Krok 3: Zapisywanie i odczytywanie danych z zewnętrznej lokalizacji zarządzanej przez Unity Catalog
Databricks zaleca używanie Auto Loader do przyrostowego pozyskiwania danych. Automatyczny moduł ładujący wykrywa i przetwarza nowe pliki, gdy pojawiają się w chmurowym magazynie obiektów.
Użyj Unity Catalog, aby zarządzać bezpiecznym dostępem do lokalizacji zewnętrznych. Użytkownicy lub podmioty usługi z uprawnieniami READ FILES
do lokalizacji zewnętrznej mogą używać Auto Loader do pozyskiwania danych.
Zwykle dane docierają do lokalizacji zewnętrznej w wyniku zapisów z innych systemów. W tym pokazie można symulować przybycie danych, zapisując pliki JSON w lokalizacji zewnętrznej.
Skopiuj poniższy kod do komórki notesu. Zastąp wartość ciągu catalog
nazwą katalogu z uprawnieniami CREATE CATALOG
i USE CATALOG
. Zastąp wartość ciągu external_location
ścieżką do zewnętrznej lokalizacji z uprawnieniami READ FILES
, WRITE FILES
i CREATE EXTERNAL TABLE
.
Lokalizacje zewnętrzne można zdefiniować jako cały pojemnik magazynowy, ale często wskazują na katalog zagnieżdżony w kontenerze.
Poprawny format ścieżki lokalizacji zewnętrznej to "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"
.
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
Uruchomienie tej komórki powinno wyświetlić wiersz z napisem "12 bajtów", wydrukować ciąg "Hello world!" i wyświetlić wszystkie bazy danych obecne w podanym katalogu. Jeśli nie możesz uruchomić tej komórki, upewnij się, że jesteś w obszarze roboczym z włączonym Unity Catalog i poproś o odpowiednie uprawnienia od administratora obszaru roboczego, żeby zakończyć ten samouczek.
Poniższy kod w języku Python używa twojego adresu e-mail do utworzenia unikatowej bazy danych w podanym wykazie i unikatowej lokalizacji przechowywania w podanej lokalizacji zewnętrznej. Wykonanie tej komórki spowoduje usunięcie wszystkich danych powiązanych z tym samouczkiem, co pozwoli na wielokrotne wykonanie tego przykładu w sposób niezmienny. Klasa jest zdefiniowana i utworzona w celu symulacji partii danych przychodzących z połączonego systemu do zewnętrznej lokalizacji źródłowej.
Skopiuj ten kod do nowej komórki w notesie i wykonaj go w celu skonfigurowania środowiska.
Uwaga
Zmienne zdefiniowane w tym kodzie powinny umożliwić bezpieczne wykonywanie go bez ryzyka konfliktu z istniejącymi elementami zawartości obszaru roboczego lub innymi użytkownikami. Ograniczone uprawnienia sieci lub magazynu spowodują błędy podczas wykonywania tego kodu; skontaktuj się z administratorem obszaru roboczego, aby rozwiązać te problemy.
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
Możesz teraz załadować partię danych, kopiując następujący kod do komórki i wykonując go. Możesz ręcznie wykonać tę komórkę maksymalnie 60 razy, aby uruchomić pojawienie się nowych danych.
RawData.land_batch()
Krok 4. Skonfiguruj Auto Loader, aby pozyskiwać dane do Unity Catalog
Usługa Databricks zaleca przechowywanie danych za pomocą usługi Delta Lake. Delta Lake to otwartoźródłowa warstwa przechowywania, która zapewnia transakcje ACID i umożliwia funkcjonowanie magazynu typu data lakehouse. Usługa Delta Lake jest domyślnym formatem tabel utworzonych w usłudze Databricks.
Aby skonfigurować Auto Loader do wczytywania danych do tabeli Unity Catalog, skopiuj i wklej następujący kod do pustej komórki w notatniku:
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
Aby dowiedzieć się więcej na temat automatycznego modułu ładującego, zobacz Co to jest moduł automatycznego ładowania?.
Aby dowiedzieć się więcej na temat Strukturalnego strumieniowania i Unity Catalog, zobacz Using Unity Catalog with Structured Streaming.
Krok 5. Przetwarzanie danych i interakcja z nimi
Notatniki wykonują logikę krok po kroku, przetwarzając każdą komórkę z osobna. Wykonaj te kroki, aby uruchomić logikę w komórce.
Aby uruchomić komórkę ukończoną w poprzednim kroku, wybierz komórkę i naciśnij SHIFT+ENTER.
Aby wykonać zapytanie dotyczące utworzonej tabeli, skopiuj i wklej następujący kod do pustej komórki, a następnie naciśnij shift+ENTER, aby uruchomić komórkę.
df = spark.read.table(table)
Aby wyświetlić podgląd danych w ramce danych, skopiuj i wklej następujący kod do pustej komórki, a następnie naciśnij SHIFT+ENTER , aby uruchomić komórkę.
display(df)
Aby dowiedzieć się więcej na temat interaktywnych opcji wizualizacji danych, zobacz Wizualizacje w notesach usługi Databricks.
Krok 6. Planowanie zadania
Notesy usługi Databricks można uruchamiać jako skrypty produkcyjne, dodając je jako zadanie w zadaniu usługi Databricks. W tym kroku utworzysz nowe zadanie, które można wyzwolić ręcznie.
Aby zaplanować notatnik jako zadanie:
- Kliknij pozycję Harmonogram po prawej stronie paska nagłówka.
- Wprowadź unikatową nazwę zadania.
- Kliknij pozycję Ręczne.
- Z listy rozwijanej Klaster wybierz klaster utworzony w kroku 1.
- Kliknij pozycję Utwórz.
- W wyświetlonym oknie kliknij pozycję Uruchom teraz.
- Aby wyświetlić wyniki uruchomienia zadania, kliknij ikonę
obok znacznika czasu ostatniego uruchomienia .
Aby uzyskać więcej informacji na temat zadań, zobacz Co to są zadania?.
Krok 7. Wykonywanie zapytań dotyczących tabeli z usługi Databricks SQL
Każda osoba mająca uprawnienia USE CATALOG
w bieżącym katalogu, uprawnienie USE SCHEMA
w bieżącym schemacie i uprawnienia SELECT
do tabeli może wykonywać zapytania o zawartość tabeli za pomocą swojego preferowanego interfejsu API Databricks.
Do wykonywania zapytań w usłudze Databricks SQL SQL jest potrzebny dostęp do uruchomionego magazynu SQL.
Tabela utworzona wcześniej w tym samouczku ma nazwę target_table
. Możesz użyć katalogu podanego w pierwszej komórce oraz bazy danych z wzorem e2e_lakehouse_<your-username>
do wykonania zapytania. Aby znaleźć utworzone obiekty danych, możesz użyć Catalog Explorer.
Dodatkowe integracje
Dowiedz się więcej na temat integracji i narzędzi do inżynierii danych za pomocą usługi Azure Databricks: