Aan de slag met COPY INTO om gegevens te laden
Met de COPY INTO
SQL-opdracht kunt u gegevens van een bestandslocatie laden in een Delta-tabel. Dit is een re-triable en idempotente bewerking; bestanden in de bronlocatie die al zijn geladen, worden overgeslagen.
COPY INTO
biedt de volgende mogelijkheden:
- Eenvoudig configureerbare bestands- of mapfilters uit cloudopslag, waaronder S3-, ADLS Gen2-, ABFS-, GCS- en Unity Catalog-volumes.
- Ondersteuning voor meerdere bronbestandsindelingen: CSV-, JSON-, XML-, Avro-, ORC-, Parquet-, tekst- en binaire bestanden
- Exactly-once (idempotent) bestandsverwerking standaard
- Schemadeductie van doeltabellen, toewijzing, samenvoegen en evolutie
Notitie
Voor een meer schaalbare en robuuste ervaring voor bestandsopname raadt Databricks aan dat SQL-gebruikers gebruikmaken van streamingtabellen. Zie Gegevens laden met behulp van streamingtabellen in Databricks SQL.
Waarschuwing
COPY INTO
respecteert de werkruimte-instelling voor verwijderingsvectoren. Indien ingeschakeld, worden verwijderingsvectoren ingeschakeld in de doeltabel wanneer ze COPY INTO
worden uitgevoerd op een SQL Warehouse of rekenproces waarop Databricks Runtime 14.0 of hoger wordt uitgevoerd. Als dit is ingeschakeld, blokkeren verwijderingsvectoren query's voor een tabel in Databricks Runtime 11.3 LTS en hieronder. Zie Wat zijn verwijderingsvectoren? en verwijdervectoren automatisch inschakelen.
Vereisten
Een accountbeheerder moet de stappen volgen in Gegevenstoegang configureren voor opname om de toegang tot gegevens in de opslag van cloudobjecten te configureren voordat gebruikers gegevens kunnen laden met behulp van COPY INTO
.
Voorbeeld: Gegevens laden in een schemaloze Delta Lake-tabel
Notitie
Deze functie is beschikbaar in Databricks Runtime 11.3 LTS en hoger.
U kunt lege tijdelijke aanduidingen voor Delta-tabellen maken, zodat het schema later wordt afgeleid tijdens een COPY INTO
opdracht door in te stellen mergeSchema
op true
COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
De bovenstaande SQL-instructie is idempotent en kan worden gepland om gegevens exact één keer op te nemen in een Delta-tabel.
Notitie
De lege Delta-tabel is niet bruikbaar buiten COPY INTO
. INSERT INTO
en MERGE INTO
worden niet ondersteund voor het schrijven van gegevens in schemaloze Delta-tabellen. Nadat gegevens in de tabel zijn ingevoegd, COPY INTO
kan de tabel query's uitvoeren.
Zie Doeltabellen maken voor COPY INTO.
Voorbeeld: Schema instellen en gegevens laden in een Delta Lake-tabel
In het volgende voorbeeld ziet u hoe u een Delta-tabel maakt en vervolgens de COPY INTO
SQL-opdracht gebruikt om voorbeeldgegevens uit Databricks-gegevenssets in de tabel te laden. U kunt de voorbeeldcode Python, R, Scala of SQL uitvoeren vanuit een notebook dat is gekoppeld aan een Azure Databricks-cluster. U kunt de SQL-code ook uitvoeren vanuit een query die is gekoppeld aan een SQL-warehouse in Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Voer de volgende code uit om de tabel op te schonen:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Metagegevensbestanden opschonen
U kunt VACUUM uitvoeren om niet-deductie van metagegevensbestanden op te schonen die zijn gemaakt in COPY INTO
Databricks Runtime 15.2 en hoger.
Verwijzing
- Databricks Runtime 7.x en hoger: COPY INTO
Aanvullende bronnen
Gegevens laden met COPY INTO met Unity Catalog-volumes of externe locaties
Zie Algemene patronen voor het laden van gegevens met behulp van COPY INTO voor veelvoorkomende gebruikspatronen, waaronder voorbeelden van meerdere
COPY INTO
bewerkingen in dezelfde Delta-tabel.