Erste Schritte mit COPY INTO zum Laden von Daten
Mit dem SQL-Befehl COPY INTO
können Sie Daten aus einem Dateispeicherort in eine Delta-Tabelle laden. Hierbei handelt es sich um einen wiederholbaren und idempotenten Vorgang; Dateien am Quellspeicherort, die bereits geladen wurden, werden übersprungen.
COPY INTO
bietet die folgenden Funktionen:
- Einfache konfigurierbare Datei- oder Verzeichnisfilter aus dem Cloudspeicher, einschließlich S3, ADLS Gen2, ABFS, GCS und Unity Catalog-Volumes.
- Unterstützung für mehrere Quelldateiformate: CSV, JSON, XML, Avro, ORC, Parquet, Text und Binärdateien
- Standardmäßige Dateiverarbeitung genau einmal (idempotent)
- Zieltabellenschemarückschluss, Zuordnung, Zusammenführung und Entwicklung
Hinweis
Für eine skalierbarere und robustere Dateierfassung empfiehlt Databricks SQL-Benutzer*innen die Verwendung von Streamingtabellen. Weitere Informationen finden Sie unter Laden von Daten mithilfe von Streamingtabellen in Databricks SQL.
Warnung
COPY INTO
berücksichtigt die Arbeitsbereichseinstellung für Löschvektoren. Wenn diese Option aktiviert ist, werden Löschvektoren in der Zieltabelle aktiviert, wenn COPY INTO
in einem SQL-Warehouse oder einer Compute-Umgebung mit Databricks Runtime 14.0 oder höher ausgeführt wird. Nach der Aktivierung blockieren Löschvektoren Abfragen für eine Tabelle in Databricks Runtime 11.3 LTS und niedriger. Weitere Informationen finden Sie unter Was sind Löschvektoren? und Automatische Aktivierung von Löschvektoren.
Anforderungen
Ein Kontoadministrator muss die Schritte unter Konfigurieren des Datenzugriffs für die Erfassung ausführen, um den Zugriff auf Daten im Cloudobjektspeicher zu konfigurieren, bevor Benutzer Daten mit COPY INTO
laden können.
Beispiel: Laden von Daten in eine schemalose Delta Lake-Tabelle
Hinweis
Dieses Feature ist in Databricks Runtime 11.3 LTS und höher verfügbar.
Sie können leere Delta-Platzhaltertabellen erstellen, damit das Schema später während eines COPY INTO
-Befehls abgeleitet wird, indem Sie mergeSchema
in true
auf COPY_OPTIONS
festlegen:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Die oben genannte SQL-Anweisung ist idempotent und kann so geplant werden, dass Daten genau einmal in eine Delta-Tabelle aufgenommen werden.
Hinweis
Die leere Delta-Tabelle kann außerhalb von COPY INTO
nicht verwendet werden. INSERT INTO
und MERGE INTO
werden nicht unterstützt, Daten in schemalose Delta-Tabellen zu schreiben. Nachdem Daten mit COPY INTO
in die Tabelle eingefügt wurden, wird die Tabelle abfragbar.
Weitere Informationen finden Sie unter Erstellen von Zieltabellen für COPY INTO.
Beispiel: Festlegen des Schemas und Laden von Daten in eine Delta Lake-Tabelle
Das folgende Beispiel zeigt, wie Sie eine Delta-Tabelle erstellen und dann den SQL-Befehl COPY INTO
verwenden, um Beispieldaten aus Databricks-Datasets in die Tabelle zu laden. Sie können den Python-, R-, Scala- oder SQL-Beispielcode in einem Notebook ausführen, das an einen Azure Databricks-Cluster angefügt ist. Sie können den SQL-Code auch in einer Abfrage ausführen, die einem SQL-Warehouse in Databricks SQL zugeordnet ist.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Führen Sie zum Bereinigen den folgenden Code aus, der die Tabelle löscht:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Bereinigen von Metadatendateien
Sie können VACUUM ausführen, um nicht referenzierte Metadatendateien zu bereinigen, die von COPY INTO
in Databricks Runtime 15.2 und höher erstellt wurden.
Verweis
- Databricks Runtime 7.x und höher: COPY INTO
Zusätzliche Ressourcen
Laden von Daten mithilfe von COPY INTO mit einem Dienstprinzipal
Allgemeine Verwendungsmuster, einschließlich Beispiele für mehrere
COPY INTO
-Vorgänge für dieselbe Delta-Tabelle, finden Sie unter Allgemeine Datenlademuster mit COPY INTO.