Condividi tramite


Eseguire query nei database con JDBC

Azure Databricks supporta la connessione a database esterni tramite JDBC. Questo articolo fornisce la sintassi di base per la configurazione e l'uso di queste connessioni con esempi in Python, SQL e Scala.

Importante

Le configurazioni descritte in questo articolo sono sperimentali. Le caratteristiche sperimentali vengono fornite come sono e non sono supportate da Databricks tramite il supporto tecnico del cliente. Per ottenere il supporto completo della federazione delle query, è consigliabile usare invece Lakehouse Federation, che consente agli utenti di Azure Databricks di sfruttare la sintassi di Unity Catalog e gli strumenti di governance dei dati.

Partner Connect offre integrazioni ottimizzate per la sincronizzazione dei dati con molte origini dati esterne esterne. Vedere Che cos'è Databricks Partner Connect?.

Importante

Gli esempi in questo articolo non includono nomi utente e password negli URL JDBC. Databricks consiglia di usare i segreti per archiviare le credenziali del database. Ad esempio:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Per fare riferimento ai segreti di Databricks con SQL, è necessario configurare una proprietà di configurazione spark durante l'initilizzazione del cluster.

Per un esempio completo della gestione dei segreti, vedere Esercitazione: Creare e usare un segreto Databricks.

Leggere i dati con JDBC

È necessario configurare diverse impostazioni per leggere i dati usando JDBC. Si noti che ogni database usa un formato diverso per .<jdbc-url>

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark legge automaticamente lo schema dalla tabella di database e ne esegue il mapping ai tipi SPARK SQL.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

È possibile eseguire query su questa tabella JDBC:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Scrivere dati con JDBC

Il salvataggio di dati in tabelle con JDBC usa configurazioni simili per la lettura. Vedere l'esempio seguente:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Il comportamento predefinito tenta di creare una nuova tabella e genera un errore se esiste già una tabella con tale nome.

È possibile aggiungere dati a una tabella esistente usando la sintassi seguente:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

È possibile sovrascrivere una tabella esistente usando la sintassi seguente:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Controllare il parallelismo per le query JDBC

Per impostazione predefinita, il driver JDBC esegue una query sul database di origine con un solo thread. Per migliorare le prestazioni per le letture, è necessario specificare una serie di opzioni per controllare il numero di query simultanee eseguite da Azure Databricks nel database. Per i cluster di piccole dimensioni, l'impostazione dell'opzione numPartitions uguale al numero di core executor nel cluster garantisce che tutti i nodi eseguano query sui dati in parallelo.

Avviso

L'impostazione numPartitions di un valore elevato in un cluster di grandi dimensioni può comportare prestazioni negative per il database remoto, perché un numero eccessivo di query simultanee potrebbe sovraccaricare il servizio. Ciò è particolarmente problematico per i database dell'applicazione. Fare attenzione a impostare questo valore sopra 50.

Nota

Velocizzare le query selezionando una colonna con un indice calcolato nel database di origine per .partitionColumn

L'esempio di codice seguente illustra la configurazione del parallelismo per un cluster con otto core:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Nota

Azure Databricks supporta tutte le opzioni di Apache Spark per la configurazione di JDBC.

Durante la scrittura nei database tramite JDBC, Apache Spark usa il numero di partizioni in memoria per controllare il parallelismo. È possibile ripartizionare i dati prima di scrivere per controllare il parallelismo. Evitare un numero elevato di partizioni in cluster di grandi dimensioni per evitare di sovraccaricare il database remoto. Nell'esempio seguente viene illustrato il ripartizionamento in otto partizioni prima della scrittura:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Eseguire il push di una query nel motore di database

È possibile eseguire il push di un'intera query nel database e restituire solo il risultato. Il table parametro identifica la tabella JDBC da leggere. È possibile usare qualsiasi elemento valido in una clausola di query FROM SQL.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Numero di righe recuperate per query

I driver JDBC hanno un fetchSize parametro che controlla il numero di righe recuperate alla volta dal database remoto.

Impostazione Risultato
Troppo basso Latenza elevata dovuta a molti round trip (poche righe restituite per query)
Troppo alto Errore di memoria insufficiente (troppi dati restituiti in una query)

Il valore ottimale dipende dal carico di lavoro. Alcune considerazioni includono:

  • Quante colonne vengono restituite dalla query?
  • Quali tipi di dati vengono restituiti?
  • Quanto tempo vengono restituite le stringhe in ogni colonna?

I sistemi potrebbero avere un valore predefinito molto ridotto e trarre vantaggio dall'ottimizzazione. Ad esempio: l'impostazione predefinita fetchSize di Oracle è 10. Aumentandolo a 100 si riduce il numero di query totali che devono essere eseguite da un fattore pari a 10. I risultati di JDBC sono traffico di rete, quindi evitare numeri molto grandi, ma i valori ottimali potrebbero essere nelle migliaia per molti set di dati.

Usare l'opzione fetchSize , come nell'esempio seguente:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()