Dotazování Amazon Redshiftu pomocí Azure Databricks
Tabulky můžete číst a zapisovat z Amazon Redshiftu pomocí Azure Databricks.
Důležité
Konfigurace popsané v tomto článku jsou experimentální. Experimentální funkce jsou poskytovány tak, jak jsou, a Databricks je nepodporuje prostřednictvím technické podpory zákazníků. Pokud chcete získat plnou podporu federace dotazů, měli byste místo toho použít Lakehouse Federation, která uživatelům Azure Databricks umožňuje využívat syntaxi katalogu Unity a nástroje zásad správného řízení dat.
Zdroj dat Databricks Redshift využívá Amazon S3 k efektivnímu přenosu dat do Redshiftu a z Redshiftu a pomocí JDBC automaticky aktivuje příslušné COPY
příkazy a UNLOAD
příkazy v Redshiftu.
Poznámka:
Databricks Runtime 11.3 LTS a novější obsahuje ovladač Redshift JDBC přístupný pomocí klíčového redshift
slova pro možnost formátu. Podívejte se na poznámky k verzi databricks Runtime verze a kompatibilitu pro verze ovladačů zahrnuté v jednotlivých modulech Databricks Runtime. Ovladače poskytované uživatelem se stále podporují a mají přednost před ovladačem JDBC v sadě.
V Databricks Runtime 10.4 LTS a níže je vyžadována ruční instalace ovladače Redshift JDBC a dotazy by měly použít ovladač (com.databricks.spark.redshift
) pro formát. Viz instalace ovladače Redshift.
Využití
Následující příklady ukazují připojení pomocí ovladače Redshift.
url
Nahraďte hodnoty parametrů, pokud používáte ovladač PostgreSQL JDBC.
Po nakonfigurování přihlašovacích údajů AWS můžete zdroj dat použít s rozhraním API pro zdroj dat Spark v Pythonu, SQL, R nebo Scala.
Důležité
Externí umístění definovaná v katalogu Unity nejsou podporována jako tempdir
umístění.
Python
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)
# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)
SQL
Čtení dat pomocí SQL v Databricks Runtime 10.4 LTS a níže:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
dbtable '<table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Čtení dat pomocí SQL v Databricks Runtime 11.3 LTS a vyšší:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 5439 if not specified. *./
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
tempdir 's3a://<bucket>/<directory-path>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Zápis dat pomocí SQL:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
dbtable '<new-table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;
Rozhraní SQL API podporuje pouze vytváření nových tabulek, nikoli přepsání ani připojení.
R
Čtení dat pomocí R v Databricks Runtime 10.4 LTS a níže:
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
Čtení dat pomocí R v Databricks Runtime 11.3 LTS a novějších:
df <- read.df(
NULL,
"redshift",
host = "hostname",
port = "port",
user = "username",
password = "password",
database = "database-name",
dbtable = "schema-name.table-name",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
forward_spark_s3_credentials = "true",
dbtable = "<your-table-name>")
Scala
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 5439 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", true)
.load()
// Read data from a query
val df = spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
Doporučení pro práci s Redshiftem
Provádění dotazů může extrahovat velké objemy dat do S3. Pokud plánujete provádět několik dotazů na stejná data v Redshiftu, doporučuje Databricks uložit extrahovaná data pomocí Delta Lake.
Konfigurace
Ověřování ve S3 a Redshiftu
Zdroj dat zahrnuje několik síťových připojení, která jsou znázorněna v následujícím diagramu:
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
Zdroj dat čte a zapisuje data do S3 při přenosu dat do/z Redshiftu. V důsledku toho vyžaduje přihlašovací údaje AWS s přístupem ke čtení a zápisu do kontejneru S3 (zadané pomocí konfiguračního parametru tempdir
).
Poznámka:
Zdroj dat nevyčistí dočasné soubory, které vytvoří v S3. Proto doporučujeme použít vyhrazený dočasný kontejner S3 s konfigurací životního cyklu objektu, abyste zajistili, že se dočasné soubory po zadané době vypršení platnosti automaticky odstraní. Informace o šifrování těchto souborů najdete v části Šifrování v tomto dokumentu. Jako umístění nelze použít externí umístění definované v katalogutempdir
Unity.
Následující části popisují možnosti konfigurace ověřování jednotlivých připojení:
Ovladač Sparku do Redshiftu
Ovladač Spark se připojí k Redshiftu přes JDBC pomocí uživatelského jména a hesla. Redshift nepodporuje použití rolí IAM k ověření tohoto připojení. Ve výchozím nastavení toto připojení používá šifrování SSL; Další podrobnosti najdete v tématu Šifrování.
Spark do S3
S3 funguje jako zprostředkovatel pro ukládání hromadných dat při čtení nebo zápisu do Redshiftu. Spark se připojí k S3 pomocí rozhraní Systému souborů Hadoop i přímo pomocí klienta sady Amazon Java SDK S3.
Poznámka:
Připojení DBFS nemůžete použít ke konfiguraci přístupu k S3 for Redshift.
Nastavení klíčů v souboru Hadoop Conf: Klíče AWS můžete zadat pomocí vlastností konfigurace Systému Hadoop. Pokud vaše
tempdir
konfigurace odkazuje nas3a://
systém souborů, můžete nastavitfs.s3a.access.key
afs.s3a.secret.key
vlastnosti v konfiguračním souboru Hadoop XML nebo volatsc.hadoopConfiguration.set()
konfiguraci globální konfigurace Hadoop Sparku. Pokud používátes3n://
systém souborů, můžete zadat starší konfigurační klíče, jak je znázorněno v následujícím příkladu.Scala
Pokud například používáte
s3a
systém souborů, přidejte:sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
Pro starší
s3n
systém souborů přidejte:sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
Python
Následující příkaz spoléhá na některé interní funkce Sparku, ale měl by fungovat se všemi verzemi PySpark a v budoucnu se pravděpodobně nezmění:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
Redshift do S3
forward_spark_s3_credentials
Nastavte možnost true
automatického předávání přihlašovacích údajů klíče AWS, které Spark používá pro připojení k S3 přes JDBC k Redshiftu. Dotaz JDBC tyto přihlašovací údaje vloží, takže proto Databricks důrazně doporučuje povolit šifrování SSL připojení JDBC.
Šifrování
Zabezpečení JDBC: Pokud nejsou v adrese URL JDBC k dispozici žádná nastavení související s SSL, zdroj dat ve výchozím nastavení povolí šifrování SSL a také ověří, že server Redshift je důvěryhodný (to znamená
sslmode=verify-full
). Proto se certifikát serveru automaticky stáhne ze serverů Amazon při prvním použití. V případě selhání se jako záložní soubor certifikátu použije předem připravený soubor certifikátu. To platí pro ovladače Redshift i PostgreSQL JDBC.V případě, že dojde k problémům s touto funkcí nebo chcete jednoduše zakázat SSL, můžete zavolat
.option("autoenablessl", "false")
na svůjDataFrameReader
neboDataFrameWriter
.Pokud chcete zadat vlastní nastavení související s protokolem SSL, můžete postupovat podle pokynů v dokumentaci redshiftu: Použití certifikátů SSL a serveru v Javě a možnosti konfigurace ovladače JDBC Všechny možnosti související s protokolem SSL, které jsou součástí JDBC
url
používané se zdrojem dat, mají přednost (to znamená, že automatická konfigurace se neaktivuje).Šifrování dat UNLOAD uložených v S3 (data uložená při čtení z Redshiftu):: Podle dokumentace Redshiftu o uvolnění dat do S3 funkce UNLOAD automaticky šifruje datové soubory pomocí šifrování na straně serveru Amazon S3 (SSE-S3).
Redshift také podporuje šifrování na straně klienta s vlastním klíčem (viz: Uvolnění šifrovaných datových souborů), ale zdroj dat nemá možnost zadat požadovaný symetrický klíč.
Šifrování dat COPY uložených v S3 (data uložená při zápisu do Redshiftu): Podle dokumentace Redshiftu o načítání šifrovaných datových souborů z AmazonU S3:
Pomocí příkazu můžete COPY
načíst datové soubory nahrané do AmazonU S3 pomocí šifrování na straně serveru pomocí šifrovacích klíčů spravovaných službou AWS (SSE-S3 nebo SSE-KMS), šifrování na straně klienta nebo obojího. COPY nepodporuje šifrování na straně serveru Amazon S3 pomocí klíče dodaného zákazníkem (SSE-C).
Parametry
Mapování parametrů nebo možnosti uvedené ve Spark SQL podporují následující nastavení:
Parametr | Požaduje se | Výchozí | Popis |
---|---|---|---|
dbtable | Ano, pokud není zadaný dotaz. | Nic | Tabulka, ze které se má vytvořit nebo číst v Redshiftu. Tento parametr se vyžaduje při ukládání dat zpět do Redshiftu. |
query | Ano, pokud není zadána dbtable. | Nic | Dotaz, ze který se má číst v Redshiftu. |
user | No | Nic | Uživatelské jméno Redshiftu. Je nutné použít společně s možností hesla. Dá se použít pouze v případě, že uživatel a heslo nejsou předány v adrese URL, předání obou způsobí chybu. Tento parametr použijte, pokud uživatelské jméno obsahuje speciální znaky, které musí být uchycené. |
Heslo | No | Nic | Heslo Redshiftu. Musí se používat společně s user možností. Lze použít pouze v případě, že uživatel a heslo nejsou předány v adrese URL; předáním obou výsledků dojde k chybě. Tento parametr použijte, pokud heslo obsahuje speciální znaky, které musí být uchycené. |
url | Ano | Nic | Adresa URL JDBC ve formátujdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password> subprotocol může být postgresql nebo redshift , v závislosti na tom, který ovladač JDBC jste načetli. Jeden ovladač kompatibilní s Redshiftem musí být na cestě ke třídě a musí odpovídat této adrese URL.
host a port měl by odkazovat na hlavní uzel Redshiftu, takže skupiny zabezpečení a/nebo VPC musí být nakonfigurované tak, aby povolovaly přístup z aplikace ovladače.
database identifikuje název user databáze Redshift a password jsou přihlašovací údaje pro přístup k databázi, která musí být vložena do této adresy URL pro JDBC a váš uživatelský účet by měl mít potřebná oprávnění pro odkazovanou tabulku. |
search_path | No | Nic | Nastavte cestu hledání schématu v Redshiftu. Nastaví se pomocí SET search_path to příkazu. Měl by to být čárkami oddělený seznam názvů schémat, ve které se mají hledat tabulky. Viz dokumentace ke službě Redshift search_path. |
aws_iam_role | Pouze pokud k autorizaci používáte role IAM. | Nic | Plně zadané ARN operace IAM Redshift COPY/UNLOAD role připojené ke clusteru Redshift, arn:aws:iam::123456789000:role/<redshift-iam-role> například . |
forward_spark_s3_credentials | No | false |
Pokud true zdroj dat automaticky zjistí přihlašovací údaje, které Spark používá pro připojení k S3, a předá tyto přihlašovací údaje redshiftu přes JDBC. Tyto přihlašovací údaje se odesílají jako součást dotazu JDBC, takže při použití této možnosti důrazně doporučujeme povolit šifrování SSL připojení JDBC. |
temporary_aws_access_key_id | No | Nic | Přístupový klíč AWS musí mít oprávnění k zápisu do kontejneru S3. |
temporary_aws_secret_access_key | No | Nic | Tajný přístupový klíč AWS odpovídající zadanému přístupovém klíči. |
temporary_aws_session_token | No | Nic | Token relace AWS odpovídající zadanému přístupovém klíči. |
tempdir | Ano | Nic | Zapisovatelné umístění v AmazonU S3, které se použije pro uvolněná data při čtení a načtení dat Avro při psaní do Redshiftu. Pokud jako součást běžného kanálu ETL používáte zdroj dat Redshift pro Spark, může být užitečné nastavit zásady životního cyklu v kontejneru a použít je jako dočasné umístění pro tato data. Externí umístění definovaná v katalogu Unity nemůžete použít jako tempdir umístění. |
jdbcdriver | No | Určuje subprotokol adresy URL JDBC. | Název třídy ovladače JDBC, který se má použít. Tato třída musí být v cestě k třídě. Ve většině případů by nemělo být nutné tuto možnost zadat, protože název příslušné třídy ovladače by měl být automaticky určen subprotokolem adresy URL JDBC. |
diststyle | No | EVEN |
Styl distribuce Redshift, který se má použít při vytváření tabulky. Může to být jeden z EVEN nebo KEY ALL (viz dokumentace redshiftu). Při použití KEY musíte také nastavit distribuční klíč s možností distkey. |
distkey | Ne, pokud nepoužíváte DISTSTYLE KEY |
Nic | Název sloupce v tabulce, který se má použít jako distribuční klíč při vytváření tabulky. |
sortkeyspec | No | Nic | Úplná definice klíče řazení Redshift. Příkladem může být: - SORTKEY(my_sort_column) - COMPOUND SORTKEY(sort_col_1, sort_col_2) - INTERLEAVED SORTKEY(sort_col_1, sort_col_2) |
usestagingtable (zastaralé) | No | true |
Nastavením této zastaralé možnosti způsobí false , že cílová tabulka operace přepsání okamžitě na začátku zápisu způsobí, že operace přepsání nebude atomická a sníží dostupnost cílové tabulky. To může snížit požadavky na dočasné místo na disku pro přepsání.Vzhledem k tomu, že nastavení usestagingtable=false operace riskuje ztrátu nebo nedostupnost dat, je ve prospěch nutnosti ručního vyřazení cílové tabulky. |
description | No | Nic | Popis tabulky. Nastaví se pomocí příkazu SQL COMMENT a měl by se zobrazit ve většině nástrojů pro dotazy. Podívejte se také na description metadata pro nastavení popisů jednotlivých sloupců. |
preactions | No | Nic | Oddělený ; seznam příkazů SQL, které se mají spustit před načtením COPY příkazu. Před načtením nových dat může být užitečné mít tady spuštěné některé DELETE příkazy nebo podobné příkazy. Pokud příkaz obsahuje %s , název tabulky se před provedením naformátuje (pro případ, že používáte pracovní tabulku).Upozorňujeme, že pokud tyto příkazy selžou, považuje se za chybu a vyvolá se výjimka. Pokud používáte pracovní tabulku, změny se vrátí a záložní tabulka se obnoví, pokud se nezdaří předběžné akce. |
postactions | No | Nic | Oddělený ; seznam příkazů SQL, které se mají spustit po úspěšném COPY načtení dat. Při načítání nových dat může být užitečné mít některé GRANT příkazy nebo podobné spuštění. Pokud příkaz obsahuje %s , název tabulky se před provedením naformátuje (pro případ, že používáte pracovní tabulku).Upozorňujeme, že pokud tyto příkazy selžou, považuje se za chybu a vyvolá se výjimka. Pokud používáte pracovní tabulku, změny se vrátí a záložní tabulka se obnoví, pokud se akce po nezdaří. |
extracopyoptions | No | Nic | Seznam dalších možností pro připojení k příkazu Redshift Vzhledem k tomu, že jsou tyto možnosti připojeny na konec COPY příkazu, lze použít pouze možnosti, které mají smysl na konci příkazu, ale které by měly zahrnovat většinu možných případů použití. |
tempformat | No | AVRO |
Formát, ve kterém se při zápisu do Redshiftu uloží dočasné soubory v S3. Výchozí hodnota je AVRO ; ostatní povolené hodnoty jsou CSV a CSV GZIP pro CSV a gzipped CSV, v uvedeném pořadí.Redshift je při načítání souborů CSV výrazně rychlejší než při načítání souborů Avro, takže použití tohoto tempformatu může při zápisu do Redshiftu zvýšit výkon. |
csvnullstring | No | @NULL@ |
Hodnota String, která se má zapisovat pro hodnoty null při použití tempformat sdíleného svazku clusteru. Mělo by to být hodnota, která se ve vašich skutečných datech nezobrazuje. |
csvseparator | No | , |
Oddělovač, který se má použít při zápisu dočasných souborů s dočasným formátem nastaveným na CSV nebo CSV GZIP . Musí to být platný znak ASCII, například ", " nebo "\| ". |
csvignoreleadingwhitespace | No | true |
Při nastavení na hodnotu true odebere počáteční prázdné znaky z hodnot při zápisu, pokud je tempformat nastavena na CSV nebo CSV GZIP . Jinak se zachovají prázdné znaky. |
csvignoretrailingwhitespace | No | true |
Při nastavení na hodnotu true odebere koncové prázdné znaky z hodnot během zápisu, pokud je tempformat nastavena na CSV nebo CSV GZIP . Jinak se prázdné znaky zachovají. |
infer_timestamp_ntz_type | No | false |
Pokud true jsou hodnoty typu Redshift TIMESTAMP interpretovány jako TimestampNTZType (časové razítko bez časového pásma) během čtení. Jinak se všechna časová razítka interpretují TimestampType bez ohledu na typ v podkladové tabulce Redshift. |
Další možnosti konfigurace
Konfigurace maximální velikosti řetězců sloupců
Při vytváření tabulek Redshift je výchozím chováním vytvořit TEXT
sloupce pro řetězcové sloupce. Redshift ukládá TEXT
sloupce jako VARCHAR(256)
, takže tyto sloupce mají maximální velikost 256 znaků (zdroj).
Pokud chcete podporovat větší sloupce, můžete pomocí maxlength
pole metadat sloupců určit maximální délku jednotlivých sloupců řetězců. To je také užitečné pro implementaci optimalizace výkonu úspory místa deklarací sloupců s menší maximální délkou než výchozí.
Poznámka:
Vzhledem k omezením ve Sparku nepodporují rozhraní API jazyka SQL a R úpravy metadat sloupců.
Python
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
Scala
Tady je příklad aktualizace polí metadat více sloupců pomocí rozhraní Scala API Sparku:
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
Nastavení vlastního typu sloupce
Pokud potřebujete typ sloupce nastavit ručně, můžete použít redshift_type
metadata sloupce. Pokud například chcete přepsat Spark SQL Schema -> Redshift SQL
shodu typů, abyste přiřadili typ sloupce definovaný uživatelem, můžete udělat toto:
Python
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
Scala
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
Konfigurace kódování sloupců
Při vytváření tabulky použijte encoding
pole metadat sloupců k určení kódování komprese pro každý sloupec (v dokumentaci Amazonu najdete dostupné kódování).
Nastavení popisů sloupců
Redshift umožňuje, aby sloupce měly připojené popisy, které by se měly zobrazit ve většině nástrojů dotazů (pomocí COMMENT
příkazu). Pole metadat sloupce můžete nastavit description
tak, aby určilo popis jednotlivých sloupců.
Odeslání dotazu do Redshiftu
Optimalizátor Sparku nasdílí následující operátory do Redshiftu:
Filter
Project
Sort
Limit
Aggregation
Join
V Project
rámci a Filter
podporuje následující výrazy:
- Většina logických operátorů logiky
- Porovnání
- Základní aritmetické operace
- Přetypování čísel a řetězců
- Většina řetězcových funkcí
- Skalární poddotazy, pokud je lze zcela snížit do Redshiftu.
Poznámka:
Tento posun nepodporuje výrazy pracující s kalendářními daty a časovými razítky.
V rámci Aggregation
této funkce podporuje následující agregační funkce:
AVG
COUNT
MAX
MIN
SUM
STDDEV_SAMP
STDDEV_POP
VAR_SAMP
VAR_POP
v kombinaci s doložkou DISTINCT
, pokud je to možné.
V rámci Join
této funkce podporuje následující typy spojení:
INNER JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
- Poddotazy, které jsou přepsány
Join
optimalizátorem, např.WHERE EXISTS
WHERE NOT EXISTS
Poznámka:
Funkce Pushdown pro připojení nepodporuje FULL OUTER JOIN
.
Nasdílení změn může být nejužitevější v dotazech s LIMIT
. Dotaz, jako by mohl trvat velmi dlouho, protože SELECT * FROM large_redshift_table LIMIT 10
celá tabulka by jako průběžný výsledek nejprve byla unLOADed na S3. S odsazením se LIMIT
spustí v Redshiftu. V dotazech s agregacemi pomáhá přesun agregace dolů do Redshiftu také snížit množství dat, která je potřeba přenést.
Ve výchozím nastavení je povolený odsazení dotazu do Redshiftu. Lze ho zakázat nastavením spark.databricks.redshift.pushdown
na false
. I když je spark zakázaný, stále odesílá filtry dolů a provádí odstranění sloupců do Redshiftu.
Instalace ovladače Redshift
Zdroj dat Redshift také vyžaduje ovladač JDBC kompatibilní s Redshiftem. Vzhledem k tomu, že Redshift je založený na databázovém systému PostgreSQL, můžete použít ovladač PostgreSQL JDBC, který je součástí modulu Databricks Runtime, nebo ovladač Amazonu doporučeného ovladače Redshift JDBC. K použití ovladače PostgreSQL JDBC není nutná žádná instalace. Verze ovladače PostgreSQL JDBC, která je součástí každé verze Databricks Runtime, je uvedená v poznámkách k verzi Databricks Runtime.
Ruční instalace ovladače Redshift JDBC:
- Stáhněte si ovladač z Amazonu.
- Nahrajte ovladač do pracovního prostoru Azure Databricks. Viz knihovny.
- Nainstalujte knihovnu do clusteru.
Poznámka:
Databricks doporučuje používat nejnovější verzi ovladače Redshift JDBC. Verze ovladače Redshift JDBC nižší než 1.2.41 mají následující omezení:
- Verze 1.2.16 ovladače vrací prázdná data při použití
where
klauzule v dotazu SQL. - Verze ovladače nižší než 1.2.41 mohou vrátit neplatné výsledky, protože hodnota null ve sloupci je nesprávně hlášena jako "Not Nullable" místo "Neznámý".
Transakční záruky
Tato část popisuje transakční záruky zdroje dat Redshift pro Spark.
Obecné pozadí vlastností Redshift a S3
Obecné informace o transakčních zárukách redshiftu najdete v kapitole Správa operací souběžného zápisu v dokumentaci k Redshiftu. V maticovém prostředí poskytuje Redshift serializovatelnou izolaci podle dokumentace pro příkaz Redshift BEGIN :
[ačkoli] můžete použít kteroukoli ze čtyř úrovní izolace transakcí, Amazon Redshift zpracuje všechny úrovně izolace jako serializovatelné.
Podle dokumentace Redshiftu:
Amazon Redshift podporuje výchozí chování automatického potvrzení , ve kterém každý samostatně spouštěný příkaz SQL potvrdí jednotlivě.
Jednotlivé příkazy jako COPY
a UNLOAD
jsou atomické a transakční, i když explicitní BEGIN
a END
měly by být nezbytné pouze k vynucení atomicity více příkazů nebo dotazů.
Při čtení z redshiftu a zápisu do redshiftu zdroj dat čte a zapisuje data do S3. Spark i Redshift vytvářejí dělený výstup a ukládají ho do více souborů v S3. Podle dokumentace k modelu konzistence dat Amazon S3 jsou operace výpisu kontejnerů S3 nakonec konzistentní, takže soubory musí jít na zvláštní délky, aby se zabránilo chybějícím nebo neúplným datům kvůli tomuto zdroji konečné konzistence.
Záruky zdroje dat Redshift pro Spark
Připojení k existující tabulce
Při vkládání řádků do Redshiftu používá zdroj dat příkaz COPY a určuje manifesty, které chrání před určitými nakonec konzistentními operacemi S3. V důsledku toho spark-redshift
mají připojení k existujícím tabulkám stejné atomické a transakční vlastnosti jako běžné příkazy Redshift COPY
.
Vytvoření nové tabulky (SaveMode.CreateIfNotExists
)
Vytvoření nové tabulky je dvoustupňový proces, který se skládá z CREATE TABLE
příkazu, za nímž následuje příkaz COPY , který připojí počáteční sadu řádků. Obě operace se provádějí ve stejné transakci.
Přepsání existující tabulky
Ve výchozím nastavení používá zdroj dat transakce k provádění přepsání, které jsou implementovány odstraněním cílové tabulky, vytvořením nové prázdné tabulky a připojením řádků k ní.
Pokud je zastaralé usestagingtable
nastavení nastavené na false
, zdroj dat před připojením řádků k nové tabulce potvrdí DELETE TABLE
příkaz, obětuje atomicitu operace přepsání, ale snižuje množství přípravného prostoru, které Redshift potřebuje během přepsání.
Dotaz na tabulku Redshift
Dotazy používají příkaz Redshift UNLOAD k provedení dotazu a uložení výsledků do S3 a použití manifestů k ochranu před určitými nakonec konzistentními operacemi S3. V důsledku toho by dotazy ze zdroje dat Redshift pro Spark měly mít stejné vlastnosti konzistence jako běžné dotazy Redshift.
Běžné problémy a řešení
KontejnerY S3 a cluster Redshift jsou v různých oblastech AWS.
Ve výchozím nastavení kopie S3 <–> Redshift nefungují, pokud jsou kontejnerY S3 a cluster Redshift v různých oblastech AWS.
Pokud se pokusíte přečíst tabulku Redshift, když je kontejner S3 v jiné oblasti, může se zobrazit chyba, například:
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
Podobně může pokus o zápis do Redshiftu pomocí kbelíku S3 v jiné oblasti způsobit následující chybu:
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
Zápisy: Příkaz Redshift COPY podporuje explicitní specifikaci oblasti kbelíku S3, takže v těchto případech můžete zápisy do Redshiftu správně fungovat přidáním
region 'the-region-name'
doextracopyoptions
nastavení. Například s kbelíkem v oblasti USA – východ (Virginia) a rozhraní Scala API použijte:.option("extracopyoptions", "region 'us-east-1'")
Můžete také použít
awsregion
nastavení:.option("awsregion", "us-east-1")
Čtení: Příkaz UNLOAD Redshift podporuje také explicitní specifikaci oblasti kbelíku S3. Čtení můžete správně nastavit tak, že do
awsregion
nastavení přidáte oblast:.option("awsregion", "us-east-1")
Chyba ověřování při použití hesla se speciálními znaky v adrese URL JDBC
Pokud jako součást adresy URL JDBC zadáte uživatelské jméno a heslo a heslo obsahuje speciální znaky, například ;
, ?
nebo &
, může se zobrazit následující výjimka:
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'
Příčinou jsou speciální znaky v uživatelském jménu nebo hesle, které ovladač JDBC správně neunikne. Nezapomeňte zadat uživatelské jméno a heslo pomocí odpovídajících možností user
datového rámce a password
. Další informace najdete v tématu Parametry.
Dlouhotrvající dotaz Sparku se po neomezenou dobu zablokuje, i když se provede odpovídající operace Redshiftu.
Pokud čtete nebo zapisujete velké objemy dat z a do Redshiftu, může dotaz Sparku přestat reagovat, i když stránka monitorování AWS Redshift ukazuje, že odpovídající LOAD
operace nebo UNLOAD
operace byla dokončena a že cluster je nečinný. Příčinou je vypršení časového limitu mezi Redshiftem a Sparkem. Abyste tomu předešli, ujistěte se, že tcpKeepAlive
je povolený příznak JDBC a TCPKeepAliveMinutes
je nastavený na nízkou hodnotu (například 1).
Další informace najdete v tématu Amazon Redshift JDBC Driver Configuration.
Časové razítko sémantikou časového pásma
Při čtení dat se redshift TIMESTAMP
i TIMESTAMPTZ
datové typy mapují na Spark TimestampType
a hodnota se převede na koordinovaný univerzální čas (UTC) a uloží se jako časové razítko UTC. U Redshiftu TIMESTAMP
se místní časové pásmo předpokládá, protože hodnota neobsahuje žádné informace o časovém pásmu. Při zápisu dat do tabulky Redshift se Spark TimestampType
mapuje na datový typ Redshift TIMESTAMP
.
Průvodce migrací
Zdroj dat teď vyžaduje, abyste explicitně nastavili forward_spark_s3_credentials
před předáním přihlašovacích údajů SparkU S3 do Redshiftu. Tato změna nemá žádný vliv, pokud používáte aws_iam_role
mechanismy ověřování.temporary_aws_*
Pokud jste se ale spoléhali na staré výchozí chování, musíte teď explicitně nastavit forward_spark_s3_credentials
, abyste true
mohli pokračovat v používání předchozího mechanismu ověřování Redshift na S3. Diskuzi o třech ověřovacích mechanismech a jejich kompromisech zabezpečení najdete v části Ověřování ve S3 a Redshiftu tohoto dokumentu.