Freigeben über


Abfragen von Amazon Redshift mithilfe von Azure Databricks

Sie können Tabellen aus Amazon Redshift mit Azure Databricks lesen und schreiben.

Wichtig

Die in diesem Artikel beschriebenen Konfigurationen sind experimentell. Experimentelle Funktionen werden ohne Mängelgewähr („wie besehen“) bereitgestellt und nicht von Databricks über Kanäle des technischen Kundensupports unterstützt. Um vollständige Unterstützung für den Abfrageverbund zu erhalten, sollten Sie stattdessen Lakehouse-Verbund verwenden, wodurch Ihre Azure Databricks-Benutzer die Vorteile der Unity Catalog-Syntax und der Datengovernancetools nutzen können.

Die Datenquelle „Databricks Redshift“ verwendet Amazon S3 zum effizienten Übertragen von Daten in und aus Redshift. Sie verwendet JDBC, um die entsprechenden Befehle COPY und UNLOAD automatisch in Redshift auszulösen.

Hinweis

In Databricks Runtime 11.3 LTS und höher enthält Databricks Runtime den Redshift JDBC-Treiber, auf den mithilfe des Schlüsselworts redshift für die Formatoption zugegriffen werden kann. Weitere Informationen zu den Treiberversionen, die in jeder Databricks Runtime-Instanz enthalten sind, finden Sie unter Versionshinweise zu Databricks Runtime-Versionen und -Kompatibilität. Vom Benutzer bereitgestellte Treiber werden weiterhin unterstützt und haben vor dem gebündelten JDBC-Treiber Vorrang.

In Databricks Runtime 10.4 LTS und früher ist eine manuelle Installation des Redshift JDBC-Treibers erforderlich, und für Abfragen muss der Treiber (com.databricks.spark.redshift) für das Format verwendet werden. Siehe Redshift-Treiberinstallation.

Verwendung

In den folgenden Beispielen wird die Verbindung mit dem Redshift-Treiber veranschaulicht. Ersetzen Sie die url Parameterwerte, wenn Sie den PostgreSQL JDBC-Treiber verwenden.

Nachdem Sie Ihre AWS-Anmeldeinformationen konfiguriert haben, können Sie die Datenquelle mit der Spark-Datenquellen-API in Python, SQL, R oder Scala verwenden.

Wichtig

Externe Speicherorte, die in Unity Catalog definiert sind, werden nicht als tempdir-Speicherorte unterstützt.

Python

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

SQL

Lesen von Daten mithilfe von SQL in Databricks Runtime 10.4 LTS und früher:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Lesen von Daten mithilfe von SQL in Databricks Runtime 11.3 LTS und höher:


DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Schreiben von Daten mithilfe von SQL:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

Die SQL-API unterstützt nur das Erstellen neuer Tabellen und nicht das Überschreiben oder Anfügen.

R

Lesen von Daten mithilfe von R in Databricks Runtime 10.4 LTS und früher:

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Lesen von Daten mithilfe von R in Databricks Runtime 11.3 LTS und höher:

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")

Scala

// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

Empfehlungen zum Arbeiten mit Redshift

Bei der Abfrageausführung können große Datenmengen für S3 extrahiert werden. Wenn Sie vorhaben, mehrere Abfragen mit den gleichen Daten in Redshift auszuführen, empfiehlt Databricks, die extrahierten Daten mithilfe von Delta Lake zu speichern.

Konfiguration

Authentifizierung für S3 und Redshift

Die Datenquelle umfasst mehrere Netzwerkverbindungen, die im folgenden Diagramm dargestellt sind:

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

Die Datenquelle liest und schreibt Daten in S3, wenn Daten an/aus Redshift übertragen werden. Daher sind AWS-Anmeldeinformationen mit Lese- und Schreibzugriff auf einen S3-Bucket erforderlich (angegeben mithilfe des tempdir-Konfigurationsparameters).

Hinweis

Die Datenquelle bereinigt nicht die temporären Dateien, die sie in S3 erstellt. Daher wird empfohlen, einen dedizierten temporären S3-Bucket mit einer Objektlebenszykluskonfiguration zu verwenden, um sicherzustellen, dass temporäre Dateien nach einem angegebenen Ablaufzeitraum automatisch gelöscht werden. Weitere Informationen zum Verschlüsseln dieser Dateien finden Sie im Abschnitt Verschlüsselung in diesem Dokument. Sie können keinen externen Speicherort verwenden, der in Unity Catalog als tempdir-Speicherort definiert ist.

In den folgenden Abschnitten werden die Konfigurationsoptionen für die Authentifizierung der einzelnen Verbindungen beschrieben:

Spark-Treiber zu Redshift

Der Spark-Treiber verbindet sich über JDBC mithilfe eines Benutzernamens und eines Kennworts mit Redshift. Redshift unterstützt nicht die Verwendung von IAM-Rollen, um diese Verbindung zu authentifizieren. Standardmäßig wird für diese Verbindung die SSL-Verschlüsselung verwendet; weitere Details finden Sie unter Verschlüsselung.

Spark zu S3

S3 dient als Vermittler zum Speichern von Massendaten beim Lesen oder Schreiben in Redshift. Spark verbindet sich mit S3 sowohl mithilfe der Hadoop FileSystem-Schnittstellen als auch direkt mithilfe des S3-Clients des Amazon Java SDK.

Hinweis

Sie können DBFS-Bereitstellungen nicht verwenden, um den Zugriff auf S3 for Redshift zu konfigurieren.

  • Festlegen von Schlüsseln in der Hadoop-Konfiguration: Sie können AWS-Schlüssel mithilfe von Hadoop-Konfigurationseigenschaften angeben. Wenn Ihre tempdir-Konfiguration auf ein s3a://-Dateisystem verweist, können Sie die Eigenschaften fs.s3a.access.key und fs.s3a.secret.key in einer Hadoop-XML-Konfigurationsdatei festlegen oder sc.hadoopConfiguration.set() aufrufen, um die globale Hadoop-Konfiguration von Spark zu konfigurieren. Wenn Sie ein s3n://-Dateisystem verwenden, können Sie die Legacykonfigurationsschlüssel angeben, wie im folgenden Beispiel gezeigt.

    Scala

    Wenn Sie beispielsweise das s3a-Dateisystem verwenden, fügen Sie Folgendes hinzu:

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    Fügen Sie für das Legacydateisystem s3n Folgendes hinzu:

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    
    Python

    Der folgende Befehl basiert auf einigen internen Spark-Versionen, sollte jedoch mit allen PySpark-Versionen funktionieren. Es ist unwahrscheinlich, dass er sich in Zukunft ändert:

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

Redshift zu S3

Legen Sie die Option forward_spark_s3_credentials auf true fest, um die AWS-Schlüsselanmeldeinformationen, die Spark zum Herstellen einer Verbindung mit S3 verwendet, automatisch über JDBC an Redshift weiterzuleiten. Die JDBC-Abfrage bettet diese Anmeldeinformationen ein, sodass Databricks dringend empfiehlt, die SSL-Verschlüsselung der JDBC-Verbindung zu aktivieren.

Verschlüsselung

  • Sichern von JDBC: Sofern keine SSL-bezogenen Einstellungen in der JDBC-URL vorhanden sind, aktiviert die Datenquelle standardmäßig die SSL-Verschlüsselung und überprüft außerdem, dass der Redshift-Server vertrauenswürdig ist (das heißt sslmode=verify-full). Dazu wird ein Serverzertifikat automatisch von den Amazon-Servern heruntergeladen, wenn er zum ersten Mal benötigt wird. Wenn ein Fehler auftritt, wird eine vorab gebündelte Zertifikatdatei als Fallback verwendet. Dies gilt sowohl für die Redshift- als auch für die PostgreSQL-JDBC-Treiber.

    Falls Probleme mit diesem Feature auftreten, oder falls Sie SSL einfach deaktivieren möchten, können Sie .option("autoenablessl", "false") auf Ihrem DataFrameReader oder Ihrem DataFrameWriter aufrufen.

    Wenn Sie benutzerdefinierte SSL-bezogene Einstellungen angeben möchten, können Sie die Anweisungen in der Redshift-Dokumentation befolgen: Verwenden von SSL- und Serverzertifikaten in Java und JDBC-Treiberkonfigurationsoptionen Alle SSL-bezogenen Optionen, die in der JDBC-url vorhanden sind, die mit der Datenquelle verwendet werden, haben Vorrang (d. h. die automatische Konfiguration wird nicht ausgelöst).

  • Verschlüsseln von UNLOAD-Daten, die in S3 gespeichert sind (Daten, die beim Lesen von Redshift gespeichert sind): Gemäß der Redshift-Dokumentation zum Entladen von Daten nach S3 „verschlüsselt UNLOAD automatisch Datendateien mit der serverseitigen Amazon S3-Verschlüsselung (SSE-S3)“.

    Redshift unterstützt darüber hinaus die clientseitige Verschlüsselung mit einem benutzerdefinierten Schlüssel (siehe: Entladen verschlüsselter Datendateien), aber der Datenquelle fehlt die Möglichkeit, den erforderlichen symmetrischen Schlüssel anzugeben.

  • Verschlüsseln von COPY-Daten, die in S3 gespeichert sind (Daten, die beim Schreiben in Redshift gespeichert sind): Gemäß der Redshift-Dokumentation zum Laden verschlüsselter Datendateien von Amazon S3:

Sie können den COPY-Befehl verwenden, um Datendateien zu laden, die mithilfe der serverseitigen Verschlüsselung mit AWS-verwalteten Verschlüsselungsschlüsseln (SSE-S3 oder SSE-KMS), clientseitiger Verschlüsselung oder beidem in Amazon S3 hochgeladen wurden. COPY unterstützt keine serverseitige Amazon S3-Verschlüsselung mit einem vom Kunden bereitgestellten Schlüssel (SSE-C).

Parameter

Die Parameterzuordnung oder in Spark SQL bereitgestellte OPTIONS unterstützen die folgenden Einstellungen:

Parameter Erforderlich Standard Beschreibung
dbtable Ja, es sei denn, Abfrage ist angegeben. Keine Die Tabelle, aus der in Redshift erstellt oder gelesen werden soll. Dieser Parameter ist erforderlich, wenn Daten wieder in Redshift gespeichert werden.
Abfrage Ja, es sei denn, dbtable ist angegeben. Keine Die Abfrage, aus der in Redshift gelesen werden soll.
user No Keine Der Benutzername „Redshift“. Muss zusammen mit der Kennwort-Option verwendet werden. Kann nur verwendet werden, wenn der Benutzername und das Kennwort nicht in der URL übergeben werden. Wenn beides übergeben wird, wird ein Fehler angezeigt. Verwenden Sie diesen Parameter, wenn der Benutzername Sonderzeichen enthält, die mit Escapezeichen versehen werden müssen.
Kennwort No Keine Das Kennwort „Redshift“. Muss zusammen mit der Option user verwendet werden. Kann nur verwendet werden, wenn der Benutzername und das Kennwort nicht in der URL übergeben werden. Wenn beides übergeben wird, wird ein Fehler angezeigt. Verwenden Sie diesen Parameter, wenn das Kennwort Sonderzeichen enthält, die mit Escapezeichen versehen werden müssen.
url Ja Keine Eine JDBC-URL mit dem Format
jdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>

subprotocol kann postgresql oder redshift sein, je nachdem welchen JDBC-Treiber Sie geladen haben. Ein Redshift-kompatibler Treiber muss sich am Klassenpfad befinden und dieser URL entsprechen. host und port sollten auf den Redshift-Masterknoten verweisen, sodass Sicherheitsgruppen und/oder VPC konfiguriert werden müssen, um den Zugriff von Ihrer Treiberanwendung zu ermöglichen.
database gibt einen Redshift-Datenbanknamen user an und password sind Anmeldeinformationen für den Zugriff auf die Datenbank, die in diese URL für JDBC eingebettet werden müssen; Ihr Benutzerkonto sollte über erforderliche Berechtigungen für die Tabelle verfügen, auf die verwiesen wird.
search_path No Keine Legen Sie den Schemasuchpfad in Redshift fest. Wird mit dem SET search_path to-Befehl festgelegt. Sollte eine durch Trennzeichen getrennte Liste von Schemanamen sein, in der nach Tabellen gesucht werden soll. Siehe Redshift-Dokumentation von search_path.
aws_iam_role Nur, wenn Sie IAM-Rollen zum Autorisieren verwenden. Keine Vollständig angegebener ARN der IAM-Rolle in Redshift für COPY/UNLOAD-Vorgänge, der an den Redshift-Cluster angefügt ist, z. B. arn:aws:iam::123456789000:role/<redshift-iam-role>.
forward_spark_s3_credentials No false Wenn der Wert true ist, ermittelt die Datenquelle die Anmeldeinformationen automatisch, die Spark zum Herstellen einer Verbindung mit S3 verwendet, und leitet diese Anmeldeinformationen über JDBC an Redshift weiter. Diese Anmeldeinformationen werden als Teil der JDBC-Abfrage gesendet. Daher wird dringend empfohlen, die SSL-Verschlüsselung der JDBC-Verbindung bei Verwendung dieser Option zu aktivieren.
temporary_aws_access_key_id No Keine AWS-Zugriffsschlüssel, muss über Schreibberechtigungen für den S3-Bucket verfügen.
temporary_aws_secret_access_key No Keine Geheimer AWS-Zugriffsschlüssel, der dem angegebenen Zugriffsschlüssel entspricht.
temporary_aws_session_token No Keine AWS-Sitzungstoken, das dem angegebenen Zugriffsschlüssel entspricht.
tempdir Ja Keine Eine beschreibbare Position in Amazon S3, die beim Schreiben zum Entladen von Daten beim Lesen und zum Laden von Avro-Daten in Redshift verwendet werden soll. Wenn Sie die Redshift-Datenquelle für Spark als Teil einer regulären ETL-Pipeline verwenden, kann es hilfreich sein, eine Lifecycle-Richtlinie für einen Bucket festzulegen und diese als temporären Speicherort für diese Daten zu verwenden.

Sie können keinen externen Speicherorte verwenden, die in Unity Catalog als tempdir-Speicherorte definiert sind.
jdbcdriver No Wird durch das Unterprotokoll der JDBC-URL bestimmt. Der Klassenname des zu verwendenden JDBC-Treibers. Diese Klasse muss im Klassenpfad vorhanden sein. In den meisten Fällen sollte es nicht erforderlich sein, diese Option angeben zu müssen, da der entsprechende Treiberklassenname automatisch durch das Unterprotokoll der JDBC-URL bestimmt werden sollte.
diststyle No EVEN Die Redshift-Verteilungsart, die beim Erstellen einer Tabelle verwendet werden soll. Dies kann eine EVEN, KEY oder ALL sein (siehe Redshift-Dokumente). Bei Verwendung von KEY müssen Sie auch einen Verteilungsschlüssel mit der Distkey-Option festlegen.
distkey Nein, es sei denn, Sie verwenden DISTSTYLE KEY Keine Der Name einer Spalte in der Tabelle, der beim Erstellen einer Tabelle als Verteilungsschlüssel verwendet werden soll.
sortkeyspec No Keine Eine vollständige Sortierschlüssel-Definition in Redshift. Beispiele:

- SORTKEY(my_sort_column)
- COMPOUND SORTKEY(sort_col_1, sort_col_2)
- INTERLEAVED SORTKEY(sort_col_1, sort_col_2)
usestagingtable (veraltet) No true Wenn Sie diese veraltete Option auf false festlegen, wird die Zieltabelle eines Überschreibungsvorgangs sofort am Anfang des Schreibvorgangs gelöscht, wodurch der Überschreibungsvorgang nicht atomar ist und die Verfügbarkeit der Zieltabelle reduziert wird. Dies kann die temporären Speicherplatzanforderungen für Überschreibungen verringern.

Da das Festlegen des usestagingtable=false-Vorgangs zu Datenverlust oder Nichtverfügbarkeit führen kann, wird dies nicht mehr verwendet, und Sie müssen die Zieltabelle manuell löschen.
Beschreibung No Keine Eine Beschreibung für die Tabelle. Wird mit dem SQL COMMENT-Befehl festgelegt und sollte in den meisten Abfragetools angezeigt werden. Siehe auch die description-Metadaten zum Festlegen von Beschreibungen für einzelne Spalten.
preactions No Keine Eine durch ; getrennte Liste von SQL-Befehlen, die vor dem Laden des COPY-Befehls ausgeführt werden sollen. Es kann hilfreich sein, einige DELETE-Befehle oder ähnliche Befehle hier auszuführen, bevor neue Daten geladen werden. Wenn der Befehl %s enthält, wird der Tabellenname vor der Ausführung formatiert (falls Sie eine Stagingtabelle verwenden).

Wenn diese Befehle fehlschlagen, wird dies als Fehler behandelt, und eine Ausnahme wird ausgelöst. Wenn Sie eine Stagingtabelle verwenden, werden die Änderungen wiederhergestellt, und die Sicherungstabelle wird wiederhergestellt, wenn Vorabaktionen fehlschlagen.
postactions No Keine Eine durch ; getrennte Liste von SQL-Befehlen, die nach einem erfolgreichen COPY beim Laden von Daten ausgeführt werden sollen. Es kann hilfreich sein, einige GRANT-Befehle oder ähnliche Befehle hier auszuführen, wenn neue Daten geladen werden. Wenn der Befehl %s enthält, wird der Tabellenname vor der Ausführung formatiert (falls Sie eine Stagingtabelle verwenden).

Wenn diese Befehle fehlschlagen, wird dies als Fehler behandelt, und eine Ausnahme wird ausgelöst. Wenn Sie eine Stagingtabelle verwenden, werden die Änderungen wiederhergestellt, und die Sicherungstabelle wird wiederhergestellt, wenn Nachaktionen fehlschlagen.
extracopyoptions No Keine Eine Liste mit zusätzlichen Optionen, die beim Laden von Daten an den COPY-Befehl in Redshift angefügt werden sollen, z. B.
TRUNCATECOLUMNS oder MAXERROR n (weitere Optionen finden Sie in den Redshift-Dokumenten).

Da diese Optionen am Ende des COPY-Befehls angefügt werden, können nur Optionen verwendet werden, die am Ende des Befehls sinnvoll sind, aber dies sollte die meisten möglichen Anwendungsfälle abdecken.
tempformat No AVRO Das Format, in dem temporäre Dateien in S3 beim Schreiben in Redshift gespeichert werden sollen. Der Standardwert lautet .
AVRO; die anderen zulässigen Werte sind CSV und CSV GZIP für CSV bzw. gzipped CSV.

Redshift ist beim Laden von CSV wesentlich schneller als beim Laden von Avro-Dateien, sodass die Verwendung dieses tempformats beim Schreiben in Redshift eine große Leistungssteigerung bieten kann.
csvnullstring No @NULL@ Der Zeichenfolgenwert, der bei Verwendung des CSV-Tempformats für Nullen geschrieben werden soll. Dies sollte ein Wert sein, der nicht in Ihren tatsächlichen Daten angezeigt wird.
csvseparator Nein , Trennzeichen, das beim Schreiben temporärer Dateien verwendet werden soll, bei Festlegung von tempformat auf CSV oder
CSV GZIP. Hierbei muss es sich um ein gültiges ASCII-Zeichen handeln, z. B. „,“ oder „\|“.
csvignoreleadingwhitespace No true Wenn dies auf „true“ festgelegt ist, wird das vorangestellte Leerzeichen während des Schreibvorgangs aus Werten entfernt, wenn
tempformat auf CSV oder CSV GZIP festgelegt ist. Andernfalls wird das Leerzeichen beibehalten.
csvignoretrailingwhitespace No true Wenn dies auf „true“ festgelegt ist, wird das nachgestellte Leerzeichen während des Schreibvorgangs aus Werten entfernt, wenn
tempformat auf CSV oder CSV GZIP festgelegt ist. Andernfalls wird das Leerzeichen beibehalten.
infer_timestamp_ntz_type No false Bei einer Festlegung auf true werden Werte mit dem Redshift-Typ TIMESTAMP bei Lesevorgängen als TimestampNTZType (Zeitstempel ohne Zeitzone) interpretiert. Andernfalls werden alle Zeitstempel als TimestampType interpretiert, unabhängig vom Typ in der zugrunde liegenden Redshift-Tabelle.

Zusätzliche Konfigurationsoptionen

Konfigurieren der maximalen Größe von Zeichenfolgenspalten

Beim Erstellen von Redshift-Tabellen besteht das Standardverhalten darin, TEXT-Spalten für Zeichenfolgenspalten zu erstellen. Redshift speichert TEXT Spalten als VARCHAR(256), sodass diese Spalten maximal 256 Zeichen enthalten (Quelle).

Um größere Spalten zu unterstützen, können Sie das maxlength-Spaltenmetadatenfeld verwenden, um die maximale Länge einzelner Zeichenfolgenspalten anzugeben. Dies ist auch hilfreich bei der Implementierung von speicherplatzsparenden Leistungsoptimierungen, indem Spalten mit einer kleineren maximalen Länge als der Standardlänge deklariert werden.

Hinweis

Aufgrund von Einschränkungen in Spark unterstützen die SQL- und R-Sprach-APIs nicht die Änderung von Spaltenmetadaten.

Python

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Scala

Hier sehen Sie ein Beispiel für das Aktualisieren der Metadatenfelder mehrerer Spalten mithilfe der Skala-API von Spark:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

Festlegen eines benutzerdefinierten Spaltentyps

Wenn Sie einen Spaltentyp manuell festlegen müssen, können Sie die redshift_type-Spaltenmetadaten verwenden. Wenn Sie z. B. den Spark SQL Schema -> Redshift SQL-Typabgleicher überschreiben möchten, um einen benutzerdefinierten Spaltentyp zuzuweisen, können Sie wie folgt vorgehen:

Python

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Scala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

Konfigurieren der Spaltencodierung

Verwenden Sie beim Erstellen einer Tabelle das Spaltenmetadatenfeld encoding, um eine Komprimierungscodierung für jede Spalte anzugeben (siehe Amazon-Dokumente für verfügbare Codierungen).

Festlegen von Beschreibungen für Spalten

Mit Redshift können mit Spalten Beschreibungen verbunden werden, die in den meisten Abfragetools (mithilfe des COMMENT-Befehls) angezeigt werden sollten. Sie können das Spaltenmetadatenfeld description festlegen, um eine Beschreibung für einzelne Spalten anzugeben.

Abfrage-Pushdown in Redshift

Der Spark-Optimierer pusht die folgenden Operatoren nach unten in Redshift:

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

Innerhalb von Project und Filter unterstützt er die folgenden Ausdrücke:

  • Die meisten booleschen Logikoperatoren
  • Vergleiche
  • Grundlegende arithmetische Operationen
  • Numerische und Zeichenfolgenumwandlungen
  • Die meisten Zeichenfolgenfunktionen
  • Skalare Unterabfragen, wenn sie vollständig in Redshift gepusht werden können.

Hinweis

Dieser Pushdown unterstützt keine Ausdrücke, die auf Datums- und Zeitstempeln ausgeführt werden.

Innerhalb von Aggregation unterstützt er die folgenden Aggregationsfunktionen:

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

kombiniert mit der DISTINCT-Klausel, sofern zutreffend.

Innerhalb von Join unterstützt er die folgenden Arten von Verknüpfungen:

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • Unterabfragen, die der Optimierer neu in Join schreibt, z. B. WHERE EXISTS, WHERE NOT EXISTS

Hinweis

Join-Pushdown unterstützt FULL OUTER JOIN nicht.

Der Pushdown ist u. U. bei Abfragen mit LIMIT am nützlichsten. Eine Abfrage, wie z. B. SELECT * FROM large_redshift_table LIMIT 10 könnte sehr lange dauern, da die gesamte Tabelle zunächst per UNLOAD als Zwischenergebnis in S3 entladen würde. Mit dem Pushdown wird LIMIT in Redshift ausgeführt. Bei Abfragen mit Aggregationen hilft auch der Pushdown der Aggregation in Redshift bei der Reduzierung der Datenmenge, die übertragen werden muss.

Der Abfrage-Pushdown in Redshift ist standardmäßig aktiviert. Er kann deaktiviert werden, indem spark.databricks.redshift.pushdown auf false festgelegt wird. Selbst wenn er deaktiviert ist, führt Spark bei den Filtern weiterhin einen Pushdown aus und führt die Spaltenlöschung in Redshift durch.

Redshift-Treiberinstallation

Die Redshift-Datenquelle erfordert darüber hinaus einen mit Redshift kompatiblen JDBC-Treiber. Da Redshift auf dem PostgreSQL-Datenbanksystem basiert, können Sie den in Databricks Runtime enthaltenen PostgreSQL-JDBC-Treiber oder den von Amazon empfohlenen Redshift JDBC-Treiber verwenden. Es ist keine Installation erforderlich, um den PostgreSQL JDBC-Treiber zu verwenden. Die Version des in jeder Databricks Runtime-Version enthaltenen PostgreSQL JDBC-Treibers ist in den Databricks Runtime-Versionshinweisen aufgeführt.

So installieren Sie den Redshift JDBC-Treiber manuell:

  1. Download des Treibers von Amazon.
  2. Upload des Treibers in Ihren Azure Databricks-Arbeitsbereich. Weitere Informationen finden Sie unter Bibliotheken.
  3. Installation der Bibliothek auf Ihrem Cluster.

Hinweis

Databricks empfiehlt die Verwendung der neuesten Version des Redshift JDBC-Treibers. Versionen des Redshift JDBC-Treibers unter 1.2.41 haben die folgenden Einschränkungen:

  • Version 1.2.16 des Treibers gibt leere Daten zurück, wenn in einer SQL-Abfrage eine where-Klausel verwendet wird.
  • Versionen des Treibers vor 1.2.41 geben möglicherweise ungültige Ergebnisse zurück, da die NULL-Zulässigkeit einer Spalte fälschlicherweise als „Lässt keine Nullwerte zu“ anstelle von „Unbekannt“ angegeben wird.

Transaktionsgarantien

In diesem Abschnitt werden die Transaktionsgarantien der Redshift-Datenquelle für Spark beschrieben.

Allgemeiner Hintergrund der Redshift- und S3-Eigenschaften

Allgemeine Informationen zu Redshift-Transaktionsgarantien finden Sie in der Redshift-Dokumentation im Kapitel zum Verwalten gleichzeitiger Schreibvorgänge. Kurz zusammengefasst stellt Redshift eine serialisierbare Isolation gemäß der Dokumentation für den Redshift-Befehl BEGIN bereit:

[obwohl] Sie jede der vier Transaktionsisolationsstufen verwenden können, verarbeitet Amazon Redshift alle Isolationsstufen als serialisierbar.

In der Redshift-Dokumentation steht Folgendes darüber:

Amazon Redshift unterstützt ein standardmäßiges automatisches Commitverhalten, bei dem jeder separat ausgeführte SQL-Befehl einzeln ccommittet.

Daher sind einzelne Befehle wie COPY und UNLOAD atomisch und transaktional, während explizite BEGIN- und END-Befehle nur erforderlich sein sollten, um die Unteilbarkeit mehrerer Befehle oder Abfragen zu erzwingen.

Beim Lesen von und Schreiben in Redshift liest und schreibt die Datenquelle Daten in S3. Sowohl Spark als auch Redshift erzeugen eine partitionierte Ausgabe und speichern sie in mehreren Dateien in S3. Gemäß der Dokumentation zum Amazon S3-Datenkonsistenzmodell sind S3-Bucketauflistungsvorgänge letztendlich konsistent, sodass bei den Dateien besondere Maßnahmen ergriffen werden müssen, um fehlende oder unvollständige Daten aufgrund dieser Quelle letztendlicher Konsistenz zu vermeiden.

Garantien der Redshift-Datenquelle für Spark

Anfügen an eine vorhandene Tabelle

Beim Einfügen von Zeilen in Redshift verwendet die Datenquelle den BEFEHL COPY und gibt Manifeste an, die vor bestimmten letztendlich konsistenten S3-Vorgängen schützen. Folglich haben spark-redshift-Anhänge an vorhandenen Tabellen dieselben atomischen und transaktionalen Eigenschaften wie reguläre Redshift-COPY-Befehle.

Erstellen Sie eine neue Tabelle (SaveMode.CreateIfNotExists).

Das Erstellen einer neuen Tabelle ist ein zweistufiger Prozess, der aus einem CREATE TABLE-Befehl gefolgt von einem COPY-Befehl besteht, um die anfängliche Anzahl an Zeilen anzufügen. Beide Vorgänge werden in derselben Transaktion ausgeführt.

Überschreiben einer vorhandenen Tabelle

Standardmäßig verwendet die Datenquelle Transaktionen, um Überschreibungen auszuführen, die implementiert werden, indem die Zieltabelle gelöscht, eine neue leere Tabelle erstellt und Zeilen an diese angefügt werden.

Wenn die veraltete usestagingtable-Einstellung auf false festgelegt ist, committet die Datenquelle den DELETE TABLE-Befehl vor dem Anfügen von Zeilen an die neue Tabelle, indem die Unteilbarkeit des Überschreibungsvorgangs geopfert wird, wobei jedoch gleichzeitig die Menge des Stagingspeicherplatzes reduziert wird, den Redshift während des Überschreibens benötigt.

Abfrage-Redshift-Tabelle

In Abfragen wird der Redshift-Befehl UNLOAD verwendet, um eine Abfrage auszuführen und ihre Ergebnisse in S3 zu speichern und Manifeste zu verwenden, um vor bestimmten letztendlich konsistenten S3-Vorgängen zu schützen. Daher sollten Abfragen aus der Redshift-Datenquelle für Spark dieselben Konsistenzeigenschaften wie normale Redshift-Abfragen aufweisen.

Häufige Probleme und Lösungen

S3-Bucket- und Redshift-Cluster befinden sich in verschiedenen AWS-Regionen

Standardmäßig funktionieren S3- <-> Redshift-Kopien nicht, wenn sich der S3-Bucket- und der Redshift-Cluster in verschiedenen AWS-Regionen befinden.

Wenn Sie versuchen, eine Redshift-Tabelle zu lesen, wenn sich der S3-Bucket in einer anderen Region befindet, wird möglicherweise ein Fehler wie der folgende angezeigt:

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

Ebenso kann der Versuch, mit einem S3-Bucket in einem anderen Bereich in Redshift zu schreiben, den folgenden Fehler verursachen:

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • Schreibt: Der Redshift COPY-Befehl unterstützt die explizite Spezifikation der S3-Bucketregion, sodass Sie Schreibvorgänge in Redshift in diesen Fällen ordnungsgemäß vornehmen können, indem Sie der extracopyoptions-Einstellung region 'the-region-name' hinzufügen. Verwenden Sie beispielsweise mit einem Bucket in der Region USA, Osten (Virginia) und der Skala-API:

    .option("extracopyoptions", "region 'us-east-1'")
    

    Alternativ können Sie die awsregion-Einstellung verwenden:

    .option("awsregion", "us-east-1")
    
  • Liest: Der Redshift UNLOAD-Befehl unterstützt auch die explizite Spezifikation der S3-Bucketregion. Sie können Lesevorgänge ordnungsgemäß vornehmen, indem Sie der awsregion-Einstellung die Region hinzufügen:

    .option("awsregion", "us-east-1")
    

Authentifizierungsfehler beim Verwenden eines Kennworts mit Sonderzeichen in der JDBC-URL

Wenn Sie den Benutzernamen und das Kennwort als Teil der JDBC-URL angeben und das Kennwort Sonderzeichen wie ;, ?oder & enthält, wird möglicherweise die folgende Ausnahme angezeigt:

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

Dies wird durch Sonderzeichen im Benutzernamen oder Kennwort verursacht, die vom JDBC-Treiber nicht ordnungsgemäß entfernt werden. Stellen Sie sicher, dass Sie den Benutzernamen und das Kennwort mithilfe der entsprechenden DataFrame-Optionen user und password angeben. Weitere Informationen finden Sie unter Parameter.

Lange ausgeführte Spark-Abfrage hängt unbegrenzt, obwohl der entsprechende Redshift-Vorgang abgeschlossen ist.

Wenn Sie große Datenmengen aus und in Redshift lesen oder schreiben, hängt Ihre Spark-Abfrage möglicherweise unbegrenzt, auch wenn auf der AWS Redshift Monitoring-Seite angezeigt wird, dass der entsprechende LOAD- oder UNLOAD-Vorgang abgeschlossen ist und dass Cluster im Leerlauf ist. Dies wird durch die Verbindung zwischen Redshift und der Spark-Zeitüberschreitung verursacht. Um dies zu vermeiden, stellen Sie sicher, dass das JDBC-Flag tcpKeepAlive aktiviert ist und dass TCPKeepAliveMinutes auf einen niedrigen Wert festgelegt ist (z. B. 1).

Weitere Informationen finden Sie im Abschnitt zur Amazon Redshift JDBC-Treiberkonfiguration.

Semantik für Zeitstempel mit Zeitzone

Beim Lesen von Daten werden Redshift TIMESTAMP- und TIMESTAMPTZ-Datentypen dem Spark-TimestampType zugeordnet, und ein Wert wird in koordinierte Weltzeit (UTC) konvertiert und als UTC-Zeitstempel gespeichert. Bei einem Redshift TIMESTAMP wird die lokale Zeitzone angenommen, da der Wert keine Zeitzoneninformationen enthält. Beim Schreiben von Daten in eine Redshift-Tabelle wird ein Spark TimestampType dem Redshift-Datentyp TIMESTAMP zugeordnet.

Migrationsleitfaden

Die Datenquelle erfordert jetzt, dass Sie explizit forward_spark_s3_credentials festlegen, bevor Spark S3-Anmeldeinformationen an Redshift weitergeleitet werden. Diese Änderung hat keine Auswirkungen, wenn Sie die Authentifizierungsmechanismen aws_iam_role oder temporary_aws_* verwenden. Wenn Sie sich jedoch auf das alte Standardverhalten verlassen haben, müssen Sie forward_spark_s3_credentials jetzt explizit auf true festlegen, um Ihren vorherigen Redshift-zu-S3-Authentifizierungsmechanismus weiterhin verwenden zu können. Eine Erörterung der drei Authentifizierungsmechanismen und ihrer Sicherheits-Trade-offs finden Sie im Abschnitt Authentifizierung für S3 und Redshift in diesem Dokument.