Uruchamianie potoku delta live tables w przepływie pracy
Potok delta Live Tables można uruchomić w ramach przepływu pracy przetwarzania danych z zadaniami usługi Databricks, przepływem powietrza Apache Airflow lub usługą Azure Data Factory.
Stanowiska
W zadaniu usługi Databricks można organizować wiele zadań w celu zaimplementowania przepływu pracy przetwarzania danych. Aby uwzględnić potok delta live tables w zadaniu, użyj zadania Potok podczas tworzenia zadania. Zobacz Zadanie potoku Delta Live Tables dla zadań.
Przepływ powietrza Apache
Apache Airflow to rozwiązanie typu open source do zarządzania przepływami pracy danych i planowania ich. Przepływ powietrza reprezentuje przepływy pracy zgodnie z grafami acyklicznymi (DAG) operacji. Przepływ pracy definiuje się w pliku w języku Python, a funkcja Airflow zarządza planowaniem i wykonywaniem. Aby uzyskać informacje na temat instalowania i używania rozwiązania Airflow z usługą Azure Databricks, zobacz Orchestrate Azure Databricks jobs with Apache Airflow (Organizowanie zadań usługi Azure Databricks za pomocą platformy Apache Airflow).
Aby uruchomić potok Delta Live Tables w ramach przepływu pracy przepływu pracy przepływu pracy, użyj modułu DatabricksSubmitRunOperator.
Wymagania
Do korzystania z funkcji Delta Live Tables wymagane są następujące elementy:
- Airflow w wersji 2.1.0 lub nowszej.
- Pakiet dostawcy usługi Databricks w wersji 2.1.0 lub nowszej.
Przykład
W poniższym przykładzie utworzono grupę DAG przepływu powietrza, która wyzwala aktualizację potoku delta live tables o identyfikatorze 8279d543-063c-4d63-9926-dae38e35ce8b
:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Zastąp CONNECTION_ID
ciąg identyfikatorem połączenia Airflow z obszarem roboczym.
Zapisz ten przykład w airflow/dags
katalogu i użyj interfejsu użytkownika przepływu powietrza, aby wyświetlić i wyzwolić grupę DAG. Użyj interfejsu użytkownika delty tabel na żywo, aby wyświetlić szczegóły aktualizacji potoku.
Azure Data Factory
Uwaga
Tabele delta live i usługa Azure Data Factory zawierają opcje konfigurowania liczby ponownych prób w przypadku wystąpienia awarii. Jeśli wartości ponawiania są konfigurowane w potoku Delta Live Tables dla i oraz w działaniu usługi Azure Data Factory, które wywołuje potok, liczba ponownych prób to wartość ponawiania w usłudze Azure Data Factory pomnożona przez wartość ponawiania w Delta Live Tables.
Na przykład, jeśli aktualizacja potoku zakończy się niepowodzeniem, Delta Live Tables ponawia próbę aktualizacji do pięciu razy domyślnie. Jeśli ponawianie próby w usłudze Azure Data Factory jest ustawione na trzy, a potok Delta Live Tables, który domyślnie wykonuje pięć ponownych prób, Twój niepowodzenie potoku Delta Live Tables może być ponownie wykonany nawet piętnaście razy. Aby uniknąć nadmiernej liczby ponownych prób w przypadku niepowodzenia aktualizacji potoku, usługa Databricks zaleca ograniczenie liczby ponownych prób podczas konfigurowania potoku Delta Live Tables lub działania usługi Azure Data Factory, które wywołuje ten potok.
Aby zmienić ustawienia ponawiania dla potoku Delta Live Tables, użyj ustawienia pipelines.numUpdateRetryAttempts
podczas konfigurowania potoku.
Azure Data Factory to oparta na chmurze usługa ETL, która umożliwia organizowanie przepływów pracy integracji i przekształcania danych. Usługa Azure Data Factory bezpośrednio obsługuje uruchamianie zadań usługi Azure Databricks w przepływie pracy, w tym notesów, zadań JAR i skryptów języka Python. Potok można również uwzględnić w przepływie pracy, wywołując interfejs API delta Live Tables z działania internetowego usługi Azure Data Factory. Aby na przykład wyzwolić aktualizację potoku z usługi Azure Data Factory:
Utwórz fabrykę danych lub otwórz istniejącą fabrykę danych.
Po zakończeniu tworzenia otwórz stronę fabryki danych i kliknij kafelek Otwórz program Azure Data Factory Studio . Zostanie wyświetlony interfejs użytkownika usługi Azure Data Factory.
Utwórz nowy potok usługi Azure Data Factory, wybierając pozycję Potok z menu rozwijanego Nowy w interfejsie użytkownika narzędzia Azure Data Factory Studio.
W przyborniku Działania rozwiń pozycję Ogólne i przeciągnij działanie internetowe na kanwę potoku. Kliknij kartę Ustawienia i wprowadź następujące wartości:
Uwaga
Najlepszym rozwiązaniem w zakresie zabezpieczeń w przypadku uwierzytelniania za pomocą zautomatyzowanych narzędzi, systemów, skryptów i aplikacji usługa Databricks zaleca używanie osobistych tokenów dostępu należących do jednostek usługi zamiast użytkowników obszaru roboczego. Aby utworzyć tokeny dla jednostek usługi, zobacz Zarządzanie tokenami dla jednostki usługi.
Adres URL:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
.Zastąp element
<get-workspace-instance>
.Zastąp
<pipeline-id>
element identyfikatorem potoku.Metoda: wybierz pozycję POST z menu rozwijanego.
Nagłówki: kliknij pozycję + Nowy. W polu tekstowym Nazwa wprowadź .
Authorization
W polu tekstowym Wartość wprowadź .Bearer <personal-access-token>
Zastąp
<personal-access-token>
element osobistym tokenem dostępu usługi Azure Databricks.Treść: Aby przekazać dodatkowe parametry żądania, wprowadź dokument JSON zawierający parametry. Aby na przykład uruchomić aktualizację i ponownie przetworzyć wszystkie dane dla potoku:
{"full_refresh": "true"}
. Jeśli nie ma żadnych dodatkowych parametrów żądania, wprowadź puste nawiasy klamrowe ({}
).
Aby przetestować działanie internetowe, kliknij pozycję Debuguj na pasku narzędzi potoku w interfejsie użytkownika usługi Data Factory. Dane wyjściowe i stan przebiegu, w tym błędy, są wyświetlane na karcie Dane wyjściowe potoku usługi Azure Data Factory. Użyj interfejsu użytkownika delty tabel na żywo, aby wyświetlić szczegóły aktualizacji potoku.
Napiwek
Typowym wymaganiem dotyczącym przepływu pracy jest uruchomienie zadania po zakończeniu poprzedniego zadania. Ponieważ żądanie delta Live Tables updates
jest asynchroniczne — żądanie jest zwracane po rozpoczęciu aktualizacji, ale przed ukończeniem aktualizacji — zadania w potoku usługi Azure Data Factory z zależnością od aktualizacji tabel delta Live Tables muszą czekać na ukończenie aktualizacji. Opcja oczekiwania na ukończenie aktualizacji polega na dodaniu działania Until po działaniu sieci Web, które wyzwala aktualizację delta live tables. W działaniu Until:
- Dodaj działanie Wait (Oczekiwanie), aby poczekać skonfigurowaną liczbę sekund na ukończenie aktualizacji.
- Dodaj akcję internetową po działaniu 'Wait', która używa żądania szczegółowego dotyczącego aktualizacji Delta Live Tables, aby uzyskać stan aktualizacji. Pole
state
w odpowiedzi zwraca bieżący stan aktualizacji, w tym jeśli został ukończony. - Użyj wartości
state
pola, aby ustawić warunek zakończenia dla działania Until. Możesz również użyć działania Ustaw zmienną, aby dodać zmienną potoku nastate
podstawie wartości i użyć tej zmiennej dla warunku zakończenia.