Interroger Amazon Redshift avec Azure Databricks
Vous pouvez lire et écrire des tableaux à partir d’Amazon Redshift avec Azure Databricks.
Important
Les configurations décrites dans cet article sont Expérimentales. Les fonctionnalités expérimentales sont fournies en l’état et ne sont pas prises en charge par Databricks via le support technique client. Pour bénéficier d’une prise en charge complète de la fédération de requêtes, vous devez plutôt utiliser Lakehouse Federation, qui permet à vos utilisateurs Azure Databricks de profiter de la syntaxe Unity Catalog et des outils de gouvernance des données.
La source de données Databricks Redshift utilise Amazon S3 pour transférer efficacement des données depuis et vers Redshift, et utilise Java Database Connectivity (JDBC) pour déclencher automatiquement les commandes Redshift appropriées COPY
et UNLOAD
.
Remarque
Dans Databricks Runtime 11.3 LTS et versions ultérieures, Databricks Runtime inclut le pilote JDBC Redshift, accessible à l’aide du mot clé redshift
comme option de format. Consultez les versions des notes de publication et la compatibilité de Databricks Runtime pour les versions de pilote incluses dans chaque Runtime Databricks. Les pilotes fournis par l’utilisateur sont toujours pris en charge et sont prioritaires sur le pack pilote JDBC.
Dans Databricks Runtime 10.4 LTS et versions antérieures, l’installation manuelle du pilote JDBC Redshift est requise, et pour les requêtes il convient d’utiliser le pilote (com.databricks.spark.redshift
) comme format. Consultez Installation du pilote Redshift.
Utilisation
Les exemples suivants illustrent la connexion avec le pilote Redshift. Remplacez les valeurs de paramètre url
si vous utilisez le pilote JDBC PostgreSQL.
Une fois que vous avez configuré vos informations d’identification AWS, vous pouvez utiliser la source de données avec l’API de source de données Spark dans Python, SQL, R ou Scala.
Important
Les emplacements externes définis dans Unity Catalog ne sont pas pris en charge en tant qu’emplacements tempdir
.
Python
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)
# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)
SQL
Lire des données à l’aide de SQL sur Databricks Runtime 10.4 LTS et versions antérieures :
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
dbtable '<table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Lire des données à l’aide de SQL sur Databricks Runtime 11.3 LTS et versions ultérieures :
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 5439 if not specified. *./
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
tempdir 's3a://<bucket>/<directory-path>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Écrire les données à l’aide de SQL :
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
dbtable '<new-table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;
L’API SQL prend uniquement en charge la création de nouveaux tableaux et non l’écriture ou l’ajout.
R
Lire des données à l’aide de R sur Databricks Runtime 10.4 LTS et versions antérieures :
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
Lire des données à l’aide de R sur Databricks Runtime 11.3 LTS et versions ultérieures :
df <- read.df(
NULL,
"redshift",
host = "hostname",
port = "port",
user = "username",
password = "password",
database = "database-name",
dbtable = "schema-name.table-name",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
forward_spark_s3_credentials = "true",
dbtable = "<your-table-name>")
Scala
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 5439 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", true)
.load()
// Read data from a query
val df = spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
Suggestions pour travailler avec Redshift
L’exécution des requêtes peut extraire de grandes quantités de données dans S3. Si vous envisagez d’effectuer plusieurs requêtes sur les mêmes données dans Redshift, Databricks recommande d’enregistrer les données extraites à l’aide de Delta Lake.
Configuration
Authentification auprès de S3 et de Redshift
La source de données implique plusieurs connexions réseau, illustrées dans le diagramme suivant :
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
La source de données lit et écrit des données dans S3 lors du transfert de données vers/depuis Redshift. Par conséquent, elle nécessite des informations d’identification AWS avec un accès en lecture et en écriture à un compartiment S3 (spécifié à l’aide du paramètre de configuration tempdir
).
Remarque
La source de données ne nettoie pas les fichiers temporaires qu’elle crée dans S3. Par conséquent, nous vous recommandons d’utiliser un compartiment S3 temporaire dédié avec une configuration de cycle de vie d’objet pour vous assurer que les fichiers temporaires sont automatiquement supprimés après une période d’expiration spécifiée. Pour plus d’informations sur le chiffrement de ces fichiers, consultez la section Chiffrement de ce document. Vous ne pouvez pas utiliser un emplacement externe défini dans Unity Catalog comme emplacement tempdir
.
Les sections suivantes décrivent les options de configuration d’authentification pour chaque type de connexion.
Pilote Spark vers Redshift
Le pilote Spark se connecte à Redshift via JDBC à l’aide d’un nom d’utilisateur et d’un mot de passe. Redshift ne prend pas en charge l’utilisation de rôles IAM pour authentifier cette connexion. Par défaut, cette connexion utilise le chiffrement SSL ; pour plus d’informations, consultez Chiffrement.
Spark vers S3
S3 fait office d’intermédiaire pour stocker des données de masse lors de la lecture ou de l’écriture dans Redshift. Spark se connecte à S3 à la fois à l’aide des interfaces Hadoop FileSystem, et directement à l’aide du client S3 du Kit de développement logiciel (SDK) Amazon Java.
Remarque
Vous ne pouvez pas utiliser de montages DBFS pour configurer l’accès à S3 pour Redshift.
Définir des clés dans la configuration Hadoop : vous pouvez spécifier des clés AWS à l’aide des propriétés de configuration Hadoop. Si votre configuration
tempdir
pointe vers un système de fichierss3a://
, vous pouvez définir les propriétésfs.s3a.access.key
etfs.s3a.secret.key
dans un fichier de configuration XML Hadoop ou appelersc.hadoopConfiguration.set()
pour configurer la configuration Hadoop globale de Spark. Si vous utilisez un système de fichierss3n://
, vous pouvez fournir les clés de configuration héritées, comme illustré dans l’exemple suivant.Scala
Par exemple, si vous utilisez le système de fichiers
s3a
, ajoutez :sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
Pour le système de fichiers hérité
s3n
, ajoutez :sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
Python
La commande suivante s’appuie sur des éléments Spark internes, mais elle fonctionne en principe avec toutes les versions de PySpark. De plus, il est peu probable qu’elle change à l’avenir :
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
Redshift vers S3
Définissez l’option forward_spark_s3_credentials
sur true
pour transférer automatiquement les informations d’identification de clé AWS que Spark utilise pour se connecter à S3 via JDBC vers Redshift. La requête JDBC incorpore ces informations d’identification. Par conséquent, Databricks recommande vivement d’activer le chiffrement SSL de la connexion JDBC.
Chiffrement
Sécurisation de JDBC : sauf si des paramètres liés au protocole SSL sont présents dans l’URL JDBC, la source de données par défaut active le chiffrement SSL et vérifie également la fiabilité du serveur Redshift (autrement dit,
sslmode=verify-full
). Pour cela, un certificat de serveur est téléchargé automatiquement à partir des serveurs Amazon la première fois que cela est nécessaire. En cas d’échec, un fichier de certificat pré-groupé est utilisé comme moyen de secours. Cela est valable à la fois pour les pilotes Redshift et PostgreSQL JDBC.En cas de problèmes avec cette fonctionnalité, ou si vous souhaitez simplement désactiver SSL, vous pouvez appeler
.option("autoenablessl", "false")
sur votreDataFrameReader
ouDataFrameWriter
.Si vous souhaitez spécifier des paramètres personnalisés liés au protocole SSL, vous pouvez suivre les instructions de la documentation Redshift : Utilisation de certificats SSL et de certificats de serveur sur Java et les options de configuration du pilote JDBC Toutes les options liées au protocole SSL présentes dans l’
url
JDBC utilisées avec la source de données sont prioritaires (autrement dit, la configuration automatique ne se déclenche pas).Chiffrement des données UNLOAD stockées dans S3 (données stockées lors de la lecture à partir de Redshift) : selon la documentation Redshift sur le déchargement des données vers S3, « UNLOAD chiffre automatiquement les fichiers de données à l’aide du chiffrement côté serveur Amazon S3 (SSE-S3) ».
Redshift prend également en charge le chiffrement côté client avec une clé personnalisée (voir : Déchargement de fichiers de données chiffrés), mais la source de données ne peut pas spécifier la clé symétrique requise.
Chiffrement des données COPY stockées dans S3 (données stockées lors de l’écriture dans Redshift) : selon la documentation Redshift sur le chargement de fichiers de données chiffrés à partir d’Amazon S3 :
Vous pouvez utiliser la commande COPY
pour charger des fichiers de données qui ont été chargés sur Amazon S3 par un chiffrement côté serveur avec clés de chiffrement gérées par AWS (SSE-S3 ou SSE-KMS), un chiffrement côté client, ou les deux. COPY ne prend pas en charge le chiffrement côté serveur avec une clé fournie par le client (SSE-C).
Paramètres
Le mappage de paramètres ou OPTIONS fourni dans Spark SQL prend en charge les paramètres suivants :
Paramètre | Obligatoire | Default | Description |
---|---|---|---|
dbtable | Oui, sauf si la requête est spécifiée. | Aucun(e) | Table à partir de laquelle créer ou lire dans Redshift. Ce paramètre est obligatoire pour la réécriture des données dans Redshift. |
query | Oui, sauf si dbtable est spécifié. | Aucun(e) | Requête à lire dans Redshift. |
utilisateur | Non | Aucun(e) | Nom d’utilisateur Redshift. Doit être utilisé en tandem avec l’option mot de passe. Ne peut être utilisé que si l’utilisateur et le mot de passe ne sont pas transmis dans l’URL, car si les deux sont transmis, une erreur se produit. Utilisez ce paramètre lorsque le nom d’utilisateur contient des caractères spéciaux qui doivent être échappés. |
mot de passe | Non | Aucun(e) | Mot de passe Redshift. Doit être utilisé en tandem avec l’option user . Ne peut être utilisé que si l’utilisateur et le mot de passe ne sont pas transmis dans l’URL, car la transmission des deux entraîne une erreur. Utilisez ce paramètre lorsque le mot de passe contient des caractères spéciaux qui doivent être échappés. |
url | Oui | Aucun(e) | URL JDBC, au formatjdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password> subprotocol peut être postgresql ou redshift , selon le gestionnaire JDBC que vous avez chargé. Un gestionnaire compatible Redshift doit se trouver sur le classpath et correspondre à cette URL. host et port doivent pointer vers le nœud principal de Redshift, de sorte que les groupes de sécurité et/ou le VPC doivent être configurés pour permettre l’accès à partir de votre application gestionnaire. database identifie un nom de base de données Redshift user et password qui sont des identifiants pour accéder à la base de données, à incorporer dans cette URL pour JDBC, et votre compte d’utilisateur doit disposer de privilèges nécessaires pour la table référencée. |
search_path | Non | Aucun(e) | Définissez le chemin d’accès de recherche de schéma dans Redshift. Est défini à l’aide de la commande SET search_path to . Doit être une liste séparée par des virgules des noms de schémas dans laquelle rechercher des tables. Consultez Documentation redshift de search_path. |
aws_iam_role | Uniquement si vous utilisez des rôles IAM pour les autorisations. | Aucun(e) | ARN entièrement spécifié du rôle d’opérations IAM Redshift COPIER/DÉCHARGER attaché au cluster Redshift, par exemple arn:aws:iam::123456789000:role/<redshift-iam-role> . |
forward_spark_s3_credentials | Non | false |
Si true , la source de données découvre automatiquement les identifiants que Spark utilise pour se connecter à S3 et transmet ces identifiants à Redshift via JDBC. Ces identifiants sont envoyés dans le cadre de la requête JDBC. Il est donc vivement recommandé d’activer le chiffrement SSL de la connexion JDBC lors de l’utilisation de cette option. |
temporary_aws_access_key_id | Non | Aucun(e) | La clé d’accès AWS doit disposer d’autorisations d’écriture dans le compartiment S3. |
temporary_aws_secret_access_key | Non | Aucun(e) | Clé d’accès secrète AWS correspondant à la clé d’accès fournie. |
temporary_aws_session_token | Non | Aucun(e) | Jeton de session AWS correspondant à la clé d’accès fournie. |
tempdir | Oui | Aucun(e) | Emplacement accessible en écriture dans Amazon S3, à utiliser pour les données déchargées lors de la lecture et du chargement des données Avro dans Redshift lors de l’écriture. Si vous utilisez la source de données Redshift pour Spark dans le cadre d’un pipeline ETL standard, il peut être utile de définir une stratégie de cycle de vie sur un compartiment et de l’utiliser comme emplacement temporaire pour ces données. Vous ne pouvez pas utiliser des emplacements externes définis dans Unity Catalog comme emplacements tempdir . |
jdbcdriver | Non | Déterminé par le sous-protocole de l’URL JDBC. | Nom de la classe du pilote JDBC à utiliser. Cette classe doit être sur le classpath. Dans la plupart des cas, il n’est pas nécessaire de spécifier cette option, car le nom de la classe du gestionnaire approprié est en principe défini automatiquement par le sous-protocole de l’URL JDBC. |
diststyle | Non | EVEN |
Style de distribution Redshift à utiliser lors de la création d’une table. Il peut s’agir de EVEN , KEY or ALL (consulter la documentation Redshift). Lorsque vous utilisez KEY , vous devez également définir une clé de distribution avec l’option distkey. |
distkey | Non, sauf si vous utilisez DISTSTYLE KEY |
Aucun(e) | Nom d’une colonne dans la table à utiliser comme clé de distribution lors de la création d’une table. |
sortkeyspec | Non | Aucun(e) | Définition complète de la Clé de tri Redshift. Voici quelques exemples : - SORTKEY(my_sort_column) - COMPOUND SORTKEY(sort_col_1, sort_col_2) - INTERLEAVED SORTKEY(sort_col_1, sort_col_2) |
usetagingtable (déconseillé) | Non | true |
En définissant cette option déconseillée à false , la table de destination d’une opération de remplacement est supprimée immédiatement au début de l’écriture, ce qui rend l’opération de remplacement non atomique et réduit la disponibilité de la table de destination. Cette opération peut réduire l’espace disque temporaire requis pour les remplacements.Étant donné que l’opération usestagingtable=false présente un risque de perte de données ou d’indisponibilité, elle est déconseillée au profit d’une suppression manuelle de la table de destination. |
description | Non | Aucun(e) | Une description de la table. Est défini à l’aide de la commande SQL COMMENTAIRE et doit apparaître dans la plupart des outils de requête. Consultez également les métadonnées description pour définir les descriptions des différentes colonnes. |
preactions | Non | Aucun(e) | Une liste de commandes SQL séparées par ; à exécuter avant de charger la commande COPY . Il peut être utile d’exécuter ici quelques commandes DELETE ou similaires avant de charger de nouvelles données. Si la commande contient %s , le nom de la table est mis en forme avant l’exécution (si vous utilisez une table de mise en lots).Attention, si ces commandes échouent, une erreur est commise et une exception est levée. Si vous utilisez une table de mise en lots, les modifications sont restaurées et la table de sauvegarde restaurée si les actions préalables échouent. |
postactions | Non | Aucun(e) | Une liste de commandes SQL séparées par ; à exécuter après un COPY réussi lors du chargement des données. Il peut être utile d’exécuter ici quelques commandes GRANT ou similaires lors du chargement de nouvelles données. Si la commande contient %s , le nom de la table est mis en forme avant l’exécution (si vous utilisez une table de mise en lots).Attention, si ces commandes échouent, une erreur est commise et une exception est levée. Si vous utilisez une table de mise en lots, les modifications sont restaurées et la table de sauvegarde est également restaurée en cas d’échec des actions postérieures. |
extracopyoptions | Non | Aucun(e) | Liste des options supplémentaires à ajouter à la commande Redshift COPY lors du chargement de données, par exemple, TRUNCATECOLUMNS ou MAXERROR n (consultez la documentation Redshift pour d’autres options).Ces options étant ajoutées à la fin de la commande COPY , seules les options qui ont un sens à la fin de la commande peuvent être utilisées. Toutefois, cette liste devrait couvrir la plupart des cas d’utilisation possibles. |
tempformat | Non | AVRO |
Format dans lequel enregistrer des fichiers temporaires dans S3 lors de l’écriture dans Redshift. La valeur par défaut est AVRO ; les autres valeurs autorisées sont CSV et CSV GZIP pour csv et csv gzipped, respectivement.Redshift est beaucoup plus rapide lorsqu’il charge des fichiers CSV que lorsqu’il charge des fichiers Avro, de sorte que l’utilisation de ce tempformat peut permettre d’améliorer considérablement les performances lors de l’écriture dans Redshift. |
csvnullstring | Non | @NULL@ |
La valeur de la chaîne à écrire pour les valeurs nulles lors de l’utilisation du format CSV. Il doit s’agir d’une valeur qui n’apparaît pas dans vos données réelles. |
csvseparator | Non | , |
Séparateur à utiliser lors de l’écriture de fichiers temporaires avec tempformat défini sur CSV ou CSV GZIP . Il doit s’agir d’un caractère ASCII valide, par exemple, « , » ou « \| ». |
csvignoreleadingwhitespace | Non | true |
Lorsqu'il est réglé sur true, supprime les espaces blancs de début des valeurs durant les écritures lorsque tempformat est défini sur CSV ou CSV GZIP . Au cas contraire, l’espace blanc est conservé. |
csvignoretrailingwhitespace | Non | true |
Lorsqu'il est défini sur vrai, supprime l'espace blanc de fin des valeurs lors de l'écriture lorsque tempformat est défini sur CSV ou CSV GZIP . Au cas contraire, l’espace blanc est conservé. |
infer_timestamp_ntz_type | Non | false |
Si true , les valeurs de type Redshift TIMESTAMP sont interprétées comme TimestampNTZType (horodateur sans fuseau horaire) pendant les lectures. Dans le cas contraire, tous les horodatages sont interprétés comme étant TimestampType , quel que soit le type dans la table Redshift sous-jacente. |
Options de configuration supplémentaires
Configuration de la taille maximale des colonnes de chaîne
Lors de la création de tableaux Redshift, le comportement par défaut consiste à créer des colonnes TEXT
pour les colonnes de chaîne. Redshift stocke les colonnes TEXT
en tant que VARCHAR(256)
, de sorte que ces colonnes ont une taille maximale de 256 caractères (source).
Pour prendre en charge des colonnes plus volumineuses, vous pouvez utiliser le champ de métadonnées de colonne maxlength
afin de spécifier la longueur maximale des colonnes de chaîne individuelles. Cela est également utile à l’optimisation des performances d’économie d’espace en indiquant des colonnes dont la longueur maximale est inférieure à la valeur par défaut.
Remarque
En raison de limitations dans Spark, les API de langage SQL et R ne prennent pas en charge la modification des métadonnées de colonne.
Python
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
Scala
Voici un exemple de mise à jour des champs de métadonnées de plusieurs colonnes à l’aide de l’API Scala de Spark :
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
Définissez un type de colonne personnalisé
Si vous devez définir manuellement un type de colonne, vous pouvez utiliser les métadonnées de colonne redshift_type
. Par exemple, si vous souhaitez substituer le matcher de type Spark SQL Schema -> Redshift SQL
pour affecter un type de colonne défini par l’utilisateur, vous pouvez effectuer les opérations suivantes :
Python
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
Scala
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
Configurer le chiffrement de colonne
Lors de la création d’une table, utilisez le champ de métadonnées de colonne encoding
pour spécifier un encodage de compression pour chaque colonne (consultez la documentation Amazon pour les encodages disponibles).
Définir des descriptions sur les colonnes
Redshift permet aux colonnes d’avoir des descriptions jointes qui apparaissent dans la plupart des outils de requête (à l’aide de la commande COMMENT
). Vous pouvez définir le champ de métadonnées de colonne description
pour spécifier une description chaque colonne individuelle.
Requête pushdown dans Redshift
L’optimiseur Spark envoie (push) les opérateurs suivants vers Redshift :
Filter
Project
Sort
Limit
Aggregation
Join
Sur Project
et Filter
, il prend en charge les expressions suivantes :
- La plupart des opérateurs logiques booléens
- Comparaisons
- Opérations arithmétiques de base
- Casts numériques et de chaînes
- La plupart des fonctions de chaîne
- Sous-requêtes scalaires, si elles peuvent être entièrement envoyées (push) vers Redshift.
Remarque
Ce pushdown ne prend pas en charge les expressions qui fonctionnent sur des dates et des timestamps.
Sur Aggregation
, il prend en charge les fonctions d’agrégation suivantes :
AVG
COUNT
MAX
MIN
SUM
STDDEV_SAMP
STDDEV_POP
VAR_SAMP
VAR_POP
combiné à la clause DISTINCT
, le cas échéant.
Sur Join
, il prend en charge les types de jointures suivants :
INNER JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
- Sous-requêtes réécrites en
Join
par l’optimiseur, par exempleWHERE EXISTS
,WHERE NOT EXISTS
Remarque
La jointure pushdown ne prend pas FULL OUTER JOIN
en charge.
Le pushdown serait le plus bénéfique pour les requêtes avec LIMIT
. Une requête telle que SELECT * FROM large_redshift_table LIMIT 10
peut prendre beaucoup de temps, car l’ensemble du tableau serait d’abord UNLOADé sur S3 comme résultat intermédiaire. Avec le pushdown, le paramètre LIMIT
est exécuté dans Redshift. Dans les requêtes avec des agrégations, l’envoi (push) de l’agrégation dans Redshift permet également de réduire la quantité de données qui doivent être transférées.
La requête pushdown dans Redshift est activée par défaut. Cette fonctionnalité peut être désactivée en définissant spark.databricks.redshift.pushdown
sur false
. Même en cas de désactivation, Spark envoie (push) toujours les filtres et effectue l’élimination des colonnes dans Redshift.
Installation du pilote Redshift
La source de données Redshift nécessite également un pilote JDBC compatible avec Redshift. Étant donné que Redshift est basé sur le système de base de données PostgreSQL, vous pouvez utiliser le pilote JDBC PostgreSQL inclus avec Databricks Runtime ou le pilote JDBC pour Redshift recommandé par Amazon. Aucune installation n’est requise pour utiliser le pilote JDBC PostgreSQL. La version du pilote JDBC PostgreSQL incluse dans chaque version Databricks Runtime est répertoriée dans les notes de publication de Databricks Runtime.
Pour installer manuellement le pilote JDBC Redshift :
- Téléchargez le pilote à partir d’Amazon.
- Chargez le pilote dans votre espace de travail Azure Databricks. Consultez Bibliothèques.
- Installez la bibliothèque sur votre cluster.
Remarque
Databricks recommande d’utiliser la dernière version du pilote JDBC Redshift. Les versions du pilote JDBC Redshift inférieures à la version 1.2.41 présentent les limitations suivantes :
- La version 1.2.16 du pilote renvoie des données vides lors de l’utilisation d’une clause
where
dans une requête SQL. - Les versions du pilote inférieures à la version 1.2.41 peuvent renvoyer des résultats non valides, car la possibilité de valeur Nul d’une colonne est signalée de façon incorrecte comme « Non Nullable » au lieu d’« Inconnu ».
Garanties transactionnelles
Cette section décrit les garanties transactionnelles de la source de données Redshift pour Spark.
Arrière-plan général sur les propriétés Redshift et S3
Pour obtenir des informations générales sur les garanties transactionnelles Redshift, consultez le chapitre Gestion des opérations d’écriture simultanées dans la documentation Redshift. En bref, Redshift fournit une isolation sérialisable en fonction de la documentation de la commande BEGIN Redshift :
[Même si] vous pouvez utiliser l’un des quatre niveaux d’isolation des transactions, Amazon Redshift traite tous les niveaux d’isolation comme sérialisables.
Conformément à la documentation Redshift :
Amazon Redshift prend en charge un comportement de validation automatique par défaut dans lequel chaque commande SQL exécutée séparément est validée individuellement.
Ainsi, des commandes individuelles comme COPY
et UNLOAD
sont atomiques et transactionnelles, tandis que les commandes explicites BEGIN
et END
ne doivent être nécessaires que pour appliquer l’atomicité de plusieurs commandes ou requêtes.
Lors de la lecture et de l’écriture dans Redshift, la source de données lit et écrit des données dans S3. Spark et Redshift produisent tous deux une sortie partitionnée et la stockent dans plusieurs fichiers sur S3. Selon la documentation du modèle de cohérence des données Amazon S3, les opérations de référencement de compartiments S3 sont cohérentes. En raison de cette cohérence éventuelle, les fichiers doivent atteindre des longueurs spéciales afin d’éviter les données manquantes ou incomplètes.
Garanties de la source de données Redshift pour Spark
Ajouter à un tableau existant
Lors de l’insertion de lignes dans Redshift, la source de données utilise la commande COPY,et spécifie des manifestes pour se protéger de certaines opérations cohérentes S3. Par conséquent, spark-redshift
les ajouts aux tableaux existants ont les mêmes propriétés atomiques et transactionnelles que les commandes Redshift COPY
standard.
Créez un nouveau tableau (SaveMode.CreateIfNotExists
)
La création d’un nouveau tableau est un processus en deux étapes, constitué d’une commande CREATE TABLE
suivie d’une commande COPY pour s’ajouter à l’ensemble initial de lignes. Les deux opérations sont effectuées dans la même transaction.
Remplacer un tableau existant
Par défaut, la source de données utilise les transactions pour effectuer des remplacements. Ils sont implémentés en supprimant la table de destination, ce qui crée un nouveau tableau vide et y ajoute des lignes.
Si le paramètre déconseillé usestagingtable
est défini sur false
, la source de données valide la commande DELETE TABLE
avant d’ajouter des lignes au nouveau tableau. Cela sacrifie l’atomicité de l’opération de remplacement, mais réduit la quantité d’espace de transit nécessaire à Redshift pendant l’opération.
Tableau de requête Redshift
Les requêtes utilisent la commande Redshift UNLOAD pour exécuter une requête et enregistrer ses résultats dans S3, et utilisent des manifestes pour se protéger contre certaines opérations cohérentes S3. Par conséquent, les requêtes provenant de la source de données Redshift pour Spark doivent avoir les mêmes propriétés de cohérence que les requêtes Redshift standard.
Problèmes courants et solutions
Les compartiments S3 et les clusters Redshift se trouvent dans différentes régions AWS
Par défaut, les copies S3 Redshift <-> ne fonctionnent pas si le compartiment S3 et le cluster Redshift se trouvent dans des régions AWS différentes.
Si vous essayez de lire un tableau Redshift lorsque le compartiment S3 se trouve dans une autre région, vous pouvez voir une erreur comme :
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
De même, une tentative d’écriture dans Redshift à l’aide d’un compartiment S3 situé dans une autre région peut entraîner l’erreur suivante :
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
Écriture : La commande Redshift COPY prend en charge la spécification de la région du compartiment S3. Dans ces cas, vous pouvez donc faire fonctionner correctement les écritures dans Redshift en ajoutant
region 'the-region-name'
au paramètreextracopyoptions
. Par exemple, avec un compartiment dans la région de l’est des États-Unis (Virginie) et l’API Scala, utilisez :.option("extracopyoptions", "region 'us-east-1'")
Vous pouvez également utiliser le paramètre
awsregion
:.option("awsregion", "us-east-1")
Lecture : La commande Redshift UNLOAD prend également en charge la spécification de la région du compartiment S3. Vous pouvez faire fonctionner correctement les lectures en ajoutant la région au paramètre
awsregion
:.option("awsregion", "us-east-1")
Erreur d’authentification lors de l’utilisation d’un mot de passe avec des caractères spéciaux dans l’URL JDBC
Si vous fournissez le nom d’utilisateur et le mot de passe dans le cadre de l’URL JDBC et que le mot de passe contient des caractères spéciaux tels que ;
, ?
ou &
, vous pourrez voir l’exception suivante :
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'
Cela est causé par le fait que des caractères spéciaux dans le nom d’utilisateur ou le mot de passe ne sont pas placés correctement dans la séquence d’échappement par le pilote JDBC. Veillez à spécifier le nom d’utilisateur et le mot de passe à l’aide des options DataFrame correspondantes user
et password
. Pour plus d’informations, consultez Paramètres.
La requête de longue durée Spark se bloque indéfiniment même si l’opération Redshift correspondante est terminée
Si vous lisez ou écrivez de grandes quantités de données depuis et vers Redshift, votre requête Spark peut se bloquer indéfiniment, même si la page d’analyse AWS Redshift indique que l’opération correspondante LOAD
ou UNLOAD
est terminée et que le cluster est inactif. Cela est dû au délai d’attente entre Redshift et Spark. Pour éviter cela, vérifiez que l’indicateur JDBC tcpKeepAlive
est activé et que TCPKeepAliveMinutes
est défini sur une valeur faible (par exemple, 1).
Pour plus d’informations, consultez Configuration du pilote JDBC Amazon Redshift.
Sémantique de timestamp avec fuseau horaire
Lors de la lecture de données, Redshift TIMESTAMP
et TIMESTAMPTZ
les types de données sont mappés à Spark TimestampType
, et une valeur est convertie en temps universel coordonné (UTC) et est stockée en tant que timestamp UTC. Pour un TIMESTAMP
Redshift, le fuseau horaire local est supposé, puisque la valeur n’a pas d’information de fuseau horaire. Lors de l’écriture de données dans un tableau Redshift, un élément TimestampType
Spark est mappé au type de données TIMESTAMP
Redshift.
Guide de migration
La source de données vous oblige désormais à définir forward_spark_s3_credentials
explicitement avant que les informations d’identification Spark S3 soient transférées à Redshift. Cette modification n’a aucun impact si vous utilisez les mécanismes d’authentification aws_iam_role
ou temporary_aws_*
. Toutefois, si vous vous appuyez sur l’ancien comportement par défaut, vous devrez configurer explicitement forward_spark_s3_credentials
sur true
pour continuer à utiliser votre mécanisme d’authentification Redshift vers S3 précédent. Pour une discussion sur les trois mécanismes d’authentification et leurs compromis de sécurité, consultez la section S’authentifier au S3 et à Redshift de ce document.