Podstawy PySpark
W tym artykule przedstawiono proste przykłady ilustrujące użycie programu PySpark. Przyjęto założenie, że rozumiesz podstawowe pojęcia dotyczące platformy Apache Spark i uruchamiasz polecenia w notesie usługi Azure Databricks połączonym z obliczeniami. Ramki danych można tworzyć przy użyciu przykładowych danych, wykonywać podstawowe przekształcenia, w tym operacje na wierszach i kolumnach na tych danych, łączyć wiele ramek danych i agregować te dane, wizualizować te dane, a następnie zapisywać je w tabeli lub pliku.
Przekazywanie danych
W niektórych przykładach w tym artykule użyto przykładowych danych dostarczonych przez usługę Databricks, aby zademonstrować ładowanie, przekształcanie i zapisywanie danych przy użyciu ramek danych. Jeśli chcesz użyć własnych danych, które nie są jeszcze w usłudze Databricks, możesz go najpierw przekazać i utworzyć na jego podstawie ramkę danych. Zobacz Tworzenie lub modyfikowanie tabeli przy użyciu przekazywania plików i przekazywania plików do woluminu wykazu aparatu Unity.
Informacje o przykładowych danych usługi Databricks
Usługa Databricks udostępnia przykładowe dane w samples
katalogu i w /databricks-datasets
katalogu.
- Aby uzyskać dostęp do przykładowych danych w wykazie
samples
, użyj formatusamples.<schema-name>.<table-name>
. W tym artykule użyto tabel w schemaciesamples.tpch
, które zawierają dane z fikcyjnej firmy. Tabelacustomer
zawiera informacje o klientach iorders
zawiera informacje o zamówieniach złożonych przez tych klientów. - Użyj
dbutils.fs.ls
polecenia , aby eksplorować dane w programie/databricks-datasets
. Użyj języka Spark SQL lub ramek danych do wykonywania zapytań dotyczących danych w tej lokalizacji przy użyciu ścieżek plików. Aby dowiedzieć się więcej o przykładowych danych dostarczanych przez usługę Databricks, zobacz Przykładowe zestawy danych.
Importowanie typów danych
Wiele operacji PySpark wymaga używania funkcji SQL lub interakcji z natywnymi typami platformy Spark. Możesz bezpośrednio zaimportować tylko te funkcje i typy, których potrzebujesz, lub zaimportować cały moduł.
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
Ponieważ niektóre zaimportowane funkcje mogą zastąpić wbudowane funkcje języka Python, niektórzy użytkownicy wybierają importowanie tych modułów przy użyciu aliasu. W poniższych przykładach pokazano wspólny alias używany w przykładach kodu platformy Apache Spark:
import pyspark.sql.types as T
import pyspark.sql.functions as F
Aby uzyskać pełną listę typów danych, zobacz Typy danych platformy Spark.
Aby uzyskać kompleksową listę funkcji SQL PySpark, zobacz Funkcje platformy Spark.
Tworzenie ramki danych
Istnieje kilka sposobów tworzenia ramki danych. Zazwyczaj ramka danych jest definiowana względem źródła danych, takiego jak tabela lub kolekcja plików. Następnie zgodnie z opisem w sekcji Podstawowe pojęcia dotyczące platformy Apache Spark użyj akcji, takiej jak display
, aby wyzwolić przekształcenia do wykonania. Metoda display
zwraca ramki danych.
Tworzenie ramki danych z określonymi wartościami
Aby utworzyć ramkę danych z określonymi wartościami, użyj createDataFrame
metody , w której wiersze są wyrażane jako lista krotki:
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
Zwróć uwagę, że w danych wyjściowych typy danych kolumn funkcji df_children
są automatycznie wnioskowane. Możesz też określić typy, dodając schemat. Schematy są definiowane przy użyciu StructType
elementu, który składa się z StructFields
tego, który określa nazwę, typ danych i flagę logiczną wskazującą, czy zawierają wartość null, czy nie. Musisz zaimportować typy danych z pyspark.sql.types
programu .
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Tworzenie ramki danych na podstawie tabeli w wykazie aparatu Unity
Aby utworzyć ramkę danych na podstawie tabeli w wykazie aparatu Unity, użyj table
metody identyfikującej tabelę przy użyciu formatu <catalog-name>.<schema-name>.<table-name>
. Kliknij pozycję Wykaz na lewym pasku nawigacyjnym, aby użyć Eksploratora wykazu, aby przejść do tabeli. Kliknij ją, a następnie wybierz pozycję Kopiuj ścieżkę tabeli, aby wstawić ścieżkę tabeli do notesu.
Poniższy przykład ładuje tabelę samples.tpch.customer
, ale możesz też podać ścieżkę do własnej tabeli.
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
Tworzenie ramki danych na podstawie przekazanego pliku
Aby utworzyć ramkę danych na podstawie pliku przekazanego do woluminów wykazu aparatu Unity, użyj read
właściwości . Ta metoda zwraca DataFrameReader
element , którego następnie można użyć do odczytania odpowiedniego formatu. Kliknij opcję wykazu na małym pasku bocznym po lewej stronie i znajdź plik za pomocą przeglądarki katalogu. Wybierz ją, a następnie kliknij pozycję Kopiuj ścieżkę pliku woluminu.
Poniższy przykład odczytuje z *.csv
pliku, ale DataFrameReader
obsługuje przekazywanie plików w wielu innych formatach. Zobacz Metody DataFrameReader.
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Aby uzyskać więcej informacji na temat woluminów wykazu aparatu Unity, zobacz Co to są woluminy wykazu aparatu Unity?.
Tworzenie ramki danych na podstawie odpowiedzi JSON
Aby utworzyć ramkę danych na podstawie ładunku odpowiedzi JSON zwróconego przez interfejs API REST, użyj pakietu języka Python requests
, aby wykonać zapytanie i przeanalizować odpowiedź. Należy zaimportować pakiet, aby go użyć. W tym przykładzie użyto danych z bazy danych aplikacji leków Stany Zjednoczone Food and Drug Administration.
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
Aby uzyskać informacje na temat pracy z danymi JSON i innymi częściowo ustrukturyzowanymi danymi w usłudze Databricks, zobacz Modelowanie częściowo ustrukturyzowanych danych.
Wybieranie pola lub obiektu JSON
Aby wybrać określone pole lub obiekt z przekonwertowanego kodu JSON, użyj []
notacji. Aby na przykład wybrać products
pole, które jest tablicą produktów:
display(df_drugs.select(df_drugs["products"]))
Można również połączyć ze sobą wywołania metod, aby przejść przez wiele pól. Na przykład, aby wyświetlić nazwę marki pierwszego produktu w aplikacji narkotykowej:
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
Tworzenie ramki danych na podstawie pliku
Aby zademonstrować tworzenie ramki danych na podstawie pliku, w tym przykładzie /databricks-datasets
dane CSV są ładowane w katalogu.
Aby przejść do przykładowych zestawów danych, możesz użyć poleceń systemu plików Databricks Utilties . W poniższym przykładzie użyto dbutils
polecenia , aby wyświetlić listę zestawów danych dostępnych w programie /databricks-datasets
:
display(dbutils.fs.ls('/databricks-datasets'))
Alternatywnie możesz użyć %fs
polecenia systemu plików interfejsu wiersza polecenia usługi Databricks, jak pokazano w poniższym przykładzie:
%fs ls '/databricks-datasets'
Aby utworzyć ramkę danych na podstawie pliku lub katalogu plików, określ ścieżkę w metodzie load
:
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
Przekształcanie danych za pomocą ramek danych
Ramki danych ułatwiają przekształcanie danych przy użyciu wbudowanych metod sortowania, filtrowania i agregowania danych. Wiele przekształceń nie jest określonych jako metody w ramkach danych, ale zamiast tego są udostępniane w pakiecie spark.sql.functions
. Zobacz Funkcje Spark SQL usługi Databricks.
- Operacje na kolumnach
- Operacje na wierszach
- Dołączanie ramek danych
- Agregowanie danych
- Łączenie łańcuchów wywołań
Operacje na kolumnach
Platforma Spark udostępnia wiele podstawowych operacji na kolumnach:
Napiwek
Aby wyświetlić wszystkie kolumny w ramce danych, użyj polecenia columns
, na przykład df_customer.columns
.
Wybierz kolumny
Możesz wybrać określone kolumny przy użyciu elementów select
i col
. Funkcja col
znajduje się w pyspark.sql.functions
podmodule.
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
Możesz również odwołać się do kolumny, przy użyciu expr
której wyrażenie jest zdefiniowane jako ciąg:
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
Można również użyć metody selectExpr
, która akceptuje wyrażenia SQL:
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
Aby wybrać kolumny przy użyciu literału ciągu, wykonaj następujące czynności:
df_customer.select(
"c_custkey",
"c_acctbal"
)
Aby jawnie wybrać kolumnę z określonej ramki danych, możesz użyć []
operatora lub .
operatora. (Operator nie może służyć do wybierania .
kolumn rozpoczynających się od liczby całkowitej lub tych, które zawierają spację lub znak specjalny). Może to być szczególnie przydatne w przypadku dołączania ramek danych, w których niektóre kolumny mają taką samą nazwę.
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
Tworzenie kolumn
Aby utworzyć nową kolumnę, użyj withColumn
metody . W poniższym przykładzie zostanie utworzona nowa kolumna zawierająca wartość logiczną na podstawie tego, czy saldo c_acctbal
konta klienta przekracza wartość 1000
:
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
Zmienianie nazw kolumn
Aby zmienić nazwę kolumny, użyj withColumnRenamed
metody , która akceptuje istniejące i nowe nazwy kolumn:
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
Metoda jest szczególnie przydatna alias
, gdy chcesz zmienić nazwy kolumn w ramach agregacji:
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
Rzutowanie typów kolumn
W niektórych przypadkach możesz zmienić typ danych dla co najmniej jednej kolumny w ramce danych. W tym celu użyj cast
metody , aby przekonwertować między typami danych kolumn. W poniższym przykładzie pokazano, jak przekonwertować kolumnę z liczby całkowitej na typ ciągu przy użyciu col
metody w celu odwołania się do kolumny:
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
Usuwanie kolumn
Aby usunąć kolumny, możesz pominąć kolumny podczas wybierania lub select(*) except
użyć drop
metody :
df_customer_flag_renamed.drop("balance_flag_renamed")
Jednocześnie można usunąć wiele kolumn:
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
Operacje na wierszach
Platforma Spark udostępnia wiele podstawowych operacji wierszy:
- Filtrowanie wierszy
- Usuwanie zduplikowanych wierszy
- Obsługa wartości null
- Dołączanie wierszy
- Sortowanie wierszy
- Filtrowanie wierszy
Filtruj wiersze
Aby filtrować wiersze, użyj filter
metody or where
w ramce danych, aby zwrócić tylko niektóre wiersze. Aby zidentyfikować kolumnę do filtrowania, użyj col
metody lub wyrażenia, które daje w wyniku kolumnę.
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
Aby filtrować według wielu warunków, użyj operatorów logicznych. Można na przykład &
odpowiednio włączyć AND
OR
i |
warunki. Poniższy przykład filtruje wiersze, w których wartość c_nationkey
jest równa 20
i c_acctbal
jest większa niż 1000
.
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
Usuwanie zduplikowanych wierszy
Aby usunąć zduplikowane wiersze, użyj polecenia distinct
, który zwraca tylko unikatowe wiersze.
df_unique = df_customer.distinct()
Obsługa wartości null
Aby obsłużyć wartości null, usuń wiersze zawierające wartości null przy użyciu na.drop
metody . Ta metoda umożliwia określenie, czy chcesz usunąć wiersze zawierające any
wartości null lub all
wartości null.
Aby usunąć wszystkie wartości null, użyj jednego z poniższych przykładów.
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
Jeśli zamiast tego chcesz odfiltrować tylko wiersze zawierające wszystkie wartości null, użyj następujących elementów:
df_customer_no_nulls = df_customer.na.drop("all")
Można to zastosować dla podzbioru kolumn, określając to, jak pokazano poniżej:
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
Aby wypełnić brakujące wartości, użyj fill
metody . Można to zastosować do wszystkich kolumn lub podzestawu kolumn. W poniższym przykładzie salda kont, które mają wartość null dla salda c_acctbal
konta, są wypełnione wartością 0
.
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
Aby zastąpić ciągi innymi wartościami, użyj replace
metody . W poniższym przykładzie wszystkie puste ciągi adresowe są zastępowane wyrazem UNKNOWN
:
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
Dołączanie wierszy
Aby dołączyć wiersze, należy użyć union
metody w celu utworzenia nowej ramki danych. W poniższym przykładzie ramka danych utworzona wcześniej i df_filtered_customer
połączona, która zwraca ramkę df_that_one_customer
danych z trzema klientami:
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
Uwaga
Możesz również połączyć ramki danych, zapisując je w tabeli, a następnie dołączając nowe wiersze. W przypadku obciążeń produkcyjnych przyrostowe przetwarzanie źródeł danych w tabeli docelowej może znacząco zmniejszyć opóźnienia i koszty obliczeń w miarę wzrostu rozmiaru danych. Zobacz Pozyskiwanie danych do magazynu lakehouse usługi Databricks.
Sortowanie wierszy
Ważne
Sortowanie może być kosztowne na dużą skalę, a w przypadku przechowywania posortowanych danych i ponownego załadowania danych za pomocą platformy Spark kolejność nie jest gwarantowana. Upewnij się, że używasz sortowania celowo.
Aby posortować wiersze według co najmniej jednej kolumny, użyj sort
metody lub orderBy
. Domyślnie te metody są sortowane w kolejności rosnącej:
df_customer.orderBy(col("c_acctbal"))
Aby filtrować w kolejności malejącej, użyj polecenia desc
:
df_customer.sort(col("c_custkey").desc())
W poniższym przykładzie pokazano, jak sortować dwie kolumny:
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
Aby ograniczyć liczbę wierszy do zwrócenia po posortowania ramki danych, użyj limit
metody . W poniższym przykładzie wyświetlane są tylko najlepsze 10
wyniki:
display(df_sorted.limit(10))
Dołączanie ramek danych
Aby dołączyć co najmniej dwie ramki danych, użyj join
metody . Możesz określić, jak chcesz, aby ramki danych zostały połączone w parametrach how
(typ sprzężenia) i on
(na których kolumnach mają być oparte sprzężenie). Typowe typy sprzężenia obejmują:
inner
: Jest to wartość domyślna typu sprzężenia, która zwraca ramkę danych, która przechowuje tylko wiersze, w których występuje dopasowanie parametruon
w ramkach danych.left
: Przechowuje wszystkie wiersze pierwszej określonej ramki danych i tylko wiersze z drugiej określonej ramki danych, które mają dopasowanie do pierwszego.outer
: Sprzężenia zewnętrzne przechowuje wszystkie wiersze z obu ramek danych niezależnie od dopasowania.
Aby uzyskać szczegółowe informacje na temat sprzężeń, zobacz Praca z przyłączeniami w usłudze Azure Databricks. Aby uzyskać listę sprzężeń obsługiwanych w programie PySpark, zobacz DataFrame joins (Sprzężenia ramki danych).
Poniższy przykład zwraca pojedynczą ramkę danych, w której każdy wiersz orders
ramki danych jest połączony z odpowiednim wierszem customers
z ramki danych. Jest używane sprzężenie wewnętrzne, ponieważ oczekuje się, że każde zamówienie odpowiada dokładnie jednemu klientowi.
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
Aby sprzężć w wielu warunkach, użyj operatorów logicznych, takich jak &
i |
, odpowiednio do określenia AND
i OR
. W poniższym przykładzie dodano dodatkowy warunek, filtrując tylko wiersze o o_totalprice
wartości większej niż 500,000
:
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
Agregowanie danych
Aby agregować dane w ramce danych, podobnie jak GROUP BY
w języku SQL, użyj groupBy
metody , aby określić kolumny do grupowania i agg
metodę określania agregacji. Zaimportuj typowe agregacje, w tym avg
, sum
, max
i min
z pyspark.sql.functions
. W poniższym przykładzie przedstawiono średnią równowagę klientów według segmentu rynku:
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
Niektóre agregacje to akcje, co oznacza, że wyzwalają obliczenia. W takim przypadku nie trzeba używać innych akcji do wyprowadzania wyników.
Aby zliczyć wiersze w ramce danych, użyj count
metody :
df_customer.count()
Łączenie łańcuchów wywołań
Metody przekształcające ramki danych zwracają ramki danych, a platforma Spark nie wykonuje działań na przekształceniach do momentu wywołania akcji. Ta leniwa ocena oznacza, że można połączyć wiele metod dla wygody i czytelności. W poniższym przykładzie pokazano, jak połączyć filtrowanie, agregację i porządkowanie:
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
Wizualizowanie ramki danych
Aby wizualizować ramkę danych w notesie, kliknij + znak obok tabeli w lewym górnym rogu ramki danych, a następnie wybierz pozycję Wizualizacja , aby dodać co najmniej jeden wykres na podstawie ramki danych. Aby uzyskać szczegółowe informacje na temat wizualizacji, zobacz Wizualizacje w notesach usługi Databricks.
display(df_order)
Aby wykonać dodatkowe wizualizacje, usługa Databricks zaleca korzystanie z interfejsu API biblioteki pandas dla platformy Spark. Element .pandas_api()
umożliwia rzutowanie do odpowiedniego interfejsu API biblioteki pandas dla ramki danych platformy Spark. Aby uzyskać więcej informacji, zobacz Interfejs API biblioteki Pandas na platformie Spark.
Zapisywanie danych
Po przekształceniu danych można je zapisać przy użyciu DataFrameWriter
metod . Pełną listę tych metod można znaleźć w narzędziu DataFrameWriter. W poniższych sekcjach pokazano, jak zapisać ramkę danych jako tabelę i jako kolekcję plików danych.
Zapisywanie ramki danych jako tabeli
Aby zapisać ramkę danych jako tabelę w katalogu aparatu Unity, użyj write.saveAsTable
metody i określ ścieżkę w formacie <catalog-name>.<schema-name>.<table-name>
.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
Zapisywanie ramki danych jako pliku CSV
Aby zapisać ramkę danych w celu *.csv
formatowania, użyj write.csv
metody , określając format i opcje. Domyślnie, jeśli dane istnieją na określonej ścieżce, operacja zapisu kończy się niepowodzeniem. Możesz określić jeden z następujących trybów, aby wykonać inną akcję:
overwrite
zastępuje wszystkie istniejące dane w ścieżce docelowej zawartością ramki danych.append
Dołącza zawartość ramki danych do danych w ścieżce docelowej.ignore
dyskretnie kończy się niepowodzeniem zapisu, jeśli dane istnieją w ścieżce docelowej.
W poniższym przykładzie pokazano zastępowanie danych z zawartością ramki danych jako plikami CSV:
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
Następne kroki
Aby wykorzystać więcej możliwości platformy Spark w usłudze Databricks, zobacz: