자습서: Apache Spark DataFrames를 사용하여 데이터 변환 및 정리
자습서를 통해 Azure Databricks에서 Apache Spark Python(PySpark) DataFrame API, Apache Spark Scala DataFrame API 또는 SparkR SparkDataFrame API를 사용하여 데이터를 로드하고 변환하는 방법을 알아봅니다.
이 자습서를 마치면 DataFrame이 무엇인지 이해하고 다음 작업에 익숙해집니다.
Python
- 변수 정의 및 공용 데이터를 Unity 카탈로그 볼륨에 복사
- Python으로 DataFrame 만들기
- CSV 파일에서 DataFrame으로 데이터 로드
- DataFrame으로 보기 및 상호 작용
- DataFrame 저장
- PySpark에서 SQL 쿼리 실행
Apache Spark PySpark API 참조도 참조하세요.
Scala
- 변수 정의 및 공용 데이터를 Unity 카탈로그 볼륨에 복사
- Scala로 DataFrame 만들기
- CSV 파일에서 DataFrame으로 데이터 로드
- DataFrame으로 데이터 보기 및 상호 작용
- DataFrame 저장
- Apache Spark에서 SQL 쿼리 실행
Apache Spark Scala API 참조도 확인하세요.
R
- 변수 정의 및 공용 데이터를 Unity 카탈로그 볼륨에 복사
- SparkR SparkDataFrames 만들기
- CSV 파일에서 DataFrame으로 데이터 로드
- DataFrame으로 보기 및 상호 작용
- DataFrame 저장
- Spark에서 SQL 쿼리 실행
Apache SparkR API 참조도 확인하세요.
데이터 프레임이란 무엇인가요?
DataFrame은 잠재적으로 서로 다른 형식의 열이 있는 2차원 레이블이 지정된 데이터 구조입니다. DataFrame은 스프레드시트, SQL 테이블 또는 계열 개체 사전이라고 생각하면 됩니다. Apache Spark DataFrames는 일반적인 데이터 분석 문제를 효율적으로 해결할 수 있는 다양한 함수 집합(열 선택, 필터, 조인, 집계)를 제공합니다.
Apache Spark DataFrames는 복원력 있는 분산 데이터 세트(RDD)를 기반으로 빌드된 추상화입니다. Spark DataFrames 및 Spark SQL은 통합 계획 및 최적화 엔진을 사용하므로 Azure Databricks에서 지원되는 모든 언어(Python, SQL, Scala 및 R)에서 거의 동일한 성능을 가져올 수 있습니다.
요구 사항
이 자습서를 완료하려면 다음 요구 사항을 충족해야 합니다.
이 자습서의 예제를 사용하려면 작업 영역에 Unity 카탈로그가 활성화되어 있어야 합니다.
이 자습서의 예제에서는 Unity 카탈로그 볼륨 을 사용하여 샘플 데이터를 저장합니다. 이러한 예제를 사용하려면 볼륨을 만들고 해당 볼륨의 카탈로그, 스키마 및 볼륨 이름을 사용하여 예제에서 사용하는 볼륨 경로를 설정합니다.
Unity 카탈로그에 다음 권한이 있어야 합니다.
- 이 자습서에 사용되는 볼륨의 경우
READ VOLUME
및WRITE VOLUME
또는ALL PRIVILEGES
. - 이 자습서에 사용되는 스키마의 경우
USE SCHEMA
또는ALL PRIVILEGES
. - 이 자습서에 사용되는 카탈로그의 경우
USE CATALOG
또는ALL PRIVILEGES
.
이러한 권한을 설정하려면 Databricks 관리자 또는 Unity 카탈로그 권한 및 보안 개체를 참조하세요.
- 이 자습서에 사용되는 볼륨의 경우
팁
이 문서의 완성된 Notebook은 DataFrame 자습서 Notebook을 참조하세요.
1단계: 변수 정의 및 CSV 파일 로드
이 단계에서는 이 자습서에서 사용할 변수를 정의한 다음, health.data.ny.gov로부터 아기 이름 데이터가 포함된 CSV 파일을 Unity 카탈로그 볼륨으로 로드합니다.
아이콘을 클릭하여 새 Notebook을 엽니다. Azure Databricks Notebook을 탐색하는 방법을 알아보려면 Databricks Notebook 인터페이스 및 컨트롤을 참조하세요.
다음 코드를 복사하고 새로운 빈 Notebook 셀에 붙여넣습니다.
<catalog-name>
,<schema-name>
,<volume-name>
을 Unity 카탈로그 볼륨의 카탈로그, 스키마 및 볼륨 이름으로 바꿉니다.<table_name>
을 원하는 테이블 이름으로 바꿉니다. 이 자습서의 뒷부분에서 이 테이블에 아기 이름 데이터를 로드합니다.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
셀을 실행하고 새 빈 셀을 만들려면
Shift+Enter
를 누릅니다.다음 코드를 복사하고 새로운 빈 Notebook 셀에 붙여넣습니다. 이 코드는 Databricks dbutuils 명령을 사용하여 health.data.ny.gov의
rows.csv
파일을 Unity 카탈로그 볼륨으로 복사합니다.Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
2단계: 기본 데이터 프레임 만들기
이 단계에서는 테스트 데이터를 사용하여 df1
로 명명된 DataFrame을 만든 다음 해당 내용을 표시합니다.
다음 코드를 복사하고 새로운 빈 Notebook 셀에 붙여넣습니다. 이 코드는 테스트 데이터를 사용하여 DataFrame을 만든 다음 DataFrame의 콘텐츠와 스키마를 표시합니다.
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
3단계: CSV 파일에서 DataFrame으로 데이터 로드
이 단계에서는 이전에 Unity 카탈로그 볼륨에 로드한 CSV 파일에서 df_csv
로 명명된 DataFrame을 만듭니다. spark.read.csv를 참조하세요.
다음 코드를 복사하고 새로운 빈 Notebook 셀에 붙여넣습니다. 이 코드는 CSV 파일에서 DataFrame
df_csv
에 아기 이름 데이터를 로드한 다음 DataFrame의 내용을 표시합니다.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
지원되는 다양한 파일 형식에서 데이터를 로드할 수 있습니다.
4단계: DataFrame로 데이터를 보고 상호 작용합니다
다음 방법을 사용하여 아기 이름 DataFrames를 보고 상호 작용합니다.
DataFrame 스키마 인쇄
Apache Spark DataFrame의 스키마를 표시하는 방법을 알아봅니다. Apache Spark는 스키마라는 용어를 사용하여 DataFrame에 있는 열의 이름과 데이터 형식을 나타냅니다.
참고 항목
Azure Databricks는 스키마라는 용어를 사용하여 카탈로그에 등록된 테이블 컬렉션을 설명하기도 합니다.
다음 코드를 복사한 후 빈 Notebook 셀에 붙여넣습니다. 이 코드는 두 DataFrame의 스키마를 보는
.printSchema()
메서드를 사용하여 DataFrames의 스키마를 보여 줍니다. 두 DataFrame을 통합할 준비를 합니다.Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
DataFrame에서 열 이름 바꾸기
DataFrame에서 열 이름을 바꾸는 방법을 알아봅니다.
다음 코드를 복사한 후 빈 Notebook 셀에 붙여넣습니다. 이 코드는
df1_csv
DataFrame의 열 이름을df1
DataFrame의 해당 열과 일치하도록 바꿉니다. 이 코드는 Apache SparkwithColumnRenamed()
메서드를 사용합니다.Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
DataFrames 결합
한 DataFrame의 행을 다른 DataFrame에 추가하는 새 DataFrame을 만드는 방법을 알아봅니다.
다음 코드를 복사한 후 빈 Notebook 셀에 붙여넣습니다. 이 코드는 Apache Spark
union()
메서드를 사용하여 첫 번째df
DataFrame의 콘텐츠를 CSV 파일에서 로드된 아기 이름 데이터가 포함된df_csv
DataFrame과 결합합니다.Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
DataFrame의 행 필터링
Apache Spark .filter()
또는 .where()
메서드를 사용하여 행을 필터링하여 데이터 집합에서 가장 인기 있는 아기 이름을 검색합니다. DataFrame에서 반환하거나 수정할 행의 하위 집합을 선택하려면 필터링을 사용합니다. 다음 예에서 볼 수 있듯이 성능이나 구문에는 차이가 없습니다.
.filter() 메서드 사용하기
다음 코드를 복사한 후 빈 Notebook 셀에 붙여넣습니다. 이 코드는 Apache Spark
.filter()
메서드를 사용하여 DataFrame에 50개 이상의 행을 표시합니다.Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
.where() 메서드 사용
다음 코드를 복사한 후 빈 Notebook 셀에 붙여넣습니다. 이 코드는 Apache Spark
.where()
메서드를 사용하여 DataFrame에 50개 이상의 행을 표시합니다.Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
DataFrame에서 열 선택 및 빈도별 순서 지정
반환할 DataFrame의 열을 지정하는 select()
메서드의 아기 이름 빈도에 대해 알아봅니다. Apache Spark orderby
및 desc
함수를 사용하여 결과를 정렬합니다.
Apache Spark용 pyspark.sql 모듈은 SQL 함수를 지원합니다. 이 자습서에서 사용하는 이러한 함수 중에는 Apache Spark orderBy()
, desc()
및 expr()
함수가 있습니다. 필요에 따라 이러한 함수를 세션으로 가져와 이러한 함수를 사용하도록 설정합니다.
다음 코드를 복사한 후 빈 Notebook 셀에 붙여넣습니다. 이 코드는
desc()
함수를 가져온 다음 Apache Sparkselect()
메서드와 Apache SparkorderBy()
및desc()
함수를 사용하여 가장 일반적인 이름과 개수를 내림차순으로 표시합니다.Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
새 하위 집합 DataFrame 만들기
기존 DataFrame에서 하위 집합 DataFrame을 만드는 방법을 알아봅니다.
다음 코드를 복사한 후 빈 Notebook 셀에 붙여넣습니다. 이 코드는 Apache Spark
filter
메서드를 사용하여 연도, 개수 및 성별별로 데이터를 제한하는 새 DataFrame을 만듭니다. Apache Sparkselect()
메서드를 사용하여 열을 제한합니다. 또한 Apache SparkorderBy()
및desc()
함수를 사용하여 새 DataFrame을 개수별로 정렬합니다.Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
5단계: DataFrame 저장
DataFrame을 저장하는 방법을 알아봅니다. DataFrame을 테이블에 저장하거나 데이터 프레임을 파일 또는 여러 파일에 쓸 수 있습니다.
테이블에 기능 데이터 저장
Azure Databricks는 기본적으로 모든 테이블에 대해 Delta Lake 형식을 사용합니다. DataFrame을 저장하려면 카탈로그 및 스키마에 대한 CREATE
테이블 권한이 있어야 합니다.
다음 코드를 복사한 후 빈 Notebook 셀에 붙여넣습니다. 이 코드는 이 자습서의 시작 부분에 정의한 변수를 사용하여 DataFrame의 내용을 테이블에 저장합니다.
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
대부분의 Apache Spark 애플리케이션은 대규모 데이터 집합 및 분산 방식으로 작동합니다. Apache Spark는 단일 파일이 아닌 파일 디렉터리를 작성합니다. Delta Lake는 Parquet 폴더와 파일을 분할합니다. 많은 데이터 시스템이 이러한 파일 디렉터리를 읽도록 구성되어 있습니다. Azure Databricks의 권장 사항에 따라 대부분의 애플리케이션에 파일 경로를 통해 테이블을 사용하는 것이 좋습니다.
JSON 파일에 DataFrame 저장
다음 코드를 복사한 후 빈 Notebook 셀에 붙여넣습니다. 이 코드는 DataFrame을 JSON 파일의 디렉터리에 저장합니다.
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
JSON 파일에서 DataFrame 읽기
Apache Spark spark.read.format()
메서드를 사용하여 디렉터리에서 DataFrame으로 JSON 데이터를 읽는 방법을 알아봅니다.
다음 코드를 복사한 후 빈 Notebook 셀에 붙여넣습니다. 이 코드는 이전 예제에서 저장한 JSON 파일을 표시합니다.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
추가 작업: PySpark, Scala 및 R에서 SQL 쿼리 실행
Apache Spark DataFrames는 SQL을 PySpark, Scala 및 R과 결합하는 다음 옵션을 제공합니다. 이 자습서에서 만든 것과 동일한 Notebook에서 다음 코드를 실행할 수 있습니다.
열을 SQL 쿼리로 지정
Apache Spark selectExpr()
메서드를 사용하는 방법을 알아봅니다. SQL 식을 수락하고 업데이트된 select()
DataFrame을 반환하는 메서드의 변형입니다. 이 메서드를 사용하면 upper
같은 SQL 식을 사용할 수 있습니다.
다음 코드를 복사한 후 빈 Notebook 셀에 붙여넣습니다. 이 코드는 Apache Spark
selectExpr()
메서드와 SQLupper
식을 사용하여 문자열 열을 대문자로 변환하고 열 이름을 바꿉니다.Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
열에 SQL 구문을 사용하는 데 expr()
사용
Apache Spark expr()
함수를 가져오고 이를 사용하여 열을 지정하는 모든 위치에서 SQL 구문을 사용하는 방법을 알아봅니다.
다음 코드를 복사한 후 빈 Notebook 셀에 붙여넣습니다. 이 코드는
expr()
함수를 가져온 다음 Apache Sparkexpr()
함수와 SQLlower
식을 사용하여 문자열 열을 소문자(및 열 이름 바꾸기)로 변환합니다.Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
spark.sql() 함수를 사용하여 임의 SQL 쿼리 실행
Apache Spark spark.sql()
함수를 사용하여 임의의 SQL 쿼리를 실행하는 방법을 알아봅니다.
다음 코드를 복사한 후 빈 Notebook 셀에 붙여넣습니다. 이 코드는 Apache Spark
spark.sql()
함수를 사용하여 SQL 구문을 사용하여 SQL 테이블을 쿼리합니다.Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
Shift+Enter
키를 눌러 셀을 실행하고 다음 셀로 이동합니다.
DataFrame 자습서 Notebooks
다음 Notebook에는 이 자습서의 예제 쿼리가 포함되어 있습니다.