Delen via


Zelfstudie: Gegevens laden en transformeren met Apache Spark DataFrames

In deze zelfstudie leert u hoe u gegevens laadt en transformeert met behulp van de DataFrame-API van Apache Spark (PySpark), de Apache Spark Scala DataFrame-API en de SparkR SparkDataFrame-API in Azure Databricks.

Aan het einde van deze zelfstudie begrijpt u wat een DataFrame is en vertrouwd bent met de volgende taken:

Python

Zie ook naslaginformatie over de Apache Spark PySpark-API.

Scala

Zie ook de Naslaginformatie over de Scala-API van Apache Spark.

R

Zie ook naslaginformatie over de Apache SparkR-API.

Wat is een DataFrame?

Een DataFrame is een tweedimensionale gelabelde gegevensstructuur met columns van mogelijk verschillende typen. U kunt een DataFrame beschouwen als een spreadsheet, een SQL-tableof een woordenlijst met reeksobjecten. Apache Spark DataFrames bieden een uitgebreide set van functies (selectcolumns, filter, join, aggregaties) waarmee u veelvoorkomende problemen met gegevensanalyse efficiënt kunt oplossen.

Apache Spark DataFrames zijn een abstractie die is gebouwd op RDD's (Resilient Distributed Datasets). Spark DataFrames en Spark SQL maken gebruik van een geïntegreerde plannings- en optimalisatie-engine, zodat u bijna identieke prestaties kunt get in alle ondersteunde talen in Azure Databricks (Python, SQL, Scala en R).

Vereisten

Als u de volgende zelfstudie wilt voltooien, moet u voldoen aan de volgende vereisten:

  • Als u de voorbeelden in deze handleiding wilt gebruiken, moet uw werkruimte Unity Catalog ingeschakeld zijn.

  • In de voorbeelden in deze zelfstudie wordt een Unity-Catalog-volume gebruikt om voorbeeldgegevens op te slaan. Als u deze voorbeelden wilt gebruiken, maakt u een volume en gebruikt u de catalogvan dat volume, schemaen volumenamen om het volumepad te set dat door de voorbeelden wordt gebruikt.

  • U moet de volgende machtigingen hebben in Unity Catalog:

    • READ VOLUME en WRITE VOLUME, of ALL PRIVILEGES voor het volume dat voor deze zelfstudie wordt gebruikt.
    • USE SCHEMA of ALL PRIVILEGES voor de schema die voor deze tutorial wordt gebruikt.
    • USE CATALOG of ALL PRIVILEGES voor de catalog die voor deze instructie wordt gebruikt.

    Als u deze machtigingen wilt set, raadpleegt u uw Databricks-beheerder of Unity-Catalog-bevoegdheden en beveiligbare objecten.

Tip

Zie DataFrame-zelfstudienotebooks voor een voltooid notebook voor dit artikel.

Stap 1: Variabelen definiëren en CSV-bestand laden

Deze stap definieert variabelen voor gebruik in deze zelfstudie en laadt vervolgens een CSV-bestand met babynaamgegevens van health.data.ny.gov in uw Unity Catalog volume.

  1. Open een nieuw notitieblok door op het Nieuw pictogram pictogram te klikken. Zie de Interface en besturingselementen voor Databricks-notebooks voor meer informatie over het navigeren in Azure Databricks-notebooks.

  2. Kopieer en plak de volgende code in de nieuwe lege notebookcel. Vervang <catalog-name>, <schema-name>en <volume-name> door de catalog, schemaen volumenamen voor een Unity-Catalog volume. Vervang <table_name> door een table naam van uw keuze. Later in deze handleiding zult u gegevens van babynamen in deze table laden.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Druk Shift+Enter om de cel uit te voeren en een nieuwe lege cel te maken.

  4. Kopieer en plak de volgende code in de nieuwe lege notebookcel. Met deze code kopieert u het rows.csv-bestand van health.data.ny.gov naar uw Unity Catalog volume met behulp van de opdracht Databricks dbutuils.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  5. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

Stap 2: Een DataFrame maken

Met deze stap maakt u een DataFrame met de naam df1 testgegevens en wordt vervolgens de inhoud ervan weergegeven.

  1. Kopieer en plak de volgende code in de nieuwe lege notebookcel. Met deze code maakt u het DataFrame met testgegevens en geeft u vervolgens de inhoud en de schema van het DataFrame weer.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

Stap 3: Gegevens in een DataFrame laden vanuit een CSV-bestand

Met deze stap maakt u een DataFrame met de naam df_csv van het CSV-bestand dat u eerder in uw Unity Catalog volume hebt geladen. Zie spark.read.csv.

  1. Kopieer en plak de volgende code in de nieuwe lege notebookcel. Met deze code worden babynaamgegevens vanuit het CSV-bestand in DataFrame df_csv geladen en wordt vervolgens de inhoud van het DataFrame weergegeven.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

U kunt gegevens laden uit een groot aantal ondersteunde bestandsindelingen.

Stap 4: Uw DataFrame weergeven en ermee werken

Bekijk en communiceer met uw babynamen DataFrames met behulp van de volgende methoden.

Meer informatie over het weergeven van de schema van een Apache Spark DataFrame. Apache Spark gebruikt de term schema om te verwijzen naar de namen en gegevenstypen van de columns in het DataFrame.

Notitie

Azure Databricks gebruikt ook de term schema om een verzameling tables te beschrijven die is geregistreerd bij een catalog.

  1. Kopieer en plak de volgende code in een lege notebookcel. Deze code toont de schema van uw DataFrames met de .printSchema()-methode om de schema's van de twee DataFrames weer te geven en voor te bereiden om de twee DataFrames te combineren.

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

Naam van column wijzigen in het DataFrame

Meer informatie over het wijzigen van de naam van een column in een DataFrame.

  1. Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt de naam van een column in het df1_csv DataFrame aangepast aan de respectieve column in het df1 DataFrame. Deze code maakt gebruik van de Apache Spark-methode withColumnRenamed() .

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    

    Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

DataFrames combineren

Leer hoe u een nieuw DataFrame maakt waarmee de rijen van het ene DataFrame aan het andere worden toegevoegd.

  1. Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode union() om de inhoud van uw eerste DataFrame df te combineren met DataFrame df_csv met de babynamen die zijn geladen vanuit het CSV-bestand.

    Python

    df = df1.union(df_csv)
    display(df)
    

    Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

Rijen filteren in een DataFrame

Ontdek de populairste babynamen in uw gegevens set door rijen te filteren met behulp van de Apache Spark-.filter() of .where() methoden. Filter gebruiken om een subset van rijen te select om een DataFrame terug te geven of te wijzigen. Er is geen verschil in prestaties of syntaxis, zoals te zien is in de volgende voorbeelden.

Methode .filter() gebruiken

  1. Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode .filter() om deze rijen in het DataFrame weer te geven met een telling van meer dan 50.

    Python
    display(df.filter(df["Count"] > 50))
    
    Scala
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

Gebruik de methodewhere()

  1. Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode .where() om deze rijen in het DataFrame weer te geven met een telling van meer dan 50.

    Python
    display(df.where(df["Count"] > 50))
    
    Scala
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

Select columns van een DataFrame en volgorde per frequentie

Ontdek welke babynaamfrequentie met de select()-methode moet worden opgegeven om de columns van het DataFrame te retourneren. Gebruik Apache Spark orderby en desc functies om de resultaten te ordenen.

De pyspark.sql-module voor Apache Spark biedt ondersteuning voor SQL-functies. Een van deze functies die we in deze zelfstudie gebruiken, zijn de Apache Spark orderBy()en desc()expr() functies. U schakelt het gebruik van deze functies in door ze indien nodig in uw sessie te importeren.

  1. Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt de desc() functie geïmporteerd en vervolgens de Apache Spark-methode select() en Apache Spark orderBy() en desc() functies gebruikt om de meest voorkomende namen en hun aantallen in aflopende volgorde weer te geven.

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

Een subset DataFrame maken

Meer informatie over het maken van een subset DataFrame op basis van een bestaand DataFrame.

  1. Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode filter om een nieuw DataFrame te maken dat de gegevens per jaar, aantal en geslacht beperkt. De Apache Spark-methode select() wordt gebruikt om de columnste limit. Het maakt ook gebruik van Apache Spark orderBy() en desc() functies om het nieuwe DataFrame te sorteren op aantal.

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

Stap 5: Het DataFrame opslaan

Meer informatie over het opslaan van een DataFrame. U kunt uw DataFrame opslaan in een table of het DataFrame naar een bestand of meerdere bestanden schrijven.

Sla het DataFrame op in een table

Azure Databricks maakt standaard gebruik van de Delta Lake-indeling voor alle tables. Als u uw DataFrame wilt opslaan, moet u CREATEtable bevoegdheden hebben voor de catalog en schema.

  1. Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt de inhoud van het DataFrame opgeslagen in een table met behulp van de variabele die u aan het begin van deze zelfstudie hebt gedefinieerd.

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

De meeste Apache Spark-toepassingen werken op grote gegevenssets en op gedistribueerde wijze. Apache Spark schrijft een map met bestanden uit in plaats van één bestand. Delta Lake splitst de Parquet-mappen en -bestanden. Veel gegevenssystemen kunnen deze mappen met bestanden lezen. Azure Databricks raadt het gebruik van tables aan via bestandspaden voor de meeste toepassingen.

Het DataFrame opslaan in JSON-bestanden

  1. Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt het DataFrame opgeslagen in een map met JSON-bestanden.

    Python

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

Het DataFrame lezen uit een JSON-bestand

Meer informatie over het gebruik van de Apache Spark-methode spark.read.format() om JSON-gegevens uit een map te lezen in een DataFrame.

  1. Kopieer en plak de volgende code in een lege notebookcel. Met deze code worden de JSON-bestanden weergegeven die u in het vorige voorbeeld hebt opgeslagen.

    Python

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

Aanvullende taken: SQL-query's uitvoeren in PySpark, Scala en R

Apache Spark DataFrames bieden de volgende opties om SQL te combineren met PySpark, Scala en R. U kunt de volgende code uitvoeren in hetzelfde notebook dat u voor deze zelfstudie hebt gemaakt.

Een column opgeven als een SQL-query

Meer informatie over het gebruik van de Apache Spark-methode selectExpr() . Dit is een variant van de select() methode die SQL-expressies accepteert en een bijgewerkt DataFrame retourneert. Met deze methode kunt u een SQL-expressie gebruiken, zoals upper.

  1. Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode selectExpr() en de SQL upper-expressie om een tekenreeks column te converteren naar hoofdletters (en de naam van de columnte wijzigen).

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

Gebruik expr() om sql-syntaxis te gebruiken voor een column

Meer informatie over het importeren en gebruiken van de Apache Spark-expr()-functie voor het gebruik van SQL-syntaxis overal waar een column wordt opgegeven.

  1. Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt de expr()-functie geïmporteerd en wordt vervolgens de Apache Spark-expr()-functie en de SQL-lower-expressie gebruikt om een tekenreeks column te converteren naar kleine letters (en de naam van de columnte wijzigen).

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

Een willekeurige SQL-query uitvoeren met behulp van de functie spark.sql()

Meer informatie over het gebruik van de Apache Spark-functie spark.sql() om willekeurige SQL-query's uit te voeren.

  1. Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-spark.sql()-functie om een query uit te voeren op een SQL-table met behulp van SQL-syntaxis.

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

Zelfstudienotebooks voor DataFrame

De volgende notebooks bevatten de voorbeeldenquery's uit deze zelfstudie.

Python

DataFrames-zelfstudie met Python

Get notebook

Scala

Zelfstudie over DataFrames met behulp van Scala

Get notitieboek

R

Zelfstudie over DataFrames met R

Get notitieboek

Aanvullende bronnen