Uruchamianie pierwszego obciążenia ETL w usłudze Azure Databricks
Dowiedz się, jak używać narzędzi gotowych do produkcji z usługi Azure Databricks do tworzenia i wdrażania pierwszych potoków wyodrębniania, przekształcania i ładowania (ETL) na potrzeby orkiestracji danych.
Na końcu tego artykułu poczujesz się komfortowo:
- Uruchamianie klastra obliczeniowego usługi Databricks ogólnego przeznaczenia.
- Tworzenie notesu usługi Databricks.
- Konfigurowanie pozyskiwania danych przyrostowych do usługi Delta Lake za pomocą modułu ładującego automatycznego.
- Wykonywanie komórek notesu w celu przetwarzania, wykonywania zapytań i podglądu danych.
- Planowanie notesu jako zadania usługi Databricks.
W tym samouczku są używane interaktywne notesy do wykonywania typowych zadań ETL w języku Python lub Scala.
Do tworzenia potoków ETL można również użyć tabel różnicowych na żywo. Usługa Databricks utworzyła tabele delta live, aby zmniejszyć złożoność tworzenia, wdrażania i obsługi produkcyjnych potoków ETL. Zobacz Samouczek: uruchamianie pierwszego potoku delty tabel na żywo.
Możesz również użyć dostawcy narzędzia Terraform usługi Databricks, aby utworzyć zasoby tego artykułu. Zobacz Tworzenie klastrów, notesów i zadań za pomocą programu Terraform.
Wymagania
- Zalogowano się do obszaru roboczego usługi Azure Databricks.
- Masz uprawnienia do tworzenia klastra.
Uwaga
Jeśli nie masz uprawnień kontroli klastra, możesz wykonać większość poniższych kroków, o ile masz dostęp do klastra.
Krok 1. Tworzenie klastra
Aby wykonać eksploracyjne analizy danych i inżynierii danych, utwórz klaster w celu udostępnienia zasobów obliczeniowych potrzebnych do wykonywania poleceń.
- Kliknij pozycję Obliczenia na pasku bocznym.
- Na stronie Obliczenia kliknij pozycję Utwórz klaster. Spowoduje to otwarcie strony Nowy klaster.
- Określ unikatową nazwę klastra, pozostaw pozostałe wartości w stanie domyślnym, a następnie kliknij pozycję Utwórz klaster.
Aby dowiedzieć się więcej o klastrach usługi Databricks, zobacz Obliczenia.
Krok 2. Tworzenie notesu usługi Databricks
Aby utworzyć notes w obszarze roboczym, kliknij pozycję Nowy na pasku bocznym, a następnie kliknij przycisk Notes. W obszarze roboczym zostanie otwarty pusty notes.
Aby dowiedzieć się więcej na temat tworzenia notesów i zarządzania nimi, zobacz Zarządzanie notesami.
Krok 3. Konfigurowanie automatycznego modułu ładującego w celu pozyskiwania danych do usługi Delta Lake
Usługa Databricks zaleca używanie automatycznego modułu ładującego do pozyskiwania danych przyrostowych. Automatycznie moduł ładujący automatycznie wykrywa i przetwarza nowe pliki w miarę ich przybycia do magazynu obiektów w chmurze.
Usługa Databricks zaleca przechowywanie danych za pomocą usługi Delta Lake. Usługa Delta Lake to warstwa magazynu typu open source, która zapewnia transakcje ACID i umożliwia magazyn typu data lakehouse. Usługa Delta Lake jest domyślnym formatem tabel utworzonych w usłudze Databricks.
Aby skonfigurować moduł automatycznego ładowania w celu pozyskiwania danych do tabeli usługi Delta Lake, skopiuj i wklej następujący kod do pustej komórki w notesie:
Python
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
Scala
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
Uwaga
Zmienne zdefiniowane w tym kodzie powinny umożliwić bezpieczne wykonywanie go bez ryzyka konfliktu z istniejącymi elementami zawartości obszaru roboczego lub innymi użytkownikami. Ograniczone uprawnienia sieci lub magazynu będą zgłaszać błędy podczas wykonywania tego kodu; skontaktuj się z administratorem obszaru roboczego, aby rozwiązać te ograniczenia.
Aby dowiedzieć się więcej na temat automatycznego modułu ładującego, zobacz Co to jest moduł automatycznego ładowania?.
Krok 4. Przetwarzanie i interakcja z danymi
Notesy wykonują komórkę logiki po komórce. Aby wykonać logikę w komórce:
Aby uruchomić komórkę ukończoną w poprzednim kroku, wybierz komórkę i naciśnij SHIFT+ENTER.
Aby wykonać zapytanie dotyczące utworzonej tabeli, skopiuj i wklej następujący kod do pustej komórki, a następnie naciśnij SHIFT+ENTER , aby uruchomić komórkę.
Python
df = spark.read.table(table_name)
Scala
val df = spark.read.table(table_name)
Aby wyświetlić podgląd danych w ramce danych, skopiuj i wklej następujący kod do pustej komórki, a następnie naciśnij SHIFT+ENTER , aby uruchomić komórkę.
Python
display(df)
Scala
display(df)
Aby dowiedzieć się więcej na temat interaktywnych opcji wizualizacji danych, zobacz Wizualizacje w notesach usługi Databricks.
Krok 5. Planowanie zadania
Notesy usługi Databricks można uruchamiać jako skrypty produkcyjne, dodając je jako zadanie w zadaniu usługi Databricks. W tym kroku utworzysz nowe zadanie, które można wyzwolić ręcznie.
Aby zaplanować notes jako zadanie:
- Kliknij pozycję Harmonogram po prawej stronie paska nagłówka.
- Wprowadź unikatową nazwę zadania.
- Kliknij pozycję Ręczne.
- Z listy rozwijanej Klaster wybierz klaster utworzony w kroku 1.
- Kliknij pozycję Utwórz.
- W wyświetlonym oknie kliknij pozycję Uruchom teraz.
- Aby wyświetlić wyniki uruchomienia zadania, kliknij ikonę obok znacznika czasu ostatniego uruchomienia .
Aby uzyskać więcej informacji na temat zadań, zobacz Co to są zadania usługi Databricks?.
Dodatkowe integracje
Dowiedz się więcej na temat integracji i narzędzi do inżynierii danych za pomocą usługi Azure Databricks: