Sdílet prostřednictvím


Dotazování Amazon Redshiftu pomocí Azure Databricks

Tabulky můžete číst a zapisovat z Amazon Redshiftu pomocí Azure Databricks.

Důležité

Konfigurace popsané v tomto článku jsou experimentální. Experimentální funkce jsou poskytovány tak, jak jsou, a Databricks je nepodporuje prostřednictvím technické podpory zákazníků. Pokud chcete získat plnou podporu federace dotazů, měli byste místo toho použít Lakehouse Federation, která uživatelům Azure Databricks umožňuje využívat syntaxi katalogu Unity a nástroje zásad správného řízení dat.

Zdroj dat Databricks Redshift využívá Amazon S3 k efektivnímu přenosu dat do Redshiftu a z Redshiftu a pomocí JDBC automaticky aktivuje příslušné COPY příkazy a UNLOAD příkazy v Redshiftu.

Poznámka:

Databricks Runtime 11.3 LTS a novější obsahuje ovladač Redshift JDBC přístupný pomocí klíčového redshift slova pro možnost formátu. Podívejte se na poznámky k verzi databricks Runtime verze a kompatibilitu pro verze ovladačů zahrnuté v jednotlivých modulech Databricks Runtime. Ovladače poskytované uživatelem se stále podporují a mají přednost před ovladačem JDBC v sadě.

V Databricks Runtime 10.4 LTS a níže je vyžadována ruční instalace ovladače Redshift JDBC a dotazy by měly použít ovladač (com.databricks.spark.redshift) pro formát. Viz instalace ovladače Redshift.

Využití

Následující příklady ukazují připojení pomocí ovladače Redshift. url Nahraďte hodnoty parametrů, pokud používáte ovladač PostgreSQL JDBC.

Po nakonfigurování přihlašovacích údajů AWS můžete zdroj dat použít s rozhraním API pro zdroj dat Spark v Pythonu, SQL, R nebo Scala.

Důležité

Externí umístění definovaná v katalogu Unity nejsou podporována jako tempdir umístění.

Python

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

SQL

Čtení dat pomocí SQL v Databricks Runtime 10.4 LTS a níže:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Čtení dat pomocí SQL v Databricks Runtime 11.3 LTS a vyšší:


DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Zápis dat pomocí SQL:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

Rozhraní SQL API podporuje pouze vytváření nových tabulek, nikoli přepsání ani připojení.

R

Čtení dat pomocí R v Databricks Runtime 10.4 LTS a níže:

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Čtení dat pomocí R v Databricks Runtime 11.3 LTS a novějších:

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")

Scala

// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

Doporučení pro práci s Redshiftem

Provádění dotazů může extrahovat velké objemy dat do S3. Pokud plánujete provádět několik dotazů na stejná data v Redshiftu, doporučuje Databricks uložit extrahovaná data pomocí Delta Lake.

Konfigurace

Ověřování ve S3 a Redshiftu

Zdroj dat zahrnuje několik síťových připojení, která jsou znázorněna v následujícím diagramu:

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

Zdroj dat čte a zapisuje data do S3 při přenosu dat do/z Redshiftu. V důsledku toho vyžaduje přihlašovací údaje AWS s přístupem ke čtení a zápisu do kontejneru S3 (zadané pomocí konfiguračního parametru tempdir ).

Poznámka:

Zdroj dat nevyčistí dočasné soubory, které vytvoří v S3. Proto doporučujeme použít vyhrazený dočasný kontejner S3 s konfigurací životního cyklu objektu, abyste zajistili, že se dočasné soubory po zadané době vypršení platnosti automaticky odstraní. Informace o šifrování těchto souborů najdete v části Šifrování v tomto dokumentu. Jako umístění nelze použít externí umístění definované v katalogutempdir Unity.

Následující části popisují možnosti konfigurace ověřování jednotlivých připojení:

Ovladač Sparku do Redshiftu

Ovladač Spark se připojí k Redshiftu přes JDBC pomocí uživatelského jména a hesla. Redshift nepodporuje použití rolí IAM k ověření tohoto připojení. Ve výchozím nastavení toto připojení používá šifrování SSL; Další podrobnosti najdete v tématu Šifrování.

Spark do S3

S3 funguje jako zprostředkovatel pro ukládání hromadných dat při čtení nebo zápisu do Redshiftu. Spark se připojí k S3 pomocí rozhraní Systému souborů Hadoop i přímo pomocí klienta sady Amazon Java SDK S3.

Poznámka:

Připojení DBFS nemůžete použít ke konfiguraci přístupu k S3 for Redshift.

  • Nastavení klíčů v souboru Hadoop Conf: Klíče AWS můžete zadat pomocí vlastností konfigurace Systému Hadoop. Pokud vaše tempdir konfigurace odkazuje na s3a:// systém souborů, můžete nastavit fs.s3a.access.key a fs.s3a.secret.key vlastnosti v konfiguračním souboru Hadoop XML nebo volat sc.hadoopConfiguration.set() konfiguraci globální konfigurace Hadoop Sparku. Pokud používáte s3n:// systém souborů, můžete zadat starší konfigurační klíče, jak je znázorněno v následujícím příkladu.

    Scala

    Pokud například používáte s3a systém souborů, přidejte:

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    Pro starší s3n systém souborů přidejte:

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    
    Python

    Následující příkaz spoléhá na některé interní funkce Sparku, ale měl by fungovat se všemi verzemi PySpark a v budoucnu se pravděpodobně nezmění:

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

Redshift do S3

forward_spark_s3_credentials Nastavte možnost true automatického předávání přihlašovacích údajů klíče AWS, které Spark používá pro připojení k S3 přes JDBC k Redshiftu. Dotaz JDBC tyto přihlašovací údaje vloží, takže proto Databricks důrazně doporučuje povolit šifrování SSL připojení JDBC.

Šifrování

  • Zabezpečení JDBC: Pokud nejsou v adrese URL JDBC k dispozici žádná nastavení související s SSL, zdroj dat ve výchozím nastavení povolí šifrování SSL a také ověří, že server Redshift je důvěryhodný (to znamená sslmode=verify-full). Proto se certifikát serveru automaticky stáhne ze serverů Amazon při prvním použití. V případě selhání se jako záložní soubor certifikátu použije předem připravený soubor certifikátu. To platí pro ovladače Redshift i PostgreSQL JDBC.

    V případě, že dojde k problémům s touto funkcí nebo chcete jednoduše zakázat SSL, můžete zavolat .option("autoenablessl", "false") na svůj DataFrameReader nebo DataFrameWriter.

    Pokud chcete zadat vlastní nastavení související s protokolem SSL, můžete postupovat podle pokynů v dokumentaci redshiftu: Použití certifikátů SSL a serveru v Javě a možnosti konfigurace ovladače JDBC Všechny možnosti související s protokolem SSL, které jsou součástí JDBC url používané se zdrojem dat, mají přednost (to znamená, že automatická konfigurace se neaktivuje).

  • Šifrování dat UNLOAD uložených v S3 (data uložená při čtení z Redshiftu):: Podle dokumentace Redshiftu o uvolnění dat do S3 funkce UNLOAD automaticky šifruje datové soubory pomocí šifrování na straně serveru Amazon S3 (SSE-S3).

    Redshift také podporuje šifrování na straně klienta s vlastním klíčem (viz: Uvolnění šifrovaných datových souborů), ale zdroj dat nemá možnost zadat požadovaný symetrický klíč.

  • Šifrování dat COPY uložených v S3 (data uložená při zápisu do Redshiftu): Podle dokumentace Redshiftu o načítání šifrovaných datových souborů z AmazonU S3:

Pomocí příkazu můžete COPY načíst datové soubory nahrané do AmazonU S3 pomocí šifrování na straně serveru pomocí šifrovacích klíčů spravovaných službou AWS (SSE-S3 nebo SSE-KMS), šifrování na straně klienta nebo obojího. COPY nepodporuje šifrování na straně serveru Amazon S3 pomocí klíče dodaného zákazníkem (SSE-C).

Parametry

Mapování parametrů nebo možnosti uvedené ve Spark SQL podporují následující nastavení:

Parametr Požaduje se Výchozí Popis
dbtable Ano, pokud není zadaný dotaz. Nic Tabulka, ze které se má vytvořit nebo číst v Redshiftu. Tento parametr se vyžaduje při ukládání dat zpět do Redshiftu.
query Ano, pokud není zadána dbtable. Nic Dotaz, ze který se má číst v Redshiftu.
user No Nic Uživatelské jméno Redshiftu. Je nutné použít společně s možností hesla. Dá se použít pouze v případě, že uživatel a heslo nejsou předány v adrese URL, předání obou způsobí chybu. Tento parametr použijte, pokud uživatelské jméno obsahuje speciální znaky, které musí být uchycené.
Heslo No Nic Heslo Redshiftu. Musí se používat společně s user možností. Lze použít pouze v případě, že uživatel a heslo nejsou předány v adrese URL; předáním obou výsledků dojde k chybě. Tento parametr použijte, pokud heslo obsahuje speciální znaky, které musí být uchycené.
url Ano Nic Adresa URL JDBC ve formátu
jdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>

subprotocol může být postgresql nebo redshift, v závislosti na tom, který ovladač JDBC jste načetli. Jeden ovladač kompatibilní s Redshiftem musí být na cestě ke třídě a musí odpovídat této adrese URL. host a port měl by odkazovat na hlavní uzel Redshiftu, takže skupiny zabezpečení a/nebo VPC musí být nakonfigurované tak, aby povolovaly přístup z aplikace ovladače. database identifikuje název user databáze Redshift a password jsou přihlašovací údaje pro přístup k databázi, která musí být vložena do této adresy URL pro JDBC a váš uživatelský účet by měl mít potřebná oprávnění pro odkazovanou tabulku.
search_path No Nic Nastavte cestu hledání schématu v Redshiftu. Nastaví se pomocí SET search_path to příkazu. Měl by to být čárkami oddělený seznam názvů schémat, ve které se mají hledat tabulky. Viz dokumentace ke službě Redshift search_path.
aws_iam_role Pouze pokud k autorizaci používáte role IAM. Nic Plně zadané ARN operace IAM Redshift COPY/UNLOAD role připojené ke clusteru Redshift, arn:aws:iam::123456789000:role/<redshift-iam-role>například .
forward_spark_s3_credentials No false Pokud truezdroj dat automaticky zjistí přihlašovací údaje, které Spark používá pro připojení k S3, a předá tyto přihlašovací údaje redshiftu přes JDBC. Tyto přihlašovací údaje se odesílají jako součást dotazu JDBC, takže při použití této možnosti důrazně doporučujeme povolit šifrování SSL připojení JDBC.
temporary_aws_access_key_id No Nic Přístupový klíč AWS musí mít oprávnění k zápisu do kontejneru S3.
temporary_aws_secret_access_key No Nic Tajný přístupový klíč AWS odpovídající zadanému přístupovém klíči.
temporary_aws_session_token No Nic Token relace AWS odpovídající zadanému přístupovém klíči.
tempdir Ano Nic Zapisovatelné umístění v AmazonU S3, které se použije pro uvolněná data při čtení a načtení dat Avro při psaní do Redshiftu. Pokud jako součást běžného kanálu ETL používáte zdroj dat Redshift pro Spark, může být užitečné nastavit zásady životního cyklu v kontejneru a použít je jako dočasné umístění pro tato data.

Externí umístění definovaná v katalogu Unity nemůžete použít jako tempdir umístění.
jdbcdriver No Určuje subprotokol adresy URL JDBC. Název třídy ovladače JDBC, který se má použít. Tato třída musí být v cestě k třídě. Ve většině případů by nemělo být nutné tuto možnost zadat, protože název příslušné třídy ovladače by měl být automaticky určen subprotokolem adresy URL JDBC.
diststyle No EVEN Styl distribuce Redshift, který se má použít při vytváření tabulky. Může to být jeden z EVENnebo KEYALL (viz dokumentace redshiftu). Při použití KEYmusíte také nastavit distribuční klíč s možností distkey.
distkey Ne, pokud nepoužíváte DISTSTYLE KEY Nic Název sloupce v tabulce, který se má použít jako distribuční klíč při vytváření tabulky.
sortkeyspec No Nic Úplná definice klíče řazení Redshift. Příkladem může být:

- SORTKEY(my_sort_column)
- COMPOUND SORTKEY(sort_col_1, sort_col_2)
- INTERLEAVED SORTKEY(sort_col_1, sort_col_2)
usestagingtable (zastaralé) No true Nastavením této zastaralé možnosti způsobí false , že cílová tabulka operace přepsání okamžitě na začátku zápisu způsobí, že operace přepsání nebude atomická a sníží dostupnost cílové tabulky. To může snížit požadavky na dočasné místo na disku pro přepsání.

Vzhledem k tomu, že nastavení usestagingtable=false operace riskuje ztrátu nebo nedostupnost dat, je ve prospěch nutnosti ručního vyřazení cílové tabulky.
description No Nic Popis tabulky. Nastaví se pomocí příkazu SQL COMMENT a měl by se zobrazit ve většině nástrojů pro dotazy. Podívejte se také na description metadata pro nastavení popisů jednotlivých sloupců.
preactions No Nic Oddělený ; seznam příkazů SQL, které se mají spustit před načtením COPY příkazu. Před načtením nových dat může být užitečné mít tady spuštěné některé DELETE příkazy nebo podobné příkazy. Pokud příkaz obsahuje %s, název tabulky se před provedením naformátuje (pro případ, že používáte pracovní tabulku).

Upozorňujeme, že pokud tyto příkazy selžou, považuje se za chybu a vyvolá se výjimka. Pokud používáte pracovní tabulku, změny se vrátí a záložní tabulka se obnoví, pokud se nezdaří předběžné akce.
postactions No Nic Oddělený ; seznam příkazů SQL, které se mají spustit po úspěšném COPY načtení dat. Při načítání nových dat může být užitečné mít některé GRANT příkazy nebo podobné spuštění. Pokud příkaz obsahuje %s, název tabulky se před provedením naformátuje (pro případ, že používáte pracovní tabulku).

Upozorňujeme, že pokud tyto příkazy selžou, považuje se za chybu a vyvolá se výjimka. Pokud používáte pracovní tabulku, změny se vrátí a záložní tabulka se obnoví, pokud se akce po nezdaří.
extracopyoptions No Nic Seznam dalších možností pro připojení k příkazu Redshift při načítání dat, například nebo (další možnosti najdete v dokumentaci Redshift).

Vzhledem k tomu, že jsou tyto možnosti připojeny na konec COPY příkazu, lze použít pouze možnosti, které mají smysl na konci příkazu, ale které by měly zahrnovat většinu možných případů použití.
tempformat No AVRO Formát, ve kterém se při zápisu do Redshiftu uloží dočasné soubory v S3. Výchozí hodnota je AVRO; ostatní povolené hodnoty jsou CSV a CSV GZIP pro CSV a gzipped CSV, v uvedeném pořadí.

Redshift je při načítání souborů CSV výrazně rychlejší než při načítání souborů Avro, takže použití tohoto tempformatu může při zápisu do Redshiftu zvýšit výkon.
csvnullstring No @NULL@ Hodnota String, která se má zapisovat pro hodnoty null při použití tempformat sdíleného svazku clusteru. Mělo by to být hodnota, která se ve vašich skutečných datech nezobrazuje.
csvseparator No , Oddělovač, který se má použít při zápisu dočasných souborů s dočasným formátem nastaveným na CSV nebo CSV GZIP. Musí to být platný znak ASCII, například "," nebo "\|".
csvignoreleadingwhitespace No true Při nastavení na hodnotu true odebere počáteční prázdné znaky z hodnot při zápisu, pokud je tempformat nastavena na CSV nebo CSV GZIP. Jinak se zachovají prázdné znaky.
csvignoretrailingwhitespace No true Při nastavení na hodnotu true odebere koncové prázdné znaky z hodnot během zápisu, pokud je tempformat nastavena na CSV nebo CSV GZIP. Jinak se prázdné znaky zachovají.
infer_timestamp_ntz_type No false Pokud truejsou hodnoty typu Redshift TIMESTAMP interpretovány jako TimestampNTZType (časové razítko bez časového pásma) během čtení. Jinak se všechna časová razítka interpretují TimestampType bez ohledu na typ v podkladové tabulce Redshift.

Další možnosti konfigurace

Konfigurace maximální velikosti řetězců sloupců

Při vytváření tabulek Redshift je výchozím chováním vytvořit TEXT sloupce pro řetězcové sloupce. Redshift ukládá TEXT sloupce jako VARCHAR(256), takže tyto sloupce mají maximální velikost 256 znaků (zdroj).

Pokud chcete podporovat větší sloupce, můžete pomocí maxlength pole metadat sloupců určit maximální délku jednotlivých sloupců řetězců. To je také užitečné pro implementaci optimalizace výkonu úspory místa deklarací sloupců s menší maximální délkou než výchozí.

Poznámka:

Vzhledem k omezením ve Sparku nepodporují rozhraní API jazyka SQL a R úpravy metadat sloupců.

Python

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Scala

Tady je příklad aktualizace polí metadat více sloupců pomocí rozhraní Scala API Sparku:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

Nastavení vlastního typu sloupce

Pokud potřebujete typ sloupce nastavit ručně, můžete použít redshift_type metadata sloupce. Pokud například chcete přepsat Spark SQL Schema -> Redshift SQL shodu typů, abyste přiřadili typ sloupce definovaný uživatelem, můžete udělat toto:

Python

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Scala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

Konfigurace kódování sloupců

Při vytváření tabulky použijte encoding pole metadat sloupců k určení kódování komprese pro každý sloupec (v dokumentaci Amazonu najdete dostupné kódování).

Nastavení popisů sloupců

Redshift umožňuje, aby sloupce měly připojené popisy, které by se měly zobrazit ve většině nástrojů dotazů (pomocí COMMENT příkazu). Pole metadat sloupce můžete nastavit description tak, aby určilo popis jednotlivých sloupců.

Odeslání dotazu do Redshiftu

Optimalizátor Sparku nasdílí následující operátory do Redshiftu:

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

V Project rámci a Filterpodporuje následující výrazy:

  • Většina logických operátorů logiky
  • Porovnání
  • Základní aritmetické operace
  • Přetypování čísel a řetězců
  • Většina řetězcových funkcí
  • Skalární poddotazy, pokud je lze zcela snížit do Redshiftu.

Poznámka:

Tento posun nepodporuje výrazy pracující s kalendářními daty a časovými razítky.

V rámci Aggregationtéto funkce podporuje následující agregační funkce:

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

v kombinaci s doložkou DISTINCT , pokud je to možné.

V rámci Jointéto funkce podporuje následující typy spojení:

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • Poddotazy, které jsou přepsány Join optimalizátorem, např. WHERE EXISTSWHERE NOT EXISTS

Poznámka:

Funkce Pushdown pro připojení nepodporuje FULL OUTER JOIN.

Nasdílení změn může být nejužitevější v dotazech s LIMIT. Dotaz, jako by mohl trvat velmi dlouho, protože SELECT * FROM large_redshift_table LIMIT 10 celá tabulka by jako průběžný výsledek nejprve byla unLOADed na S3. S odsazením se LIMIT spustí v Redshiftu. V dotazech s agregacemi pomáhá přesun agregace dolů do Redshiftu také snížit množství dat, která je potřeba přenést.

Ve výchozím nastavení je povolený odsazení dotazu do Redshiftu. Lze ho zakázat nastavením spark.databricks.redshift.pushdown na false. I když je spark zakázaný, stále odesílá filtry dolů a provádí odstranění sloupců do Redshiftu.

Instalace ovladače Redshift

Zdroj dat Redshift také vyžaduje ovladač JDBC kompatibilní s Redshiftem. Vzhledem k tomu, že Redshift je založený na databázovém systému PostgreSQL, můžete použít ovladač PostgreSQL JDBC, který je součástí modulu Databricks Runtime, nebo ovladač Amazonu doporučeného ovladače Redshift JDBC. K použití ovladače PostgreSQL JDBC není nutná žádná instalace. Verze ovladače PostgreSQL JDBC, která je součástí každé verze Databricks Runtime, je uvedená v poznámkách k verzi Databricks Runtime.

Ruční instalace ovladače Redshift JDBC:

  1. Stáhněte si ovladač z Amazonu.
  2. Nahrajte ovladač do pracovního prostoru Azure Databricks. Viz knihovny.
  3. Nainstalujte knihovnu do clusteru.

Poznámka:

Databricks doporučuje používat nejnovější verzi ovladače Redshift JDBC. Verze ovladače Redshift JDBC nižší než 1.2.41 mají následující omezení:

  • Verze 1.2.16 ovladače vrací prázdná data při použití where klauzule v dotazu SQL.
  • Verze ovladače nižší než 1.2.41 mohou vrátit neplatné výsledky, protože hodnota null ve sloupci je nesprávně hlášena jako "Not Nullable" místo "Neznámý".

Transakční záruky

Tato část popisuje transakční záruky zdroje dat Redshift pro Spark.

Obecné pozadí vlastností Redshift a S3

Obecné informace o transakčních zárukách redshiftu najdete v kapitole Správa operací souběžného zápisu v dokumentaci k Redshiftu. V maticovém prostředí poskytuje Redshift serializovatelnou izolaci podle dokumentace pro příkaz Redshift BEGIN :

[ačkoli] můžete použít kteroukoli ze čtyř úrovní izolace transakcí, Amazon Redshift zpracuje všechny úrovně izolace jako serializovatelné.

Podle dokumentace Redshiftu:

Amazon Redshift podporuje výchozí chování automatického potvrzení , ve kterém každý samostatně spouštěný příkaz SQL potvrdí jednotlivě.

Jednotlivé příkazy jako COPY a UNLOAD jsou atomické a transakční, i když explicitní BEGIN a END měly by být nezbytné pouze k vynucení atomicity více příkazů nebo dotazů.

Při čtení z redshiftu a zápisu do redshiftu zdroj dat čte a zapisuje data do S3. Spark i Redshift vytvářejí dělený výstup a ukládají ho do více souborů v S3. Podle dokumentace k modelu konzistence dat Amazon S3 jsou operace výpisu kontejnerů S3 nakonec konzistentní, takže soubory musí jít na zvláštní délky, aby se zabránilo chybějícím nebo neúplným datům kvůli tomuto zdroji konečné konzistence.

Záruky zdroje dat Redshift pro Spark

Připojení k existující tabulce

Při vkládání řádků do Redshiftu používá zdroj dat příkaz COPY a určuje manifesty, které chrání před určitými nakonec konzistentními operacemi S3. V důsledku toho spark-redshift mají připojení k existujícím tabulkám stejné atomické a transakční vlastnosti jako běžné příkazy Redshift COPY .

Vytvoření nové tabulky (SaveMode.CreateIfNotExists)

Vytvoření nové tabulky je dvoustupňový proces, který se skládá z CREATE TABLE příkazu, za nímž následuje příkaz COPY , který připojí počáteční sadu řádků. Obě operace se provádějí ve stejné transakci.

Přepsání existující tabulky

Ve výchozím nastavení používá zdroj dat transakce k provádění přepsání, které jsou implementovány odstraněním cílové tabulky, vytvořením nové prázdné tabulky a připojením řádků k ní.

Pokud je zastaralé usestagingtable nastavení nastavené na false, zdroj dat před připojením řádků k nové tabulce potvrdí DELETE TABLE příkaz, obětuje atomicitu operace přepsání, ale snižuje množství přípravného prostoru, které Redshift potřebuje během přepsání.

Dotaz na tabulku Redshift

Dotazy používají příkaz Redshift UNLOAD k provedení dotazu a uložení výsledků do S3 a použití manifestů k ochranu před určitými nakonec konzistentními operacemi S3. V důsledku toho by dotazy ze zdroje dat Redshift pro Spark měly mít stejné vlastnosti konzistence jako běžné dotazy Redshift.

Běžné problémy a řešení

KontejnerY S3 a cluster Redshift jsou v různých oblastech AWS.

Ve výchozím nastavení kopie S3 <–> Redshift nefungují, pokud jsou kontejnerY S3 a cluster Redshift v různých oblastech AWS.

Pokud se pokusíte přečíst tabulku Redshift, když je kontejner S3 v jiné oblasti, může se zobrazit chyba, například:

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

Podobně může pokus o zápis do Redshiftu pomocí kbelíku S3 v jiné oblasti způsobit následující chybu:

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • Zápisy: Příkaz Redshift COPY podporuje explicitní specifikaci oblasti kbelíku S3, takže v těchto případech můžete zápisy do Redshiftu správně fungovat přidáním region 'the-region-name' do extracopyoptions nastavení. Například s kbelíkem v oblasti USA – východ (Virginia) a rozhraní Scala API použijte:

    .option("extracopyoptions", "region 'us-east-1'")
    

    Můžete také použít awsregion nastavení:

    .option("awsregion", "us-east-1")
    
  • Čtení: Příkaz UNLOAD Redshift podporuje také explicitní specifikaci oblasti kbelíku S3. Čtení můžete správně nastavit tak, že do awsregion nastavení přidáte oblast:

    .option("awsregion", "us-east-1")
    

Chyba ověřování při použití hesla se speciálními znaky v adrese URL JDBC

Pokud jako součást adresy URL JDBC zadáte uživatelské jméno a heslo a heslo obsahuje speciální znaky, například ;, ?nebo &, může se zobrazit následující výjimka:

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

Příčinou jsou speciální znaky v uživatelském jménu nebo hesle, které ovladač JDBC správně neunikne. Nezapomeňte zadat uživatelské jméno a heslo pomocí odpovídajících možností user datového rámce a password. Další informace najdete v tématu Parametry.

Dlouhotrvající dotaz Sparku se po neomezenou dobu zablokuje, i když se provede odpovídající operace Redshiftu.

Pokud čtete nebo zapisujete velké objemy dat z a do Redshiftu, může dotaz Sparku přestat reagovat, i když stránka monitorování AWS Redshift ukazuje, že odpovídající LOAD operace nebo UNLOAD operace byla dokončena a že cluster je nečinný. Příčinou je vypršení časového limitu mezi Redshiftem a Sparkem. Abyste tomu předešli, ujistěte se, že tcpKeepAlive je povolený příznak JDBC a TCPKeepAliveMinutes je nastavený na nízkou hodnotu (například 1).

Další informace najdete v tématu Amazon Redshift JDBC Driver Configuration.

Časové razítko sémantikou časového pásma

Při čtení dat se redshift TIMESTAMP i TIMESTAMPTZ datové typy mapují na Spark TimestampTypea hodnota se převede na koordinovaný univerzální čas (UTC) a uloží se jako časové razítko UTC. U Redshiftu TIMESTAMPse místní časové pásmo předpokládá, protože hodnota neobsahuje žádné informace o časovém pásmu. Při zápisu dat do tabulky Redshift se Spark TimestampType mapuje na datový typ Redshift TIMESTAMP .

Průvodce migrací

Zdroj dat teď vyžaduje, abyste explicitně nastavili forward_spark_s3_credentials před předáním přihlašovacích údajů SparkU S3 do Redshiftu. Tato změna nemá žádný vliv, pokud používáte aws_iam_role mechanismy ověřování.temporary_aws_* Pokud jste se ale spoléhali na staré výchozí chování, musíte teď explicitně nastavit forward_spark_s3_credentials , abyste true mohli pokračovat v používání předchozího mechanismu ověřování Redshift na S3. Diskuzi o třech ověřovacích mechanismech a jejich kompromisech zabezpečení najdete v části Ověřování ve S3 a Redshiftu tohoto dokumentu.