Spark 데이터 프레임에서 데이터 사용

완료됨

기본적으로 Spark는 RDD(복원력 있는 분산 데이터 세트)라는 데이터 구조를 사용합니다. 하지만 RDD에서 직접 작동하는 코드를 작성할 수 있지만 Spark에서 정형 데이터를 사용하기 위해 가장 일반적으로 사용되는 데이터 구조는 Spark SQL 라이브러리의 일부로 제공되는 데이터 프레임입니다. Spark의 데이터 프레임은 유비쿼터스 Pandas Python 라이브러리의 데이터 프레임과 비슷하지만 Spark의 분산 처리 환경에서 작동하도록 최적화되었습니다.

참고

Spark SQL은 데이터 프레임 API 외에도 Java 및 Scala에서 지원되는 강력한 형식의 데이터 세트 API를 제공합니다. 본 모듈에서는 Dataframe API에 중점을 둡니다.

데이터 프레임에 데이터 로드

가상의 예제를 통해 데이터 프레임을 사용하여 데이터 작업을 수행할 수 있는 방법을 알아보겠습니다. 레이크하우스의 Files/data 폴더에 있는 products.csv라는 쉼표로 구분된 텍스트 파일에 다음 데이터가 있다고 가정합니다.

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

스키마 유추

Spark Notebook에서 다음 PySpark 코드를 사용하여 파일 데이터를 데이터 프레임에 로드하고 처음 10개 행을 표시할 수 있습니다.

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

처음 부분의 %%pyspark 줄을 매직이라고 하며 이 셀에 사용되는 언어가 PySpark임을 Spark에 알립니다. Notebook 인터페이스의 도구 모음에서 기본값으로 사용할 언어를 선택한 다음 매직을 사용하여 특정 셀에 대해 선택한 언어를 재정의할 수 있습니다. 예를 들어 제품 데이터 예제에 해당하는 Scala 코드는 다음과 같습니다.

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

매직 %%spark는 Scala를 지정하는 데 사용됩니다.

이러한 코드 샘플은 모두 다음과 같은 출력을 생성합니다.

ProductID ProductName 범주 ListPrice
771 Mountain-100 Silver, 38 산악용 자전거 3399.9900
772 Mountain-100 Silver, 42 산악용 자전거 3399.9900
773 Mountain-100 Silver, 44 산악용 자전거 3399.9900
... ... ... ...

명시적 스키마 지정

이전 예제에서 CSV 파일의 첫 번째 행에는 열 이름이 포함되어 있으며 Spark는 포함된 데이터에서 각 열의 데이터 형식을 유추할 수 있었습니다. 데이터에 대한 명시적 스키마를 지정할 수도 있습니다. 이 스키마는 다음 CSV 예제와 같이 열 이름이 데이터 파일에 포함되지 않은 경우에 유용합니다.

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

다음 PySpark 예제에서는 이 형식의 product-data.csv 파일에서 로드할 데이터 프레임에 대한 스키마를 지정하는 방법을 보여 줍니다.

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

이번에도 결과는 다음과 유사합니다.

ProductID ProductName 범주 ListPrice
771 Mountain-100 Silver, 38 산악용 자전거 3399.9900
772 Mountain-100 Silver, 42 산악용 자전거 3399.9900
773 Mountain-100 Silver, 44 산악용 자전거 3399.9900
... ... ... ...

명시적 스키마를 지정하면 성능도 향상됩니다.

데이터 프레임 필터링 및 그룹화

Dataframe 클래스의 메서드를 사용하여 포함된 데이터를 필터링, 정렬, 그룹화 또는 조작할 수 있습니다. 예를 들어 다음 코드 예제에서는 select 메서드를 사용하여 이전 예제의 제품 데이터를 포함하는 df 데이터 프레임에서 ProductIDListPrice 열을 검색합니다.

pricelist_df = df.select("ProductID", "ListPrice")

이 코드 예제의 결과는 다음과 같습니다.

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

대부분의 데이터 조작 메서드와 공통적으로 select는 새 데이터 프레임 개체를 반환합니다.

데이터 프레임에서 열의 하위 집합을 선택하는 것은 일반적인 작업이며 다음과 같은 짧은 구문을 사용하여 수행할 수도 있습니다.

pricelist_df = df["ProductID", "ListPrice"]

메서드를 함께 “연결”하여 변환된 데이터 프레임을 생성하는 일련의 조작을 수행할 수 있습니다. 예를 들어 이 예제 코드는 selectwhere 메서드를 연결하여 Mountain Bikes 또는 Road Bikes 카테고리의 ProductNameListPrice 열을 포함하는 새 데이터 프레임을 만듭니다.

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

이 코드 예제의 결과는 다음과 같습니다.

ProductName 범주 ListPrice
Mountain-100 Silver, 38 산악용 자전거 3399.9900
Road-750 Black, 52 도로용 자전거 539.9900
... ... ...

데이터를 그룹화하고 집계하려면 groupBy 메서드 및 집계 함수를 사용할 수 있습니다. 예를 들어 다음 PySpark 코드는 각 카테고리의 제품 수를 계산합니다.

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

이 코드 예제의 결과는 다음과 같습니다.

범주 개수
헤드세트 3
바퀴 14
산악용 자전거 32
... ...

데이터 프레임 저장

종종 Spark를 사용하여 원시 데이터를 변환하고 추가 분석 또는 다운스트림 처리를 위해 결과를 저장하려고 합니다. 다음 코드 예제에서는 dataFrame을 데이터 레이크의 parquet 파일에 저장하여 동일한 이름의 기존 파일을 대체합니다.

bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')

참고

Parquet 형식은 일반적으로 분석 저장소에 대한 추가 분석 또는 수집에 사용할 데이터 파일에 유용합니다. Parquet은 대부분의 대규모 데이터 분석 시스템에서 지원하는 매우 효율적인 형식입니다. 실제로 데이터 변환 요구 사항은 단순히 데이터를 다른 형식(예: CSV)에서 Parquet으로 변환하는 것일 수 있습니다.

출력 파일 분할

분할은 Spark가 작업자 노드에서 성능을 극대화할 수 있도록 하는 최적화 기술입니다. 불필요한 디스크 IO를 제거하여 쿼리에서 데이터를 필터링할 때 성능을 더 많이 향상할 수 있습니다.

데이터 프레임을 분할된 파일 집합으로 저장하려면 데이터를 작성할 때 partitionBy 메서드를 사용합니다. 다음 예제에서는 bikes_df 데이터 프레임(산악용 자전거 및 도로용 자전거 범주에 대한 제품 데이터 포함)을 저장하고 범주별로 데이터를 분할합니다.

bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")

데이터 프레임을 분할할 때 생성된 폴더 이름에는 column=value 형식의 분할 열 이름과 값이 포함되므로 코드 예제에서는 다음 하위 폴더가 포함된 bike_data 폴더를 만듭니다.

  • Category=Mountain Bikes
  • Category=Road Bikes

각 하위 폴더에는 해당 범주에 대한 제품 데이터가 포함된 하나 이상의 parquet 파일이 포함됩니다.

참고

데이터를 여러 열로 분할하여 각 분할 키에 대한 폴더 계층 구조를 만들 수 있습니다. 예를 들어 폴더 계층 구조에 각 연도 값에 대한 폴더가 포함된 다음, 이 폴더에 각 월 값에 대한 하위 폴더가 포함되도록 판매 주문 데이터를 연도 및 월별로 분할할 수 있습니다.

분할된 데이터 로드

분할된 데이터를 데이터 프레임으로 읽을 때 분할된 필드의 명시적 값 또는 와일드카드를 지정하여 계층 구조 내의 폴더에서 데이터를 로드할 수 있습니다. 다음 예제에서는 Road Bikes 범주의 제품에 대한 데이터를 로드합니다.

road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))

참고

파일 경로에 지정된 분할 열은 결과 데이터 프레임에서 생략됩니다. 예제 쿼리에서 생성된 결과에는 Category 열이 포함되지 않습니다. 모든 행의 범주는 Road Bikes여야 합니다.