Abfragen von Amazon Redshift mithilfe von Azure Databricks
Sie können Tabellen aus Amazon Redshift mit Azure Databricks lesen und schreiben.
Wichtig
Die in diesem Artikel beschriebenen Konfigurationen sind experimentell. Experimentelle Funktionen werden ohne Mängelgewähr („wie besehen“) bereitgestellt und nicht von Databricks über Kanäle des technischen Kundensupports unterstützt. Um vollständige Unterstützung für den Abfrageverbund zu erhalten, sollten Sie stattdessen Lakehouse-Verbund verwenden, wodurch Ihre Azure Databricks-Benutzer die Vorteile der Unity Catalog-Syntax und der Datengovernancetools nutzen können.
Die Datenquelle „Databricks Redshift“ verwendet Amazon S3 zum effizienten Übertragen von Daten in und aus Redshift. Sie verwendet JDBC, um die entsprechenden Befehle COPY
und UNLOAD
automatisch in Redshift auszulösen.
Hinweis
In Databricks Runtime 11.3 LTS und höher enthält Databricks Runtime den Redshift JDBC-Treiber, auf den mithilfe des Schlüsselworts redshift
für die Formatoption zugegriffen werden kann. Weitere Informationen zu den Treiberversionen, die in jeder Databricks Runtime-Instanz enthalten sind, finden Sie unter Versionshinweise zu Databricks Runtime-Versionen und -Kompatibilität. Vom Benutzer bereitgestellte Treiber werden weiterhin unterstützt und haben vor dem gebündelten JDBC-Treiber Vorrang.
In Databricks Runtime 10.4 LTS und früher ist eine manuelle Installation des Redshift JDBC-Treibers erforderlich, und für Abfragen muss der Treiber (com.databricks.spark.redshift
) für das Format verwendet werden. Siehe Redshift-Treiberinstallation.
Verwendung
In den folgenden Beispielen wird die Verbindung mit dem Redshift-Treiber veranschaulicht. Ersetzen Sie die url
Parameterwerte, wenn Sie den PostgreSQL JDBC-Treiber verwenden.
Nachdem Sie Ihre AWS-Anmeldeinformationen konfiguriert haben, können Sie die Datenquelle mit der Spark-Datenquellen-API in Python, SQL, R oder Scala verwenden.
Wichtig
Externe Speicherorte, die in Unity Catalog definiert sind, werden nicht als tempdir
-Speicherorte unterstützt.
Python
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)
# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)
SQL
Lesen von Daten mithilfe von SQL in Databricks Runtime 10.4 LTS und früher:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
dbtable '<table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Lesen von Daten mithilfe von SQL in Databricks Runtime 11.3 LTS und höher:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 5439 if not specified. *./
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
tempdir 's3a://<bucket>/<directory-path>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Schreiben von Daten mithilfe von SQL:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
dbtable '<new-table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;
Die SQL-API unterstützt nur das Erstellen neuer Tabellen und nicht das Überschreiben oder Anfügen.
R
Lesen von Daten mithilfe von R in Databricks Runtime 10.4 LTS und früher:
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
Lesen von Daten mithilfe von R in Databricks Runtime 11.3 LTS und höher:
df <- read.df(
NULL,
"redshift",
host = "hostname",
port = "port",
user = "username",
password = "password",
database = "database-name",
dbtable = "schema-name.table-name",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
forward_spark_s3_credentials = "true",
dbtable = "<your-table-name>")
Scala
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 5439 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", true)
.load()
// Read data from a query
val df = spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
Empfehlungen zum Arbeiten mit Redshift
Bei der Abfrageausführung können große Datenmengen für S3 extrahiert werden. Wenn Sie vorhaben, mehrere Abfragen mit den gleichen Daten in Redshift auszuführen, empfiehlt Databricks, die extrahierten Daten mithilfe von Delta Lake zu speichern.
Konfiguration
Authentifizierung für S3 und Redshift
Die Datenquelle umfasst mehrere Netzwerkverbindungen, die im folgenden Diagramm dargestellt sind:
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
Die Datenquelle liest und schreibt Daten in S3, wenn Daten an/aus Redshift übertragen werden. Daher sind AWS-Anmeldeinformationen mit Lese- und Schreibzugriff auf einen S3-Bucket erforderlich (angegeben mithilfe des tempdir
-Konfigurationsparameters).
Hinweis
Die Datenquelle bereinigt nicht die temporären Dateien, die sie in S3 erstellt. Daher wird empfohlen, einen dedizierten temporären S3-Bucket mit einer Objektlebenszykluskonfiguration zu verwenden, um sicherzustellen, dass temporäre Dateien nach einem angegebenen Ablaufzeitraum automatisch gelöscht werden. Weitere Informationen zum Verschlüsseln dieser Dateien finden Sie im Abschnitt Verschlüsselung in diesem Dokument. Sie können keinen externen Speicherort verwenden, der in Unity Catalog als tempdir
-Speicherort definiert ist.
In den folgenden Abschnitten werden die Konfigurationsoptionen für die Authentifizierung der einzelnen Verbindungen beschrieben:
Spark-Treiber zu Redshift
Der Spark-Treiber verbindet sich über JDBC mithilfe eines Benutzernamens und eines Kennworts mit Redshift. Redshift unterstützt nicht die Verwendung von IAM-Rollen, um diese Verbindung zu authentifizieren. Standardmäßig wird für diese Verbindung die SSL-Verschlüsselung verwendet; weitere Details finden Sie unter Verschlüsselung.
Spark zu S3
S3 dient als Vermittler zum Speichern von Massendaten beim Lesen oder Schreiben in Redshift. Spark verbindet sich mit S3 sowohl mithilfe der Hadoop FileSystem-Schnittstellen als auch direkt mithilfe des S3-Clients des Amazon Java SDK.
Hinweis
Sie können DBFS-Bereitstellungen nicht verwenden, um den Zugriff auf S3 for Redshift zu konfigurieren.
Festlegen von Schlüsseln in der Hadoop-Konfiguration: Sie können AWS-Schlüssel mithilfe von Hadoop-Konfigurationseigenschaften angeben. Wenn Ihre
tempdir
-Konfiguration auf eins3a://
-Dateisystem verweist, können Sie die Eigenschaftenfs.s3a.access.key
undfs.s3a.secret.key
in einer Hadoop-XML-Konfigurationsdatei festlegen odersc.hadoopConfiguration.set()
aufrufen, um die globale Hadoop-Konfiguration von Spark zu konfigurieren. Wenn Sie eins3n://
-Dateisystem verwenden, können Sie die Legacykonfigurationsschlüssel angeben, wie im folgenden Beispiel gezeigt.Scala
Wenn Sie beispielsweise das
s3a
-Dateisystem verwenden, fügen Sie Folgendes hinzu:sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
Fügen Sie für das Legacydateisystem
s3n
Folgendes hinzu:sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
Python
Der folgende Befehl basiert auf einigen internen Spark-Versionen, sollte jedoch mit allen PySpark-Versionen funktionieren. Es ist unwahrscheinlich, dass er sich in Zukunft ändert:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
Redshift zu S3
Legen Sie die Option forward_spark_s3_credentials
auf true
fest, um die AWS-Schlüsselanmeldeinformationen, die Spark zum Herstellen einer Verbindung mit S3 verwendet, automatisch über JDBC an Redshift weiterzuleiten. Die JDBC-Abfrage bettet diese Anmeldeinformationen ein, sodass Databricks dringend empfiehlt, die SSL-Verschlüsselung der JDBC-Verbindung zu aktivieren.
Verschlüsselung
Sichern von JDBC: Sofern keine SSL-bezogenen Einstellungen in der JDBC-URL vorhanden sind, aktiviert die Datenquelle standardmäßig die SSL-Verschlüsselung und überprüft außerdem, dass der Redshift-Server vertrauenswürdig ist (das heißt
sslmode=verify-full
). Dazu wird ein Serverzertifikat automatisch von den Amazon-Servern heruntergeladen, wenn er zum ersten Mal benötigt wird. Wenn ein Fehler auftritt, wird eine vorab gebündelte Zertifikatdatei als Fallback verwendet. Dies gilt sowohl für die Redshift- als auch für die PostgreSQL-JDBC-Treiber.Falls Probleme mit diesem Feature auftreten, oder falls Sie SSL einfach deaktivieren möchten, können Sie
.option("autoenablessl", "false")
auf IhremDataFrameReader
oder IhremDataFrameWriter
aufrufen.Wenn Sie benutzerdefinierte SSL-bezogene Einstellungen angeben möchten, können Sie die Anweisungen in der Redshift-Dokumentation befolgen: Verwenden von SSL- und Serverzertifikaten in Java und JDBC-Treiberkonfigurationsoptionen Alle SSL-bezogenen Optionen, die in der JDBC-
url
vorhanden sind, die mit der Datenquelle verwendet werden, haben Vorrang (d. h. die automatische Konfiguration wird nicht ausgelöst).Verschlüsseln von UNLOAD-Daten, die in S3 gespeichert sind (Daten, die beim Lesen von Redshift gespeichert sind): Gemäß der Redshift-Dokumentation zum Entladen von Daten nach S3 „verschlüsselt UNLOAD automatisch Datendateien mit der serverseitigen Amazon S3-Verschlüsselung (SSE-S3)“.
Redshift unterstützt darüber hinaus die clientseitige Verschlüsselung mit einem benutzerdefinierten Schlüssel (siehe: Entladen verschlüsselter Datendateien), aber der Datenquelle fehlt die Möglichkeit, den erforderlichen symmetrischen Schlüssel anzugeben.
Verschlüsseln von COPY-Daten, die in S3 gespeichert sind (Daten, die beim Schreiben in Redshift gespeichert sind): Gemäß der Redshift-Dokumentation zum Laden verschlüsselter Datendateien von Amazon S3:
Sie können den COPY
-Befehl verwenden, um Datendateien zu laden, die mithilfe der serverseitigen Verschlüsselung mit AWS-verwalteten Verschlüsselungsschlüsseln (SSE-S3 oder SSE-KMS), clientseitiger Verschlüsselung oder beidem in Amazon S3 hochgeladen wurden. COPY unterstützt keine serverseitige Amazon S3-Verschlüsselung mit einem vom Kunden bereitgestellten Schlüssel (SSE-C).
Parameter
Die Parameterzuordnung oder in Spark SQL bereitgestellte OPTIONS unterstützen die folgenden Einstellungen:
Parameter | Erforderlich | Standard | Beschreibung |
---|---|---|---|
dbtable | Ja, es sei denn, Abfrage ist angegeben. | Keine | Die Tabelle, aus der in Redshift erstellt oder gelesen werden soll. Dieser Parameter ist erforderlich, wenn Daten wieder in Redshift gespeichert werden. |
Abfrage | Ja, es sei denn, dbtable ist angegeben. | Keine | Die Abfrage, aus der in Redshift gelesen werden soll. |
user | No | Keine | Der Benutzername „Redshift“. Muss zusammen mit der Kennwort-Option verwendet werden. Kann nur verwendet werden, wenn der Benutzername und das Kennwort nicht in der URL übergeben werden. Wenn beides übergeben wird, wird ein Fehler angezeigt. Verwenden Sie diesen Parameter, wenn der Benutzername Sonderzeichen enthält, die mit Escapezeichen versehen werden müssen. |
Kennwort | No | Keine | Das Kennwort „Redshift“. Muss zusammen mit der Option user verwendet werden. Kann nur verwendet werden, wenn der Benutzername und das Kennwort nicht in der URL übergeben werden. Wenn beides übergeben wird, wird ein Fehler angezeigt. Verwenden Sie diesen Parameter, wenn das Kennwort Sonderzeichen enthält, die mit Escapezeichen versehen werden müssen. |
url | Ja | Keine | Eine JDBC-URL mit dem Formatjdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password> subprotocol kann postgresql oder redshift sein, je nachdem welchen JDBC-Treiber Sie geladen haben. Ein Redshift-kompatibler Treiber muss sich am Klassenpfad befinden und dieser URL entsprechen. host und port sollten auf den Redshift-Masterknoten verweisen, sodass Sicherheitsgruppen und/oder VPC konfiguriert werden müssen, um den Zugriff von Ihrer Treiberanwendung zu ermöglichen.database gibt einen Redshift-Datenbanknamen user an und password sind Anmeldeinformationen für den Zugriff auf die Datenbank, die in diese URL für JDBC eingebettet werden müssen; Ihr Benutzerkonto sollte über erforderliche Berechtigungen für die Tabelle verfügen, auf die verwiesen wird. |
search_path | No | Keine | Legen Sie den Schemasuchpfad in Redshift fest. Wird mit dem SET search_path to -Befehl festgelegt. Sollte eine durch Trennzeichen getrennte Liste von Schemanamen sein, in der nach Tabellen gesucht werden soll. Siehe Redshift-Dokumentation von search_path. |
aws_iam_role | Nur, wenn Sie IAM-Rollen zum Autorisieren verwenden. | Keine | Vollständig angegebener ARN der IAM-Rolle in Redshift für COPY/UNLOAD-Vorgänge, der an den Redshift-Cluster angefügt ist, z. B. arn:aws:iam::123456789000:role/<redshift-iam-role> . |
forward_spark_s3_credentials | No | false |
Wenn der Wert true ist, ermittelt die Datenquelle die Anmeldeinformationen automatisch, die Spark zum Herstellen einer Verbindung mit S3 verwendet, und leitet diese Anmeldeinformationen über JDBC an Redshift weiter. Diese Anmeldeinformationen werden als Teil der JDBC-Abfrage gesendet. Daher wird dringend empfohlen, die SSL-Verschlüsselung der JDBC-Verbindung bei Verwendung dieser Option zu aktivieren. |
temporary_aws_access_key_id | No | Keine | AWS-Zugriffsschlüssel, muss über Schreibberechtigungen für den S3-Bucket verfügen. |
temporary_aws_secret_access_key | No | Keine | Geheimer AWS-Zugriffsschlüssel, der dem angegebenen Zugriffsschlüssel entspricht. |
temporary_aws_session_token | No | Keine | AWS-Sitzungstoken, das dem angegebenen Zugriffsschlüssel entspricht. |
tempdir | Ja | Keine | Eine beschreibbare Position in Amazon S3, die beim Schreiben zum Entladen von Daten beim Lesen und zum Laden von Avro-Daten in Redshift verwendet werden soll. Wenn Sie die Redshift-Datenquelle für Spark als Teil einer regulären ETL-Pipeline verwenden, kann es hilfreich sein, eine Lifecycle-Richtlinie für einen Bucket festzulegen und diese als temporären Speicherort für diese Daten zu verwenden. Sie können keinen externen Speicherorte verwenden, die in Unity Catalog als tempdir -Speicherorte definiert sind. |
jdbcdriver | No | Wird durch das Unterprotokoll der JDBC-URL bestimmt. | Der Klassenname des zu verwendenden JDBC-Treibers. Diese Klasse muss im Klassenpfad vorhanden sein. In den meisten Fällen sollte es nicht erforderlich sein, diese Option angeben zu müssen, da der entsprechende Treiberklassenname automatisch durch das Unterprotokoll der JDBC-URL bestimmt werden sollte. |
diststyle | No | EVEN |
Die Redshift-Verteilungsart, die beim Erstellen einer Tabelle verwendet werden soll. Dies kann eine EVEN , KEY oder ALL sein (siehe Redshift-Dokumente). Bei Verwendung von KEY müssen Sie auch einen Verteilungsschlüssel mit der Distkey-Option festlegen. |
distkey | Nein, es sei denn, Sie verwenden DISTSTYLE KEY |
Keine | Der Name einer Spalte in der Tabelle, der beim Erstellen einer Tabelle als Verteilungsschlüssel verwendet werden soll. |
sortkeyspec | No | Keine | Eine vollständige Sortierschlüssel-Definition in Redshift. Beispiele: - SORTKEY(my_sort_column) - COMPOUND SORTKEY(sort_col_1, sort_col_2) - INTERLEAVED SORTKEY(sort_col_1, sort_col_2) |
usestagingtable (veraltet) | No | true |
Wenn Sie diese veraltete Option auf false festlegen, wird die Zieltabelle eines Überschreibungsvorgangs sofort am Anfang des Schreibvorgangs gelöscht, wodurch der Überschreibungsvorgang nicht atomar ist und die Verfügbarkeit der Zieltabelle reduziert wird. Dies kann die temporären Speicherplatzanforderungen für Überschreibungen verringern.Da das Festlegen des usestagingtable=false -Vorgangs zu Datenverlust oder Nichtverfügbarkeit führen kann, wird dies nicht mehr verwendet, und Sie müssen die Zieltabelle manuell löschen. |
Beschreibung | No | Keine | Eine Beschreibung für die Tabelle. Wird mit dem SQL COMMENT-Befehl festgelegt und sollte in den meisten Abfragetools angezeigt werden. Siehe auch die description -Metadaten zum Festlegen von Beschreibungen für einzelne Spalten. |
preactions | No | Keine | Eine durch ; getrennte Liste von SQL-Befehlen, die vor dem Laden des COPY -Befehls ausgeführt werden sollen. Es kann hilfreich sein, einige DELETE -Befehle oder ähnliche Befehle hier auszuführen, bevor neue Daten geladen werden. Wenn der Befehl %s enthält, wird der Tabellenname vor der Ausführung formatiert (falls Sie eine Stagingtabelle verwenden).Wenn diese Befehle fehlschlagen, wird dies als Fehler behandelt, und eine Ausnahme wird ausgelöst. Wenn Sie eine Stagingtabelle verwenden, werden die Änderungen wiederhergestellt, und die Sicherungstabelle wird wiederhergestellt, wenn Vorabaktionen fehlschlagen. |
postactions | No | Keine | Eine durch ; getrennte Liste von SQL-Befehlen, die nach einem erfolgreichen COPY beim Laden von Daten ausgeführt werden sollen. Es kann hilfreich sein, einige GRANT -Befehle oder ähnliche Befehle hier auszuführen, wenn neue Daten geladen werden. Wenn der Befehl %s enthält, wird der Tabellenname vor der Ausführung formatiert (falls Sie eine Stagingtabelle verwenden).Wenn diese Befehle fehlschlagen, wird dies als Fehler behandelt, und eine Ausnahme wird ausgelöst. Wenn Sie eine Stagingtabelle verwenden, werden die Änderungen wiederhergestellt, und die Sicherungstabelle wird wiederhergestellt, wenn Nachaktionen fehlschlagen. |
extracopyoptions | No | Keine | Eine Liste mit zusätzlichen Optionen, die beim Laden von Daten an den COPY -Befehl in Redshift angefügt werden sollen, z. B.TRUNCATECOLUMNS oder MAXERROR n (weitere Optionen finden Sie in den Redshift-Dokumenten).Da diese Optionen am Ende des COPY -Befehls angefügt werden, können nur Optionen verwendet werden, die am Ende des Befehls sinnvoll sind, aber dies sollte die meisten möglichen Anwendungsfälle abdecken. |
tempformat | No | AVRO |
Das Format, in dem temporäre Dateien in S3 beim Schreiben in Redshift gespeichert werden sollen. Der Standardwert lautet .AVRO ; die anderen zulässigen Werte sind CSV und CSV GZIP für CSV bzw. gzipped CSV.Redshift ist beim Laden von CSV wesentlich schneller als beim Laden von Avro-Dateien, sodass die Verwendung dieses tempformats beim Schreiben in Redshift eine große Leistungssteigerung bieten kann. |
csvnullstring | No | @NULL@ |
Der Zeichenfolgenwert, der bei Verwendung des CSV-Tempformats für Nullen geschrieben werden soll. Dies sollte ein Wert sein, der nicht in Ihren tatsächlichen Daten angezeigt wird. |
csvseparator | Nein | , |
Trennzeichen, das beim Schreiben temporärer Dateien verwendet werden soll, bei Festlegung von tempformat auf CSV oderCSV GZIP . Hierbei muss es sich um ein gültiges ASCII-Zeichen handeln, z. B. „, “ oder „\| “. |
csvignoreleadingwhitespace | No | true |
Wenn dies auf „true“ festgelegt ist, wird das vorangestellte Leerzeichen während des Schreibvorgangs aus Werten entfernt, wenntempformat auf CSV oder CSV GZIP festgelegt ist. Andernfalls wird das Leerzeichen beibehalten. |
csvignoretrailingwhitespace | No | true |
Wenn dies auf „true“ festgelegt ist, wird das nachgestellte Leerzeichen während des Schreibvorgangs aus Werten entfernt, wenntempformat auf CSV oder CSV GZIP festgelegt ist. Andernfalls wird das Leerzeichen beibehalten. |
infer_timestamp_ntz_type | No | false |
Bei einer Festlegung auf true werden Werte mit dem Redshift-Typ TIMESTAMP bei Lesevorgängen als TimestampNTZType (Zeitstempel ohne Zeitzone) interpretiert. Andernfalls werden alle Zeitstempel als TimestampType interpretiert, unabhängig vom Typ in der zugrunde liegenden Redshift-Tabelle. |
Zusätzliche Konfigurationsoptionen
Konfigurieren der maximalen Größe von Zeichenfolgenspalten
Beim Erstellen von Redshift-Tabellen besteht das Standardverhalten darin, TEXT
-Spalten für Zeichenfolgenspalten zu erstellen. Redshift speichert TEXT
Spalten als VARCHAR(256)
, sodass diese Spalten maximal 256 Zeichen enthalten (Quelle).
Um größere Spalten zu unterstützen, können Sie das maxlength
-Spaltenmetadatenfeld verwenden, um die maximale Länge einzelner Zeichenfolgenspalten anzugeben. Dies ist auch hilfreich bei der Implementierung von speicherplatzsparenden Leistungsoptimierungen, indem Spalten mit einer kleineren maximalen Länge als der Standardlänge deklariert werden.
Hinweis
Aufgrund von Einschränkungen in Spark unterstützen die SQL- und R-Sprach-APIs nicht die Änderung von Spaltenmetadaten.
Python
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
Scala
Hier sehen Sie ein Beispiel für das Aktualisieren der Metadatenfelder mehrerer Spalten mithilfe der Skala-API von Spark:
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
Festlegen eines benutzerdefinierten Spaltentyps
Wenn Sie einen Spaltentyp manuell festlegen müssen, können Sie die redshift_type
-Spaltenmetadaten verwenden. Wenn Sie z. B. den Spark SQL Schema -> Redshift SQL
-Typabgleicher überschreiben möchten, um einen benutzerdefinierten Spaltentyp zuzuweisen, können Sie wie folgt vorgehen:
Python
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
Scala
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
Konfigurieren der Spaltencodierung
Verwenden Sie beim Erstellen einer Tabelle das Spaltenmetadatenfeld encoding
, um eine Komprimierungscodierung für jede Spalte anzugeben (siehe Amazon-Dokumente für verfügbare Codierungen).
Festlegen von Beschreibungen für Spalten
Mit Redshift können mit Spalten Beschreibungen verbunden werden, die in den meisten Abfragetools (mithilfe des COMMENT
-Befehls) angezeigt werden sollten. Sie können das Spaltenmetadatenfeld description
festlegen, um eine Beschreibung für einzelne Spalten anzugeben.
Abfrage-Pushdown in Redshift
Der Spark-Optimierer pusht die folgenden Operatoren nach unten in Redshift:
Filter
Project
Sort
Limit
Aggregation
Join
Innerhalb von Project
und Filter
unterstützt er die folgenden Ausdrücke:
- Die meisten booleschen Logikoperatoren
- Vergleiche
- Grundlegende arithmetische Operationen
- Numerische und Zeichenfolgenumwandlungen
- Die meisten Zeichenfolgenfunktionen
- Skalare Unterabfragen, wenn sie vollständig in Redshift gepusht werden können.
Hinweis
Dieser Pushdown unterstützt keine Ausdrücke, die auf Datums- und Zeitstempeln ausgeführt werden.
Innerhalb von Aggregation
unterstützt er die folgenden Aggregationsfunktionen:
AVG
COUNT
MAX
MIN
SUM
STDDEV_SAMP
STDDEV_POP
VAR_SAMP
VAR_POP
kombiniert mit der DISTINCT
-Klausel, sofern zutreffend.
Innerhalb von Join
unterstützt er die folgenden Arten von Verknüpfungen:
INNER JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
- Unterabfragen, die der Optimierer neu in
Join
schreibt, z. B.WHERE EXISTS
,WHERE NOT EXISTS
Hinweis
Join-Pushdown unterstützt FULL OUTER JOIN
nicht.
Der Pushdown ist u. U. bei Abfragen mit LIMIT
am nützlichsten. Eine Abfrage, wie z. B. SELECT * FROM large_redshift_table LIMIT 10
könnte sehr lange dauern, da die gesamte Tabelle zunächst per UNLOAD als Zwischenergebnis in S3 entladen würde. Mit dem Pushdown wird LIMIT
in Redshift ausgeführt. Bei Abfragen mit Aggregationen hilft auch der Pushdown der Aggregation in Redshift bei der Reduzierung der Datenmenge, die übertragen werden muss.
Der Abfrage-Pushdown in Redshift ist standardmäßig aktiviert. Er kann deaktiviert werden, indem spark.databricks.redshift.pushdown
auf false
festgelegt wird. Selbst wenn er deaktiviert ist, führt Spark bei den Filtern weiterhin einen Pushdown aus und führt die Spaltenlöschung in Redshift durch.
Redshift-Treiberinstallation
Die Redshift-Datenquelle erfordert darüber hinaus einen mit Redshift kompatiblen JDBC-Treiber. Da Redshift auf dem PostgreSQL-Datenbanksystem basiert, können Sie den in Databricks Runtime enthaltenen PostgreSQL-JDBC-Treiber oder den von Amazon empfohlenen Redshift JDBC-Treiber verwenden. Es ist keine Installation erforderlich, um den PostgreSQL JDBC-Treiber zu verwenden. Die Version des in jeder Databricks Runtime-Version enthaltenen PostgreSQL JDBC-Treibers ist in den Databricks Runtime-Versionshinweisen aufgeführt.
So installieren Sie den Redshift JDBC-Treiber manuell:
- Download des Treibers von Amazon.
- Upload des Treibers in Ihren Azure Databricks-Arbeitsbereich. Weitere Informationen finden Sie unter Bibliotheken.
- Installation der Bibliothek auf Ihrem Cluster.
Hinweis
Databricks empfiehlt die Verwendung der neuesten Version des Redshift JDBC-Treibers. Versionen des Redshift JDBC-Treibers unter 1.2.41 haben die folgenden Einschränkungen:
- Version 1.2.16 des Treibers gibt leere Daten zurück, wenn in einer SQL-Abfrage eine
where
-Klausel verwendet wird. - Versionen des Treibers vor 1.2.41 geben möglicherweise ungültige Ergebnisse zurück, da die NULL-Zulässigkeit einer Spalte fälschlicherweise als „Lässt keine Nullwerte zu“ anstelle von „Unbekannt“ angegeben wird.
Transaktionsgarantien
In diesem Abschnitt werden die Transaktionsgarantien der Redshift-Datenquelle für Spark beschrieben.
Allgemeiner Hintergrund der Redshift- und S3-Eigenschaften
Allgemeine Informationen zu Redshift-Transaktionsgarantien finden Sie in der Redshift-Dokumentation im Kapitel zum Verwalten gleichzeitiger Schreibvorgänge. Kurz zusammengefasst stellt Redshift eine serialisierbare Isolation gemäß der Dokumentation für den Redshift-Befehl BEGIN bereit:
[obwohl] Sie jede der vier Transaktionsisolationsstufen verwenden können, verarbeitet Amazon Redshift alle Isolationsstufen als serialisierbar.
In der Redshift-Dokumentation steht Folgendes darüber:
Amazon Redshift unterstützt ein standardmäßiges automatisches Commitverhalten, bei dem jeder separat ausgeführte SQL-Befehl einzeln ccommittet.
Daher sind einzelne Befehle wie COPY
und UNLOAD
atomisch und transaktional, während explizite BEGIN
- und END
-Befehle nur erforderlich sein sollten, um die Unteilbarkeit mehrerer Befehle oder Abfragen zu erzwingen.
Beim Lesen von und Schreiben in Redshift liest und schreibt die Datenquelle Daten in S3. Sowohl Spark als auch Redshift erzeugen eine partitionierte Ausgabe und speichern sie in mehreren Dateien in S3. Gemäß der Dokumentation zum Amazon S3-Datenkonsistenzmodell sind S3-Bucketauflistungsvorgänge letztendlich konsistent, sodass bei den Dateien besondere Maßnahmen ergriffen werden müssen, um fehlende oder unvollständige Daten aufgrund dieser Quelle letztendlicher Konsistenz zu vermeiden.
Garantien der Redshift-Datenquelle für Spark
Anfügen an eine vorhandene Tabelle
Beim Einfügen von Zeilen in Redshift verwendet die Datenquelle den BEFEHL COPY und gibt Manifeste an, die vor bestimmten letztendlich konsistenten S3-Vorgängen schützen. Folglich haben spark-redshift
-Anhänge an vorhandenen Tabellen dieselben atomischen und transaktionalen Eigenschaften wie reguläre Redshift-COPY
-Befehle.
Erstellen Sie eine neue Tabelle (SaveMode.CreateIfNotExists
).
Das Erstellen einer neuen Tabelle ist ein zweistufiger Prozess, der aus einem CREATE TABLE
-Befehl gefolgt von einem COPY-Befehl besteht, um die anfängliche Anzahl an Zeilen anzufügen. Beide Vorgänge werden in derselben Transaktion ausgeführt.
Überschreiben einer vorhandenen Tabelle
Standardmäßig verwendet die Datenquelle Transaktionen, um Überschreibungen auszuführen, die implementiert werden, indem die Zieltabelle gelöscht, eine neue leere Tabelle erstellt und Zeilen an diese angefügt werden.
Wenn die veraltete usestagingtable
-Einstellung auf false
festgelegt ist, committet die Datenquelle den DELETE TABLE
-Befehl vor dem Anfügen von Zeilen an die neue Tabelle, indem die Unteilbarkeit des Überschreibungsvorgangs geopfert wird, wobei jedoch gleichzeitig die Menge des Stagingspeicherplatzes reduziert wird, den Redshift während des Überschreibens benötigt.
Abfrage-Redshift-Tabelle
In Abfragen wird der Redshift-Befehl UNLOAD verwendet, um eine Abfrage auszuführen und ihre Ergebnisse in S3 zu speichern und Manifeste zu verwenden, um vor bestimmten letztendlich konsistenten S3-Vorgängen zu schützen. Daher sollten Abfragen aus der Redshift-Datenquelle für Spark dieselben Konsistenzeigenschaften wie normale Redshift-Abfragen aufweisen.
Häufige Probleme und Lösungen
S3-Bucket- und Redshift-Cluster befinden sich in verschiedenen AWS-Regionen
Standardmäßig funktionieren S3- <-> Redshift-Kopien nicht, wenn sich der S3-Bucket- und der Redshift-Cluster in verschiedenen AWS-Regionen befinden.
Wenn Sie versuchen, eine Redshift-Tabelle zu lesen, wenn sich der S3-Bucket in einer anderen Region befindet, wird möglicherweise ein Fehler wie der folgende angezeigt:
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
Ebenso kann der Versuch, mit einem S3-Bucket in einem anderen Bereich in Redshift zu schreiben, den folgenden Fehler verursachen:
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
Schreibt: Der Redshift COPY-Befehl unterstützt die explizite Spezifikation der S3-Bucketregion, sodass Sie Schreibvorgänge in Redshift in diesen Fällen ordnungsgemäß vornehmen können, indem Sie der
extracopyoptions
-Einstellungregion 'the-region-name'
hinzufügen. Verwenden Sie beispielsweise mit einem Bucket in der Region USA, Osten (Virginia) und der Skala-API:.option("extracopyoptions", "region 'us-east-1'")
Alternativ können Sie die
awsregion
-Einstellung verwenden:.option("awsregion", "us-east-1")
Liest: Der Redshift UNLOAD-Befehl unterstützt auch die explizite Spezifikation der S3-Bucketregion. Sie können Lesevorgänge ordnungsgemäß vornehmen, indem Sie der
awsregion
-Einstellung die Region hinzufügen:.option("awsregion", "us-east-1")
Authentifizierungsfehler beim Verwenden eines Kennworts mit Sonderzeichen in der JDBC-URL
Wenn Sie den Benutzernamen und das Kennwort als Teil der JDBC-URL angeben und das Kennwort Sonderzeichen wie ;
, ?
oder &
enthält, wird möglicherweise die folgende Ausnahme angezeigt:
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'
Dies wird durch Sonderzeichen im Benutzernamen oder Kennwort verursacht, die vom JDBC-Treiber nicht ordnungsgemäß entfernt werden. Stellen Sie sicher, dass Sie den Benutzernamen und das Kennwort mithilfe der entsprechenden DataFrame-Optionen user
und password
angeben. Weitere Informationen finden Sie unter Parameter.
Lange ausgeführte Spark-Abfrage hängt unbegrenzt, obwohl der entsprechende Redshift-Vorgang abgeschlossen ist.
Wenn Sie große Datenmengen aus und in Redshift lesen oder schreiben, hängt Ihre Spark-Abfrage möglicherweise unbegrenzt, auch wenn auf der AWS Redshift Monitoring-Seite angezeigt wird, dass der entsprechende LOAD
- oder UNLOAD
-Vorgang abgeschlossen ist und dass Cluster im Leerlauf ist. Dies wird durch die Verbindung zwischen Redshift und der Spark-Zeitüberschreitung verursacht. Um dies zu vermeiden, stellen Sie sicher, dass das JDBC-Flag tcpKeepAlive
aktiviert ist und dass TCPKeepAliveMinutes
auf einen niedrigen Wert festgelegt ist (z. B. 1).
Weitere Informationen finden Sie im Abschnitt zur Amazon Redshift JDBC-Treiberkonfiguration.
Semantik für Zeitstempel mit Zeitzone
Beim Lesen von Daten werden Redshift TIMESTAMP
- und TIMESTAMPTZ
-Datentypen dem Spark-TimestampType
zugeordnet, und ein Wert wird in koordinierte Weltzeit (UTC) konvertiert und als UTC-Zeitstempel gespeichert. Bei einem Redshift TIMESTAMP
wird die lokale Zeitzone angenommen, da der Wert keine Zeitzoneninformationen enthält. Beim Schreiben von Daten in eine Redshift-Tabelle wird ein Spark TimestampType
dem Redshift-Datentyp TIMESTAMP
zugeordnet.
Migrationsleitfaden
Die Datenquelle erfordert jetzt, dass Sie explizit forward_spark_s3_credentials
festlegen, bevor Spark S3-Anmeldeinformationen an Redshift weitergeleitet werden. Diese Änderung hat keine Auswirkungen, wenn Sie die Authentifizierungsmechanismen aws_iam_role
oder temporary_aws_*
verwenden. Wenn Sie sich jedoch auf das alte Standardverhalten verlassen haben, müssen Sie forward_spark_s3_credentials
jetzt explizit auf true
festlegen, um Ihren vorherigen Redshift-zu-S3-Authentifizierungsmechanismus weiterhin verwenden zu können. Eine Erörterung der drei Authentifizierungsmechanismen und ihrer Sicherheits-Trade-offs finden Sie im Abschnitt Authentifizierung für S3 und Redshift in diesem Dokument.