데이터 프레임 수정 및 저장
Apache Spark는 데이터 프레임 개체를 데이터 작업을 위한 기본 구조로 제공합니다. 데이터 프레임을 사용하여 데이터를 쿼리 및 변환하고 결과를 데이터 레이크에 유지할 수 있습니다. 데이터 프레임에 데이터를 로드하려면 spark.read 함수를 사용하여 파일 형식, 경로, 선택적으로 읽을 데이터의 스키마를 지정합니다. 예를 들어 다음 코드는 orders 폴더의 모든 .csv 파일에서 order_details라는 데이터 프레임으로 데이터를 로드한 다음, 처음 5개의 레코드를 표시합니다.
order_details = spark.read.csv('/orders/*.csv', header=True, inferSchema=True)
display(order_details.limit(5))
데이터 구조 변환
원본 데이터를 데이터 프레임에 로드한 후 데이터 프레임 개체의 메서드 및 Spark 함수를 사용하여 변환할 수 있습니다. 일반적인 데이터 프레임 작업은 다음과 같습니다.
- 행 및 열 필터링
- 열 이름 바꾸기
- 기존 열에서 파생되는 새 열 만들기
- null 또는 기타 값 바꾸기
다음 예제에서 코드는 split
함수를 사용하여 CustomerName 열의 값을 FirstName 및 LastName이라는 두 개의 새 열로 분리합니다. 그런 다음, drop
메서드를 사용하여 원래 CustomerName 열을 삭제합니다.
from pyspark.sql.functions import split, col
# Create the new FirstName and LastName fields
transformed_df = order_details.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))
# Remove the CustomerName field
transformed_df = transformed_df.drop("CustomerName")
display(transformed_df.limit(5))
Spark SQL 라이브러리의 모든 기능을 사용하여 행 필터링, 열 파생, 제거, 이름 바꾸기, 기타 필요한 데이터 수정을 적용하여 데이터를 변환할 수 있습니다.
변환된 데이터 저장
dataFrame이 필요한 구조로 구성되어 있으면 데이터 레이크에서 지원되는 형식으로 결과를 저장할 수 있습니다.
다음 코드 예제에서는 dataFrame을 데이터 레이크의 parquet 파일에 저장하여 동일한 이름의 기존 파일을 대체합니다.
transformed_df.write.mode("overwrite").parquet('/transformed_data/orders.parquet')
print ("Transformed data saved!")
참고
Parquet 형식은 일반적으로 분석 저장소에 대한 추가 분석 또는 수집에 사용할 데이터 파일에 유용합니다. Parquet은 대부분의 대규모 데이터 분석 시스템에서 지원하는 매우 효율적인 형식입니다. 실제로 데이터 변환 요구 사항은 단순히 데이터를 다른 형식(예: CSV)에서 Parquet으로 변환하는 것일 수 있습니다.