Dela via


Fråga Amazon Redshift med Azure Databricks

Du kan läsa och skriva tabeller från Amazon Redshift med Azure Databricks.

Viktigt!

De konfigurationer som beskrivs i den här artikeln är Experimentella. Experimentella funktioner tillhandahålls som de är och stöds inte av Databricks via teknisk kundsupport. För att få fullständigt stöd för frågefederation bör du i stället använda Lakehouse Federation, vilket gör att dina Azure Databricks-användare kan dra nytta av Unity Catalog-syntaxen och datastyrningsverktygen.

Databricks Redshift-datakällan använder Amazon S3 för att effektivt överföra data till och från Redshift och använder JDBC för att automatiskt utlösa lämpliga COPY kommandon och UNLOAD kommandon på Redshift.

Anteckning

I Databricks Runtime 11.3 LTS och senare innehåller Databricks Runtime Redshift JDBC-drivrutinen, som är tillgänglig med hjälp av nyckelordet redshift för formatalternativet. Se Versionsinformation och kompatibilitet för Databricks Runtime för drivrutinsversioner som ingår i varje Databricks Runtime. Drivrutiner som tillhandahålls av användaren stöds fortfarande och har företräde framför den paketerade JDBC-drivrutinen.

I Databricks Runtime 10.4 LTS och nedan krävs manuell installation av Redshift JDBC-drivrutinen, och frågor bör använda drivrutinen (com.databricks.spark.redshift) för formatet. Se Installation av Redshift-drivrutin.

Förbrukning

Följande exempel visar hur du ansluter med Redshift-drivrutinen. url Ersätt parametervärdena om du använder PostgreSQL JDBC-drivrutinen.

När du har konfigurerat dina AWS-autentiseringsuppgifter kan du använda datakällan med Spark-datakällans API i Python, SQL, R eller Scala.

Viktigt!

Externa platser som definieras i Unity Catalog stöds inte som tempdir platser.

Python

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

SQL

Läs data med SQL på Databricks Runtime 10.4 LTS och tidigare versioner.

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Läsa data med SQL i Databricks Runtime 11.3 LTS och senare:


DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Skriva data med SQL:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

SQL-API:et stöder endast skapandet av nya tabeller och skriver inte över eller lägger till.

R

Läs data i R på Databricks Runtime 10.4 LTS och tidigare versioner.

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Läs data med R på Databricks Runtime 11.3 LTS och senare

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")

Scala

// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

Rekommendationer för att arbeta med Redshift

Utförandet av en fråga kan extrahera stora mängder data till S3. Om du planerar att utföra flera frågor mot samma data i Redshift rekommenderar Databricks att du sparar extraherade data med Delta Lake.

Konfiguration

Autentisera till S3 och Redshift

Datakällan omfattar flera nätverksanslutningar, vilket illustreras i följande diagram:

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

Datakällan läser och skriver data till S3 när data överförs till/från Redshift. Därför krävs AWS-autentiseringsuppgifter med läs- och skrivåtkomst till en S3-bucket (anges med hjälp av konfigurationsparametern tempdir ).

Kommentar

Datakällan rensar inte de temporära filer som skapas i S3. Därför rekommenderar vi att du använder en dedikerad tillfällig S3-bucket med en objektlivscykelkonfiguration för att säkerställa att temporära filer tas bort automatiskt efter en angiven giltighetstid. Se avsnittet Kryptering i det här dokumentet för en diskussion om hur du krypterar dessa filer. Du kan inte använda en extern plats som definierats i Unity Catalog som en tempdir plats.

I följande avsnitt beskrivs konfigurationsalternativen för varje anslutning:

Spark-drivrutin för Redshift

Spark-drivrutinen ansluter till Redshift via JDBC med ett användarnamn och lösenord. Redshift stöder inte användning av IAM-roller för att autentisera den här anslutningen. Som standard använder den här anslutningen SSL-kryptering. Mer information finns i Kryptering.

Spark till S3

S3 fungerar som mellanhand för att lagra massdata vid läsning från eller skrivning till Redshift. Spark ansluter till S3 med både Hadoop FileSystem-gränssnitten och direkt med Amazon Java SDK:s S3-klient.

Anmärkning

Du kan inte använda DBFS-monteringar för att konfigurera åtkomst till S3 för Redshift.

  • Ange nycklar i Hadoop-konfiguration: Du kan ange AWS-nycklar med hjälp av Hadoop-konfigurationsegenskaper. Om konfigurationen tempdir pekar på ett s3a:// filsystem kan du ange fs.s3a.access.key egenskaperna och fs.s3a.secret.key i en Hadoop XML-konfigurationsfil eller anropa sc.hadoopConfiguration.set() för att konfigurera Sparks globala Hadoop-konfiguration. Om du använder ett s3n:// filsystem kan du ange de äldre konfigurationsnycklarna enligt följande exempel.

    Scala

    Om du till exempel använder s3a filsystemet lägger du till:

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    För det äldre s3n filsystemet lägger du till:

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    
    Python

    Följande kommando förlitar sig på vissa interna Spark-objekt, men bör fungera med alla PySpark-versioner och kommer sannolikt inte att ändras i framtiden:

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

Redshift till S3

Ange alternativet forward_spark_s3_credentials till true för att automatiskt vidarebefordra autentiseringsuppgifterna för AWS-nyckeln som Spark använder för att ansluta till S3 via JDBC till Redshift. JDBC-frågan bäddar in dessa autentiseringsuppgifter så därför rekommenderar Databricks starkt att du aktiverar SSL-kryptering för JDBC-anslutningen.

Kryptering

  • Skydda JDBC: Om inga SSL-relaterade inställningar finns i JDBC-URL:en aktiverar datakällan som standard SSL-kryptering och verifierar även att Redshift-servern är tillförlitlig (det vill: sslmode=verify-full). För det laddas ett servercertifikat automatiskt ned från Amazon-servrarna första gången det behövs. Om det misslyckas används en i förväg paketerad certifikatfil som reserv. Detta gäller för både Redshift- och PostgreSQL JDBC-drivrutinerna.

    Om det finns problem med den här funktionen, eller om du helt enkelt vill inaktivera SSL, kan du anropa .option("autoenablessl", "false") på din DataFrameReader eller DataFrameWriter.

    Om du vill ange anpassade SSL-relaterade inställningar kan du följa anvisningarna i Redshift-dokumentationen: Användning av SSL- och servercertifikat i Java - och JDBC-drivrutinskonfigurationsalternativ Alla SSL-relaterade alternativ som finns i JDBC url som används med datakällan har företräde (det vill s. den automatiska konfigurationen utlöses inte).

  • Kryptera UNLOAD-data som lagras i S3 (data som lagras vid läsning från Redshift): Enligt Redshift-dokumentationen om att avlasta data till S3 avlastar UNLOAD automatiskt datafiler med server-side kryptering via Amazon S3 (SSE-S3).

    Redshift stöder också kryptering på klientsidan med en anpassad nyckel (se: Ta bort krypterade datafiler) men datakällan saknar möjlighet att ange den symmetriska nyckel som krävs.

  • Kryptera COPY-data som lagras i S3 (data som lagras när de skrivs till Redshift): Enligt Redshift-dokumentationen om inläsning av krypterade datafiler från Amazon S3:

Du kan använda COPY kommandot för att läsa in datafiler som laddats upp till Amazon S3 med hjälp av kryptering på serversidan med AWS-hanterade krypteringsnycklar (SSE-S3 eller SSE-KMS), kryptering på klientsidan eller både och. COPY stöder inte Amazon S3-kryptering på serversidan med en kundtilldelad nyckel (SSE-C).

Parametrar

Parameterkartan eller ALTERNATIV som anges i Spark SQL stöder följande inställningar:

Parameter Obligatoriskt Standardvärde beskrivning
dbtabell Ja, såvida inte frågan har angetts. Ingen Tabellen som ska skapas eller läsas från i Redshift. Den här parametern krävs när du sparar data tillbaka till Redshift.
query Ja, såvida inte dbtable har angetts. Ingen Frågan som ska läsas från i Redshift.
användare Nej Ingen Redshift-användarnamnet. Måste användas tillsammans med lösenordsalternativet. Kan endast användas om användaren och lösenordet inte skickas i URL:en, vilket resulterar i ett fel när båda skickas. Använd den här parametern när användarnamnet innehåller specialtecken som måste vara undantagna.
lösenord Nej Ingen Redshift-lösenordet. Måste användas tillsammans med user alternativet . Kan endast användas om användaren och lösenordet inte skickas i URL:en. att skicka båda resulterar i ett fel. Använd den här parametern när lösenordet innehåller specialtecken som måste tas bort.
URL Ja Ingen En URL för JDBC, i formatet
jdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>
subprotocol kan vara postgresql eller redshift, beroende på vilken JDBC-drivrutin du har läst in. En Redshift-kompatibel drivrutin måste finnas på klassens sökväg och matcha den här URL:en. host och port bör peka på Redshift-huvudnoden, så säkerhetsgrupper och/eller VPC måste konfigureras för att tillåta åtkomst från ditt drivrutinsprogram. database identifierar ett Redshift-databasnamn user och password är autentiseringsuppgifter för att komma åt databasen, som måste bäddas in i den här URL:en för JDBC, och ditt användarkonto bör ha nödvändiga behörigheter för tabellen som refereras till.
sökväg Nej Ingen Ange sökväg för schemasökning i Redshift. Anges med kommandot SET search_path to . Bör vara en kommaavgränsad lista med schemanamn att söka efter tabeller i. Se Redshift-dokumentationen för search_path.
aws_iam_role Endast om du använder IAM-roller för att auktorisera. Ingen Fullständigt angiven ARN för IAM Redshift COPY/UNLOAD-åtgärdsrollen som är kopplad till Redshift-klustret, till exempel arn:aws:iam::123456789000:role/<redshift-iam-role>.
vidarebefordra_spark_s3_behörigheter Nej false Om trueidentifierar datakällan automatiskt de autentiseringsuppgifter som Spark använder för att ansluta till S3 och vidarebefordrar dessa autentiseringsuppgifter till Redshift över JDBC. Dessa autentiseringsuppgifter skickas som en del av JDBC-frågan, så därför rekommenderar vi starkt att du aktiverar SSL-kryptering för JDBC-anslutningen när du använder det här alternativet.
temporary_aws_access_key_id Nej Ingen AWS-åtkomstnyckeln måste ha skrivbehörighet till S3-bucketen.
tillfällig_aws_hemlig_åtkomstnyckel Nej Ingen AWS-hemlig åtkomstnyckel som motsvarar den angivna åtkomstnyckeln.
temporary_aws_session_token (tillfällig AWS-sessionstoken) Nej Ingen AWS-sessionstoken som motsvarar den angivna åtkomstnyckeln.
tempdir Ja Ingen En skrivbar plats i Amazon S3 som ska användas för oladdade data vid läsning och Avro-data som ska läsas in i Redshift vid skrivning. Om du använder Redshift-datakällan för Spark som en del av en vanlig ETL-pipeline kan det vara användbart att ange en livscykelprincip på en bucket och använda den som en temporär plats för dessa data.
Du kan inte använda externa platser som definierats i Unity Catalog som tempdir platser.
jdbcdriver Nej Bestäms av JDBC-URL:ens underprotokol. Klassnamnet på den JDBC-drivrutin som ska användas. Den här klassen måste finnas på klassökvägen. I de flesta fall bör det inte vara nödvändigt att ange det här alternativet, eftersom det lämpliga drivrutinsklassnamnet automatiskt bör fastställas av JDBC-URL:ens underprotokol.
diststyle Nej EVEN Redshift-distributionsformatet som ska användas när du skapar en tabell. Kan vara ett av EVEN, KEY eller ALL (se Redshift-dokument). När du använder KEYmåste du också ange en distributionsnyckel med alternativet distkey.
distkey Nej, om du inte använder DISTSTYLE KEY Ingen Namnet på en kolumn i tabellen som ska användas som distributionsnyckel när du skapar en tabell.
sorteringsnyckelspecifikation Nej Ingen En fullständig Redshift-sorteringsnyckeldefinition. Exempel:
  • SORTKEY(my_sort_column)
  • COMPOUND SORTKEY(sort_col_1, sort_col_2)
  • INTERLEAVED SORTKEY(sort_col_1, sort_col_2)
usestagingtable (inaktuell) Nej true Inställning av detta inaktuella alternativ till false orsakar att en överskrivningsåtgärds måltabell tas bort omedelbart i början av skrivningen, vilket gör överskrivningsåtgärden icke-atomisk, och minskar tillgängligheten för måltabellen. Detta kan minska de tillfälliga diskutrymmeskraven för överskrivningar.
Eftersom inställningen usestagingtable=false av åtgärden riskerar dataförlust eller otillgänglighet är den föråldrad och det rekommenderas att du manuellt tar bort måltabellen.
beskrivning Nej Ingen En beskrivning av tabellen. Kommer att ställas in med hjälp av SQL COMMENT-kommandot och bör visas i de flesta frågeverktyg. Se också description-metadata för att ange beskrivningar för enskilda kolumner.
föråtgärder Nej Ingen En ; avgränsad lista över SQL-kommandon som ska köras innan COPY kommandot laddas. Det kan vara bra att köra vissa DELETE kommandon eller liknande här innan du läser in nya data. Om kommandot innehåller %s, formateras tabellnamnet innan körningen (om du använder en mellanlagringstabell).
Varnas för att om dessa kommandon misslyckas behandlas det som ett fel och ett undantag genereras. Om du använder en mellanlagringstabell återställs ändringarna och säkerhetskopieringstabellen återställs om föråtgärder misslyckas.
efteråtgärder Nej Ingen En ; avgränsad lista över SQL-kommandon som ska köras efter en lyckad COPY inläsning av data. Det kan vara användbart att vissa GRANT kommandon eller liknande körs här när du läser in nya data. Om kommandot innehåller %s formateras tabellnamnet före körningen (om du använder en mellanlagringstabell).
Varnas för att om dessa kommandon misslyckas behandlas det som ett fel och ett undantag genereras. Om du använder en mellanlagringstabell återställs ändringarna och säkerhetskopieringstabellen återställs om efteråtgärder misslyckas.
extra kopieringsalternativ Nej Ingen En lista över extra alternativ för att lägga till redshift-kommandot COPY vid inläsning av data, till exempel TRUNCATECOLUMNS eller MAXERROR n (se Redshift-dokument för andra alternativ).
Eftersom dessa alternativ läggs till i slutet av COPY kommandot kan endast alternativ som är lämpliga i slutet av kommandot användas, men det bör omfatta de flesta möjliga användningsfall.
tempformat Nej AVRO Formatet där du kan spara temporära filer i S3 när du skriver till Redshift. Standardvärdet är AVRO; de andra tillåtna värdena är CSV och CSV GZIP för CSV respektive gzipped CSV.
Redshift är betydligt snabbare vid inläsning av CSV än vid inläsning av Avro-filer, så användning av det tempformatet kan ge en stor prestandaökning när du skriver till Redshift.
csvnullstring Nej @NULL@ Strängvärdet som ska skrivas för null när du använder CSV-tempformatet. Detta bör vara ett värde som inte visas i dina faktiska data.
CSV-avgränsare Nej , Avgränsare som ska användas när du skriver temporära filer med tempformat inställt på CSV eller CSV GZIP. Detta måste vara ett giltigt ASCII-tecken, till exempel "," eller "\|".
csvignoreleadingwhitespace Nej true När värdet är true tas inledande blanksteg bort från värden under skrivningar när tempformat är inställt på CSV eller CSV GZIP. Annars behålls tomt utrymme.
csvignoretrailingwhitespace Nej true När värdet är true tar bort avslutande blanksteg från värden under skrivningar när tempformat är inställt på CSV eller CSV GZIP. Annars behålls tomt utrymme.
infer_timestamp_ntz_type Nej false Om truetolkas värden av typen Redshift TIMESTAMP som TimestampNTZType (tidsstämpel utan tidszon) under läsningar. Annars tolkas alla tidsstämplar som TimestampType oavsett typ i den underliggande Redshift-tabellen.

Ytterligare konfigurationsalternativ

Konfigurera den maximala storleken på strängkolumner

När du skapar Redshift-tabeller är standardbeteendet att skapa TEXT kolumner för strängkolumner. Redshift lagrar TEXT kolumner som VARCHAR(256), så dessa kolumner har en maximal storlek på 256 tecken (källa).

Om du vill ha stöd för större kolumner kan du använda kolumnmetadatafältet maxlength för att ange den maximala längden på enskilda strängkolumner. Detta är också användbart för att implementera utrymmesbesparande prestandaoptimeringar genom att deklarera kolumner med en mindre maximal längd än standardvärdet.

Kommentar

På grund av begränsningar i Spark stöder SQL- och R-språk-API:erna inte ändringar av kolumnmetadata.

Python

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Scala

Här är ett exempel på hur du uppdaterar flera kolumners metadatafält med Sparks Scala API:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

Ange en anpassad kolumntyp

Om du behöver ange en kolumntyp manuellt kan du använda kolumnmetadata redshift_type . Om du till exempel vill åsidosätta Spark SQL Schema -> Redshift SQL typen matchare för att tilldela en användardefinierad kolumntyp kan du göra följande:

Python

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Scala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

Konfigurera kolumnkodning

När du skapar en tabell använder du encoding fältet kolumnmetadata för att ange en komprimeringskodning för varje kolumn (se Amazon-dokument för tillgängliga kodningar).

Ange beskrivningar för kolumner

Redshift tillåter att kolumner har bifogade beskrivningar som ska visas i de flesta frågeverktyg (med kommandot COMMENT ). Du kan ange kolumnmetadatafältet description för att ange en beskrivning för enskilda kolumner.

Skicka frågor till Redshift

Spark-optimeraren kör ner följande operatorer till Redshift:

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

I Project och Filterstöder den följande uttryck:

  • De flesta booleska logikoperatorer
  • Jämförelser
  • Grundläggande aritmetiska åtgärder
  • Numeriska och stränggjutningar
  • De flesta strängfunktioner
  • Skalär underfrågor, om de kan flyttas ner helt till Redshift.

Kommentar

Denna pushdown stöder inte uttryck som opererar på datum och tidsangivelser.

I Aggregationstöder den följande aggregeringsfunktioner:

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

tillsammans med klausulen DISTINCT , i förekommande fall.

I Joinstöder den följande typer av kopplingar:

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • Underfrågor som optimeraren skriver om till Join, t.ex. WHERE EXISTS, WHERE NOT EXISTS

Kommentar

Join pushdown stöder inte FULL OUTER JOIN.

Pushdown kan vara mest fördelaktigt i sökfrågor med LIMIT. En frågeställning som SELECT * FROM large_redshift_table LIMIT 10 kan ta mycket lång tid eftersom hela tabellen först skulle avläsas till S3 som ett intermediärt resultat. Med pushdown körs LIMIT i Redshift. I förfrågningar med aggregeringar bidrar det att skjuta ner aggregeringen till Redshift också till att minska mängden data som behöver överföras.

Pushdown för frågor till Redshift är aktiverat som standard. Det kan inaktiveras genom att ange spark.databricks.redshift.pushdown till false. Även när det är inaktiverat trycker Spark fortfarande ned filter och utför kolumneliminering till Redshift.

Installation av Redshift-drivrutin

Redshift-datakällan kräver också en Redshift-kompatibel JDBC-drivrutin. Eftersom Redshift baseras på PostgreSQL-databassystemet kan du använda PostgreSQL JDBC-drivrutinen som ingår i Databricks Runtime eller Amazons rekommenderade Redshift JDBC-drivrutin. Ingen installation krävs för att använda PostgreSQL JDBC-drivrutinen. Versionen av PostgreSQL JDBC-drivrutinen som ingår i varje Databricks Runtime-utgåva listas i release-anteckningar för Databricks Runtime.

Installera Redshift JDBC-drivrutinen manuellt:

  1. Ladda ned drivrutinen från Amazon.
  2. Ladda upp drivrutinen till din Azure Databricks-arbetsyta. Se Bibliotek.
  3. Installera biblioteket i klustret.

Kommentar

Databricks rekommenderar att du använder den senaste versionen av Redshift JDBC-drivrutinen. Versioner av Redshift JDBC-drivrutinen under 1.2.41 har följande begränsningar:

  • Version 1.2.16 av drivrutinen returnerar tomma data när du använder en where sats i en SQL-fråga.
  • Versioner av drivrutinen under 1.2.41 kan returnera ogiltiga resultat eftersom en kolumns nullbarhet felaktigt rapporteras som "Inte nullbar" i stället för "Okänd".

Transaktionsgarantier

I det här avsnittet beskrivs transaktionsgarantierna för Redshift-datakällan för Spark.

Allmän bakgrund för redshift- och S3-egenskaper

Allmän information om Redshift-transaktionsgarantier finns i avsnittet Hantera samtidiga skrivåtgärder i Redshift-dokumentationen. I ett nötskal tillhandahåller Redshift serialiserbar isolering enligt dokumentationen för Kommandot Redshift BEGIN :

[även om] du kan använda någon av de fyra transaktionsisoleringsnivåerna bearbetar Amazon Redshift alla isoleringsnivåer som serialiserbara.

Enligt Redshift-dokumentationen:

Amazon Redshift har stöd för ett standardbeteende för automatisk incheckning där varje separat körd SQL-kommando checkar in individuellt.

Därför är enskilda kommandon som COPY och UNLOAD atomiska och transaktionella, samtidigt som de är explicita BEGIN och END bör endast vara nödvändiga för att framtvinga atomiteten för flera kommandon eller frågor.

När du läser från och skriver till Redshift använder datakällan S3 för att läsa och skriva data. Både Spark och Redshift producerar partitionerade utdata och lagrar dem i flera filer i S3. Enligt dokumentationen om Amazon S3 Data Consistency Model är S3-bucketlistningsåtgärder så småningom konsekventa, så filerna måste gå till särskilda längder för att undvika saknade eller ofullständiga data på grund av den här källan för slutlig konsekvens.

Garantier för Redshift-datakällan för Spark

Lägg till i en befintlig tabell

När du infogar rader i Redshift använder datakällan kommandot COPY och anger manifest för att skydda mot vissa S3-åtgärder som så småningom är konsekventa. Därför spark-redshift har tillägg till befintliga tabeller samma atomiska och transaktionella egenskaper som vanliga Redshift-kommandon COPY .

Skapa en ny tabell (SaveMode.CreateIfNotExists)

Att skapa en ny tabell är en tvåstegsprocess som består av ett CREATE TABLE kommando följt av ett COPY-kommando för att lägga till den första uppsättningen rader. Båda åtgärderna utförs i samma transaktion.

Skriv över en befintlig tabell

Som standard använder datakällan transaktioner för att utföra överskrivningar, som implementeras genom att ta bort måltabellen, skapa en ny tom tabell och lägga till rader i den.

Om den inaktuella usestagingtable inställningen är satt till false, bekräftar datakällan DELETE TABLE-kommandot innan raderna läggs till i den nya tabellen, vilket offrar atomiciteten i överskrivningen men minskar mängden mellanlagringsutrymme som Redshift behöver under överskrivningen.

Fråga Redshift-tabellen

Frågor använder kommandot Redshift UNLOAD för att köra en fråga och spara resultatet till S3 och använda manifest för att skydda mot vissa S3-åtgärder som så småningom är konsekventa. Därför bör frågor från Redshift-datakällan för Spark ha samma konsekvensegenskaper som vanliga Redshift-frågor.

Vanliga problem och lösningar

S3-bucketen och Redshift-klustret finns i olika AWS-regioner

Som standard fungerar inte S3 <–> Redshift-kopior om S3-bucketen och Redshift-klustret finns i olika AWS-regioner.

Om du försöker läsa en Redshift-tabell när S3-bucketen finns i en annan region kan du se ett fel som:

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

På samma sätt kan försök att skriva till Redshift med en S3-bucket i en annan region orsaka följande fel:

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • Skriver: Redshift COPY-kommandot stöder explicit specifikation av S3-bucketregionen, så du kan göra så att skrivningar till Redshift fungerar korrekt i dessa fall genom att lägga region 'the-region-name' till i extracopyoptions inställningen. Om du till exempel har en bucket i regionen USA östra (Virginia) och använder Scala-API:et, använd:

    .option("extracopyoptions", "region 'us-east-1'")
    

    Du kan också använda inställningen awsregion :

    .option("awsregion", "us-east-1")
    
  • Läser: Redshift UNLOAD-kommandot stöder också specifik angivelse av S3-bucket-regionen. Du kan få avläsningarna att fungera korrekt genom att lägga till regionen i awsregion-inställningen.

    .option("awsregion", "us-east-1")
    

Autentiseringsfel vid användning av ett lösenord med specialtecken i JDBC-url:en

Om du anger användarnamnet och lösenordet som en del av JDBC-url:en och lösenordet innehåller specialtecken som ;, ?eller &, kan du se följande undantag:

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

Detta orsakas av att specialtecken i användarnamnet eller lösenordet inte kan kringgås korrekt av JDBC-drivrutinen. Se till att ange användarnamnet och lösenordet med motsvarande DataFrame-alternativ user och password. Mer information finns i Parametrar.

Långvariga Spark-frågor låser sig på obestämd tid trots att motsvarande Redshift-åtgärd är klar

Om du läser eller skriver stora mängder data från och till Redshift kan Spark-frågan hänga sig på obestämd tid, även om sidan AWS Redshift Monitoring visar att motsvarande LOAD eller UNLOAD åtgärd har slutförts och att klustret är inaktivt. Detta orsakas av att anslutningen mellan Redshift och Spark går ut. För att undvika detta, se till att tcpKeepAlive JDBC-flaggan är aktiverad och TCPKeepAliveMinutes är inställd på ett lågt värde (till exempel 1).

Mer information finns i Amazon Redshift JDBC-drivrutinskonfiguration.

Tidsstämpel med tidszonssemantik

När du läser data mappas både Redshift TIMESTAMP - och TIMESTAMPTZ datatyper till Spark TimestampTypeoch ett värde konverteras till Coordinated Universal Time (UTC) och lagras som UTC-tidsstämpeln. För en Redshift TIMESTAMP antas den lokala tidszonen användas eftersom värdet inte har någon tidszonsinformation. När du skriver data till en Redshift-tabell mappas en Spark TimestampType till Redshift-datatypen TIMESTAMP .

Migreringsguide

Datakällan kräver nu att du uttryckligen anger forward_spark_s3_credentials innan Spark S3-autentiseringsuppgifterna vidarebefordras till Redshift. Den här ändringen påverkar inte om du använder autentiseringsmekanismerna aws_iam_role eller temporary_aws_* . Men om du förlitade dig på det gamla standardbeteendet måste du nu uttryckligen ange forward_spark_s3_credentials till true för att fortsätta använda din tidigare Redshift till S3-autentiseringsmekanism. En diskussion om de tre autentiseringsmekanismerna och deras säkerhetsvägningar finns i avsnittet Autentisera till S3 och Redshift i det här dokumentet.