Udostępnij za pośrednictwem


Podstawy PySpark

W tym artykule przedstawiono proste przykłady ilustrujące użycie programu PySpark. Przyjęto założenie, że rozumiesz podstawowe pojęcia dotyczące platformy Apache Spark i uruchamiasz polecenia w notesie usługi Azure Databricks połączonym z obliczeniami. Ramki danych można tworzyć przy użyciu przykładowych danych, wykonywać podstawowe przekształcenia, w tym operacje na wierszach i kolumnach na tych danych, łączyć wiele ramek danych i agregować te dane, wizualizować te dane, a następnie zapisywać je w tabeli lub pliku.

Przekazywanie danych

W niektórych przykładach w tym artykule użyto przykładowych danych dostarczonych przez usługę Databricks, aby zademonstrować ładowanie, przekształcanie i zapisywanie danych przy użyciu ramek danych. Jeśli chcesz użyć własnych danych, które nie są jeszcze w usłudze Databricks, możesz go najpierw przekazać i utworzyć na jego podstawie ramkę danych. Zobacz Tworzenie lub modyfikowanie tabeli przy użyciu przekazywania plików i przekazywania plików do woluminu wykazu aparatu Unity.

Informacje o przykładowych danych usługi Databricks

Usługa Databricks udostępnia przykładowe dane w samples katalogu i w /databricks-datasets katalogu.

  • Aby uzyskać dostęp do przykładowych danych w wykazie samples , użyj formatu samples.<schema-name>.<table-name>. W tym artykule użyto tabel w schemacie samples.tpch , które zawierają dane z fikcyjnej firmy. Tabela customer zawiera informacje o klientach i orders zawiera informacje o zamówieniach złożonych przez tych klientów.
  • Użyj dbutils.fs.ls polecenia , aby eksplorować dane w programie /databricks-datasets. Użyj języka Spark SQL lub ramek danych do wykonywania zapytań dotyczących danych w tej lokalizacji przy użyciu ścieżek plików. Aby dowiedzieć się więcej o przykładowych danych dostarczanych przez usługę Databricks, zobacz Przykładowe zestawy danych.

Importowanie typów danych

Wiele operacji PySpark wymaga używania funkcji SQL lub interakcji z natywnymi typami platformy Spark. Możesz bezpośrednio zaimportować tylko te funkcje i typy, których potrzebujesz, lub zaimportować cały moduł.

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

Ponieważ niektóre zaimportowane funkcje mogą zastąpić wbudowane funkcje języka Python, niektórzy użytkownicy wybierają importowanie tych modułów przy użyciu aliasu. W poniższych przykładach pokazano wspólny alias używany w przykładach kodu platformy Apache Spark:

import pyspark.sql.types as T
import pyspark.sql.functions as F

Aby uzyskać pełną listę typów danych, zobacz Typy danych platformy Spark.

Aby uzyskać kompleksową listę funkcji SQL PySpark, zobacz Funkcje platformy Spark.

Tworzenie ramki danych

Istnieje kilka sposobów tworzenia ramki danych. Zazwyczaj ramka danych jest definiowana względem źródła danych, takiego jak tabela lub kolekcja plików. Następnie zgodnie z opisem w sekcji Podstawowe pojęcia dotyczące platformy Apache Spark użyj akcji, takiej jak display, aby wyzwolić przekształcenia do wykonania. Metoda display zwraca ramki danych.

Tworzenie ramki danych z określonymi wartościami

Aby utworzyć ramkę danych z określonymi wartościami, użyj createDataFrame metody , w której wiersze są wyrażane jako lista krotki:

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

Zwróć uwagę, że w danych wyjściowych typy danych kolumn funkcji df_children są automatycznie wnioskowane. Możesz też określić typy, dodając schemat. Schematy są definiowane przy użyciu StructType elementu, który składa się z StructFields tego, który określa nazwę, typ danych i flagę logiczną wskazującą, czy zawierają wartość null, czy nie. Musisz zaimportować typy danych z pyspark.sql.typesprogramu .

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Tworzenie ramki danych na podstawie tabeli w wykazie aparatu Unity

Aby utworzyć ramkę danych na podstawie tabeli w wykazie aparatu Unity, użyj table metody identyfikującej tabelę przy użyciu formatu <catalog-name>.<schema-name>.<table-name>. Kliknij pozycję Wykaz na lewym pasku nawigacyjnym, aby użyć Eksploratora wykazu, aby przejść do tabeli. Kliknij ją, a następnie wybierz pozycję Kopiuj ścieżkę tabeli, aby wstawić ścieżkę tabeli do notesu.

Poniższy przykład ładuje tabelę samples.tpch.customer, ale możesz też podać ścieżkę do własnej tabeli.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

Tworzenie ramki danych na podstawie przekazanego pliku

Aby utworzyć ramkę danych na podstawie pliku przekazanego do woluminów wykazu aparatu Unity, użyj read właściwości . Ta metoda zwraca DataFrameReaderelement , którego następnie można użyć do odczytania odpowiedniego formatu. Kliknij opcję wykazu na małym pasku bocznym po lewej stronie i znajdź plik za pomocą przeglądarki katalogu. Wybierz ją, a następnie kliknij pozycję Kopiuj ścieżkę pliku woluminu.

Poniższy przykład odczytuje z *.csv pliku, ale DataFrameReader obsługuje przekazywanie plików w wielu innych formatach. Zobacz Metody DataFrameReader.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Aby uzyskać więcej informacji na temat woluminów wykazu aparatu Unity, zobacz Co to są woluminy wykazu aparatu Unity?.

Tworzenie ramki danych na podstawie odpowiedzi JSON

Aby utworzyć ramkę danych na podstawie ładunku odpowiedzi JSON zwróconego przez interfejs API REST, użyj pakietu języka Python requests , aby wykonać zapytanie i przeanalizować odpowiedź. Należy zaimportować pakiet, aby go użyć. W tym przykładzie użyto danych z bazy danych aplikacji leków Stany Zjednoczone Food and Drug Administration.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Aby uzyskać informacje na temat pracy z danymi JSON i innymi częściowo ustrukturyzowanymi danymi w usłudze Databricks, zobacz Modelowanie częściowo ustrukturyzowanych danych.

Wybieranie pola lub obiektu JSON

Aby wybrać określone pole lub obiekt z przekonwertowanego kodu JSON, użyj [] notacji. Aby na przykład wybrać products pole, które jest tablicą produktów:

display(df_drugs.select(df_drugs["products"]))

Można również połączyć ze sobą wywołania metod, aby przejść przez wiele pól. Na przykład, aby wyświetlić nazwę marki pierwszego produktu w aplikacji narkotykowej:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

Tworzenie ramki danych na podstawie pliku

Aby zademonstrować tworzenie ramki danych na podstawie pliku, w tym przykładzie /databricks-datasets dane CSV są ładowane w katalogu.

Aby przejść do przykładowych zestawów danych, możesz użyć poleceń systemu plików Databricks Utilties . W poniższym przykładzie użyto dbutils polecenia , aby wyświetlić listę zestawów danych dostępnych w programie /databricks-datasets:

display(dbutils.fs.ls('/databricks-datasets'))

Alternatywnie możesz użyć %fs polecenia systemu plików interfejsu wiersza polecenia usługi Databricks, jak pokazano w poniższym przykładzie:

%fs ls '/databricks-datasets'

Aby utworzyć ramkę danych na podstawie pliku lub katalogu plików, określ ścieżkę w metodzie load :

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

Przekształcanie danych za pomocą ramek danych

Ramki danych ułatwiają przekształcanie danych przy użyciu wbudowanych metod sortowania, filtrowania i agregowania danych. Wiele przekształceń nie jest określonych jako metody w ramkach danych, ale zamiast tego są udostępniane w pakiecie spark.sql.functions . Zobacz Funkcje Spark SQL usługi Databricks.

Operacje na kolumnach

Platforma Spark udostępnia wiele podstawowych operacji na kolumnach:

Napiwek

Aby wyświetlić wszystkie kolumny w ramce danych, użyj polecenia columns, na przykład df_customer.columns.

Wybierz kolumny

Możesz wybrać określone kolumny przy użyciu elementów select i col. Funkcja col znajduje się w pyspark.sql.functions podmodule.

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

Możesz również odwołać się do kolumny, przy użyciu expr której wyrażenie jest zdefiniowane jako ciąg:

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

Można również użyć metody selectExpr, która akceptuje wyrażenia SQL:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

Aby wybrać kolumny przy użyciu literału ciągu, wykonaj następujące czynności:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

Aby jawnie wybrać kolumnę z określonej ramki danych, możesz użyć [] operatora lub . operatora. (Operator nie może służyć do wybierania . kolumn rozpoczynających się od liczby całkowitej lub tych, które zawierają spację lub znak specjalny). Może to być szczególnie przydatne w przypadku dołączania ramek danych, w których niektóre kolumny mają taką samą nazwę.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

Tworzenie kolumn

Aby utworzyć nową kolumnę, użyj withColumn metody . W poniższym przykładzie zostanie utworzona nowa kolumna zawierająca wartość logiczną na podstawie tego, czy saldo c_acctbal konta klienta przekracza wartość 1000:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

Zmienianie nazw kolumn

Aby zmienić nazwę kolumny, użyj withColumnRenamed metody , która akceptuje istniejące i nowe nazwy kolumn:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

Metoda jest szczególnie przydatna alias , gdy chcesz zmienić nazwy kolumn w ramach agregacji:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

Rzutowanie typów kolumn

W niektórych przypadkach możesz zmienić typ danych dla co najmniej jednej kolumny w ramce danych. W tym celu użyj cast metody , aby przekonwertować między typami danych kolumn. W poniższym przykładzie pokazano, jak przekonwertować kolumnę z liczby całkowitej na typ ciągu przy użyciu col metody w celu odwołania się do kolumny:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

Usuwanie kolumn

Aby usunąć kolumny, możesz pominąć kolumny podczas wybierania lub select(*) except użyć drop metody :

df_customer_flag_renamed.drop("balance_flag_renamed")

Jednocześnie można usunąć wiele kolumn:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

Operacje na wierszach

Platforma Spark udostępnia wiele podstawowych operacji wierszy:

Filtruj wiersze

Aby filtrować wiersze, użyj filter metody or where w ramce danych, aby zwrócić tylko niektóre wiersze. Aby zidentyfikować kolumnę do filtrowania, użyj col metody lub wyrażenia, które daje w wyniku kolumnę.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

Aby filtrować według wielu warunków, użyj operatorów logicznych. Można na przykład & odpowiednio włączyć AND OR i | warunki. Poniższy przykład filtruje wiersze, w których wartość c_nationkey jest równa 20 i c_acctbal jest większa niż 1000.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

Usuwanie zduplikowanych wierszy

Aby usunąć zduplikowane wiersze, użyj polecenia distinct, który zwraca tylko unikatowe wiersze.

df_unique = df_customer.distinct()

Obsługa wartości null

Aby obsłużyć wartości null, usuń wiersze zawierające wartości null przy użyciu na.drop metody . Ta metoda umożliwia określenie, czy chcesz usunąć wiersze zawierające any wartości null lub all wartości null.

Aby usunąć wszystkie wartości null, użyj jednego z poniższych przykładów.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

Jeśli zamiast tego chcesz odfiltrować tylko wiersze zawierające wszystkie wartości null, użyj następujących elementów:

df_customer_no_nulls = df_customer.na.drop("all")

Można to zastosować dla podzbioru kolumn, określając to, jak pokazano poniżej:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

Aby wypełnić brakujące wartości, użyj fill metody . Można to zastosować do wszystkich kolumn lub podzestawu kolumn. W poniższym przykładzie salda kont, które mają wartość null dla salda c_acctbal konta, są wypełnione wartością 0.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

Aby zastąpić ciągi innymi wartościami, użyj replace metody . W poniższym przykładzie wszystkie puste ciągi adresowe są zastępowane wyrazem UNKNOWN:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

Dołączanie wierszy

Aby dołączyć wiersze, należy użyć union metody w celu utworzenia nowej ramki danych. W poniższym przykładzie ramka danych utworzona wcześniej i df_filtered_customer połączona, która zwraca ramkę df_that_one_customer danych z trzema klientami:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Uwaga

Możesz również połączyć ramki danych, zapisując je w tabeli, a następnie dołączając nowe wiersze. W przypadku obciążeń produkcyjnych przyrostowe przetwarzanie źródeł danych w tabeli docelowej może znacząco zmniejszyć opóźnienia i koszty obliczeń w miarę wzrostu rozmiaru danych. Zobacz Pozyskiwanie danych do magazynu lakehouse usługi Databricks.

Sortowanie wierszy

Ważne

Sortowanie może być kosztowne na dużą skalę, a w przypadku przechowywania posortowanych danych i ponownego załadowania danych za pomocą platformy Spark kolejność nie jest gwarantowana. Upewnij się, że używasz sortowania celowo.

Aby posortować wiersze według co najmniej jednej kolumny, użyj sort metody lub orderBy . Domyślnie te metody są sortowane w kolejności rosnącej:

df_customer.orderBy(col("c_acctbal"))

Aby filtrować w kolejności malejącej, użyj polecenia desc:

df_customer.sort(col("c_custkey").desc())

W poniższym przykładzie pokazano, jak sortować dwie kolumny:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

Aby ograniczyć liczbę wierszy do zwrócenia po posortowania ramki danych, użyj limit metody . W poniższym przykładzie wyświetlane są tylko najlepsze 10 wyniki:

display(df_sorted.limit(10))

Dołączanie ramek danych

Aby dołączyć co najmniej dwie ramki danych, użyj join metody . Możesz określić, jak chcesz, aby ramki danych zostały połączone w parametrach how (typ sprzężenia) i on (na których kolumnach mają być oparte sprzężenie). Typowe typy sprzężenia obejmują:

  • inner: Jest to wartość domyślna typu sprzężenia, która zwraca ramkę danych, która przechowuje tylko wiersze, w których występuje dopasowanie parametru on w ramkach danych.
  • left: Przechowuje wszystkie wiersze pierwszej określonej ramki danych i tylko wiersze z drugiej określonej ramki danych, które mają dopasowanie do pierwszego.
  • outer: Sprzężenia zewnętrzne przechowuje wszystkie wiersze z obu ramek danych niezależnie od dopasowania.

Aby uzyskać szczegółowe informacje na temat sprzężeń, zobacz Praca z przyłączeniami w usłudze Azure Databricks. Aby uzyskać listę sprzężeń obsługiwanych w programie PySpark, zobacz DataFrame joins (Sprzężenia ramki danych).

Poniższy przykład zwraca pojedynczą ramkę danych, w której każdy wiersz orders ramki danych jest połączony z odpowiednim wierszem customers z ramki danych. Jest używane sprzężenie wewnętrzne, ponieważ oczekuje się, że każde zamówienie odpowiada dokładnie jednemu klientowi.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

Aby sprzężć w wielu warunkach, użyj operatorów logicznych, takich jak & i | , odpowiednio do określenia AND i OR. W poniższym przykładzie dodano dodatkowy warunek, filtrując tylko wiersze o o_totalprice wartości większej niż 500,000:

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

Agregowanie danych

Aby agregować dane w ramce danych, podobnie jak GROUP BY w języku SQL, użyj groupBy metody , aby określić kolumny do grupowania i agg metodę określania agregacji. Zaimportuj typowe agregacje, w tym avg, sum, maxi min z pyspark.sql.functions. W poniższym przykładzie przedstawiono średnią równowagę klientów według segmentu rynku:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

Niektóre agregacje to akcje, co oznacza, że wyzwalają obliczenia. W takim przypadku nie trzeba używać innych akcji do wyprowadzania wyników.

Aby zliczyć wiersze w ramce danych, użyj count metody :

df_customer.count()

Łączenie łańcuchów wywołań

Metody przekształcające ramki danych zwracają ramki danych, a platforma Spark nie wykonuje działań na przekształceniach do momentu wywołania akcji. Ta leniwa ocena oznacza, że można połączyć wiele metod dla wygody i czytelności. W poniższym przykładzie pokazano, jak połączyć filtrowanie, agregację i porządkowanie:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

Wizualizowanie ramki danych

Aby wizualizować ramkę danych w notesie, kliknij + znak obok tabeli w lewym górnym rogu ramki danych, a następnie wybierz pozycję Wizualizacja , aby dodać co najmniej jeden wykres na podstawie ramki danych. Aby uzyskać szczegółowe informacje na temat wizualizacji, zobacz Wizualizacje w notesach usługi Databricks.

display(df_order)

Aby wykonać dodatkowe wizualizacje, usługa Databricks zaleca korzystanie z interfejsu API biblioteki pandas dla platformy Spark. Element .pandas_api() umożliwia rzutowanie do odpowiedniego interfejsu API biblioteki pandas dla ramki danych platformy Spark. Aby uzyskać więcej informacji, zobacz Interfejs API biblioteki Pandas na platformie Spark.

Zapisywanie danych

Po przekształceniu danych można je zapisać przy użyciu DataFrameWriter metod . Pełną listę tych metod można znaleźć w narzędziu DataFrameWriter. W poniższych sekcjach pokazano, jak zapisać ramkę danych jako tabelę i jako kolekcję plików danych.

Zapisywanie ramki danych jako tabeli

Aby zapisać ramkę danych jako tabelę w katalogu aparatu Unity, użyj write.saveAsTable metody i określ ścieżkę w formacie <catalog-name>.<schema-name>.<table-name>.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

Zapisywanie ramki danych jako pliku CSV

Aby zapisać ramkę danych w celu *.csv formatowania, użyj write.csv metody , określając format i opcje. Domyślnie, jeśli dane istnieją na określonej ścieżce, operacja zapisu kończy się niepowodzeniem. Możesz określić jeden z następujących trybów, aby wykonać inną akcję:

  • overwrite zastępuje wszystkie istniejące dane w ścieżce docelowej zawartością ramki danych.
  • append Dołącza zawartość ramki danych do danych w ścieżce docelowej.
  • ignore dyskretnie kończy się niepowodzeniem zapisu, jeśli dane istnieją w ścieżce docelowej.

W poniższym przykładzie pokazano zastępowanie danych z zawartością ramki danych jako plikami CSV:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

Następne kroki

Aby wykorzystać więcej możliwości platformy Spark w usłudze Databricks, zobacz: