共用方式為


使用 Azure Databricks 查詢 Amazon Redshift

您可以使用 Azure Databricks 從 Amazon Redshift 讀取和寫入數據表。

重要

本文所述的設定為實驗性質。 實驗性功能是以現況提供,且無法透過客戶技術支援來支援 Databricks。 若要取得完整的查詢同盟支援,您應該改用 Lakehouse 同盟,這可讓您的 Azure Databricks 使用者利用 Unity 目錄語法和資料控管工具

Databricks Redshift 數據源會使用 Amazon S3 有效率地將數據傳入和移出 Redshift,並使用 JDBC 自動觸發 Redshift 上的適當 COPYUNLOAD 命令。

注意

在 Databricks Runtime 11.3 LTS 和更新版本中,Databricks Runtime 包含 Redshift JDBC 驅動程式,可使用 redshift 格式選項的 關鍵詞來存取。 請參閱 Databricks Runtime 版本資訊,以及每個 Databricks Runtime 中包含的驅動程式版本和相容性 。 仍支援使用者提供的驅動程式,並優先於配套的 JDBC 驅動程式。

在 Databricks Runtime 10.4 LTS 和以下版本中,需要手動安裝 Redshift JDBC 驅動程式,而且查詢應該使用驅動程式 (com.databricks.spark.redshift) 的格式。 請參閱 Redshift 驅動程式安裝

使用方式

下列範例示範如何與 Redshift 驅動程式連線。 url如果您使用 PostgreSQL JDBC 驅動程式,請取代參數值。

設定 AWS 認證之後,您就可以在 Python、SQL、R 或 Scala 中使用資料源 API 搭配 Spark 數據源 API。

重要

不支援將在 Unity 目錄中定義的外部位置作為 tempdir 位置。

Python

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

SQL

在 Databricks Runtime 10.4 LTS 和以下版本上使用 SQL 讀取數據:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

在 Databricks Runtime 11.3 LTS 和更新版本上使用 SQL 讀取數據:


DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

使用 SQL 寫入資料:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

SQL API 僅支援建立新的數據表,而不會覆寫或附加。

R

在 Databricks Runtime 10.4 LTS 上使用 R 讀取數據,如下所示:

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

在 Databricks Runtime 11.3 LTS 和更新版本上使用 R 讀取數據:

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")

Scala

// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

使用 Redshift 的建議

查詢執行可能會將大量數據擷取至 S3。 如果您打算對 Redshift 中的相同數據執行數個查詢,Databricks 建議使用 Delta Lake 儲存擷取的數據。

組態

驗證 S3 和 Redshift

數據源包含數個網路連線,如下圖所示:

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

當將數據傳送至 Redshift 或從 Redshift 傳送數據時,數據源會讀取和寫入 S3。 因此,它需要具有 S3 貯體讀取和寫入存取權的 AWS 認證(使用 tempdir 組態參數所指定)。

注意

數據源不會清除它在 S3 中建立的暫存盤。 因此,我們建議您使用具有物件生命週期設定的專用暫存 S3 貯體,以確保在指定的到期期間之後自動刪除暫存盤。 如需如何加密這些檔案的討論,請參閱本檔的加密一節。 您無法使用在 Unity 目錄中定義的外部位置做為 tempdir 位置。

下列各節說明每個連線的驗證組態選項:

Spark 驅動程式至 Redshift

Spark 驅動程式會使用使用者名稱和密碼透過 JDBC 連線到 Redshift。 Redshift 不支援使用 IAM 角色來驗證此連線。 根據預設,此聯機會使用 SSL 加密;如需詳細資訊,請參閱 加密

Spark 至 S3

S3 可作為在讀取或寫入 Redshift 時儲存大量數據的媒介。 Spark 會同時使用 Hadoop FileSystem 介面和直接使用 Amazon Java SDK 的 S3 用戶端來連線到 S3。

注意

您無法使用 DBFS 掛接來設定 S3 for Redshift 的存取權。

  • 在 Hadoop conf 中設定金鑰:您可以使用 Hadoop 組態屬性來指定 AWS 金鑰tempdir如果您的組態指向s3a://文件系統,您可以在 Hadoop XML 組態檔中設定 fs.s3a.access.keyfs.s3a.secret.key 屬性,或呼叫 sc.hadoopConfiguration.set() 來設定 Spark 的全域 Hadoop 組態。 如果您使用 s3n:// 檔案系統,您可以提供舊版組態密鑰,如下列範例所示。

    Scala

    例如,如果您使用 s3a 檔案系統,請新增:

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    針對舊版 s3n 文件系統,新增:

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    
    Python

    下列命令依賴一些Spark內部,但應該使用所有 PySpark 版本,而且未來不太可能變更:

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

將重新移轉至 S3

forward_spark_s3_credentials 選項設定為 true ,以自動將Spark用來透過 JDBC 連線到 S3 的 AWS 金鑰認證轉送至 Redshift。 JDBC 查詢會內嵌這些認證,因此 Databricks 強烈建議您啟用 JDBC 連線的 SSL 加密。

加密

  • 保護 JDBC:除非 JDBC URL 中存在任何 SSL 相關設定,否則數據源預設會啟用 SSL 加密,並驗證 Redshift 伺服器是否值得信任(亦即 sslmode=verify-full)。 為此,第一次需要伺服器時,系統會自動從 Amazon 伺服器下載伺服器證書。 如果失敗,則會使用預先配套的憑證檔案作為後援。 這適用於 Redshift 和 PostgreSQL JDBC 驅動程式。

    如果這項功能發生任何問題,或只是想要停用 SSL,您可以在 或 DataFrameWriterDataFrameReader呼叫 .option("autoenablessl", "false")

    如果您想要指定自定義 SSL 相關設定,您可以遵循 Redshift 檔中的指示: 在 Java 中使用 SSL 和伺服器憑證和 JDBC 驅動程式組態選項 任何與數據源搭配使用的 JDBC url 中出現的 SSL 相關選項都優先 (也就是自動設定不會觸發)。

  • 加密儲存在 S3 中的 UNLOAD 資料(從 Redshift 讀取時儲存的數據):根據卸除數據至 S3 的 Redshift 檔,「UNLOAD 會使用 Amazon S3 伺服器端加密自動加密數據檔(SSE-S3)。

    Redshift 也支援使用自定義金鑰進行用戶端加密(請參閱: 卸除加密數據檔),但數據源缺少指定必要對稱密鑰的功能。

  • 加密儲存在 S3 中的 COPY 數據(寫入 Redshift 時儲存的數據):根據從 Amazon S3 載入加密資料檔的 Redshift 檔

您可以使用 COPY 命令,透過 AWS 管理的加密金鑰(SSE-S3 或 SSE-KMS)、用戶端加密或兩者,載入上傳至 Amazon S3 的數據檔。 COPY 不支援使用客戶提供金鑰 (SSE-C) 的 Amazon S3 伺服器端加密。

參數

Spark SQL 中提供的參數對應或 OPTIONS 支援下列設定:

參數 必要 預設 描述
dbtable 是,除非指定查詢。 在 Redshift 中建立或讀取的數據表。 將數據儲存回 Redshift 時,需要此參數。
query 是,除非指定 dbtable。 要從 Redshift 中讀取的查詢。
使用者 No Redshift 用戶名稱。 必須與密碼選項搭配使用。 只有在使用者和密碼未傳入 URL 時,才能使用,傳遞兩者都會導致錯誤。 當使用者名稱包含需要逸出的特殊字元時,請使用此參數。
password No Redshift 密碼。 必須與 user 選項搭配使用。 只有在使用者和密碼未傳入 URL 時,才能使用;傳遞這兩者會導致錯誤。 當密碼包含需要逸出的特殊字元時,請使用此參數。
URL Yes 格式的 JDBC URL
jdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>

subprotocol 可以是 postgresqlredshift,視您已載入的 JDBC 驅動程式而定。 一個 Redshift 相容的驅動程式必須位於 classpath 上,並符合此 URL。 hostport 應該指向 Redshift 主要節點,因此必須設定安全組和/或VP,以允許從驅動程式應用程式存取。
database 會識別 Redshift 資料庫名稱 user ,而且 password 是存取資料庫的認證,此資料庫必須內嵌在此 JDBC 的 URL 中,而且您的使用者帳戶應該具有所參考數據表的必要許可權。
search_path No 在 Redshift 中設定架構搜尋路徑。 將會使用 SET search_path to 命令來設定。 應該是以逗號分隔的架構名稱清單,以搜尋 中的數據表。 請參閱 search_path 的 Redshift 檔。
aws_iam_role 只有當使用 IAM 角色授權時。 附加至 Redshift 叢集之 IAM Redshift COPY/UNLOAD 作業角色的完整 ARN,例如 。 arn:aws:iam::123456789000:role/<redshift-iam-role>
forward_spark_s3_credentials No false 如果 true為 ,數據源會自動探索Spark用來連線到S3的認證,並透過JDBC將這些認證轉送至 Redshift。 這些認證會作為 JDBC 查詢的一部分傳送,因此強烈建議在使用此選項時啟用 JDBC 連線的 SSL 加密。
temporary_aws_access_key_id No AWS 存取金鑰必須具有 S3 貯體寫入許可權。
temporary_aws_secret_access_key No 對應至所提供存取金鑰的 AWS 秘密存取金鑰。
temporary_aws_session_token No 對應至所提供存取金鑰的 AWS 工作階段令牌。
tempdir Yes Amazon S3 中可寫入的位置,用於讀取和寫入時要載入 Redshift 的 Avro 數據時卸除數據。 如果您使用 Redshift 數據源作為一般 ETL 管線的一部分,在貯體上設定 生命周期原則 並使用該原則作為此數據的暫存位置很有用。

您無法使用 Unity 目錄中 定義的外部位置作為 tempdir 位置。
jdbcdriver No 由 JDBC URL 的子項目決定。 要使用的 JDBC 驅動程式類別名稱。 此類別必須位於 classpath 上。 在大部分情況下,不應該指定這個選項,因為適當的驅動程序類別名稱應該由 JDBC URL 的子項目自動決定。
diststyle No EVEN 建立數據表時要使用的 Redshift 散發樣式 。 可以是 的 EVEN其中一個 , KEYALL (請參閱 Redshift 檔)。 使用 KEY時,您也必須使用 distkey 選項來設定散髮密鑰。
distkey 否,除非使用 DISTSTYLE KEY 建立數據表時,要當做散發索引鍵使用之數據表中的數據行名稱。
sortkeyspec No 完整的 Redshift 排序索引鍵 定義。 範例包含:

- SORTKEY(my_sort_column)
- COMPOUND SORTKEY(sort_col_1, sort_col_2)
- INTERLEAVED SORTKEY(sort_col_1, sort_col_2)
usestagingtable (已淘汰) No true 將這個已被取代的選項設定為 false ,會在寫入開始時立即卸除覆寫作業的目的地數據表,使覆寫作業不可部分完成,並減少目的地數據表的可用性。 這可能會減少覆寫的暫存磁碟空間需求。

由於設定 usestagingtable=false 作業會造成數據遺失或無法使用的風險,因此它已被取代,因此建議您手動卸除目的地數據表。
description No 數據表的描述。 將會使用 SQL COMMENT 命令來設定,而且應該會顯示在大部分的查詢工具中。 另請參閱元數據, description 以設定個別數據行的描述。
preactions No 載入 ; 命令之前 COPY 要執行的 SQL 命令分隔清單。 在載入新資料之前,在這裡執行一些 DELETE 命令或類似的命令可能很有用。 如果命令包含 %s,則數據表名稱會在執行前格式化 (以防您使用臨時表)。

請注意,如果這些命令失敗,則會將其視為錯誤並擲回例外狀況。 如果使用臨時表,則會還原變更,如果預先動作失敗,則會還原備份數據表。
postactions No ;載入數據成功COPY之後要執行的 SQL 命令分隔清單。 載入新資料時,在此執行某些 GRANT 命令或類似的執行可能很有用。 如果命令包含 %s,則數據表名稱會在執行前格式化 (以防您使用臨時表)。

請注意,如果這些命令失敗,則會將其視為錯誤並擲回例外狀況。 如果使用臨時表,則會還原變更,並在執行後動作失敗時還原備份數據表。
extracopyoptions No 載入資料時要附加至 Redshift COPY 命令的額外選項清單,例如
TRUNCATECOLUMNSMAXERROR n (如需其他選項, 請參閱 Redshift 檔 )。

由於這些選項會附加至命令結尾 COPY ,因此只能使用命令結尾有意義的選項,但應該涵蓋大部分可能的使用案例。
tempformat No AVRO 寫入 Redshift 時,在 S3 中儲存暫存盤的格式。 預設為
AVRO;其他允許的值分別是 CSV CSV CSV GZIP 和 gzipped CSV。

載入 CSV 時,Redshift 的速度比載入 Avro 檔案時要快得多,因此使用 tempformat 可能會在寫入 Redshift 時提供較大的效能提升。
csvnullstring No @NULL@ 使用 CSV tempformat 時要寫入 Null 的 String 值。 這應該是未出現在實際數據中的值。
csvseparator No , 寫入暫存盤時所要使用的分隔符,暫存盤設定為 CSV
CSV GZIP. 這必須是有效的 ASCII 字元,例如 「,或」 或 “\|
csvignoreleadingwhitespace No true 當設定為 true 時,會在寫入期間從值中移除前置空格符
tempformat 設定為 CSVCSV GZIP。 否則,會保留空格符。
csvignoretrailingwhitespace No true 當設定為 true 時,會在寫入期間從值中移除尾端空格符
tempformat 設定為 CSVCSV GZIP。 否則,會保留空格符。
infer_timestamp_ntz_type No false 如果 true為 ,則 Redshift TIMESTAMP 類型的值會在讀取期間解譯為 TimestampNTZType (時間戳不含時區)。 否則,不論基礎 Redshift 資料表中的類型為何,所有時間戳都會解譯為 TimestampType

其他組態選項

設定字串數據行的大小上限

建立 Redshift 數據表時,預設行為是建立 TEXT 字串數據行的數據行。 Redshift 會將資料 TEXT 行儲存為 VARCHAR(256),因此這些數據行的大小上限為 256 個字元(來源)。

若要支援較大的數據行,您可以使用資料 maxlength 行元數據欄位來指定個別字串資料行的最大長度。 這也適用於藉由宣告長度小於預設值的數據行,來實作節省空間的效能優化。

注意

由於 Spark 的限制,SQL 和 R 語言 API 不支援資料行元數據修改。

Python

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Scala

以下是使用 Spark Scala API 更新多個資料行元資料欄位的範例:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

設定自訂數據行類型

如果您需要手動設定資料行類型,您可以使用數據 redshift_type 行元數據。 例如,如果您想要覆寫 Spark SQL Schema -> Redshift SQL 類型比對器來指派使用者定義的數據行類型,您可以執行下列動作:

Python

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Scala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

設定數據行編碼

建立資料表時,請使用數據 encoding 行元數據欄位來指定每個資料行的壓縮編碼方式(請參閱 Amazon 檔 以取得可用的編碼方式)。

設定數據行的描述

Redshift 可讓數據行附加描述,這些描述應該顯示在大部分的查詢工具中(使用 COMMENT 命令)。 您可以設定資料 description 列元數據欄位,以指定個別資料列的描述。

查詢下推至 Redshift

Spark 優化器會將下列運算子向下推送至 Redshift:

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

在和 FilterProject,它支援下列表達式:

  • 大部分布爾邏輯運算元
  • 比較
  • 基本算術運算子
  • 數值和字串轉換
  • 大部分的字串函式
  • 純量子查詢,如果他們可以完全向下推送到 Redshift。

注意

此下推不支援在日期和時間時間戳上操作的表達式。

在 中 Aggregation,它支援下列聚合函數:

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

DISTINCT 子句結合,如果適用的話。

在內 Join,它支援下列類型的聯結:

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • 優化工具重寫成Join的子查詢,例如 、 WHERE EXISTSWHERE NOT EXISTS

注意

聯結下推不支援 FULL OUTER JOIN

使用的查詢 LIMIT中,下推可能會最有説明。 這類 SELECT * FROM large_redshift_table LIMIT 10 查詢可能需要很長的時間,因為整個數據表會先將 UNLOADed 到 S3 作為中繼結果。 在下推時,會在 LIMIT Redshift 中執行 。 在具有匯總的查詢中,將匯總向下推送至 Redshift 也有助於減少需要傳輸的數據量。

默認會啟用查詢下推至 Redshift。 您可以藉由將 設定 spark.databricks.redshift.pushdownfalse來停用它。 即使停用,Spark 仍會向下推送篩選,並執行數據行刪除至 Redshift。

Redshift 驅動程序安裝

Redshift 數據源也需要與 Redshift 相容的 JDBC 驅動程式。 由於 Redshift 是以 PostgreSQL 資料庫系統為基礎,因此您可以使用 Databricks Runtime 隨附的 PostgreSQL JDBC 驅動程式或 Amazon 建議的 Redshift JDBC 驅動程式。 不需要安裝PostgreSQL JDBC 驅動程式。 每個 Databricks Runtime 版本中包含的 PostgreSQL JDBC 驅動程式版本會列在 Databricks Runtime 版本資訊中。

若要手動安裝 Redshift JDBC 驅動程式:

  1. 從 Amazon 下載 驅動程式。
  2. 將驅動程式上傳至您的 Azure Databricks 工作區。 請參閱程式庫
  3. 在叢集上安裝連結庫。

注意

Databricks 建議使用最新版的 Redshift JDBC 驅動程式。 1.2.41 以下的 Redshift JDBC 驅動程式版本有下列限制:

  • 在 SQL 查詢中使用 where 子句時,驅動程式 1.2.16 版會傳回空的數據。
  • 1.2.41 以下驅動程式的版本可能會傳回無效的結果,因為數據行的 Null 功能不正確地回報為「不可為 Null」,而不是「未知」。

交易式保證

本節說明 Spark Redshift 數據源的交易保證。

Redshift 和 S3 屬性的一般背景

如需 Redshift 交易保證的一般資訊,請參閱 Redshift 檔中的<管理並行寫入作業 >一章。 簡言之,Redshift 會根據 Redshift BEGIN 命令的檔提供可串行化的隔離

[雖然] 您可以使用四個交易隔離等級中的任何一個,Amazon Redshift 會將所有隔離等級視為可串行化。

根據 Redshift 檔

Amazon Redshift 支援預設的 自動認可 行為,其中每個個別執行的 SQL 命令會個別認可。

因此,和 等COPYUNLOAD個別命令是不可部分完成的和交易式,而明確BEGINEND應該只需要強制執行多個命令或查詢的不可部分完成性。

讀取和寫入 Redshift 時,數據源會在 S3 中讀取和寫入數據。 Spark 和 Redshift 都會產生分割輸出,並將其儲存在 S3 中的多個檔案中。 根據 Amazon S3 資料一致性模型 檔,S3 貯體清單作業最終會保持一致,因此檔案必須移至特殊長度,以避免因為此最終一致性來源而遺失或不完整的數據。

Spark 的 Redshift 數據源保證

附加至現有的數據表

將數據列插入 Redshift 時,數據源會使用 COPY 命令,並指定 指令清單 來防範特定最終一致的 S3 作業。 因此, spark-redshift 附加至現有數據表的不可部分完成和交易屬性與一般 Redshift COPY 命令相同。

建立新的資料表 (SaveMode.CreateIfNotExists

建立新的數據表是一個雙步驟的程式,由 CREATE TABLE 命令所組成,後面接著 COPY 命令以附加初始數據列集。 這兩個作業都會在相同的交易中執行。

覆寫現有的數據表

根據預設,數據源會使用交易來執行覆寫,其實作方式是刪除目的地數據表、建立新的空白數據表,以及附加數據列。

如果已被 usestagingtable 取代的設定設為 false,數據源會在將數據列附加至新數據表之前認可 DELETE TABLE 命令,犧牲覆寫作業的不可部分完成性,但會減少 Redshift 在覆寫期間所需的暫存空間量。

查詢 Redshift 數據表

查詢會使用 Redshift UNLOAD 命令來執行查詢,並將其結果儲存至 S3,並使用 指令清單 來防範特定最終一致的 S3 作業。 因此,Spark Redshift 數據源的查詢應該具有與一般 Redshift 查詢相同的一致性屬性。

常見問題與解決方案

S3 貯體和 Redshift 叢集位於不同的 AWS 區域中

根據預設,如果 S3 貯體和 Redshift 叢集位於不同的 AWS 區域中,S3 <-> Redshift 複本將無法運作。

如果您在 S3 貯體位於不同區域時嘗試讀取 Redshift 數據表,您可能會看到錯誤,例如:

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

同樣地,嘗試在不同的區域中使用 S3 貯體寫入 Redshift,可能會導致下列錯誤:

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • 寫入: Redshift COPY 命令支援 S3 貯體區域的明確規格,因此您可以藉由將 新增 region 'the-region-name'extracopyoptions 設定,讓寫入 Redshift 能夠正常運作。 例如,在美國東部(維吉尼亞州)區域和 Scala API 中使用貯體,請使用:

    .option("extracopyoptions", "region 'us-east-1'")
    

    您也可以使用 awsregion 設定:

    .option("awsregion", "us-east-1")
    
  • 讀取: Redshift UNLOAD 命令也支持明確規格 S3 貯體區域。 您可以將區域新增至 awsregion 設定,讓讀取正常運作:

    .option("awsregion", "us-east-1")
    

在 JDBC URL 中搭配特殊字元使用密碼時發生驗證錯誤

如果您要在 JDBC URL 中提供使用者名稱和密碼,且密碼包含特殊字元,例如 ;?&,您可能會看到下列例外狀況:

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

這是因為 JDBC 驅動程式未正確逸出使用者名稱或密碼中的特殊字元所造成。 請務必使用對應的 DataFrame 選項 userpassword來指定使用者名稱和密碼。 如需詳細資訊,請參閱參數

即使完成對應的 Redshift 作業,長時間執行的 Spark 查詢仍會無限期停止回應

如果您要從 Redshift 讀取或寫入大量數據,您的 Spark 查詢可能會無限期停止回應,即使 AWS Redshift 監視頁面顯示對應的 LOADUNLOAD 作業已完成,而且叢集處於閑置狀態。 這是由 Redshift 與 Spark 之間的連線逾時所造成。若要避免這種情況,請確定 tcpKeepAlive 已啟用 JDBC 旗標,並 TCPKeepAliveMinutes 設定為低值(例如 1)。

如需詳細資訊,請參閱 Amazon Redshift JDBC Driver Configuration

具有時區語意的時間戳

讀取數據時,Redshift TIMESTAMPTIMESTAMPTZ 數據類型都會對應至 Spark TimestampType,且值會轉換成國際標準時間 (UTC),並儲存為 UTC 時間戳。 若為 Redshift TIMESTAMP,則會假設本機時區的值沒有任何時區資訊。 將數據寫入 Redshift 數據表時,Spark TimestampType 會對應至 Redshift TIMESTAMP 數據類型。

移轉指南

數據源現在會要求您在Spark S3認證轉送至 Redshift 之前明確設定 forward_spark_s3_credentials 。 如果您使用 或 temporary_aws_* 驗證機制,aws_iam_role這項變更不會有任何影響。 不過,如果您依賴舊的預設行為,您現在必須明確設定 forward_spark_s3_credentialstrue ,以繼續使用先前的 Redshift 到 S3 驗證機制。 如需三種驗證機制及其安全性取捨的討論,請參閱 本文件的驗證至 S3 和 Redshift 一節。