Condividi tramite


Eseguire query su Amazon Redshift con Azure Databricks

È possibile leggere e scrivere tabelle da Amazon Redshift con Azure Databricks.

Importante

Le configurazioni descritte in questo articolo sono sperimentali. Le caratteristiche sperimentali vengono fornite come sono e non sono supportate da Databricks tramite il supporto tecnico del cliente. Per ottenere il supporto completo della federazione delle query, è consigliabile usare invece Lakehouse Federation, che consente agli utenti di Azure Databricks di sfruttare la sintassi di Unity Catalog e gli strumenti di governance dei dati.

L'origine dati Databricks Redshift usa Amazon S3 per trasferire in modo efficiente i dati in Redshift e usa JDBC per attivare automaticamente i comandi e UNLOAD appropriati COPY in Redshift.

Nota

In Databricks Runtime 11.3 LTS e versioni successive Databricks Runtime include il driver JDBC Redshift, accessibile usando la redshift parola chiave per l'opzione di formato. Vedere Le versioni delle note sulla versione di Databricks Runtime e la compatibilità per le versioni dei driver incluse in ogni databricks Runtime. I driver forniti dall'utente sono ancora supportati e hanno la precedenza sul driver JDBC in bundle.

In Databricks Runtime 10.4 LTS e versioni successive è necessaria l'installazione manuale del driver JDBC di Redshift e le query devono usare il driver (com.databricks.spark.redshift) per il formato. Vedere Installazione del driver Redshift.

Utilizzo

Gli esempi seguenti illustrano la connessione con il driver Redshift. Sostituire i valori dei url parametri se si usa il driver JDBC PostgreSQL.

Dopo aver configurato le credenziali AWS, è possibile usare l'origine dati con l'API origine dati Spark in Python, SQL, R o Scala.

Importante

Le posizioni esterne definite nel catalogo Unity non sono supportate come posizioni tempdir.

Python

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

SQL

Leggere i dati usando SQL in Databricks Runtime 10.4 LTS e versioni successive:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Leggere i dati usando SQL in Databricks Runtime 11.3 LTS e versioni successive:


DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Scrivere dati con SQL:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

L'API SQL supporta solo la creazione di nuove tabelle e non la sovrascrittura o l'aggiunta.

R

Leggere i dati usando R in Databricks Runtime 10.4 LTS e versioni successive:

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Leggere i dati usando R in Databricks Runtime 11.3 LTS e versioni successive:

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")

Scala

// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

Consigli per l'uso di Redshift

L'esecuzione di query può estrarre grandi quantità di dati in S3. Se si prevede di eseguire diverse query sugli stessi dati in Redshift, Databricks consiglia di salvare i dati estratti usando Delta Lake.

Impostazione

Autenticazione a S3 e Redshift

L'origine dati include diverse connessioni di rete, illustrate nel diagramma seguente:

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

L'origine dati legge e scrive i dati in S3 durante il trasferimento dei dati da e verso Redshift. Di conseguenza, richiede credenziali AWS con accesso in lettura e scrittura a un bucket S3 (specificato usando il tempdir parametro di configurazione).

Nota

L'origine dati non pulisce i file temporanei creati in S3. Di conseguenza, è consigliabile usare un bucket S3 temporaneo dedicato con una configurazione del ciclo di vita degli oggetti per assicurarsi che i file temporanei vengano eliminati automaticamente dopo un periodo di scadenza specificato. Per informazioni su come crittografare questi file, vedere la sezione Crittografia di questo documento. Non è possibile usare un percorso esterno definito nel catalogo Unity come posizione tempdir.

Le sezioni seguenti descrivono le opzioni di configurazione dell'autenticazione di ogni connessione:

Driver Spark a Redshift

Il driver Spark si connette a Redshift tramite JDBC usando un nome utente e una password. Redshift non supporta l'uso dei ruoli IAM per autenticare questa connessione. Per impostazione predefinita, questa connessione usa la crittografia SSL; per altri dettagli, vedere Crittografia.

Da Spark a S3

S3 funge da intermediario per archiviare i dati in blocco durante la lettura o la scrittura in Redshift. Spark si connette a S3 usando entrambe le interfacce FileSystem di Hadoop e direttamente usando il client S3 di Amazon Java SDK.

Nota

Non è possibile usare i montaggi DBFS per configurare l'accesso a S3 per Redshift.

  • Impostare le chiavi in Hadoop conf: è possibile specificare le chiavi AWS usando le proprietà di configurazione di Hadoop. Se la tempdir configurazione punta a un s3a:// file system, è possibile impostare le fs.s3a.access.key proprietà e fs.s3a.secret.key in un file di configurazione XML hadoop o chiamare sc.hadoopConfiguration.set() per configurare la configurazione hadoop globale di Spark. Se si usa un s3n:// file system, è possibile specificare le chiavi di configurazione legacy, come illustrato nell'esempio seguente.

    Scala

    Ad esempio, se si usa il s3a file system, aggiungere:

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    Per il file system legacy s3n , aggiungere:

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    
    Python

    Il comando seguente si basa su alcuni elementi interni di Spark, ma dovrebbe funzionare con tutte le versioni di PySpark ed è improbabile che cambi in futuro:

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

Da Redshift a S3

Impostare l'opzione forward_spark_s3_credentials su true per inoltrare automaticamente le credenziali della chiave AWS usate da Spark per connettersi a S3 tramite JDBC a Redshift. La query JDBC incorpora queste credenziali, pertanto Databricks consiglia vivamente di abilitare la crittografia SSL della connessione JDBC.

Codifica

  • Protezione di JDBC: a meno che non siano presenti impostazioni correlate a SSL nell'URL JDBC, l'origine dati per impostazione predefinita abilita la crittografia SSL e verifica anche che il server Redshift sia attendibile ,ovvero sslmode=verify-full. A tale scopo, un certificato server viene scaricato automaticamente dai server Amazon la prima volta che è necessario. In caso di errore, come fallback viene usato un file di certificato pre-bundle. Questo vale sia per i driver Redshift che per i driver JDBC postgreSQL.

    Nel caso in cui si verifichino problemi con questa funzionalità o si vuole semplicemente disabilitare SSL, è possibile chiamare .option("autoenablessl", "false") su DataFrameReader o DataFrameWriter.

    Se si desidera specificare impostazioni personalizzate correlate a SSL, è possibile seguire le istruzioni nella documentazione di Redshift: Uso dei certificati SSL e server in Java e JDBC Driver Configuration Options Any SSL-related options present in the JDBC url used with the data source take precedence (ovvero la configurazione automatica non verrà attivata).

  • Crittografia dei dati UNLOAD archiviati in S3 (dati archiviati durante la lettura da Redshift): secondo la documentazione di Redshift sullo scaricamento dei dati in S3, "UNLOAD crittografa automaticamente i file di dati usando la crittografia lato server di Amazon S3 (SSE-S3)."

    Redshift supporta anche la crittografia lato client con una chiave personalizzata (vedere: Scaricamento di file di dati crittografati), ma l'origine dati non è in grado di specificare la chiave simmetrica richiesta.

  • Crittografia dei dati COPY archiviati in S3 (dati archiviati durante la scrittura in Redshift): secondo la documentazione di Redshift sul caricamento di file di dati crittografati da Amazon S3:

È possibile usare il COPY comando per caricare i file di dati caricati in Amazon S3 usando la crittografia lato server con chiavi di crittografia gestite da AWS (SSE-S3 o SSE-KMS), crittografia lato client o entrambe. COPY non supporta la crittografia lato server Amazon S3 con una chiave fornita dal cliente (SSE-C).

Parametri

La mappa dei parametri o OPTIONS fornita in Spark SQL supporta le impostazioni seguenti:

Parametro Richiesto Default Descrizione
dbtable Sì, a meno che non venga specificata la query. None Tabella da cui creare o leggere in Redshift. Questo parametro è obbligatorio quando si salvano i dati in Redshift.
query Sì, a meno che non sia specificata la tabella dbtable. None Query da cui leggere in Redshift.
utente No None Nome utente di Redshift. Deve essere utilizzato in combinazione con l'opzione password. Può essere usato solo se l'utente e la password non vengono passati nell'URL, passando entrambi genereranno un errore. Usare questo parametro quando il nome utente contiene caratteri speciali di cui è necessario eseguire l'escape.
password No None Password di Redshift. Deve essere utilizzato in combinazione con l'opzione user. Può essere usato solo se l'utente e la password non vengono passati nell'URL; il passaggio di entrambi genererà un errore. Usare questo parametro quando la password contiene caratteri speciali di cui è necessario eseguire l'escape.
URL. None URL JDBC, del formato
jdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>

subprotocol può essere postgresql o redshift, a seconda del driver JDBC caricato. Un driver compatibile con Redshift deve trovarsi nel classpath e corrispondere a questo URL. host e port deve puntare al nodo master Redshift, quindi i gruppi di sicurezza e/o VPC devono essere configurati per consentire l'accesso dall'applicazione driver.
database identifica un nome user di database Redshift e password sono credenziali per accedere al database, che deve essere incorporato in questo URL per JDBC e l'account utente deve disporre dei privilegi necessari per la tabella a cui si fa riferimento.
search_path No None Impostare il percorso di ricerca dello schema in Redshift. Verrà impostato usando il SET search_path to comando . Deve essere un elenco delimitato da virgole di nomi di schema in cui cercare le tabelle. Vedere la documentazione di Redshift di search_path.
aws_iam_role Solo se si usano ruoli IAM per autorizzare. None ARN completamente specificato del ruolo delle operazioni IAM Redshift COPY/UNLOAD collegate al cluster Redshift, arn:aws:iam::123456789000:role/<redshift-iam-role>ad esempio .
forward_spark_s3_credentials No false Se true, l'origine dati individua automaticamente le credenziali usate da Spark per connettersi a S3 e inoltra tali credenziali a Redshift tramite JDBC. Queste credenziali vengono inviate come parte della query JDBC, pertanto è consigliabile abilitare la crittografia SSL della connessione JDBC quando si usa questa opzione.
temporary_aws_access_key_id No None La chiave di accesso AWS deve disporre delle autorizzazioni di scrittura per il bucket S3.
temporary_aws_secret_access_key No None Chiave di accesso privata AWS corrispondente alla chiave di accesso fornita.
temporary_aws_session_token No None Token di sessione AWS corrispondente alla chiave di accesso fornita.
tempdir None Una posizione scrivibile in Amazon S3, da usare per i dati scaricati durante la lettura e l'avro dei dati da caricare in Redshift durante la scrittura. Se si usa l'origine dati Redshift per Spark come parte di una normale pipeline ETL, può essere utile impostare criteri relativi al ciclo di vita in un bucket e usarla come posizione temporanea per questi dati.

Non è possibile usare percorsi esterni definiti in Unity Catalog come tempdir posizioni.
jdbcdriver No Determinato dal sottoprotocolo dell'URL JDBC. Nome della classe del driver JDBC da usare. Questa classe deve trovarsi nel classpath. Nella maggior parte dei casi, non deve essere necessario specificare questa opzione, perché il nome della classe driver appropriato deve essere determinato automaticamente dal sottoprotocolo dell'URL JDBC.
diststyle No EVEN Stile di distribuzione Redshift da utilizzare durante la creazione di una tabella. Può essere uno di EVENo KEY ALL (vedere la documentazione di Redshift). Quando si usa KEY, è necessario impostare anche una chiave di distribuzione con l'opzione distkey .
distkey No, a meno che non si usi DISTSTYLE KEY None Nome di una colonna nella tabella da utilizzare come chiave di distribuzione durante la creazione di una tabella.
sortkeyspec No None Definizione completa della chiave di ordinamento redshift. Alcuni esempi:

- SORTKEY(my_sort_column)
- COMPOUND SORTKEY(sort_col_1, sort_col_2)
- INTERLEAVED SORTKEY(sort_col_1, sort_col_2)
usestagingtable (deprecato) No true Se si imposta questa opzione deprecata su false , la tabella di destinazione di un'operazione di sovrascrittura viene eliminata immediatamente all'inizio della scrittura, rendendo l'operazione di sovrascrittura non atomica e riducendo la disponibilità della tabella di destinazione. Ciò può ridurre i requisiti di spazio su disco temporanei per le sovrascrizioni.

Poiché l'impostazione usestagingtable=false dell'operazione comporta la perdita o la mancata disponibilità dei dati, è deprecata a favore della richiesta di eliminare manualmente la tabella di destinazione.
description No None Descrizione della tabella. Verrà impostato usando il comando SQL COMMENT e dovrebbe essere visualizzato nella maggior parte degli strumenti di query. Vedere anche i description metadati per impostare le descrizioni sulle singole colonne.
preactions No None Elenco ; separato di comandi SQL da eseguire prima del caricamento COPY del comando. Potrebbe essere utile avere alcuni DELETE comandi o un'esecuzione simile qui prima di caricare nuovi dati. Se il comando contiene %s, il nome della tabella viene formattato in prima dell'esecuzione (nel caso in cui si usi una tabella di staging).

Si avvisa che se questi comandi hanno esito negativo, viene considerato come un errore e viene generata un'eccezione. Se si usa una tabella di staging, le modifiche vengono ripristinate e la tabella di backup ripristinata in caso di esito negativo delle azioni preliminari.
postactions No None Elenco ; separato di comandi SQL da eseguire dopo un esito positivo COPY durante il caricamento dei dati. Può essere utile avere alcuni GRANT comandi o un'esecuzione simile in questo caso durante il caricamento di nuovi dati. Se il comando contiene %s, il nome della tabella viene formattato in prima dell'esecuzione (nel caso in cui si usi una tabella di staging).

Si avvisa che se questi comandi hanno esito negativo, viene considerato come un errore e viene generata un'eccezione. Se si usa una tabella di staging, le modifiche vengono ripristinate e la tabella di backup ripristinata in caso di esito negativo delle azioni post.If using a staging table, the changes are reverted and the backup table restored if post actions fail.
extracopyoptions No None Elenco di opzioni aggiuntive da aggiungere al comando Redshift COPY durante il caricamento dei dati, ad esempio
TRUNCATECOLUMNS oppure MAXERROR n (vedere la documentazione di Redshift per altre opzioni).

Poiché queste opzioni vengono aggiunte alla fine del COPY comando, è possibile usare solo le opzioni che hanno senso alla fine del comando, ma che devono coprire i casi d'uso più possibili.
tempformat No AVRO Formato in cui salvare i file temporanei in S3 durante la scrittura in Redshift. Il valore predefinito è
AVRO; gli altri valori consentiti sono CSV e CSV GZIP rispettivamente per CSV e csv con gzipped.

Redshift è notevolmente più veloce quando si caricano file CSV rispetto al caricamento di file Avro, quindi l'uso di tempformat può offrire un notevole miglioramento delle prestazioni durante la scrittura in Redshift.
csvnullstring No @NULL@ Valore String da scrivere per i valori Null quando si usa il formato tempformat CSV. Deve trattarsi di un valore che non viene visualizzato nei dati effettivi.
csvseparator No , Separatore da utilizzare per la scrittura di file temporanei con tempformat impostato su CSV o
CSV GZIP. Deve essere un carattere ASCII valido, ad esempio "," o "\|".
csdrivereleadingwhitespace No true Se impostato su true, rimuove gli spazi vuoti iniziali dai valori durante le scritture quando
tempformat è impostato su CSV o CSV GZIP. In caso contrario, gli spazi vuoti vengono mantenuti.
csdriveretrailingwhitespace No true Se impostato su true, rimuove gli spazi vuoti finali dai valori durante le scritture quando
tempformat è impostato su CSV o CSV GZIP. In caso contrario, lo spazio vuoto viene mantenuto.
infer_timestamp_ntz_type No false Se true, i valori di tipo Redshift TIMESTAMP vengono interpretati come TimestampNTZType (timestamp senza fuso orario) durante le letture. In caso contrario, tutti i timestamp vengono interpretati come TimestampType indipendentemente dal tipo nella tabella Redshift sottostante.

Opzioni di configurazione aggiuntive

Configurazione delle dimensioni massime delle colonne stringa

Quando si creano tabelle Redshift, il comportamento predefinito consiste nel creare TEXT colonne per le colonne stringa. Redshift archivia TEXT le colonne come VARCHAR(256), pertanto queste colonne hanno una dimensione massima di 256 caratteri (origine).

Per supportare colonne di dimensioni maggiori, è possibile usare il maxlength campo dei metadati della colonna per specificare la lunghezza massima delle singole colonne stringa. Ciò è utile anche per implementare ottimizzazioni delle prestazioni di risparmio di spazio dichiarando colonne con una lunghezza massima inferiore rispetto all'impostazione predefinita.

Nota

A causa delle limitazioni in Spark, le API del linguaggio SQL e R non supportano la modifica dei metadati delle colonne.

Python

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Scala

Di seguito è riportato un esempio di aggiornamento dei campi di metadati di più colonne usando l'API Scala di Spark:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

Impostare un tipo di colonna personalizzato

Se è necessario impostare manualmente un tipo di colonna, è possibile usare i metadati della redshift_type colonna. Ad esempio, se si desidera eseguire l'override del Spark SQL Schema -> Redshift SQL matcher di tipo per assegnare un tipo di colonna definito dall'utente, è possibile eseguire le operazioni seguenti:

Python

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Scala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

Configurare la codifica delle colonne

Quando si crea una tabella, usare il encoding campo dei metadati della colonna per specificare una codifica di compressione per ogni colonna (vedere La documentazione di Amazon per le codifiche disponibili).

Impostazione delle descrizioni sulle colonne

Redshift consente alle colonne di avere descrizioni associate che devono essere visualizzate nella maggior parte degli strumenti di query (usando il COMMENT comando ). È possibile impostare il description campo dei metadati della colonna per specificare una descrizione per le singole colonne.

Eseguire il push delle query in Redshift

Spark Optimizer esegue il push degli operatori seguenti in Redshift:

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

All'interno di Project e Filtersupporta le espressioni seguenti:

  • Maggior parte degli operatori di logica booleani
  • Confronti
  • Operazioni aritmetiche di base
  • Cast numerici e stringa
  • Maggior parte delle funzioni stringa
  • Sottoquery scalari, se possono essere spostate interamente in Redshift.

Nota

Questo pushdown non supporta le espressioni che operano su date e timestamp.

In Aggregationsupporta le funzioni di aggregazione seguenti:

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

combinato con la DISTINCT clausola , ove applicabile.

All'interno Joindi supporta i tipi di join seguenti:

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • Sottoquery riscritte da Join Optimizer, ad esempio WHERE EXISTS, WHERE NOT EXISTS

Nota

Il pushdown di join non supporta FULL OUTER JOIN.

Il pushdown potrebbe essere più utile nelle query con LIMIT. Una query come SELECT * FROM large_redshift_table LIMIT 10 potrebbe richiedere molto tempo, perché l'intera tabella sarebbe prima UNLOADed su S3 come risultato intermedio. Con il pushdown, viene LIMIT eseguito in Redshift. Nelle query con aggregazioni, il push dell'aggregazione in Redshift consente anche di ridurre la quantità di dati da trasferire.

Il push delle query in Redshift è abilitato per impostazione predefinita. Può essere disabilitato impostando spark.databricks.redshift.pushdown su false. Anche se disabilitato, Spark esegue comunque il push dei filtri ed esegue l'eliminazione delle colonne in Redshift.

Installazione del driver Redshift

L'origine dati Redshift richiede anche un driver JDBC compatibile con Redshift. Poiché Redshift si basa sul sistema di database PostgreSQL, è possibile usare il driver JDBC PostgreSQL incluso in Databricks Runtime o il driver JDBC di Redshift consigliato da Amazon. Non è necessaria alcuna installazione per usare il driver JDBC PostgreSQL. La versione del driver JDBC PostgreSQL inclusa in ogni versione di Databricks Runtime è elencata nelle note sulla versione di Databricks Runtime.

Per installare manualmente il driver JDBC di Redshift:

  1. Scaricare il driver da Amazon.
  2. Caricare il driver nell'area di lavoro di Azure Databricks. Vedere Librerie.
  3. Installare la libreria nel cluster.

Nota

Databricks consiglia di usare la versione più recente del driver JDBC di Redshift. Le versioni del driver JDBC redshift inferiori alla 1.2.41 presentano le limitazioni seguenti:

  • La versione 1.2.16 del driver restituisce dati vuoti quando si usa una where clausola in una query SQL.
  • Le versioni del driver precedenti alla 1.2.41 possono restituire risultati non validi perché il supporto dei valori Null di una colonna viene erroneamente segnalato come "Non nullable" anziché "Sconosciuto".

Garanzie transazionali

Questa sezione descrive le garanzie transazionali dell'origine dati Redshift per Spark.

Informazioni generali sulle proprietà redshift e S3

Per informazioni generali sulle garanzie transazionali di Redshift, vedere il capitolo Managing Concurrent Write Operations (Gestione delle operazioni di scrittura simultanee) nella documentazione di Redshift. In breve, Redshift fornisce l'isolamento serializzabile in base alla documentazione per il comando Redshift BEGIN :

[anche se] è possibile usare uno dei quattro livelli di isolamento delle transazioni, Amazon Redshift elabora tutti i livelli di isolamento come serializzabili.

Secondo la documentazione di Redshift:

Amazon Redshift supporta un comportamento di commit automatico predefinito in cui ogni comando SQL eseguito separatamente esegue il commit singolarmente.

Pertanto, i singoli comandi come COPY e UNLOAD sono atomici e transazionali, mentre espliciti BEGIN e END devono essere necessari solo per applicare l'atomicità di più comandi o query.

Durante la lettura e la scrittura in Redshift, l'origine dati legge e scrive i dati in S3. Sia Spark che Redshift producono output partizionato e lo archiviano in più file in S3. Secondo la documentazione di Amazon S3 Data Consistency Model , le operazioni di presentazione dei bucket S3 sono alla fine coerenti, quindi i file devono passare a lunghezze speciali per evitare dati mancanti o incompleti a causa di questa origine di coerenza finale.

Garanzie dell'origine dati Redshift per Spark

Accodare a una tabella esistente

Quando si inseriscono righe in Redshift, l'origine dati usa il comando COPY e specifica i manifesti per proteggersi da determinate operazioni S3 coerenti alla fine. Di conseguenza, spark-redshift le aggiunte alle tabelle esistenti hanno le stesse proprietà atomiche e transazionali dei normali comandi redshift COPY .

Creare una nuova tabella (SaveMode.CreateIfNotExists)

La creazione di una nuova tabella è un processo in due passaggi, costituito da un CREATE TABLE comando seguito da un comando COPY per aggiungere il set iniziale di righe. Entrambe le operazioni vengono eseguite nella stessa transazione.

Sovrascrivere una tabella esistente

Per impostazione predefinita, l'origine dati usa transazioni per eseguire sovrascrizioni, implementate eliminando la tabella di destinazione, creando una nuova tabella vuota e accodando le righe.

Se l'impostazione deprecata usestagingtable è impostata su false, l'origine dati esegue il commit del DELETE TABLE comando prima di accodare le righe alla nuova tabella, sacrificando l'atomicità dell'operazione di sovrascrittura, ma riducendo la quantità di spazio di staging necessario da Redshift durante la sovrascrittura.

Eseguire query sulla tabella Redshift

Le query usano il comando REDSHIFT UNLOAD per eseguire una query e salvare i risultati in S3 e usare manifesti per proteggersi da determinate operazioni S3 coerenti . Di conseguenza, le query dall'origine dati Redshift per Spark devono avere le stesse proprietà di coerenza delle normali query Redshift.

Problemi e soluzioni comuni

Il bucket S3 e il cluster Redshift si trovano in aree AWS diverse

Per impostazione predefinita, le copie S3 <-> Redshift non funzionano se il bucket S3 e il cluster Redshift si trovano in aree AWS diverse.

Se si tenta di leggere una tabella Redshift quando il bucket S3 si trova in un'area diversa, potrebbe essere visualizzato un errore, ad esempio:

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

Analogamente, il tentativo di scrivere in Redshift usando un bucket S3 in un'area diversa può causare l'errore seguente:

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • Scritture: il comando Redshift COPY supporta la specifica esplicita dell'area del bucket S3, in modo da poter eseguire operazioni di scrittura in Redshift correttamente in questi casi aggiungendo region 'the-region-name' all'impostazione extracopyoptions . Ad esempio, con un bucket nell'area Stati Uniti orientali (Virginia) e nell'API Scala, usare:

    .option("extracopyoptions", "region 'us-east-1'")
    

    In alternativa, è possibile usare l'impostazione awsregion :

    .option("awsregion", "us-east-1")
    
  • Letture: il comando UNLOAD di Redshift supporta anche specifiche esplicite dell'area del bucket S3. È possibile fare in modo che le letture funzionino correttamente aggiungendo l'area all'impostazione awsregion :

    .option("awsregion", "us-east-1")
    

Errore di autenticazione quando si usa una password con caratteri speciali nell'URL JDBC

Se si specificano il nome utente e la password come parte dell'URL JDBC e la password contiene caratteri speciali, ;ad esempio , ?o &, è possibile che venga visualizzata l'eccezione seguente:

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

Ciò è causato da caratteri speciali nel nome utente o nella password non preceduti correttamente dall'escape del driver JDBC. Assicurarsi di specificare il nome utente e la password usando le opzioni user del dataframe corrispondenti e password. Per altre informazioni, vedere Parametri.

La query Spark con esecuzione prolungata si blocca a tempo indefinito anche se l'operazione Redshift corrispondente viene eseguita

Se si stanno leggendo o scrivendo grandi quantità di dati da e a Redshift, la query Spark potrebbe bloccarsi a tempo indefinito, anche se la pagina di monitoraggio di AWS Redshift indica che l'operazione o UNLOAD corrispondente LOAD è stata completata e che il cluster è inattiva. Ciò è causato dal timeout della connessione tra Redshift e Spark. Per evitare questo problema, assicurarsi che il tcpKeepAlive flag JDBC sia abilitato e TCPKeepAliveMinutes sia impostato su un valore basso ,ad esempio 1.

Per altre informazioni, vedere Amazon Redshift JDBC Driver Configuration.For additional information, see Amazon Redshift JDBC Driver Configuration.

Timestamp con semantica del fuso orario

Durante la lettura dei dati, sia Redshift TIMESTAMP TIMESTAMPTZ che i tipi di dati vengono mappati a Spark TimestampTypee un valore viene convertito in utc (Coordinated Universal Time) e viene archiviato come timestamp UTC. Per redshift TIMESTAMP, si presuppone che il fuso orario locale non contenga informazioni sul fuso orario. Quando si scrivono dati in una tabella Redshift, viene eseguito il mapping di spark TimestampType al tipo di dati Redshift TIMESTAMP .

Guida alla migrazione

L'origine dati richiede ora di impostare forward_spark_s3_credentials in modo esplicito prima che le credenziali di Spark S3 vengano inoltrate a Redshift. Questa modifica non ha alcun impatto se si usano i aws_iam_role meccanismi di autenticazione o temporary_aws_* . Tuttavia, se si è basato sul comportamento predefinito precedente, è ora necessario impostare forward_spark_s3_credentials in modo esplicito su true per continuare a usare il precedente meccanismo di autenticazione da Redshift a S3. Per una descrizione dei tre meccanismi di autenticazione e dei relativi compromessi di sicurezza, vedere la sezione Autenticazione a S3 e Redshift di questo documento.