Eseguire query su Amazon Redshift con Azure Databricks
È possibile leggere e scrivere tabelle da Amazon Redshift con Azure Databricks.
Importante
Le configurazioni descritte in questo articolo sono sperimentali. Le caratteristiche sperimentali vengono fornite come sono e non sono supportate da Databricks tramite il supporto tecnico del cliente. Per ottenere il supporto completo della federazione delle query, è consigliabile usare invece Lakehouse Federation, che consente agli utenti di Azure Databricks di sfruttare la sintassi di Unity Catalog e gli strumenti di governance dei dati.
L'origine dati Databricks Redshift usa Amazon S3 per trasferire in modo efficiente i dati in Redshift e usa JDBC per attivare automaticamente i comandi e UNLOAD
appropriati COPY
in Redshift.
Nota
In Databricks Runtime 11.3 LTS e versioni successive Databricks Runtime include il driver JDBC Redshift, accessibile usando la redshift
parola chiave per l'opzione di formato. Vedere Le versioni delle note sulla versione di Databricks Runtime e la compatibilità per le versioni dei driver incluse in ogni databricks Runtime. I driver forniti dall'utente sono ancora supportati e hanno la precedenza sul driver JDBC in bundle.
In Databricks Runtime 10.4 LTS e versioni successive è necessaria l'installazione manuale del driver JDBC di Redshift e le query devono usare il driver (com.databricks.spark.redshift
) per il formato. Vedere Installazione del driver Redshift.
Utilizzo
Gli esempi seguenti illustrano la connessione con il driver Redshift. Sostituire i valori dei url
parametri se si usa il driver JDBC PostgreSQL.
Dopo aver configurato le credenziali AWS, è possibile usare l'origine dati con l'API origine dati Spark in Python, SQL, R o Scala.
Importante
Le posizioni esterne definite nel catalogo Unity non sono supportate come posizioni tempdir
.
Python
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)
# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)
SQL
Leggere i dati usando SQL in Databricks Runtime 10.4 LTS e versioni successive:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
dbtable '<table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Leggere i dati usando SQL in Databricks Runtime 11.3 LTS e versioni successive:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 5439 if not specified. *./
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
tempdir 's3a://<bucket>/<directory-path>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Scrivere dati con SQL:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
dbtable '<new-table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;
L'API SQL supporta solo la creazione di nuove tabelle e non la sovrascrittura o l'aggiunta.
R
Leggere i dati usando R in Databricks Runtime 10.4 LTS e versioni successive:
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
Leggere i dati usando R in Databricks Runtime 11.3 LTS e versioni successive:
df <- read.df(
NULL,
"redshift",
host = "hostname",
port = "port",
user = "username",
password = "password",
database = "database-name",
dbtable = "schema-name.table-name",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
forward_spark_s3_credentials = "true",
dbtable = "<your-table-name>")
Scala
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 5439 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", true)
.load()
// Read data from a query
val df = spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
Consigli per l'uso di Redshift
L'esecuzione di query può estrarre grandi quantità di dati in S3. Se si prevede di eseguire diverse query sugli stessi dati in Redshift, Databricks consiglia di salvare i dati estratti usando Delta Lake.
Impostazione
Autenticazione a S3 e Redshift
L'origine dati include diverse connessioni di rete, illustrate nel diagramma seguente:
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
L'origine dati legge e scrive i dati in S3 durante il trasferimento dei dati da e verso Redshift. Di conseguenza, richiede credenziali AWS con accesso in lettura e scrittura a un bucket S3 (specificato usando il tempdir
parametro di configurazione).
Nota
L'origine dati non pulisce i file temporanei creati in S3. Di conseguenza, è consigliabile usare un bucket S3 temporaneo dedicato con una configurazione del ciclo di vita degli oggetti per assicurarsi che i file temporanei vengano eliminati automaticamente dopo un periodo di scadenza specificato. Per informazioni su come crittografare questi file, vedere la sezione Crittografia di questo documento. Non è possibile usare un percorso esterno definito nel catalogo Unity come posizione tempdir
.
Le sezioni seguenti descrivono le opzioni di configurazione dell'autenticazione di ogni connessione:
Driver Spark a Redshift
Il driver Spark si connette a Redshift tramite JDBC usando un nome utente e una password. Redshift non supporta l'uso dei ruoli IAM per autenticare questa connessione. Per impostazione predefinita, questa connessione usa la crittografia SSL; per altri dettagli, vedere Crittografia.
Da Spark a S3
S3 funge da intermediario per archiviare i dati in blocco durante la lettura o la scrittura in Redshift. Spark si connette a S3 usando entrambe le interfacce FileSystem di Hadoop e direttamente usando il client S3 di Amazon Java SDK.
Nota
Non è possibile usare i montaggi DBFS per configurare l'accesso a S3 per Redshift.
Impostare le chiavi in Hadoop conf: è possibile specificare le chiavi AWS usando le proprietà di configurazione di Hadoop. Se la
tempdir
configurazione punta a uns3a://
file system, è possibile impostare lefs.s3a.access.key
proprietà efs.s3a.secret.key
in un file di configurazione XML hadoop o chiamaresc.hadoopConfiguration.set()
per configurare la configurazione hadoop globale di Spark. Se si usa uns3n://
file system, è possibile specificare le chiavi di configurazione legacy, come illustrato nell'esempio seguente.Scala
Ad esempio, se si usa il
s3a
file system, aggiungere:sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
Per il file system legacy
s3n
, aggiungere:sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
Python
Il comando seguente si basa su alcuni elementi interni di Spark, ma dovrebbe funzionare con tutte le versioni di PySpark ed è improbabile che cambi in futuro:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
Da Redshift a S3
Impostare l'opzione forward_spark_s3_credentials
su true
per inoltrare automaticamente le credenziali della chiave AWS usate da Spark per connettersi a S3 tramite JDBC a Redshift. La query JDBC incorpora queste credenziali, pertanto Databricks consiglia vivamente di abilitare la crittografia SSL della connessione JDBC.
Codifica
Protezione di JDBC: a meno che non siano presenti impostazioni correlate a SSL nell'URL JDBC, l'origine dati per impostazione predefinita abilita la crittografia SSL e verifica anche che il server Redshift sia attendibile ,ovvero
sslmode=verify-full
. A tale scopo, un certificato server viene scaricato automaticamente dai server Amazon la prima volta che è necessario. In caso di errore, come fallback viene usato un file di certificato pre-bundle. Questo vale sia per i driver Redshift che per i driver JDBC postgreSQL.Nel caso in cui si verifichino problemi con questa funzionalità o si vuole semplicemente disabilitare SSL, è possibile chiamare
.option("autoenablessl", "false")
suDataFrameReader
oDataFrameWriter
.Se si desidera specificare impostazioni personalizzate correlate a SSL, è possibile seguire le istruzioni nella documentazione di Redshift: Uso dei certificati SSL e server in Java e JDBC Driver Configuration Options Any SSL-related options present in the JDBC
url
used with the data source take precedence (ovvero la configurazione automatica non verrà attivata).Crittografia dei dati UNLOAD archiviati in S3 (dati archiviati durante la lettura da Redshift): secondo la documentazione di Redshift sullo scaricamento dei dati in S3, "UNLOAD crittografa automaticamente i file di dati usando la crittografia lato server di Amazon S3 (SSE-S3)."
Redshift supporta anche la crittografia lato client con una chiave personalizzata (vedere: Scaricamento di file di dati crittografati), ma l'origine dati non è in grado di specificare la chiave simmetrica richiesta.
Crittografia dei dati COPY archiviati in S3 (dati archiviati durante la scrittura in Redshift): secondo la documentazione di Redshift sul caricamento di file di dati crittografati da Amazon S3:
È possibile usare il COPY
comando per caricare i file di dati caricati in Amazon S3 usando la crittografia lato server con chiavi di crittografia gestite da AWS (SSE-S3 o SSE-KMS), crittografia lato client o entrambe. COPY non supporta la crittografia lato server Amazon S3 con una chiave fornita dal cliente (SSE-C).
Parametri
La mappa dei parametri o OPTIONS fornita in Spark SQL supporta le impostazioni seguenti:
Parametro | Richiesto | Default | Descrizione |
---|---|---|---|
dbtable | Sì, a meno che non venga specificata la query. | None | Tabella da cui creare o leggere in Redshift. Questo parametro è obbligatorio quando si salvano i dati in Redshift. |
query | Sì, a meno che non sia specificata la tabella dbtable. | None | Query da cui leggere in Redshift. |
utente | No | None | Nome utente di Redshift. Deve essere utilizzato in combinazione con l'opzione password. Può essere usato solo se l'utente e la password non vengono passati nell'URL, passando entrambi genereranno un errore. Usare questo parametro quando il nome utente contiene caratteri speciali di cui è necessario eseguire l'escape. |
password | No | None | Password di Redshift. Deve essere utilizzato in combinazione con l'opzione user . Può essere usato solo se l'utente e la password non vengono passati nell'URL; il passaggio di entrambi genererà un errore. Usare questo parametro quando la password contiene caratteri speciali di cui è necessario eseguire l'escape. |
URL. | Sì | None | URL JDBC, del formatojdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password> subprotocol può essere postgresql o redshift , a seconda del driver JDBC caricato. Un driver compatibile con Redshift deve trovarsi nel classpath e corrispondere a questo URL. host e port deve puntare al nodo master Redshift, quindi i gruppi di sicurezza e/o VPC devono essere configurati per consentire l'accesso dall'applicazione driver.database identifica un nome user di database Redshift e password sono credenziali per accedere al database, che deve essere incorporato in questo URL per JDBC e l'account utente deve disporre dei privilegi necessari per la tabella a cui si fa riferimento. |
search_path | No | None | Impostare il percorso di ricerca dello schema in Redshift. Verrà impostato usando il SET search_path to comando . Deve essere un elenco delimitato da virgole di nomi di schema in cui cercare le tabelle. Vedere la documentazione di Redshift di search_path. |
aws_iam_role | Solo se si usano ruoli IAM per autorizzare. | None | ARN completamente specificato del ruolo delle operazioni IAM Redshift COPY/UNLOAD collegate al cluster Redshift, arn:aws:iam::123456789000:role/<redshift-iam-role> ad esempio . |
forward_spark_s3_credentials | No | false |
Se true , l'origine dati individua automaticamente le credenziali usate da Spark per connettersi a S3 e inoltra tali credenziali a Redshift tramite JDBC. Queste credenziali vengono inviate come parte della query JDBC, pertanto è consigliabile abilitare la crittografia SSL della connessione JDBC quando si usa questa opzione. |
temporary_aws_access_key_id | No | None | La chiave di accesso AWS deve disporre delle autorizzazioni di scrittura per il bucket S3. |
temporary_aws_secret_access_key | No | None | Chiave di accesso privata AWS corrispondente alla chiave di accesso fornita. |
temporary_aws_session_token | No | None | Token di sessione AWS corrispondente alla chiave di accesso fornita. |
tempdir | Sì | None | Una posizione scrivibile in Amazon S3, da usare per i dati scaricati durante la lettura e l'avro dei dati da caricare in Redshift durante la scrittura. Se si usa l'origine dati Redshift per Spark come parte di una normale pipeline ETL, può essere utile impostare criteri relativi al ciclo di vita in un bucket e usarla come posizione temporanea per questi dati. Non è possibile usare percorsi esterni definiti in Unity Catalog come tempdir posizioni. |
jdbcdriver | No | Determinato dal sottoprotocolo dell'URL JDBC. | Nome della classe del driver JDBC da usare. Questa classe deve trovarsi nel classpath. Nella maggior parte dei casi, non deve essere necessario specificare questa opzione, perché il nome della classe driver appropriato deve essere determinato automaticamente dal sottoprotocolo dell'URL JDBC. |
diststyle | No | EVEN |
Stile di distribuzione Redshift da utilizzare durante la creazione di una tabella. Può essere uno di EVEN o KEY ALL (vedere la documentazione di Redshift). Quando si usa KEY , è necessario impostare anche una chiave di distribuzione con l'opzione distkey . |
distkey | No, a meno che non si usi DISTSTYLE KEY |
None | Nome di una colonna nella tabella da utilizzare come chiave di distribuzione durante la creazione di una tabella. |
sortkeyspec | No | None | Definizione completa della chiave di ordinamento redshift. Alcuni esempi: - SORTKEY(my_sort_column) - COMPOUND SORTKEY(sort_col_1, sort_col_2) - INTERLEAVED SORTKEY(sort_col_1, sort_col_2) |
usestagingtable (deprecato) | No | true |
Se si imposta questa opzione deprecata su false , la tabella di destinazione di un'operazione di sovrascrittura viene eliminata immediatamente all'inizio della scrittura, rendendo l'operazione di sovrascrittura non atomica e riducendo la disponibilità della tabella di destinazione. Ciò può ridurre i requisiti di spazio su disco temporanei per le sovrascrizioni.Poiché l'impostazione usestagingtable=false dell'operazione comporta la perdita o la mancata disponibilità dei dati, è deprecata a favore della richiesta di eliminare manualmente la tabella di destinazione. |
description | No | None | Descrizione della tabella. Verrà impostato usando il comando SQL COMMENT e dovrebbe essere visualizzato nella maggior parte degli strumenti di query. Vedere anche i description metadati per impostare le descrizioni sulle singole colonne. |
preactions | No | None | Elenco ; separato di comandi SQL da eseguire prima del caricamento COPY del comando. Potrebbe essere utile avere alcuni DELETE comandi o un'esecuzione simile qui prima di caricare nuovi dati. Se il comando contiene %s , il nome della tabella viene formattato in prima dell'esecuzione (nel caso in cui si usi una tabella di staging).Si avvisa che se questi comandi hanno esito negativo, viene considerato come un errore e viene generata un'eccezione. Se si usa una tabella di staging, le modifiche vengono ripristinate e la tabella di backup ripristinata in caso di esito negativo delle azioni preliminari. |
postactions | No | None | Elenco ; separato di comandi SQL da eseguire dopo un esito positivo COPY durante il caricamento dei dati. Può essere utile avere alcuni GRANT comandi o un'esecuzione simile in questo caso durante il caricamento di nuovi dati. Se il comando contiene %s , il nome della tabella viene formattato in prima dell'esecuzione (nel caso in cui si usi una tabella di staging).Si avvisa che se questi comandi hanno esito negativo, viene considerato come un errore e viene generata un'eccezione. Se si usa una tabella di staging, le modifiche vengono ripristinate e la tabella di backup ripristinata in caso di esito negativo delle azioni post.If using a staging table, the changes are reverted and the backup table restored if post actions fail. |
extracopyoptions | No | None | Elenco di opzioni aggiuntive da aggiungere al comando Redshift COPY durante il caricamento dei dati, ad esempioTRUNCATECOLUMNS oppure MAXERROR n (vedere la documentazione di Redshift per altre opzioni).Poiché queste opzioni vengono aggiunte alla fine del COPY comando, è possibile usare solo le opzioni che hanno senso alla fine del comando, ma che devono coprire i casi d'uso più possibili. |
tempformat | No | AVRO |
Formato in cui salvare i file temporanei in S3 durante la scrittura in Redshift. Il valore predefinito èAVRO ; gli altri valori consentiti sono CSV e CSV GZIP rispettivamente per CSV e csv con gzipped.Redshift è notevolmente più veloce quando si caricano file CSV rispetto al caricamento di file Avro, quindi l'uso di tempformat può offrire un notevole miglioramento delle prestazioni durante la scrittura in Redshift. |
csvnullstring | No | @NULL@ |
Valore String da scrivere per i valori Null quando si usa il formato tempformat CSV. Deve trattarsi di un valore che non viene visualizzato nei dati effettivi. |
csvseparator | No | , |
Separatore da utilizzare per la scrittura di file temporanei con tempformat impostato su CSV oCSV GZIP . Deve essere un carattere ASCII valido, ad esempio ", " o "\| ". |
csdrivereleadingwhitespace | No | true |
Se impostato su true, rimuove gli spazi vuoti iniziali dai valori durante le scritture quandotempformat è impostato su CSV o CSV GZIP . In caso contrario, gli spazi vuoti vengono mantenuti. |
csdriveretrailingwhitespace | No | true |
Se impostato su true, rimuove gli spazi vuoti finali dai valori durante le scritture quandotempformat è impostato su CSV o CSV GZIP . In caso contrario, lo spazio vuoto viene mantenuto. |
infer_timestamp_ntz_type | No | false |
Se true , i valori di tipo Redshift TIMESTAMP vengono interpretati come TimestampNTZType (timestamp senza fuso orario) durante le letture. In caso contrario, tutti i timestamp vengono interpretati come TimestampType indipendentemente dal tipo nella tabella Redshift sottostante. |
Opzioni di configurazione aggiuntive
Configurazione delle dimensioni massime delle colonne stringa
Quando si creano tabelle Redshift, il comportamento predefinito consiste nel creare TEXT
colonne per le colonne stringa. Redshift archivia TEXT
le colonne come VARCHAR(256)
, pertanto queste colonne hanno una dimensione massima di 256 caratteri (origine).
Per supportare colonne di dimensioni maggiori, è possibile usare il maxlength
campo dei metadati della colonna per specificare la lunghezza massima delle singole colonne stringa. Ciò è utile anche per implementare ottimizzazioni delle prestazioni di risparmio di spazio dichiarando colonne con una lunghezza massima inferiore rispetto all'impostazione predefinita.
Nota
A causa delle limitazioni in Spark, le API del linguaggio SQL e R non supportano la modifica dei metadati delle colonne.
Python
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
Scala
Di seguito è riportato un esempio di aggiornamento dei campi di metadati di più colonne usando l'API Scala di Spark:
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
Impostare un tipo di colonna personalizzato
Se è necessario impostare manualmente un tipo di colonna, è possibile usare i metadati della redshift_type
colonna. Ad esempio, se si desidera eseguire l'override del Spark SQL Schema -> Redshift SQL
matcher di tipo per assegnare un tipo di colonna definito dall'utente, è possibile eseguire le operazioni seguenti:
Python
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
Scala
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
Configurare la codifica delle colonne
Quando si crea una tabella, usare il encoding
campo dei metadati della colonna per specificare una codifica di compressione per ogni colonna (vedere La documentazione di Amazon per le codifiche disponibili).
Impostazione delle descrizioni sulle colonne
Redshift consente alle colonne di avere descrizioni associate che devono essere visualizzate nella maggior parte degli strumenti di query (usando il COMMENT
comando ). È possibile impostare il description
campo dei metadati della colonna per specificare una descrizione per le singole colonne.
Eseguire il push delle query in Redshift
Spark Optimizer esegue il push degli operatori seguenti in Redshift:
Filter
Project
Sort
Limit
Aggregation
Join
All'interno di Project
e Filter
supporta le espressioni seguenti:
- Maggior parte degli operatori di logica booleani
- Confronti
- Operazioni aritmetiche di base
- Cast numerici e stringa
- Maggior parte delle funzioni stringa
- Sottoquery scalari, se possono essere spostate interamente in Redshift.
Nota
Questo pushdown non supporta le espressioni che operano su date e timestamp.
In Aggregation
supporta le funzioni di aggregazione seguenti:
AVG
COUNT
MAX
MIN
SUM
STDDEV_SAMP
STDDEV_POP
VAR_SAMP
VAR_POP
combinato con la DISTINCT
clausola , ove applicabile.
All'interno Join
di supporta i tipi di join seguenti:
INNER JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
- Sottoquery riscritte da
Join
Optimizer, ad esempioWHERE EXISTS
,WHERE NOT EXISTS
Nota
Il pushdown di join non supporta FULL OUTER JOIN
.
Il pushdown potrebbe essere più utile nelle query con LIMIT
. Una query come SELECT * FROM large_redshift_table LIMIT 10
potrebbe richiedere molto tempo, perché l'intera tabella sarebbe prima UNLOADed su S3 come risultato intermedio. Con il pushdown, viene LIMIT
eseguito in Redshift. Nelle query con aggregazioni, il push dell'aggregazione in Redshift consente anche di ridurre la quantità di dati da trasferire.
Il push delle query in Redshift è abilitato per impostazione predefinita. Può essere disabilitato impostando spark.databricks.redshift.pushdown
su false
. Anche se disabilitato, Spark esegue comunque il push dei filtri ed esegue l'eliminazione delle colonne in Redshift.
Installazione del driver Redshift
L'origine dati Redshift richiede anche un driver JDBC compatibile con Redshift. Poiché Redshift si basa sul sistema di database PostgreSQL, è possibile usare il driver JDBC PostgreSQL incluso in Databricks Runtime o il driver JDBC di Redshift consigliato da Amazon. Non è necessaria alcuna installazione per usare il driver JDBC PostgreSQL. La versione del driver JDBC PostgreSQL inclusa in ogni versione di Databricks Runtime è elencata nelle note sulla versione di Databricks Runtime.
Per installare manualmente il driver JDBC di Redshift:
- Scaricare il driver da Amazon.
- Caricare il driver nell'area di lavoro di Azure Databricks. Vedere Librerie.
- Installare la libreria nel cluster.
Nota
Databricks consiglia di usare la versione più recente del driver JDBC di Redshift. Le versioni del driver JDBC redshift inferiori alla 1.2.41 presentano le limitazioni seguenti:
- La versione 1.2.16 del driver restituisce dati vuoti quando si usa una
where
clausola in una query SQL. - Le versioni del driver precedenti alla 1.2.41 possono restituire risultati non validi perché il supporto dei valori Null di una colonna viene erroneamente segnalato come "Non nullable" anziché "Sconosciuto".
Garanzie transazionali
Questa sezione descrive le garanzie transazionali dell'origine dati Redshift per Spark.
Informazioni generali sulle proprietà redshift e S3
Per informazioni generali sulle garanzie transazionali di Redshift, vedere il capitolo Managing Concurrent Write Operations (Gestione delle operazioni di scrittura simultanee) nella documentazione di Redshift. In breve, Redshift fornisce l'isolamento serializzabile in base alla documentazione per il comando Redshift BEGIN :
[anche se] è possibile usare uno dei quattro livelli di isolamento delle transazioni, Amazon Redshift elabora tutti i livelli di isolamento come serializzabili.
Secondo la documentazione di Redshift:
Amazon Redshift supporta un comportamento di commit automatico predefinito in cui ogni comando SQL eseguito separatamente esegue il commit singolarmente.
Pertanto, i singoli comandi come COPY
e UNLOAD
sono atomici e transazionali, mentre espliciti BEGIN
e END
devono essere necessari solo per applicare l'atomicità di più comandi o query.
Durante la lettura e la scrittura in Redshift, l'origine dati legge e scrive i dati in S3. Sia Spark che Redshift producono output partizionato e lo archiviano in più file in S3. Secondo la documentazione di Amazon S3 Data Consistency Model , le operazioni di presentazione dei bucket S3 sono alla fine coerenti, quindi i file devono passare a lunghezze speciali per evitare dati mancanti o incompleti a causa di questa origine di coerenza finale.
Garanzie dell'origine dati Redshift per Spark
Accodare a una tabella esistente
Quando si inseriscono righe in Redshift, l'origine dati usa il comando COPY e specifica i manifesti per proteggersi da determinate operazioni S3 coerenti alla fine. Di conseguenza, spark-redshift
le aggiunte alle tabelle esistenti hanno le stesse proprietà atomiche e transazionali dei normali comandi redshift COPY
.
Creare una nuova tabella (SaveMode.CreateIfNotExists
)
La creazione di una nuova tabella è un processo in due passaggi, costituito da un CREATE TABLE
comando seguito da un comando COPY per aggiungere il set iniziale di righe. Entrambe le operazioni vengono eseguite nella stessa transazione.
Sovrascrivere una tabella esistente
Per impostazione predefinita, l'origine dati usa transazioni per eseguire sovrascrizioni, implementate eliminando la tabella di destinazione, creando una nuova tabella vuota e accodando le righe.
Se l'impostazione deprecata usestagingtable
è impostata su false
, l'origine dati esegue il commit del DELETE TABLE
comando prima di accodare le righe alla nuova tabella, sacrificando l'atomicità dell'operazione di sovrascrittura, ma riducendo la quantità di spazio di staging necessario da Redshift durante la sovrascrittura.
Eseguire query sulla tabella Redshift
Le query usano il comando REDSHIFT UNLOAD per eseguire una query e salvare i risultati in S3 e usare manifesti per proteggersi da determinate operazioni S3 coerenti . Di conseguenza, le query dall'origine dati Redshift per Spark devono avere le stesse proprietà di coerenza delle normali query Redshift.
Problemi e soluzioni comuni
Il bucket S3 e il cluster Redshift si trovano in aree AWS diverse
Per impostazione predefinita, le copie S3 <-> Redshift non funzionano se il bucket S3 e il cluster Redshift si trovano in aree AWS diverse.
Se si tenta di leggere una tabella Redshift quando il bucket S3 si trova in un'area diversa, potrebbe essere visualizzato un errore, ad esempio:
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
Analogamente, il tentativo di scrivere in Redshift usando un bucket S3 in un'area diversa può causare l'errore seguente:
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
Scritture: il comando Redshift COPY supporta la specifica esplicita dell'area del bucket S3, in modo da poter eseguire operazioni di scrittura in Redshift correttamente in questi casi aggiungendo
region 'the-region-name'
all'impostazioneextracopyoptions
. Ad esempio, con un bucket nell'area Stati Uniti orientali (Virginia) e nell'API Scala, usare:.option("extracopyoptions", "region 'us-east-1'")
In alternativa, è possibile usare l'impostazione
awsregion
:.option("awsregion", "us-east-1")
Letture: il comando UNLOAD di Redshift supporta anche specifiche esplicite dell'area del bucket S3. È possibile fare in modo che le letture funzionino correttamente aggiungendo l'area all'impostazione
awsregion
:.option("awsregion", "us-east-1")
Errore di autenticazione quando si usa una password con caratteri speciali nell'URL JDBC
Se si specificano il nome utente e la password come parte dell'URL JDBC e la password contiene caratteri speciali, ;
ad esempio , ?
o &
, è possibile che venga visualizzata l'eccezione seguente:
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'
Ciò è causato da caratteri speciali nel nome utente o nella password non preceduti correttamente dall'escape del driver JDBC. Assicurarsi di specificare il nome utente e la password usando le opzioni user
del dataframe corrispondenti e password
. Per altre informazioni, vedere Parametri.
La query Spark con esecuzione prolungata si blocca a tempo indefinito anche se l'operazione Redshift corrispondente viene eseguita
Se si stanno leggendo o scrivendo grandi quantità di dati da e a Redshift, la query Spark potrebbe bloccarsi a tempo indefinito, anche se la pagina di monitoraggio di AWS Redshift indica che l'operazione o UNLOAD
corrispondente LOAD
è stata completata e che il cluster è inattiva. Ciò è causato dal timeout della connessione tra Redshift e Spark. Per evitare questo problema, assicurarsi che il tcpKeepAlive
flag JDBC sia abilitato e TCPKeepAliveMinutes
sia impostato su un valore basso ,ad esempio 1.
Per altre informazioni, vedere Amazon Redshift JDBC Driver Configuration.For additional information, see Amazon Redshift JDBC Driver Configuration.
Timestamp con semantica del fuso orario
Durante la lettura dei dati, sia Redshift TIMESTAMP
TIMESTAMPTZ
che i tipi di dati vengono mappati a Spark TimestampType
e un valore viene convertito in utc (Coordinated Universal Time) e viene archiviato come timestamp UTC. Per redshift TIMESTAMP
, si presuppone che il fuso orario locale non contenga informazioni sul fuso orario. Quando si scrivono dati in una tabella Redshift, viene eseguito il mapping di spark TimestampType
al tipo di dati Redshift TIMESTAMP
.
Guida alla migrazione
L'origine dati richiede ora di impostare forward_spark_s3_credentials
in modo esplicito prima che le credenziali di Spark S3 vengano inoltrate a Redshift. Questa modifica non ha alcun impatto se si usano i aws_iam_role
meccanismi di autenticazione o temporary_aws_*
. Tuttavia, se si è basato sul comportamento predefinito precedente, è ora necessario impostare forward_spark_s3_credentials
in modo esplicito su true
per continuare a usare il precedente meccanismo di autenticazione da Redshift a S3. Per una descrizione dei tre meccanismi di autenticazione e dei relativi compromessi di sicurezza, vedere la sezione Autenticazione a S3 e Redshift di questo documento.