Fråga Amazon Redshift med Azure Databricks
Du kan läsa och skriva tabeller från Amazon Redshift med Azure Databricks.
Viktigt!
De konfigurationer som beskrivs i den här artikeln är Experimentella. Experimentella funktioner tillhandahålls som de är och stöds inte av Databricks via teknisk kundsupport. För att få fullständigt stöd för frågefederation bör du i stället använda Lakehouse Federation, vilket gör att dina Azure Databricks-användare kan dra nytta av Unity Catalog-syntaxen och datastyrningsverktygen.
Databricks Redshift-datakällan använder Amazon S3 för att effektivt överföra data till och från Redshift och använder JDBC för att automatiskt utlösa lämpliga COPY
kommandon och UNLOAD
kommandon på Redshift.
Anteckning
I Databricks Runtime 11.3 LTS och senare innehåller Databricks Runtime Redshift JDBC-drivrutinen, som är tillgänglig med hjälp av nyckelordet redshift
för formatalternativet. Se Versionsinformation och kompatibilitet för Databricks Runtime för drivrutinsversioner som ingår i varje Databricks Runtime. Drivrutiner som tillhandahålls av användaren stöds fortfarande och har företräde framför den paketerade JDBC-drivrutinen.
I Databricks Runtime 10.4 LTS och nedan krävs manuell installation av Redshift JDBC-drivrutinen, och frågor bör använda drivrutinen (com.databricks.spark.redshift
) för formatet. Se Installation av Redshift-drivrutin.
Förbrukning
Följande exempel visar hur du ansluter med Redshift-drivrutinen.
url
Ersätt parametervärdena om du använder PostgreSQL JDBC-drivrutinen.
När du har konfigurerat dina AWS-autentiseringsuppgifter kan du använda datakällan med Spark-datakällans API i Python, SQL, R eller Scala.
Viktigt!
Externa platser som definieras i Unity Catalog stöds inte som tempdir
platser.
Python
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)
# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)
SQL
Läs data med SQL på Databricks Runtime 10.4 LTS och tidigare versioner.
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
dbtable '<table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Läsa data med SQL i Databricks Runtime 11.3 LTS och senare:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 5439 if not specified. *./
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
tempdir 's3a://<bucket>/<directory-path>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Skriva data med SQL:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
dbtable '<new-table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;
SQL-API:et stöder endast skapandet av nya tabeller och skriver inte över eller lägger till.
R
Läs data i R på Databricks Runtime 10.4 LTS och tidigare versioner.
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
Läs data med R på Databricks Runtime 11.3 LTS och senare
df <- read.df(
NULL,
"redshift",
host = "hostname",
port = "port",
user = "username",
password = "password",
database = "database-name",
dbtable = "schema-name.table-name",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
forward_spark_s3_credentials = "true",
dbtable = "<your-table-name>")
Scala
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 5439 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", true)
.load()
// Read data from a query
val df = spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
Rekommendationer för att arbeta med Redshift
Utförandet av en fråga kan extrahera stora mängder data till S3. Om du planerar att utföra flera frågor mot samma data i Redshift rekommenderar Databricks att du sparar extraherade data med Delta Lake.
Konfiguration
Autentisera till S3 och Redshift
Datakällan omfattar flera nätverksanslutningar, vilket illustreras i följande diagram:
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
Datakällan läser och skriver data till S3 när data överförs till/från Redshift. Därför krävs AWS-autentiseringsuppgifter med läs- och skrivåtkomst till en S3-bucket (anges med hjälp av konfigurationsparametern tempdir
).
Kommentar
Datakällan rensar inte de temporära filer som skapas i S3. Därför rekommenderar vi att du använder en dedikerad tillfällig S3-bucket med en objektlivscykelkonfiguration för att säkerställa att temporära filer tas bort automatiskt efter en angiven giltighetstid.
Se avsnittet Kryptering i det här dokumentet för en diskussion om hur du krypterar dessa filer. Du kan inte använda en extern plats som definierats i Unity Catalog som en tempdir
plats.
I följande avsnitt beskrivs konfigurationsalternativen för varje anslutning:
Spark-drivrutin för Redshift
Spark-drivrutinen ansluter till Redshift via JDBC med ett användarnamn och lösenord. Redshift stöder inte användning av IAM-roller för att autentisera den här anslutningen. Som standard använder den här anslutningen SSL-kryptering. Mer information finns i Kryptering.
Spark till S3
S3 fungerar som mellanhand för att lagra massdata vid läsning från eller skrivning till Redshift. Spark ansluter till S3 med både Hadoop FileSystem-gränssnitten och direkt med Amazon Java SDK:s S3-klient.
Anmärkning
Du kan inte använda DBFS-monteringar för att konfigurera åtkomst till S3 för Redshift.
Ange nycklar i Hadoop-konfiguration: Du kan ange AWS-nycklar med hjälp av Hadoop-konfigurationsegenskaper. Om konfigurationen
tempdir
pekar på etts3a://
filsystem kan du angefs.s3a.access.key
egenskaperna ochfs.s3a.secret.key
i en Hadoop XML-konfigurationsfil eller anropasc.hadoopConfiguration.set()
för att konfigurera Sparks globala Hadoop-konfiguration. Om du använder etts3n://
filsystem kan du ange de äldre konfigurationsnycklarna enligt följande exempel.Scala
Om du till exempel använder
s3a
filsystemet lägger du till:sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
För det äldre
s3n
filsystemet lägger du till:sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
Python
Följande kommando förlitar sig på vissa interna Spark-objekt, men bör fungera med alla PySpark-versioner och kommer sannolikt inte att ändras i framtiden:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
Redshift till S3
Ange alternativet forward_spark_s3_credentials
till true
för att automatiskt vidarebefordra autentiseringsuppgifterna för AWS-nyckeln som Spark använder för att ansluta till S3 via JDBC till Redshift. JDBC-frågan bäddar in dessa autentiseringsuppgifter så därför rekommenderar Databricks starkt att du aktiverar SSL-kryptering för JDBC-anslutningen.
Kryptering
Skydda JDBC: Om inga SSL-relaterade inställningar finns i JDBC-URL:en aktiverar datakällan som standard SSL-kryptering och verifierar även att Redshift-servern är tillförlitlig (det vill:
sslmode=verify-full
). För det laddas ett servercertifikat automatiskt ned från Amazon-servrarna första gången det behövs. Om det misslyckas används en i förväg paketerad certifikatfil som reserv. Detta gäller för både Redshift- och PostgreSQL JDBC-drivrutinerna.Om det finns problem med den här funktionen, eller om du helt enkelt vill inaktivera SSL, kan du anropa
.option("autoenablessl", "false")
på dinDataFrameReader
ellerDataFrameWriter
.Om du vill ange anpassade SSL-relaterade inställningar kan du följa anvisningarna i Redshift-dokumentationen: Användning av SSL- och servercertifikat i Java - och JDBC-drivrutinskonfigurationsalternativ Alla SSL-relaterade alternativ som finns i JDBC
url
som används med datakällan har företräde (det vill s. den automatiska konfigurationen utlöses inte).Kryptera UNLOAD-data som lagras i S3 (data som lagras vid läsning från Redshift): Enligt Redshift-dokumentationen om att avlasta data till S3 avlastar UNLOAD automatiskt datafiler med server-side kryptering via Amazon S3 (SSE-S3).
Redshift stöder också kryptering på klientsidan med en anpassad nyckel (se: Ta bort krypterade datafiler) men datakällan saknar möjlighet att ange den symmetriska nyckel som krävs.
Kryptera COPY-data som lagras i S3 (data som lagras när de skrivs till Redshift): Enligt Redshift-dokumentationen om inläsning av krypterade datafiler från Amazon S3:
Du kan använda COPY
kommandot för att läsa in datafiler som laddats upp till Amazon S3 med hjälp av kryptering på serversidan med AWS-hanterade krypteringsnycklar (SSE-S3 eller SSE-KMS), kryptering på klientsidan eller både och. COPY stöder inte Amazon S3-kryptering på serversidan med en kundtilldelad nyckel (SSE-C).
Parametrar
Parameterkartan eller ALTERNATIV som anges i Spark SQL stöder följande inställningar:
Parameter | Obligatoriskt | Standardvärde | beskrivning |
---|---|---|---|
dbtabell | Ja, såvida inte frågan har angetts. | Ingen | Tabellen som ska skapas eller läsas från i Redshift. Den här parametern krävs när du sparar data tillbaka till Redshift. |
query | Ja, såvida inte dbtable har angetts. | Ingen | Frågan som ska läsas från i Redshift. |
användare | Nej | Ingen | Redshift-användarnamnet. Måste användas tillsammans med lösenordsalternativet. Kan endast användas om användaren och lösenordet inte skickas i URL:en, vilket resulterar i ett fel när båda skickas. Använd den här parametern när användarnamnet innehåller specialtecken som måste vara undantagna. |
lösenord | Nej | Ingen | Redshift-lösenordet. Måste användas tillsammans med user alternativet . Kan endast användas om användaren och lösenordet inte skickas i URL:en. att skicka båda resulterar i ett fel. Använd den här parametern när lösenordet innehåller specialtecken som måste tas bort. |
URL | Ja | Ingen | En URL för JDBC, i formatetjdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password> subprotocol kan vara postgresql eller redshift , beroende på vilken JDBC-drivrutin du har läst in. En Redshift-kompatibel drivrutin måste finnas på klassens sökväg och matcha den här URL:en.
host och port bör peka på Redshift-huvudnoden, så säkerhetsgrupper och/eller VPC måste konfigureras för att tillåta åtkomst från ditt drivrutinsprogram.
database identifierar ett Redshift-databasnamn user och password är autentiseringsuppgifter för att komma åt databasen, som måste bäddas in i den här URL:en för JDBC, och ditt användarkonto bör ha nödvändiga behörigheter för tabellen som refereras till. |
sökväg | Nej | Ingen | Ange sökväg för schemasökning i Redshift. Anges med kommandot SET search_path to . Bör vara en kommaavgränsad lista med schemanamn att söka efter tabeller i. Se Redshift-dokumentationen för search_path. |
aws_iam_role | Endast om du använder IAM-roller för att auktorisera. | Ingen | Fullständigt angiven ARN för IAM Redshift COPY/UNLOAD-åtgärdsrollen som är kopplad till Redshift-klustret, till exempel arn:aws:iam::123456789000:role/<redshift-iam-role> . |
vidarebefordra_spark_s3_behörigheter | Nej | false |
Om true identifierar datakällan automatiskt de autentiseringsuppgifter som Spark använder för att ansluta till S3 och vidarebefordrar dessa autentiseringsuppgifter till Redshift över JDBC. Dessa autentiseringsuppgifter skickas som en del av JDBC-frågan, så därför rekommenderar vi starkt att du aktiverar SSL-kryptering för JDBC-anslutningen när du använder det här alternativet. |
temporary_aws_access_key_id | Nej | Ingen | AWS-åtkomstnyckeln måste ha skrivbehörighet till S3-bucketen. |
tillfällig_aws_hemlig_åtkomstnyckel | Nej | Ingen | AWS-hemlig åtkomstnyckel som motsvarar den angivna åtkomstnyckeln. |
temporary_aws_session_token (tillfällig AWS-sessionstoken) | Nej | Ingen | AWS-sessionstoken som motsvarar den angivna åtkomstnyckeln. |
tempdir | Ja | Ingen | En skrivbar plats i Amazon S3 som ska användas för oladdade data vid läsning och Avro-data som ska läsas in i Redshift vid skrivning. Om du använder Redshift-datakällan för Spark som en del av en vanlig ETL-pipeline kan det vara användbart att ange en livscykelprincip på en bucket och använda den som en temporär plats för dessa data. Du kan inte använda externa platser som definierats i Unity Catalog som tempdir platser. |
jdbcdriver | Nej | Bestäms av JDBC-URL:ens underprotokol. | Klassnamnet på den JDBC-drivrutin som ska användas. Den här klassen måste finnas på klassökvägen. I de flesta fall bör det inte vara nödvändigt att ange det här alternativet, eftersom det lämpliga drivrutinsklassnamnet automatiskt bör fastställas av JDBC-URL:ens underprotokol. |
diststyle | Nej | EVEN |
Redshift-distributionsformatet som ska användas när du skapar en tabell. Kan vara ett av EVEN , KEY eller ALL (se Redshift-dokument). När du använder KEY måste du också ange en distributionsnyckel med alternativet distkey. |
distkey | Nej, om du inte använder DISTSTYLE KEY |
Ingen | Namnet på en kolumn i tabellen som ska användas som distributionsnyckel när du skapar en tabell. |
sorteringsnyckelspecifikation | Nej | Ingen | En fullständig Redshift-sorteringsnyckeldefinition. Exempel:
|
usestagingtable (inaktuell) | Nej | true |
Inställning av detta inaktuella alternativ till false orsakar att en överskrivningsåtgärds måltabell tas bort omedelbart i början av skrivningen, vilket gör överskrivningsåtgärden icke-atomisk, och minskar tillgängligheten för måltabellen. Detta kan minska de tillfälliga diskutrymmeskraven för överskrivningar.Eftersom inställningen usestagingtable=false av åtgärden riskerar dataförlust eller otillgänglighet är den föråldrad och det rekommenderas att du manuellt tar bort måltabellen. |
beskrivning | Nej | Ingen | En beskrivning av tabellen. Kommer att ställas in med hjälp av SQL COMMENT-kommandot och bör visas i de flesta frågeverktyg. Se också description -metadata för att ange beskrivningar för enskilda kolumner. |
föråtgärder | Nej | Ingen | En ; avgränsad lista över SQL-kommandon som ska köras innan COPY kommandot laddas. Det kan vara bra att köra vissa DELETE kommandon eller liknande här innan du läser in nya data. Om kommandot innehåller %s , formateras tabellnamnet innan körningen (om du använder en mellanlagringstabell).Varnas för att om dessa kommandon misslyckas behandlas det som ett fel och ett undantag genereras. Om du använder en mellanlagringstabell återställs ändringarna och säkerhetskopieringstabellen återställs om föråtgärder misslyckas. |
efteråtgärder | Nej | Ingen | En ; avgränsad lista över SQL-kommandon som ska köras efter en lyckad COPY inläsning av data. Det kan vara användbart att vissa GRANT kommandon eller liknande körs här när du läser in nya data. Om kommandot innehåller %s formateras tabellnamnet före körningen (om du använder en mellanlagringstabell).Varnas för att om dessa kommandon misslyckas behandlas det som ett fel och ett undantag genereras. Om du använder en mellanlagringstabell återställs ändringarna och säkerhetskopieringstabellen återställs om efteråtgärder misslyckas. |
extra kopieringsalternativ | Nej | Ingen | En lista över extra alternativ för att lägga till redshift-kommandot COPY vid inläsning av data, till exempel TRUNCATECOLUMNS eller MAXERROR n (se Redshift-dokument för andra alternativ).Eftersom dessa alternativ läggs till i slutet av COPY kommandot kan endast alternativ som är lämpliga i slutet av kommandot användas, men det bör omfatta de flesta möjliga användningsfall. |
tempformat | Nej | AVRO |
Formatet där du kan spara temporära filer i S3 när du skriver till Redshift. Standardvärdet är AVRO ; de andra tillåtna värdena är CSV och CSV GZIP för CSV respektive gzipped CSV.Redshift är betydligt snabbare vid inläsning av CSV än vid inläsning av Avro-filer, så användning av det tempformatet kan ge en stor prestandaökning när du skriver till Redshift. |
csvnullstring | Nej | @NULL@ |
Strängvärdet som ska skrivas för null när du använder CSV-tempformatet. Detta bör vara ett värde som inte visas i dina faktiska data. |
CSV-avgränsare | Nej | , |
Avgränsare som ska användas när du skriver temporära filer med tempformat inställt på CSV eller CSV GZIP . Detta måste vara ett giltigt ASCII-tecken, till exempel ", " eller "\| ". |
csvignoreleadingwhitespace | Nej | true |
När värdet är true tas inledande blanksteg bort från värden under skrivningar när tempformat är inställt på CSV eller CSV GZIP . Annars behålls tomt utrymme. |
csvignoretrailingwhitespace | Nej | true |
När värdet är true tar bort avslutande blanksteg från värden under skrivningar när tempformat är inställt på CSV eller CSV GZIP . Annars behålls tomt utrymme. |
infer_timestamp_ntz_type | Nej | false |
Om true tolkas värden av typen Redshift TIMESTAMP som TimestampNTZType (tidsstämpel utan tidszon) under läsningar. Annars tolkas alla tidsstämplar som TimestampType oavsett typ i den underliggande Redshift-tabellen. |
Ytterligare konfigurationsalternativ
Konfigurera den maximala storleken på strängkolumner
När du skapar Redshift-tabeller är standardbeteendet att skapa TEXT
kolumner för strängkolumner. Redshift lagrar TEXT
kolumner som VARCHAR(256)
, så dessa kolumner har en maximal storlek på 256 tecken (källa).
Om du vill ha stöd för större kolumner kan du använda kolumnmetadatafältet maxlength
för att ange den maximala längden på enskilda strängkolumner. Detta är också användbart för att implementera utrymmesbesparande prestandaoptimeringar genom att deklarera kolumner med en mindre maximal längd än standardvärdet.
Kommentar
På grund av begränsningar i Spark stöder SQL- och R-språk-API:erna inte ändringar av kolumnmetadata.
Python
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
Scala
Här är ett exempel på hur du uppdaterar flera kolumners metadatafält med Sparks Scala API:
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
Ange en anpassad kolumntyp
Om du behöver ange en kolumntyp manuellt kan du använda kolumnmetadata redshift_type
. Om du till exempel vill åsidosätta Spark SQL Schema -> Redshift SQL
typen matchare för att tilldela en användardefinierad kolumntyp kan du göra följande:
Python
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
Scala
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
Konfigurera kolumnkodning
När du skapar en tabell använder du encoding
fältet kolumnmetadata för att ange en komprimeringskodning för varje kolumn (se Amazon-dokument för tillgängliga kodningar).
Ange beskrivningar för kolumner
Redshift tillåter att kolumner har bifogade beskrivningar som ska visas i de flesta frågeverktyg (med kommandot COMMENT
). Du kan ange kolumnmetadatafältet description
för att ange en beskrivning för enskilda kolumner.
Skicka frågor till Redshift
Spark-optimeraren kör ner följande operatorer till Redshift:
Filter
Project
Sort
Limit
Aggregation
Join
I Project
och Filter
stöder den följande uttryck:
- De flesta booleska logikoperatorer
- Jämförelser
- Grundläggande aritmetiska åtgärder
- Numeriska och stränggjutningar
- De flesta strängfunktioner
- Skalär underfrågor, om de kan flyttas ner helt till Redshift.
Kommentar
Denna pushdown stöder inte uttryck som opererar på datum och tidsangivelser.
I Aggregation
stöder den följande aggregeringsfunktioner:
AVG
COUNT
MAX
MIN
SUM
STDDEV_SAMP
STDDEV_POP
VAR_SAMP
VAR_POP
tillsammans med klausulen DISTINCT
, i förekommande fall.
I Join
stöder den följande typer av kopplingar:
INNER JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
- Underfrågor som optimeraren skriver om till
Join
, t.ex.WHERE EXISTS
,WHERE NOT EXISTS
Kommentar
Join pushdown stöder inte FULL OUTER JOIN
.
Pushdown kan vara mest fördelaktigt i sökfrågor med LIMIT
. En frågeställning som SELECT * FROM large_redshift_table LIMIT 10
kan ta mycket lång tid eftersom hela tabellen först skulle avläsas till S3 som ett intermediärt resultat. Med pushdown körs LIMIT
i Redshift. I förfrågningar med aggregeringar bidrar det att skjuta ner aggregeringen till Redshift också till att minska mängden data som behöver överföras.
Pushdown för frågor till Redshift är aktiverat som standard. Det kan inaktiveras genom att ange spark.databricks.redshift.pushdown
till false
. Även när det är inaktiverat trycker Spark fortfarande ned filter och utför kolumneliminering till Redshift.
Installation av Redshift-drivrutin
Redshift-datakällan kräver också en Redshift-kompatibel JDBC-drivrutin. Eftersom Redshift baseras på PostgreSQL-databassystemet kan du använda PostgreSQL JDBC-drivrutinen som ingår i Databricks Runtime eller Amazons rekommenderade Redshift JDBC-drivrutin. Ingen installation krävs för att använda PostgreSQL JDBC-drivrutinen. Versionen av PostgreSQL JDBC-drivrutinen som ingår i varje Databricks Runtime-utgåva listas i release-anteckningar för Databricks Runtime.
Installera Redshift JDBC-drivrutinen manuellt:
- Ladda ned drivrutinen från Amazon.
- Ladda upp drivrutinen till din Azure Databricks-arbetsyta. Se Bibliotek.
- Installera biblioteket i klustret.
Kommentar
Databricks rekommenderar att du använder den senaste versionen av Redshift JDBC-drivrutinen. Versioner av Redshift JDBC-drivrutinen under 1.2.41 har följande begränsningar:
- Version 1.2.16 av drivrutinen returnerar tomma data när du använder en
where
sats i en SQL-fråga. - Versioner av drivrutinen under 1.2.41 kan returnera ogiltiga resultat eftersom en kolumns nullbarhet felaktigt rapporteras som "Inte nullbar" i stället för "Okänd".
Transaktionsgarantier
I det här avsnittet beskrivs transaktionsgarantierna för Redshift-datakällan för Spark.
Allmän bakgrund för redshift- och S3-egenskaper
Allmän information om Redshift-transaktionsgarantier finns i avsnittet Hantera samtidiga skrivåtgärder i Redshift-dokumentationen. I ett nötskal tillhandahåller Redshift serialiserbar isolering enligt dokumentationen för Kommandot Redshift BEGIN :
[även om] du kan använda någon av de fyra transaktionsisoleringsnivåerna bearbetar Amazon Redshift alla isoleringsnivåer som serialiserbara.
Enligt Redshift-dokumentationen:
Amazon Redshift har stöd för ett standardbeteende för automatisk incheckning där varje separat körd SQL-kommando checkar in individuellt.
Därför är enskilda kommandon som COPY
och UNLOAD
atomiska och transaktionella, samtidigt som de är explicita BEGIN
och END
bör endast vara nödvändiga för att framtvinga atomiteten för flera kommandon eller frågor.
När du läser från och skriver till Redshift använder datakällan S3 för att läsa och skriva data. Både Spark och Redshift producerar partitionerade utdata och lagrar dem i flera filer i S3. Enligt dokumentationen om Amazon S3 Data Consistency Model är S3-bucketlistningsåtgärder så småningom konsekventa, så filerna måste gå till särskilda längder för att undvika saknade eller ofullständiga data på grund av den här källan för slutlig konsekvens.
Garantier för Redshift-datakällan för Spark
Lägg till i en befintlig tabell
När du infogar rader i Redshift använder datakällan kommandot COPY och anger manifest för att skydda mot vissa S3-åtgärder som så småningom är konsekventa. Därför spark-redshift
har tillägg till befintliga tabeller samma atomiska och transaktionella egenskaper som vanliga Redshift-kommandon COPY
.
Skapa en ny tabell (SaveMode.CreateIfNotExists
)
Att skapa en ny tabell är en tvåstegsprocess som består av ett CREATE TABLE
kommando följt av ett COPY-kommando för att lägga till den första uppsättningen rader. Båda åtgärderna utförs i samma transaktion.
Skriv över en befintlig tabell
Som standard använder datakällan transaktioner för att utföra överskrivningar, som implementeras genom att ta bort måltabellen, skapa en ny tom tabell och lägga till rader i den.
Om den inaktuella usestagingtable
inställningen är satt till false
, bekräftar datakällan DELETE TABLE
-kommandot innan raderna läggs till i den nya tabellen, vilket offrar atomiciteten i överskrivningen men minskar mängden mellanlagringsutrymme som Redshift behöver under överskrivningen.
Fråga Redshift-tabellen
Frågor använder kommandot Redshift UNLOAD för att köra en fråga och spara resultatet till S3 och använda manifest för att skydda mot vissa S3-åtgärder som så småningom är konsekventa. Därför bör frågor från Redshift-datakällan för Spark ha samma konsekvensegenskaper som vanliga Redshift-frågor.
Vanliga problem och lösningar
S3-bucketen och Redshift-klustret finns i olika AWS-regioner
Som standard fungerar inte S3 <–> Redshift-kopior om S3-bucketen och Redshift-klustret finns i olika AWS-regioner.
Om du försöker läsa en Redshift-tabell när S3-bucketen finns i en annan region kan du se ett fel som:
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
På samma sätt kan försök att skriva till Redshift med en S3-bucket i en annan region orsaka följande fel:
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
Skriver: Redshift COPY-kommandot stöder explicit specifikation av S3-bucketregionen, så du kan göra så att skrivningar till Redshift fungerar korrekt i dessa fall genom att lägga
region 'the-region-name'
till iextracopyoptions
inställningen. Om du till exempel har en bucket i regionen USA östra (Virginia) och använder Scala-API:et, använd:.option("extracopyoptions", "region 'us-east-1'")
Du kan också använda inställningen
awsregion
:.option("awsregion", "us-east-1")
Läser: Redshift UNLOAD-kommandot stöder också specifik angivelse av S3-bucket-regionen. Du kan få avläsningarna att fungera korrekt genom att lägga till regionen i
awsregion
-inställningen..option("awsregion", "us-east-1")
Autentiseringsfel vid användning av ett lösenord med specialtecken i JDBC-url:en
Om du anger användarnamnet och lösenordet som en del av JDBC-url:en och lösenordet innehåller specialtecken som ;
, ?
eller &
, kan du se följande undantag:
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'
Detta orsakas av att specialtecken i användarnamnet eller lösenordet inte kan kringgås korrekt av JDBC-drivrutinen. Se till att ange användarnamnet och lösenordet med motsvarande DataFrame-alternativ user
och password
. Mer information finns i Parametrar.
Långvariga Spark-frågor låser sig på obestämd tid trots att motsvarande Redshift-åtgärd är klar
Om du läser eller skriver stora mängder data från och till Redshift kan Spark-frågan hänga sig på obestämd tid, även om sidan AWS Redshift Monitoring visar att motsvarande LOAD
eller UNLOAD
åtgärd har slutförts och att klustret är inaktivt. Detta orsakas av att anslutningen mellan Redshift och Spark går ut. För att undvika detta, se till att tcpKeepAlive
JDBC-flaggan är aktiverad och TCPKeepAliveMinutes
är inställd på ett lågt värde (till exempel 1).
Mer information finns i Amazon Redshift JDBC-drivrutinskonfiguration.
Tidsstämpel med tidszonssemantik
När du läser data mappas både Redshift TIMESTAMP
- och TIMESTAMPTZ
datatyper till Spark TimestampType
och ett värde konverteras till Coordinated Universal Time (UTC) och lagras som UTC-tidsstämpeln. För en Redshift TIMESTAMP
antas den lokala tidszonen användas eftersom värdet inte har någon tidszonsinformation. När du skriver data till en Redshift-tabell mappas en Spark TimestampType
till Redshift-datatypen TIMESTAMP
.
Migreringsguide
Datakällan kräver nu att du uttryckligen anger forward_spark_s3_credentials
innan Spark S3-autentiseringsuppgifterna vidarebefordras till Redshift. Den här ändringen påverkar inte om du använder autentiseringsmekanismerna aws_iam_role
eller temporary_aws_*
. Men om du förlitade dig på det gamla standardbeteendet måste du nu uttryckligen ange forward_spark_s3_credentials
till true
för att fortsätta använda din tidigare Redshift till S3-autentiseringsmekanism. En diskussion om de tre autentiseringsmekanismerna och deras säkerhetsvägningar finns i avsnittet Autentisera till S3 och Redshift i det här dokumentet.