COPY INTO 사용하여 데이터 로드 시작
COPY INTO
SQL 명령을 사용하면 파일 위치에서 델타 테이블로 데이터를 로드할 수 있습니다. 이는 재시도 가능하며 멱등성인 작업입니다. 이미 로드된 원본 위치의 파일은 건너뜁니다.
COPY INTO
에서는 다음과 같은 기능을 제공합니다.
- S3, ADLS Gen2, ABFS, GCS 및 Unity 카탈로그 볼륨을 포함하여 클라우드 스토리지에서 쉽게 구성할 수 있는 파일 또는 디렉터리 필터입니다.
- CSV, JSON, XML, Avro, ORC, Parquet, 텍스트 및 이진 파일 등 여러 소스 파일 형식 지원
- 기본적으로 정확히 한 번(idempotent) 파일 처리
- 대상 테이블 스키마 유추, 매핑, 병합 및 진화
참고 항목
더 확장 가능하고 강력한 파일 수집 환경을 위해 Databricks는 SQL 사용자가 스트리밍 테이블을 활용하는 것이 좋습니다. Databricks SQL에서 스트리밍 테이블을 사용하여 데이터를 로드하는 방법에 대한 내용은 을 참조하세요.
Warning
COPY INTO
는 삭제 벡터에 대한 작업 영역 설정을 적용합니다. 사용하도록 설정하면 SQL 웨어하우스에서 COPY INTO
실행되거나 Databricks Runtime 14.0 이상을 실행하는 컴퓨팅이 실행될 때 대상 테이블에서 삭제 벡터가 활성화됩니다. 사용하도록 설정되면 삭제 벡터는 Databricks Runtime 11.3 LTS 이하의 테이블에 대한 쿼리를 차단합니다. 삭제 벡터란? 및 자동 사용 삭제 벡터를 참조하세요.
요구 사항
계정 관리자는 수집을 위한 데이터 액세스 구성의 단계를 따라 클라우드 개체 스토리지의 데이터에 대한 액세스를 구성해야 사용자가 데이터를 COPY INTO
로드할 수 있습니다.
예: 스키마 없는 Delta Lake 테이블에 데이터 로드
참고 항목
이 기능은 Databricks Runtime 11.3 LTS 이상에서 사용할 수 있습니다.
COPY_OPTIONS
에서 mergeSchema
을 true
로 설정하여 COPY INTO
명령을 통해 스키마가 나중에 유추되도록 빈 자리 표시자 델타 테이블을 만들 수 있습니다.
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
위의 SQL 문은 idempotent이며 델타 테이블에 정확히 한 번 데이터를 수집하도록 실행되도록 예약할 수 있습니다.
참고 항목
빈 델타 테이블은 COPY INTO
외부에서 사용할 수 없습니다.
INSERT INTO
및 MERGE INTO
스키마 없는 델타 테이블에 데이터를 쓰는 것은 지원되지 않습니다. 데이터가 COPY INTO
테이블에 삽입되면 테이블을 쿼리할 수 있게 됩니다.
을 참조하세요. COPY INTO대상 테이블을 만드십시오.
예: 스키마 설정 및 Delta Lake 테이블에 데이터 로드
다음 예제에서는 델타 테이블을 만든 다음
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
정리하려면 다음 코드를 실행하여 테이블을 삭제합니다.
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
메타데이터 파일 정리
VACUUM 실행하여 Databricks Runtime 15.2 이상에서 COPY INTO
만든 참조되지 않은 메타데이터 파일을 정리할 수 있습니다.
참조
- Databricks Runtime 7.x 이상: COPY INTO
추가 리소스
Unity 카탈로그 볼륨 또는 외부 위치COPY INTO 사용하여 데이터 로드
서비스 주체를 사용하여
으로 데이터 로드 동일한 델타 테이블에 대한 여러
작업의 예제를 비롯한 일반적인 사용 패턴은 사용하여 공통 데이터 로드 패턴을 참조하세요.