Get začala používat COPY INTO na načtení dat
Příkaz COPY INTO
SQL umožňuje načíst data z umístění souboru do Delta table. Jedná se o re-triable a idempotentní operace; soubory ve zdrojovém umístění, které již byly načteny, se přeskočí.
COPY INTO
nabízí následující možnosti:
- Snadno konfigurovatelné filtry souborů nebo adresářů z cloudového úložiště, včetně S3, ADLS Gen2, ABFS, GCS a Unity Catalogvolumes.
- Podpora více zdrojových formátů souborů: CSV, JSON, XML, Avro, ORC, Parquet, text a binární soubory
- Zpracování souboru přesně jednou (idempotentní) ve výchozím nastavení
- Zaměřte se na odvozování, mapování, slučování a vývoj pro tableschema.
Poznámka:
Pro škálovatelnější a robustnější zážitek při příjmu souborů Databricks doporučuje, aby uživatelé SQL využívali streamingové techniky tables. Viz Načtení dat pomocí streamování tables v Databricks SQL.
Upozorňující
COPY INTO
respektuje nastavení pracovního prostoru pro vektory odstranění. Pokud je tato možnost povolená, jsou v cílovém table povolené vektory odstranění, když COPY INTO
běží na SQL Warehouse nebo ve výpočetním prostředí Databricks Runtime 14.0 nebo novějším. Po povolení vektory odstranění blokují dotazy na table v Databricks Runtime 11.3 LTS a starších verzích. Podívejte se , co jsou vektory odstranění? a vektory automatického povolení odstranění.
Požadavky
Správce účtu musí postupovat podle kroků v části Konfigurace přístupu k datům pro příjem dat a nakonfigurovat přístup k datům v cloudovém úložišti objektů, aby uživatelé mohli načítat data pomocí COPY INTO
.
Příklad: Načtení dat do schemaless Delta Lake table
Poznámka:
Tato funkce je dostupná ve verzi Databricks Runtime 11.3 LTS a vyšší.
Prázdný zástupný symbol Delta tables můžete vytvořit tak, aby schema byl později odvozen během příkazu COPY INTO
nastavením mergeSchema
na true
v COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Výše uvedený příkaz SQL je idempotentní a lze ho naplánovat tak, aby data byla přesně jednou vložena do Delta table.
Poznámka:
Delta table je prázdná a nelze ji použít mimo COPY INTO
.
INSERT INTO
a MERGE INTO
nejsou podporovány pro zápis dat do tablesDelta bez schématu . Po vložení dat do table s COPY INTO
se table stane dotazovatelným.
Viz Vytvořte cíl tables pro COPY INTO.
Příklad: Setschema a načtení dat do Delta Lake table
Následující příklad ukazuje, jak vytvořit table Delta a pak pomocí příkazu COPY INTO
SQL načíst ukázková data z datových sad Databricks do table. Ukázkový kód Pythonu, R, Scaly nebo SQL můžete spustit z poznámkového bloku připojeného ke clusteru Azure Databricks. Kód SQL můžete také spustit z dotazu přidruženého k SQL Warehouse v Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Pokud chcete vyčistit, spusťte následující kód, který odstraní table:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Vyčištění souborů metadat
Spuštěním VACUUM můžete vyčistit neodkazované soubory metadat vytvořené COPY INTO
v Databricks Runtime 15.2 a vyšší.
Reference
- Databricks Runtime 7.x a novější: COPY INTO
Další materiály
Načtení dat pomocí COPY INTO v Unity Catalogvolumes nebo v externích umístěních
Běžné vzory použití, včetně příkladů více operací
COPY INTO
se stejným rozlišovacím Delta table, naleznete v části Běžné vzory načítání dat pomocí COPY INTO.