Ladda data med Delta Live Tables
Du kan läsa in data från alla datakällor som stöds av Apache Spark på Azure Databricks med hjälp av Delta Live Tables. Du kan definiera datauppsättningar (tabeller och vyer) i Delta Live Tables mot alla frågor som returnerar en Spark DataFrame, inklusive strömmande DataFrames och Pandas för Spark DataFrames. För datainmatningsuppgifter rekommenderar Databricks att du använder strömningstabeller för de flesta användningsfall. Strömmande tabeller är bra för att mata in data från molnobjektlagring med hjälp av Auto Loader eller från meddelandebussar som Kafka. Exemplen nedan visar några vanliga mönster.
Viktigt!
Alla datakällor har inte SQL-stöd. Du kan blanda SQL- och Python-notebook-filer i en Delta Live Tables-pipeline för att använda SQL för alla åtgärder utöver inmatning.
Mer information om hur du arbetar med bibliotek som inte är paketerade i Delta Live Tables som standard finns i Hantera Python-beroenden för Delta Live Tables-pipelines.
Läsa in filer från molnobjektlagring
Databricks rekommenderar att du använder Auto Loader med Delta Live Tables för de flesta datainmatningsuppgifter från molnobjektlagring. Auto Loader och Delta Live Tables är utformade för att stegvis och idempotent ladda in ständigt växande data när den anländer till molnlagring. I följande exempel används Auto Loader för att skapa datauppsättningar från CSV- och JSON-filer:
Kommentar
Om du vill läsa in filer med Auto Loader i en Unity Catalog-aktiverad pipeline måste du använda externa platser. Mer information om hur du använder Unity Catalog med Delta Live Tables finns i Använda Unity Catalog med dina Delta Live Tables-pipelines.
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
Se Vad är SQL-syntax för automatisk inläsning? och automatisk inläsning.
Varning
Om du använder Auto Loader med filaviseringar och kör en fullständig uppdatering för din pipeline eller strömningstabell måste du rensa dina resurser manuellt. Du kan använda CloudFilesResourceManager i en notebook-fil för att utföra rensning.
Läsa in data från en meddelandebuss
Du kan konfigurera Delta Live Tables-pipelines för att mata in data från meddelandebussar med strömmande tabeller. Databricks rekommenderar att du kombinerar strömmande tabeller med kontinuerlig körning och förbättrad autoskalning för att ge den mest effektiva inmatningen för inläsning med låg latens från meddelandebussar. Se Optimera klusteranvändningen av Delta Live Tables-pipelines med förbättrad automatisk skalning.
Följande kod konfigurerar till exempel en strömmande tabell för att mata in data från Kafka:
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
Du kan skriva underordnade åtgärder i ren SQL för att utföra direktuppspelningstransformeringar på dessa data, som i följande exempel:
CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(kafka_raw)
WHERE ...
Ett exempel på hur du arbetar med Event Hubs finns i Använda Azure Event Hubs som en Delta Live Tables-datakälla.
Se Konfigurera strömmande datakällor.
Läsa in data från externa system
Delta Live Tables stöder inläsning av data från alla datakällor som stöds av Azure Databricks. Se Ansluta till datakällor. Du kan också läsa in externa data med Lakehouse Federation för datakällor som stöds. Eftersom Lakehouse Federation kräver Databricks Runtime 13.3 LTS eller senare, måste din pipeline konfigureras för att använda förhandsgranskningskanalen för att kunna använda Lakehouse Federation.
Vissa datakällor har inte motsvarande stöd i SQL. Om du inte kan använda Lakehouse Federation med någon av dessa datakällor kan du använda en Python-anteckningsbok för att mata in data från källan. Du kan lägga till Python- och SQL-källkod i samma Delta Live Tables-pipeline. I följande exempel deklareras en materialiserad vy för att få åtkomst till det aktuella tillståndet för data i en fjärransluten PostgreSQL-tabell:
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Läsa in små eller statiska datauppsättningar från molnobjektlagring
Du kan läsa in små eller statiska datauppsättningar med apache Spark-inläsningssyntax. Delta Live Tables stöder alla filformat som stöds av Apache Spark på Azure Databricks. En fullständig lista finns i Alternativ för dataformat.
Följande exempel visar hur JSON läses in för att skapa Delta Live Tables-tabeller:
Python
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;
Kommentar
SQL-konstruktionen SELECT * FROM format.`path`;
är gemensam för alla SQL-miljöer i Azure Databricks. Det är det rekommenderade mönstret för direkt filåtkomst med SQL med Delta Live Tables.
åtkomst till lagringsautentiseringsuppgifter på ett säkert sätt med hemligheter i en pipeline
Du kan använda Azure Databricks hemligheter för att lagra autentiseringsuppgifter som åtkomstnycklar eller lösenord. Om du vill konfigurera hemligheten i din pipeline använder du en Spark-egenskap i klusterkonfigurationen för pipelineinställningar. Se Konfigurera beräkningsresurser för en Delta Live Tables-pipeline.
I följande exempel används en hemlighet för att lagra en åtkomstnyckel som krävs för att läsa indata från ett Azure Data Lake Storage Gen2-lagringskonto (ADLS Gen2) med hjälp av Auto Loader. Du kan använda samma metod för att konfigurera alla hemligheter som krävs av din pipeline, till exempel AWS-nycklar för att komma åt S3 eller lösenordet till ett Apache Hive-metaarkiv.
Mer information om hur du arbetar med Azure Data Lake Storage Gen2 finns i Ansluta till Azure Data Lake Storage Gen2 och Blob Storage.
Kommentar
Du måste lägga till prefixet spark.hadoop.
i konfigurationsnyckeln spark_conf
som anger det hemliga värdet.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
Replace
-
<storage-account-name>
med namnet på ADLS Gen2-lagringskontot. -
<scope-name>
med namnet på Azure Databricks-hemlighetsomfånget. -
<secret-name>
med namnet på nyckeln som innehåller åtkomstnyckeln för Azure-lagringskontot.
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Replace
-
<container-name>
med namnet på containern för Azure-lagringskontot som lagrar indata. -
<storage-account-name>
med namnet på ADLS Gen2-lagringskontot. -
<path-to-input-dataset>
med sökvägen till indatauppsättningen.
Läsa in data från Azure Event Hubs
Azure Event Hubs är en dataströmningstjänst som tillhandahåller ett Apache Kafka-kompatibelt gränssnitt. Du kan använda Kafka-anslutningsappen för strukturerad direktuppspelning, som ingår i Delta Live Tables-körningen, för att läsa in meddelanden från Azure Event Hubs. Mer information om hur du läser in och bearbetar meddelanden från Azure Event Hubs finns i Använda Azure Event Hubs som en Delta Live Tables-datakälla.