Eseguire query nei database con JDBC
Azure Databricks supporta la connessione a database esterni tramite JDBC. Questo articolo fornisce la sintassi di base per la configurazione e l'uso di queste connessioni con esempi in Python, SQL e Scala.
Importante
Le configurazioni descritte in questo articolo sono sperimentali. Le caratteristiche sperimentali vengono fornite come sono e non sono supportate da Databricks tramite il supporto tecnico del cliente. Per ottenere il supporto completo della federazione delle query, è consigliabile usare invece Lakehouse Federation, che consente agli utenti di Azure Databricks di sfruttare la sintassi di Unity Catalog e gli strumenti di governance dei dati.
Partner Connect offre integrazioni ottimizzate per la sincronizzazione dei dati con molte origini dati esterne esterne. Vedere Che cos'è Databricks Partner Connect?.
Importante
Gli esempi in questo articolo non includono nomi utente e password negli URL JDBC. Databricks consiglia di usare i segreti per archiviare le credenziali del database. Ad esempio:
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
Per fare riferimento ai segreti di Databricks con SQL, è necessario configurare una proprietà di configurazione spark durante l'initilizzazione del cluster.
Per un esempio completo della gestione dei segreti, vedere Esercitazione: Creare e usare un segreto Databricks.
Leggere i dati con JDBC
È necessario configurare diverse impostazioni per leggere i dati usando JDBC. Si noti che ogni database usa un formato diverso per .<jdbc-url>
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark legge automaticamente lo schema dalla tabella di database e ne esegue il mapping ai tipi SPARK SQL.
Python
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
È possibile eseguire query su questa tabella JDBC:
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
Scrivere dati con JDBC
Il salvataggio di dati in tabelle con JDBC usa configurazioni simili per la lettura. Vedere l'esempio seguente:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Il comportamento predefinito tenta di creare una nuova tabella e genera un errore se esiste già una tabella con tale nome.
È possibile aggiungere dati a una tabella esistente usando la sintassi seguente:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
È possibile sovrascrivere una tabella esistente usando la sintassi seguente:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
Controllare il parallelismo per le query JDBC
Per impostazione predefinita, il driver JDBC esegue una query sul database di origine con un solo thread. Per migliorare le prestazioni per le letture, è necessario specificare una serie di opzioni per controllare il numero di query simultanee eseguite da Azure Databricks nel database. Per i cluster di piccole dimensioni, l'impostazione dell'opzione numPartitions
uguale al numero di core executor nel cluster garantisce che tutti i nodi eseguano query sui dati in parallelo.
Avviso
L'impostazione numPartitions
di un valore elevato in un cluster di grandi dimensioni può comportare prestazioni negative per il database remoto, perché un numero eccessivo di query simultanee potrebbe sovraccaricare il servizio. Ciò è particolarmente problematico per i database dell'applicazione. Fare attenzione a impostare questo valore sopra 50.
Nota
Velocizzare le query selezionando una colonna con un indice calcolato nel database di origine per .partitionColumn
L'esempio di codice seguente illustra la configurazione del parallelismo per un cluster con otto core:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Nota
Azure Databricks supporta tutte le opzioni di Apache Spark per la configurazione di JDBC.
Durante la scrittura nei database tramite JDBC, Apache Spark usa il numero di partizioni in memoria per controllare il parallelismo. È possibile ripartizionare i dati prima di scrivere per controllare il parallelismo. Evitare un numero elevato di partizioni in cluster di grandi dimensioni per evitare di sovraccaricare il database remoto. Nell'esempio seguente viene illustrato il ripartizionamento in otto partizioni prima della scrittura:
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Eseguire il push di una query nel motore di database
È possibile eseguire il push di un'intera query nel database e restituire solo il risultato. Il table
parametro identifica la tabella JDBC da leggere. È possibile usare qualsiasi elemento valido in una clausola di query FROM
SQL.
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
Numero di righe recuperate per query
I driver JDBC hanno un fetchSize
parametro che controlla il numero di righe recuperate alla volta dal database remoto.
Impostazione | Risultato |
---|---|
Troppo basso | Latenza elevata dovuta a molti round trip (poche righe restituite per query) |
Troppo alto | Errore di memoria insufficiente (troppi dati restituiti in una query) |
Il valore ottimale dipende dal carico di lavoro. Alcune considerazioni includono:
- Quante colonne vengono restituite dalla query?
- Quali tipi di dati vengono restituiti?
- Quanto tempo vengono restituite le stringhe in ogni colonna?
I sistemi potrebbero avere un valore predefinito molto ridotto e trarre vantaggio dall'ottimizzazione. Ad esempio: l'impostazione predefinita fetchSize
di Oracle è 10. Aumentandolo a 100 si riduce il numero di query totali che devono essere eseguite da un fattore pari a 10. I risultati di JDBC sono traffico di rete, quindi evitare numeri molto grandi, ma i valori ottimali potrebbero essere nelle migliaia per molti set di dati.
Usare l'opzione fetchSize
, come nell'esempio seguente:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()