Partilhar via


Consultar bases de dados utilizando JDBC

O Azure Databricks dá suporte à conexão com bancos de dados externos usando JDBC. Este artigo fornece a sintaxe básica para configurar e usar esses connections com exemplos em Python, SQL e Scala.

Importante

As configurações descritas neste artigo são experimentais. Os recursos experimentais são fornecidos no estado em que se encontram e não são suportados pelo Databricks por meio do suporte técnico ao cliente. Para get suporte completo à federação de consultas, deves usar Lakehouse Federation, que permite que os utilizadores do Azure Databricks tirem proveito das ferramentas de sintaxe e de governança de dados do Unity Catalog.

O Partner Connect fornece integrações otimizadas para sincronizar dados com muitas fontes de dados externas externas. Consulte O que é o Databricks Partner Connect?.

Importante

Os exemplos neste artigo não incluem nomes de usuário e senhas em URLs JDBC. O Databricks recomenda o uso de segredos para armazenar seu banco de dados credentials. Por exemplo:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Para fazer referência a segredos do Databricks com SQL, você deve configurar uma propriedade de configuração do Spark durante a iniciação do cluster.

Para obter um exemplo completo de gerenciamento de segredos, consulte Tutorial: Criar e usar um segredo do Databricks.

Ler dados com JDBC

Você deve definir várias configurações para ler dados usando JDBC. Observe que cada banco de dados usa um formato diferente para o <jdbc-url>.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

O Spark lê automaticamente o schema do banco de dados table e mapeia seus tipos de volta para os tipos SQL do Spark.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Você pode executar consultas neste JDBC table:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Gravar dados com JDBC

Guardar dados em tables com JDBC usa configurações semelhantes às de leitura. Veja o seguinte exemplo:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

O comportamento padrão tenta criar um novo table e lança um erro se já existir um table com esse nome.

Você pode acrescentar dados a um table existente usando a seguinte sintaxe:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Você pode substituir um table existente usando a seguinte sintaxe:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Controlar paralelismo para consultas JDBC

Por padrão, o driver JDBC consulta o banco de dados de origem com apenas um único thread. Para melhorar o desempenho de leituras, você precisa especificar várias opções para controlar quantas consultas simultâneas o Azure Databricks faz ao seu banco de dados. Para clusters pequenos, definir a numPartitions opção igual ao número de núcleos executores no cluster garante que todos os nós consultem dados em paralelo.

Aviso

A configuração numPartitions de um valor alto em um cluster grande pode resultar em desempenho negativo para o banco de dados remoto, pois muitas consultas simultâneas podem sobrecarregar o serviço. Isso é especialmente problemático para bancos de dados de aplicativos. Tenha cuidado ao definir este valor acima de 50.

Nota

Acelere as consultas selecionando um column com um índice calculado no banco de dados de origem para o partitionColumn.

O exemplo de código a seguir demonstra a configuração do paralelismo para um cluster com oito núcleos:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Nota

O Azure Databricks suporta todas as opções do Apache Spark para configurar o JDBC.

Ao gravar em bancos de dados usando JDBC, o Apache Spark usa o número de partições na memória para controlar o paralelismo. Você pode reparticionar dados antes de gravar para controlar o paralelismo. Evite um grande número de partições em clusters grandes para evitar sobrecarregar seu banco de dados remoto. O exemplo a seguir demonstra o reparticionamento para oito partições antes de escrever:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Empurrar uma consulta para o mecanismo de banco de dados

Você pode enviar uma consulta inteira para o banco de dados e retornar apenas o resultado. O parâmetro table identifica o table JDBC a ser lido. Você pode usar qualquer coisa que seja válida em uma cláusula de consulta FROM SQL.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Controlar o número de linhas obtidas por consulta

Os drivers JDBC têm um fetchSize parâmetro que controla o número de linhas buscadas por vez no banco de dados remoto.

Definição Result
Muito baixo Alta latência devido a muitas viagens de ida e volta (poucas linhas retornadas por consulta)
Muito alto Erro de falta de memória (muitos dados retornados em uma consulta)

O valor ideal depende da carga de trabalho. As considerações incluem:

  • Quantos columns são retornados pela consulta?
  • Que tipos de dados são retornados?
  • Qual é o comprimento das cadeias de caracteres em cada column retornada?

Os sistemas podem ter um padrão muito pequeno e se beneficiar do ajuste. Por exemplo: o padrão fetchSize da Oracle é 10. Aumentar para 100 reduz o número total de consultas que precisam ser executadas por um fator de 10. Os resultados JDBC são tráfego de rede, portanto, evite números muito grandes, mas o values ideal pode estar na casa dos milhares para muitos conjuntos de dados.

Use a fetchSize opção, como no exemplo a seguir:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()