Dela via


Självstudie: Läsa in och transformera data med Apache Spark DataFrames

Den här självstudien visar hur du läser in och transformerar data med apache Spark Python (PySpark) DataFrame API, Apache Spark Scala DataFrame API och SparkR SparkDataFrame API i Azure Databricks.

I slutet av den här självstudien kommer du att förstå vad en DataFrame är och känna till följande uppgifter:

Python

Se även Apache Spark PySpark API-referens.

Scala

Se även Apache Spark Scala API-referens.

R

Se även Apache SparkR API-referens.

Vad är en DataFrame?

En DataFrame är en tvådimensionell etiketterad datastruktur med columns av potentiellt olika typer. Du kan tänka dig en DataFrame som ett kalkylblad, en SQL-tableeller en ordlista med serieobjekt. Apache Spark DataFrames ger en omfattande set funktioner (selectcolumns, filter, join, aggregering) som gör att du kan lösa vanliga dataanalysproblem effektivt.

Apache Spark DataFrames är en abstraktion som bygger på Resilient Distributed Datasets (RDD). Spark DataFrames och Spark SQL använder en enhetlig planerings- och optimeringsmotor så att du kan get nästan identiska prestanda för alla språk som stöds på Azure Databricks (Python, SQL, Scala och R).

Krav

För att slutföra följande självstudie måste du uppfylla följande krav:

  • Om du vill använda exemplen i den här självstudien måste din arbetsyta ha Unity Catalog aktiverat.

  • Exemplen i den här handledningen använder en Unity Catalog-volym för att lagra exempeldata. Om du vill använda dessa exempel skapar du en volym och använder volymens catalog, schemaoch volymnamn för att set den volymsökväg som används i exemplen.

  • Du måste ha följande behörigheter i Unity Catalog:

    • READ VOLUME och WRITE VOLUME, eller ALL PRIVILEGES för volymen som används för den här självstudien.
    • USE SCHEMA eller ALL PRIVILEGES för den schema som används för den här handledningen.
    • USE CATALOG eller ALL PRIVILEGES för de catalog som används i denna handledning.

    Om du vill set dessa behörigheter, kontakta din Databricks-administratör eller Unity Catalog behörigheter och skyddsbara objekt.

Dricks

En slutförd notebook-fil för den här artikeln finns i Notebook-filer för DataFrame-självstudier.

Steg 1: Definiera variabler och läsa in CSV-fil

Det här steget definierar variabler för användning i denna tutorial och laddar sedan en CSV-fil innehållande babynamnsdata från health.data.ny.gov in i din Unity Catalog volym.

  1. Öppna en ny anteckningsbok genom att Ny ikon klicka på ikonen. Information om hur du navigerar i Notebook-filer för Azure Databricks finns i Databricks Notebook-gränssnitt och -kontroller.

  2. Kopiera och klistra in följande kod i den nya tomma notebook-cellen. Ersätt <catalog-name>, <schema-name>och <volume-name> med catalog, schemaoch volymnamnen för en Unity Catalog-volym. Ersätt <table_name> med ett table valfritt namn. Senare i den här handledningen kommer du att läsa in babynamndata i denna table.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Tryck Shift+Enter för att köra cellen och skapa en ny tom cell.

  4. Kopiera och klistra in följande kod i den nya tomma notebook-cellen. Den här koden kopierar rows.csv-filen från health.data.ny.gov till unity-Catalog-volymen med hjälp av kommandot Databricks dbutuils.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  5. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

Steg 2: Skapa en dataram

Det här steget skapar en DataFrame med namnet df1 med testdata och visar sedan dess innehåll.

  1. Kopiera och klistra in följande kod i den nya tomma notebook-cellen. Den här koden skapar DataFrame med testdata och visar sedan innehållet och schema i DataFrame.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

Steg 3: Läsa in data i en DataFrame från CSV-fil

Det här steget skapar en dataram med namnet df_csv från CSV-filen som du tidigare läste in i unity-Catalog volymen. Se spark.read.csv.

  1. Kopiera och klistra in följande kod i den nya tomma notebook-cellen. Den här koden läser in babynamnsdata i DataFrame df_csv från CSV-filen och visar sedan innehållet i DataFrame.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

Du kan läsa in data från många filformat som stöds.

Steg 4: Visa och interagera med din DataFrame

Visa och interagera med dina babynamn DataFrames med hjälp av följande metoder.

Lär dig hur du visar schema för en Apache Spark DataFrame. Apache Spark använder termen schema för att referera till namn och datatyper för columns i DataFrame.

Kommentar

Azure Databricks använder också termen schema för att beskriva en samling tables som är registrerade i en catalog.

  1. Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden visar schema för dina DataFrames med metoden .printSchema() för att visa scheman för de två dataramarna – för att förbereda för att förena de två dataramarna.

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

Byt namn på column i DataFrame

Lär dig hur du byter namn på en column i en DataFrame.

  1. Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden byter namn på en column i df1_csv DataFrame så att den matchar respektive column i df1 DataFrame. Den här koden använder Apache Spark-metoden withColumnRenamed() .

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    

    Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

Kombinera DataFrames

Lär dig hur du skapar en ny DataFrame som lägger till raderna i en DataFrame till en annan.

  1. Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden använder Apache Spark-metoden union() för att kombinera innehållet i din första DataFrame df med DataFrame df_csv som innehåller babynamndata som lästs in från CSV-filen.

    Python

    df = df1.union(df_csv)
    display(df)
    

    Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

Filtrera rader i en dataram

Identifiera de mest populära babynamnen i dina data set genom att filtrera rader med hjälp av Apache Spark-.filter() eller .where() metoder. Använd filtrering för att select en delmängd rader för att returnera eller ändra i en DataFrame. Det finns ingen skillnad i prestanda eller syntax, som du ser i följande exempel.

Använda metoden .filter()

  1. Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden använder Apache Spark-metoden .filter() för att visa dessa rader i DataFrame med ett antal på mer än 50.

    Python
    display(df.filter(df["Count"] > 50))
    
    Scala
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

Använda .where() metod

  1. Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden använder Apache Spark-metoden .where() för att visa dessa rader i DataFrame med ett antal på mer än 50.

    Python
    display(df.where(df["Count"] > 50))
    
    Scala
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

Select columns från en DataFrame och ordning efter frekvens

Lär dig om hur man använder metoden select() för att ange vilken frekvens av babynamn som columns i DataFrame ska returnera. Använd Apache Spark orderby och desc funktioner för att ordna resultatet.

Modulen pyspark.sql för Apache Spark ger stöd för SQL-funktioner. Bland de här funktionerna som vi använder i den här självstudien finns funktionerna Apache Spark orderBy(), desc()och expr() . Du aktiverar användningen av dessa funktioner genom att importera dem till sessionen efter behov.

  1. Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden importerar desc() funktionen och använder sedan Apache Spark-metoden select() och Apache Spark orderBy() och desc() funktioner för att visa de vanligaste namnen och deras antal i fallande ordning.

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

Skapa en delmängd av DataFrame

Lär dig hur du skapar en delmängd av DataFrame från en befintlig DataFrame.

  1. Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden använder Apache Spark-metoden filter för att skapa en ny DataFrame som begränsar data efter år, antal och kön. Apache Spark-metoden select() används för att limitcolumns. Den använder också Apache Spark orderBy() och desc() funktioner för att sortera den nya DataFrame efter antal.

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

Steg 5: Spara dataramen

Lär dig hur du sparar en DataFrame. Du kan antingen spara dataramen i en table eller skriva DataFrame till en fil eller flera filer.

Spara dataramen i en table

Azure Databricks använder Delta Lake-formatet för alla tables som standard. Om du vill spara dataramen måste du ha CREATEtable behörigheter för catalog och schema.

  1. Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden sparar innehållet i DataFrame till en table med hjälp av variabeln som du definierade i början av den här självstudien.

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

De flesta Apache Spark-program fungerar på stora datamängder och på ett distribuerat sätt. Apache Spark skriver ut en katalog med filer i stället för en enda fil. Delta Lake delar upp Parquet-mapparna och filerna. Många datasystem kan läsa dessa kataloger med filer. Azure Databricks rekommenderar att du använder tables över filsökvägar för de flesta program.

Spara DataFrame till JSON-filer

  1. Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden sparar DataFrame till en katalog med JSON-filer.

    Python

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

Läsa DataFrame från en JSON-fil

Lär dig hur du använder Apache Spark-metoden spark.read.format() för att läsa JSON-data från en katalog till en DataFrame.

  1. Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden visar JSON-filerna som du sparade i föregående exempel.

    Python

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

Ytterligare uppgifter: Köra SQL-frågor i PySpark, Scala och R

Apache Spark DataFrames innehåller följande alternativ för att kombinera SQL med PySpark, Scala och R. Du kan köra följande kod i samma notebook-fil som du skapade för den här självstudien.

Ange en column som en SQL-fråga

Lär dig hur du använder Apache Spark-metoden selectExpr() . Det här är en variant av metoden select() som accepterar SQL-uttryck och returnerar en uppdaterad DataFrame. Med den här metoden kan du använda ett SQL-uttryck, till exempel upper.

  1. Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden använder metoden Apache Spark selectExpr() och SQL upper-uttrycket för att konvertera en sträng column till versaler (och byta namn på column).

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

Använd expr() för att använda SQL-syntax för en column

Lär dig hur du importerar och använder funktionen Apache Spark expr() för att använda SQL-syntax överallt där en column skulle anges.

  1. Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden importerar funktionen expr() och använder sedan Apache Spark-funktionen expr() samt SQL-uttrycket lower för att konvertera en sträng column till gemener (och byta namn på column).

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

Köra en godtycklig SQL-fråga med hjälp av funktionen spark.sql()

Lär dig hur du använder Apache Spark-funktionen spark.sql() för att köra godtyckliga SQL-frågor.

  1. Kopiera och klistra in följande kod i en tom notebook-cell. Den här koden använder funktionen Apache Spark spark.sql() för att köra frågor mot en SQL-table med sql-syntax.

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Tryck Shift+Enter för att köra cellen och flytta sedan till nästa cell.

DataFrame-självstudier – notebook-filer

Följande notebook-filer innehåller exempelfrågor från den här självstudien.

Python

Självstudie om DataFrames med Python

Get anteckningsbok

Scala

Självstudie om DataFrames med Scala

Get anteckningsbok

R

Självstudie om DataFrames med R

Get anteckningsbok

Ytterligare resurser