Sdílet prostřednictvím


Běžné vzory načítání dat

Automatický zavaděč zjednodušuje řadu běžných úloh příjmu dat. Tato rychlá referenční příručka obsahuje příklady pro několik oblíbených vzorů.

Filtrování adresářů nebo souborů pomocí vzorů globu

Vzory globu lze použít pro filtrování adresářů a souborů, pokud jsou v cestě k dispozici.

Vzor Popis
? Odpovídá jakémukoli jednomu znaku.
* Odpovídá nule nebo více znaků
[abc] Odpovídá jednomu znaku ze znakové sady {a,b,c}.
[a-z] Odpovídá jednomu znaku z rozsahu znaků {a... z}.
[^a] Odpovídá jednomu znaku, který není ze znakové sady nebo rozsahu {a}. Všimněte si, že ^ znak musí nastat okamžitě napravo od levé závorky.
{ab,cd} Odpovídá řetězci ze sady řetězců {ab, cd}.
{ab,c{de, fh}} Odpovídá řetězci ze sady řetězců {ab, cde, cfh}.

path Použijte k poskytování vzorů předpon, například:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Důležité

Musíte použít možnost pathGlobFilter explicitního poskytování vzorů přípon. Jediný path filtr předpony.

Pokud například chcete parsovat jenom png soubory v adresáři, který obsahuje soubory s různými příponami, můžete:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Poznámka:

Výchozí chování automatického zavaděče se liší od výchozího chování jiných zdrojů souborů Spark. Přidejte .option("cloudFiles.useStrictGlobber", "true") do čtení, abyste použili globbing, který odpovídá výchozímu chování Sparku vůči zdrojům souborů. Další informace o globbingu najdete v následující tabulce:

Vzor Cesta k souboru Výchozí globber Striktní globber
/a/b /a/b/c/file.txt Ano Ano
/a/b /a/b_dir/c/file.txt Ne Ne
/a/b /a/b.txt Ne Ne
/a/b/ /a/b.txt Ne Ne
/a/*/c/ /a/b/c/file.txt Ano Ano
/a/*/c/ /a/b/c/d/file.txt Ano Ano
/a/*/c/ /a/b/x/y/c/file.txt Ano Ne
/a/*/c /a/b/c_file.txt Ano Ne
/a/*/c/ /a/b/c_file.txt Ano Ne
/a/*/c/ /a/*/cookie/file.txt Ano Ne
/a/b* /a/b.txt Ano Ano
/a/b* /a/b/file.txt Ano Ano
/a/{0.txt,1.txt} /a/0.txt Ano Ano
/a/*/{0.txt,1.txt} /a/0.txt Ne Ne
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Ano Ano

Povolení snadného ETL

Jednoduchý způsob, jak dostat data do Delta Lake bez ztráty dat, je použít následující vzor a povolit odvozování schématu pomocí Auto Loaderu. Databricks doporučuje spustit následující kód v úloze Azure Databricks, aby automaticky restartoval datový proud, když se změní schéma zdrojových dat. Ve výchozím nastavení je schéma odvozeno jako typy řetězců, všechny chyby analýzy (pokud vše zůstane jako řetězec) budou směřovat do _rescued_dataa všechny nové sloupce způsobí selhání datového proudu a vývoj schématu.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Prevence ztráty dat ve dobře strukturovaných datech

Když znáte schéma, ale chcete vědět, kdykoli obdržíte neočekávaná data, databricks doporučuje používat rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Pokud chcete, aby stream přestal zpracovávat, pokud se zavádí nové pole, které neodpovídá vašemu schématu, můžete přidat:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Povolení flexibilních částečně strukturovaných datových kanálů

Když dostáváte data od dodavatele, který do informací, které poskytují, zavádí nové sloupce, nemusíte vědět přesně, kdy to dělají, nebo možná nebudete mít šířku pásma pro aktualizaci datového kanálu. Nyní můžete využít vývoj schématu k restartování streamu a nechat Auto Loader aktualizovat odvozené schéma. Můžete také využít schemaHints některá pole bez schématu, která může dodavatel poskytnout.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Transformace vnořených dat JSON

Jelikož automatický zavaděč odvozuje sloupce JSON nejvyšší úrovně jako řetězce, mohou zůstat vnořené objekty JSON, které vyžadují další transformace. Pomocí částečně strukturovaných rozhraní API pro přístup k datům můžete dále transformovat složitý obsah JSON.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Odvození vnořených dat JSON

Pokud máte vnořená data, můžete použít možnost cloudFiles.inferColumnTypes k odvození vnořené struktury dat a dalších typů sloupců.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Načtení souborů CSV bez hlaviček

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Vynucení schématu u souborů CSV s hlavičkami

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Ingestování obrázků nebo binárních dat do Delta Lake pro ML

Jakmile jsou data uložená v Delta Lake, můžete na datech spustit distribuované odvozování. Viz Provádění distribuovaných odvozování pomocí funkce UDF pandas.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Syntaxe automatického zavaděče pro DLT

Delta Live Tables poskytuje mírně upravenou syntaxi Pythonu pro Auto Loader a přidává podporu SQL pro Auto Loader.

Následující příklady používají automatický zavaděč k vytvoření datových sad ze souborů CSV a JSON:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

U automatického zavaděče můžete použít podporované možnosti formátování. map() Pomocí funkce můžete metodě předat možnostiread_files(). Možnosti jsou páry klíč-hodnota, kde klíče a hodnoty jsou řetězce. Následující popis syntaxe pro práci s automatickým zavaděčem v SQL:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Následující příklad načte data ze souborů CSV oddělených tabulátory s hlavičkou:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

K ručnímu zadání formátu můžete použít schema; Je nutné zadat schema pro formáty, které nepodporují odvození schématu:

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM read_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Poznámka:

Delta Live Tables automaticky konfiguruje a spravuje adresáře schématu a kontrolních bodů při použití Auto Loader ke čtení souborů. Pokud však ručně nakonfigurujete některý z těchto adresářů, provedení úplné aktualizace nemá vliv na obsah nakonfigurovaných adresářů. Databricks doporučuje používat automaticky nakonfigurované adresáře, aby se zabránilo neočekávaným vedlejším účinkům během zpracování.