Samouczek: Azure Data Lake Storage, Azure Databricks i Spark
W tym samouczku pokazano, jak połączyć klaster usługi Azure Databricks z danymi przechowywanymi na koncie usługi Azure Storage z włączoną usługą Azure Data Lake Storage. Takie połączenie umożliwia natywne wykonywanie w klastrze zapytań i analiz dotyczących tych danych.
Ten samouczek obejmuje następujące kroki:
- Pozyskiwanie danych bez struktury na koncie magazynu
- Uruchamianie analiz dotyczących danych w magazynie obiektów blob
Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto.
Wymagania wstępne
Tworzenie konta magazynu z hierarchiczną przestrzenią nazw (Azure Data Lake Storage)
Zobacz Tworzenie konta magazynu do użycia z usługą Azure Data Lake Storage.
Upewnij się, że Twoje konto użytkownika ma przypisaną rolę Współautor danych obiektu blob magazynu.
Zainstaluj narzędzie AzCopy w wersji 10. Zobacz Transferowanie danych za pomocą narzędzia AzCopy w wersji 10
Utwórz jednostkę usługi, utwórz klucz tajny klienta, a następnie przyznaj jednostce usługi dostęp do konta magazynu.
Zobacz Samouczek: nawiązywanie połączenia z usługą Azure Data Lake Storage (kroki od 1 do 3). Po wykonaniu tych kroków wklej wartości identyfikatora dzierżawy, identyfikatora aplikacji i klucza tajnego klienta do pliku tekstowego. Będziesz ich używać w dalszej części tego samouczka.
Tworzenie obszaru roboczego, klastra i notesu usługi Azure Databricks
Tworzenie obszaru roboczego usługi Azure Databricks. Zobacz Tworzenie obszaru roboczego usługi Azure Databricks.
Tworzenie klastra. Zobacz Tworzenie klastra.
Utwórz notes. Zobacz Tworzenie notesu. Wybierz język Python jako domyślny język notesu.
Pozostaw otwarty notes. Należy go użyć w poniższych sekcjach.
Pobieranie danych lotów
W tym samouczku używane są dane dotyczące lotów o wydajności w czasie dla stycznia 2016 r. z Biura Statystyki Transportu, aby zademonstrować sposób wykonywania operacji ETL. Aby ukończyć samouczek, musisz pobrać te dane.
Pobierz plik On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip. Ten plik zawiera dane lotu.
Rozpakuj zawartość pliku zip i zanotuj nazwę pliku oraz jego ścieżkę. Te informacje będą potrzebne w późniejszym kroku.
Jeśli chcesz dowiedzieć się więcej o informacjach przechwyconych w danych dotyczących wydajności raportowania czasu, możesz zobaczyć opisy pól w witrynie internetowej Bureau of Transportation Statistics.
Pozyskiwanie danych
W tej sekcji przekażesz dane lotów .csv do konta usługi Azure Data Lake Storage, a następnie zainstalujesz konto magazynu w klastrze usługi Databricks. Na koniec użyjesz usługi Databricks do odczytywania .csv danych lotu i zapisywania ich z powrotem do magazynu w formacie Apache parquet.
Przekazywanie danych lotu do konta magazynu
Użyj narzędzia AzCopy, aby skopiować plik .csv na konto usługi Azure Data Lake Storage. Polecenie służy azcopy make
do tworzenia kontenera na koncie magazynu. Następnie użyjesz polecenia , azcopy copy
aby skopiować właśnie pobrane dane csv do katalogu w tym kontenerze.
W poniższych krokach należy wprowadzić nazwy kontenera, który chcesz utworzyć, oraz katalog i obiekt blob, do którego chcesz przekazać dane lotu do kontenera. Sugerowane nazwy można użyć w każdym kroku lub określić własne konwencje nazewnictwa kontenerów, katalogów i obiektów blob.
Otwórz okno wiersza polecenia i wprowadź następujące polecenie, aby zalogować się do usługi Azure Active Directory w celu uzyskania dostępu do konta magazynu.
azcopy login
Postępuj zgodnie z instrukcjami wyświetlanymi w oknie wiersza polecenia, aby uwierzytelnić konto użytkownika.
Aby utworzyć kontener na koncie magazynu do przechowywania danych lotu, wprowadź następujące polecenie:
azcopy make "https://<storage-account-name>.dfs.core.windows.net/<container-name>"
Zastąp wartość symbolu zastępczego
<storage-account-name>
nazwą konta magazynu.<container-name>
Zastąp symbol zastępczy nazwą kontenera, który chcesz utworzyć, aby przechowywać dane csv, na przykład flight-data-container.
Aby przekazać (skopiować) dane csv na konto magazynu, wprowadź następujące polecenie.
azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
Zastąp wartość symbolu
<csv-folder-path>
zastępczego ścieżką do pliku .csv .Zastąp wartość symbolu zastępczego
<storage-account-name>
nazwą konta magazynu.<container-name>
Zastąp symbol zastępczy nazwą kontenera na koncie magazynu.<directory-name>
Zastąp symbol zastępczy nazwą katalogu do przechowywania danych w kontenerze, na przykład jan2016.
Instalowanie konta magazynu w klastrze usługi Databricks
W tej sekcji zainstalujesz magazyn obiektów w chmurze usługi Azure Data Lake Storage w systemie plików usługi Databricks (DBFS). Używasz wcześniej utworzonej jednostki usługi Azure AD do uwierzytelniania przy użyciu konta magazynu. Aby uzyskać więcej informacji, zobacz Instalowanie magazynu obiektów w chmurze w usłudze Azure Databricks.
Dołącz notes do klastra.
W utworzonym wcześniej notesie wybierz przycisk Połącz w prawym górnym rogu paska narzędzi notesu. Ten przycisk otwiera selektor obliczeniowy. (Jeśli notes został już połączony z klastrem, nazwa tego klastra jest wyświetlana w tekście przycisku zamiast Połącz).
W menu rozwijanym klastra wybierz utworzony wcześniej klaster.
Zwróć uwagę, że tekst w selektorze klastra zmienia się na rozpoczęcie. Przed kontynuowaniem poczekaj na zakończenie uruchamiania klastra i zaczekaj na wyświetlenie nazwy klastra w przycisku.
Skopiuj i wklej następujący blok kodu do pierwszej komórki, ale jeszcze nie uruchamiaj kodu.
configs = {"fs.azure.account.auth.type": "OAuth", "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", "fs.azure.account.oauth2.client.id": "<appId>", "fs.azure.account.oauth2.client.secret": "<clientSecret>", "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token", "fs.azure.createRemoteFileSystemDuringInitialization": "true"} dbutils.fs.mount( source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>", mount_point = "/mnt/flightdata", extra_configs = configs)
W tym bloku kodu:
W
configs
pliku zastąp<appId>
wartości zastępcze ,<clientSecret>
i<tenantId>
identyfikatorem aplikacji, kluczem tajnym klienta i identyfikatorem dzierżawy skopiowanymi podczas tworzenia jednostki usługi w wymaganiach wstępnych.W identyfikatorze
source
URI zastąp<storage-account-name>
wartości zastępcze ,<container-name>
i<directory-name>
nazwą konta magazynu usługi Azure Data Lake Storage oraz nazwą kontenera i katalogu określonego podczas przekazywania danych lotu do konta magazynu.Uwaga
Identyfikator schematu w identyfikatorze URI
abfss
informuje usługę Databricks o użyciu sterownika systemu plików obiektów blob platformy Azure z protokołem Transport Layer Security (TLS). Aby dowiedzieć się więcej na temat identyfikatora URI, zobacz Use the Azure Data Lake Storage URI (Korzystanie z identyfikatora URI usługi Azure Data Lake Storage).
Przed kontynuowaniem upewnij się, że klaster został ukończony.
Naciśnij klawisze SHIFT+ENTER, aby uruchomić kod w tym bloku.
Kontener i katalog, w którym przekazano dane lotu na koncie magazynu, jest teraz dostępny w notesie za pośrednictwem punktu instalacji /mnt/flightdata.
Konwertowanie formatu CSV na format Parquet za pomocą notesu usługi Databricks
Teraz, gdy dane lotu csv są dostępne za pośrednictwem punktu instalacji systemu plików DBFS, możesz użyć ramki danych platformy Apache Spark, aby załadować je do obszaru roboczego i zapisać je z powrotem w formacie Apache parquet do magazynu obiektów usługi Azure Data Lake Storage.
Ramka danych platformy Spark to dwuwymiarowa struktura danych z kolumnami potencjalnie różnych typów. Za pomocą ramki danych można łatwo odczytywać i zapisywać dane w różnych obsługiwanych formatach. Za pomocą ramki danych można ładować dane z magazynu obiektów w chmurze i wykonywać na nim analizy i przekształcenia wewnątrz klastra obliczeniowego bez wpływu na dane bazowe w magazynie obiektów w chmurze. Aby dowiedzieć się więcej, zobacz Praca z ramkami danych PySpark w usłudze Azure Databricks.
Apache parquet to format pliku kolumnowego z optymalizacjami, które przyspieszają zapytania. Jest to bardziej wydajny format pliku niż CSV lub JSON. Aby dowiedzieć się więcej, zobacz Parquet Files.
W notesie dodaj nową komórkę i wklej do niego następujący kod.
# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/flightdata/*.csv")
# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")
Naciśnij klawisze SHIFT+ENTER, aby uruchomić kod w tym bloku.
Przed przejściem do następnej sekcji upewnij się, że wszystkie dane parquet zostały zapisane, a komunikat "Done" (Gotowe) pojawia się w danych wyjściowych.
Eksplorowanie danych
W tej sekcji użyjesz narzędzia systemu plików usługi Databricks do eksplorowania magazynu obiektów usługi Azure Data Lake Storage przy użyciu punktu instalacji systemu plików DBFS utworzonego w poprzedniej sekcji.
W nowej komórce wklej następujący kod, aby uzyskać listę plików w punkcie instalacji. Pierwsze polecenie zwraca listę plików i katalogów. Drugie polecenie wyświetla dane wyjściowe w formacie tabelarycznym, aby ułatwić odczytywanie.
dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))
Naciśnij klawisze SHIFT+ENTER, aby uruchomić kod w tym bloku.
Zwróć uwagę, że katalog parquet pojawia się na liście. Dane lotów .csv zostały zapisane w formacie parquet w katalogu parquet/flights w poprzedniej sekcji. Aby wyświetlić listę plików w katalogu parquet/flights , wklej następujący kod do nowej komórki i uruchom go:
display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))
Aby utworzyć nowy plik i wyświetlić go, wklej następujący kod do nowej komórki i uruchom go:
dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))
Ponieważ w tym samouczku nie potrzebujesz pliku 1.txt , możesz wkleić następujący kod do komórki i uruchomić go, aby rekursywnie usunąć element mydirectory. Parametr True
wskazuje cykliczne usuwanie.
dbutils.fs.rm("/mnt/flightdata/mydirectory", True)
Dla wygody możesz użyć polecenia pomocy, aby dowiedzieć się więcej o innych poleceniach.
dbutils.fs.help("rm")
Korzystając z tych przykładów kodu, zapoznaliśmy się z hierarchicznym charakterem systemu plików HDFS przy użyciu danych przechowywanych na koncie magazynu z włączoną usługą Azure Data Lake Storage.
Wykonywanie zapytań na danych
Następnie możesz rozpocząć wykonywanie zapytań dotyczących danych przekazanych na swoje konto magazynu. Wprowadź każdy z poniższych bloków kodu w nowej komórce i naciśnij SHIFT + ENTER , aby uruchomić skrypt języka Python.
Ramki danych zapewniają bogaty zestaw funkcji (wybieranie kolumn, filtrowanie, sprzężenie, agregowanie), które umożliwiają efektywne rozwiązywanie typowych problemów z analizą danych.
Aby załadować ramkę danych z wcześniej zapisanych danych lotu parquet i zapoznać się z niektórymi obsługiwanymi funkcjami, wprowadź ten skrypt w nowej komórce i uruchom go.
# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")
# Print the schema of the dataframe
flight_df.printSchema()
# Print the flight database size
print("Number of flights in the database: ", flight_df.count())
# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)
# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)
# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)
Wprowadź ten skrypt w nowej komórce, aby uruchomić kilka podstawowych zapytań analizy względem danych. Możesz uruchomić cały skrypt (SHIFT + ENTER), wyróżnić każde zapytanie i uruchomić je oddzielnie przy użyciu CTRL + SHIFT + ENTER lub wprowadzić każde zapytanie w osobnej komórce i uruchomić je tam.
# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')
# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())
# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()
# List out all the airports in Texas
airports_in_texas = spark.sql(
"SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)
# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
"SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)
# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
"SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()
# List airlines by the highest percentage of delayed flights. A delayed flight is one with a departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
"CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
"CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
"SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()
Podsumowanie
W tym samouczku zostały wykonane następujące czynności:
Utworzono zasoby platformy Azure, w tym konto magazynu usługi Azure Data Lake Storage i jednostkę usługi Azure AD oraz przypisano uprawnienia dostępu do konta magazynu.
Utworzono obszar roboczy, notes i klaster obliczeniowy usługi Azure Databricks.
Użyto narzędzia AzCopy do przekazywania danych nieustrukturyzowanych .csv lotów do konta magazynu usługi Azure Data Lake Storage.
Używane funkcje narzędzi systemu plików usługi Databricks do instalowania konta magazynu usługi Azure Data Lake Storage i eksplorowania jego hierarchicznego systemu plików.
Użyto ramek danych platformy Apache Spark do przekształcania danych .csv lotów do formatu Apache parquet i przechowywania ich z powrotem na koncie magazynu usługi Azure Data Lake Storage.
Używane ramki danych do eksplorowania danych lotu i wykonywania prostego zapytania.
Usługa Apache Spark SQL umożliwia wykonywanie zapytań dotyczących danych lotów dotyczących łącznej liczby lotów dla każdej linii lotniczej w styczniu 2016 r., lotnisk w Teksasie, linii lotniczych, które latają z Teksasu, średniego opóźnienia przylotu w minutach dla każdej linii lotniczej na szczeblu krajowym oraz procent lotów każdej linii lotniczej, które opóźniły loty lub przyloty.
Czyszczenie zasobów
Jeśli chcesz zachować notes i wrócić do niego później, warto zamknąć (zakończyć) klaster, aby uniknąć naliczania opłat. Aby zakończyć działanie klastra, wybierz go w selektorze obliczeniowym znajdującym się w prawym górnym rogu paska narzędzi notesu, wybierz pozycję Zakończ z menu i potwierdź wybór. (Domyślnie klaster zostanie automatycznie zakończony po 120 minutach braku aktywności).
Jeśli chcesz usunąć poszczególne zasoby obszaru roboczego, takie jak notesy i klastry, możesz to zrobić na lewym pasku bocznym obszaru roboczego. Aby uzyskać szczegółowe instrukcje, zobacz Usuwanie klastra lub Usuwanie notesu.
Gdy grupa zasobów i wszystkie pokrewne zasoby nie będą już potrzebne, usuń je. Aby to zrobić w witrynie Azure Portal, wybierz grupę zasobów dla konta magazynu i obszaru roboczego, a następnie wybierz pozycję Usuń.