Använda Spark för att arbeta med datafiler
En av fördelarna med att använda Spark är att du kan skriva och köra kod på olika programmeringsspråk, så att du kan använda de programmeringskunskaper du redan har och använda det lämpligaste språket för en viss uppgift. Standardspråket i en ny Azure Databricks Spark-notebook-fil är PySpark – en Spark-optimerad version av Python, som ofta används av dataforskare och analytiker på grund av dess starka stöd för datamanipulering och visualisering. Dessutom kan du använda språk som Scala (ett Java-härlett språk som kan användas interaktivt) och SQL (en variant av det vanliga SQL-språket som ingår i Spark SQL-biblioteket för att arbeta med relationsdatastrukturer). Programvarutekniker kan också skapa kompilerade lösningar som körs på Spark med hjälp av ramverk som Java.
Utforska data med dataramar
Inbyggt använder Spark en datastruktur som kallas en elastisk distribuerad datamängd (RDD), men även om du kan skriva kod som fungerar direkt med RDD:er är den vanligaste datastrukturen för att arbeta med strukturerade data i Spark dataramen, som tillhandahålls som en del av Spark SQL-biblioteket. Dataramar i Spark liknar dem i det allestädes närvarande Pandas Python-biblioteket, men optimerade för att fungera i Sparks distribuerade bearbetningsmiljö.
Kommentar
Utöver Dataframe-API:et tillhandahåller Spark SQL ett starkt skrivet API för datauppsättningar som stöds i Java och Scala. Vi fokuserar på Dataframe-API:et i den här modulen.
Läsa in data i en dataram
Nu ska vi utforska ett hypotetiskt exempel för att se hur du kan använda en dataram för att arbeta med data. Anta att du har följande data i en kommaavgränsad textfil med namnet products.csv i datamappen i dbfs-lagringen (Databricks File System):
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
I en Spark-notebook-fil kan du använda följande PySpark-kod för att läsa in data i en dataram och visa de första 10 raderna:
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Raden %pyspark
i början kallas för en magi och talar om för Spark att språket som används i den här cellen är PySpark. Här är motsvarande Scala-kod för produktdataexemplet:
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
%spark
Magin används för att ange Scala.
Dricks
Du kan också välja det språk som du vill använda för varje cell i notebook-gränssnittet.
Båda exemplen som visades tidigare skulle generera utdata så här:
ProductID | ProductName | Kategori | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountainbikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountainbikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountainbikes | 3399.9900 |
... | ... | ... | ... |
Ange ett dataramschema
I föregående exempel innehöll den första raden i CSV-filen kolumnnamnen och Spark kunde härleda datatypen för varje kolumn från de data som den innehåller. Du kan också ange ett explicit schema för data, vilket är användbart när kolumnnamnen inte ingår i datafilen, som det här CSV-exemplet:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Följande PySpark-exempel visar hur du anger ett schema för dataramen som ska läsas in från en fil med namnet product-data.csv i det här formatet:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Resultatet skulle återigen likna:
ProductID | ProductName | Kategori | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountainbikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountainbikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountainbikes | 3399.9900 |
... | ... | ... | ... |
Filtrera och gruppera dataramar
Du kan använda metoderna i klassen Dataframe för att filtrera, sortera, gruppera och på annat sätt ändra de data som den innehåller. I följande kodexempel används till exempel metoden select för att hämta kolumnerna ProductName och ListPrice från df-dataramen som innehåller produktdata i föregående exempel:
pricelist_df = df.select("ProductID", "ListPrice")
Resultatet från det här kodexemplet skulle se ut ungefär så här:
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
Gemensamt med de flesta metoder för datamanipulering är att välja returnerar ett nytt dataramobjekt.
Dricks
Att välja en delmängd av kolumner från en dataram är en vanlig åtgärd, som också kan uppnås med hjälp av följande kortare syntax:
pricelist_df = df["ProductID", "ListPrice"]
Du kan "länka" metoder tillsammans för att utföra en serie manipulationer som resulterar i en transformerad dataram. Den här exempelkoden kedjar till exempel urval och var metoder för att skapa en ny dataram som innehåller kolumnerna ProductName och ListPrice för produkter med en kategori av Mountain Bikes eller Road Bikes:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Resultatet från det här kodexemplet skulle se ut ungefär så här:
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 Svart, 52 | 539.9900 |
... | ... |
Om du vill gruppera och aggregera data kan du använda groupBy-metoden och aggregerade funktioner. Följande PySpark-kod räknar till exempel antalet produkter för varje kategori:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Resultatet från det här kodexemplet skulle se ut ungefär så här:
Kategori | antal |
---|---|
Headset | 3 |
Hjul | 14 |
Mountainbikes | 32 |
... | ... |
Använda SQL-uttryck i Spark
Dataframe-API:et är en del av ett Spark-bibliotek med namnet Spark SQL, som gör det möjligt för dataanalytiker att använda SQL-uttryck för att fråga efter och manipulera data.
Skapa databasobjekt i Spark-katalogen
Spark-katalogen är ett metaarkiv för relationsdataobjekt som vyer och tabeller. Spark-körningen kan använda katalogen för att sömlöst integrera kod som skrivits på ett Spark-språk med SQL-uttryck som kan vara mer naturligt för vissa dataanalytiker eller utvecklare.
Ett av de enklaste sätten att göra data i en dataram tillgängliga för frågor i Spark-katalogen är att skapa en tillfällig vy, som du ser i följande kodexempel:
df.createOrReplaceTempView("products")
En vy är tillfällig, vilket innebär att den tas bort automatiskt i slutet av den aktuella sessionen. Du kan också skapa tabeller som finns kvar i katalogen för att definiera en databas som kan efterfrågas med Spark SQL.
Kommentar
Vi kommer inte att utforska Spark-katalogtabeller på djupet i den här modulen, men det är värt att ta sig tid att lyfta fram några viktiga punkter:
- Du kan skapa en tom tabell med hjälp
spark.catalog.createTable
av metoden . Tabeller är metadatastrukturer som lagrar sina underliggande data på lagringsplatsen som är associerad med katalogen. Om du tar bort en tabell tas även dess underliggande data bort. - Du kan spara en dataram som en tabell med hjälp av dess
saveAsTable
metod. - Du kan skapa en extern tabell med hjälp
spark.catalog.createExternalTable
av metoden . Externa tabeller definierar metadata i katalogen men hämtar sina underliggande data från en extern lagringsplats. vanligtvis en mapp i en datasjö. Om du tar bort en extern tabell tas inte underliggande data bort.
Använda Spark SQL API för att fråga efter data
Du kan använda Spark SQL API i kod som skrivits på valfritt språk för att fråga efter data i katalogen. Följande PySpark-kod använder till exempel en SQL-fråga för att returnera data från produktvyn som en dataram.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Resultatet från kodexemplet skulle se ut ungefär som i följande tabell:
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 Svart, 52 | 539.9900 |
... | ... |
Använda SQL-kod
I föregående exempel visades hur du använder Spark SQL API för att bädda in SQL-uttryck i Spark-kod. I en notebook-fil kan du också använda magin %sql
för att köra SQL-kod som frågar objekt i katalogen, så här:
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
SQL-kodexemplet returnerar en resultatuppsättning som visas automatiskt i notebook-filen som en tabell, som den nedan:
Kategori | ProductCount |
---|---|
Bib-Shorts | 3 |
Cykelställ | 1 |
Cykelställ | 1 |
... | ... |