Condividi tramite


Velocizzare l'analisi di Big Data in tempo reale con il connettore Spark

Si applica a:Database SQL di AzureIstanza gestita di SQL di Azure

Nota

A partire da settembre 2020, questo connettore non è più mantenuto attivamente. Tuttavia, Apache Spark Connector per SQL Server e Azure SQL è ora disponibile, con il supporto per i binding Python e R, un'interfaccia più semplice da usare per l'inserimento di dati in blocco e molti altri miglioramenti. È consigliabile valutare e usare il nuovo connettore anziché questo. Le informazioni sul connettore precedente (questa pagina) vengono conservate solo a scopo di archiviazione.

Il connettore Spark consente ai database in Azure SQL Database, Istanza gestita di Azure SQL e SQL Server di fungere da origine dati di input o destinazione dati di output per i compiti Spark. È possibile utilizzare dati transazionali in tempo reale nel processo di big data analytics e mantenere i risultati per query ad hoc o per la generazione di report. Rispetto al connettore JDBC incorporato, questo connettore offre la possibilità di inserire dati in grandi quantità nel proprio database. Può superare l'inserimento riga per riga con prestazioni 10x a 20x più veloci. Il connettore Spark supporta l'autenticazione con Microsoft Entra ID (in precedenza Azure Active Directory) per connettersi a database SQL di Azure e Istanza gestita di SQL di Azure, consentendo di connettere il database da Azure Databricks usando l'account Microsoft Entra. Fornisce interfacce simili al connettore JDBC incorporato. È facile eseguire la migrazione dei processi Spark esistenti per l'utilizzo di questo nuovo connettore.

Nota

Microsoft Entra ID era precedentemente conosciuto come Azure Active Directory (Azure AD).

Scaricare e compilare un connettore Spark

Il repository GitHub per il vecchio connettore precedentemente collegato da questa pagina non viene gestito attivamente. È invece consigliabile valutare e usare il nuovo connettore.

Versioni supportate ufficialmente

Componente Versione
Apache Spark 2.0.2 o versione successiva
Scala 2.10 o versione successiva
Driver Microsoft JDBC per SQL Server 6.2 o versione successiva
Microsoft SQL Server SQL Server 2008 o versione successiva
Database SQL di Azure Supportato
Istanza gestita di SQL di Azure Supportata

Il connettore Spark utilizza Microsoft JDBC Driver per SQL Server per trasferire dati tra i nodi di lavoro Spark e i database SQL:

Il flusso di dati è il seguente:

  1. Il nodo principale Spark si connette al database SQL o a SQL Server e carica i dati da una tabella specifica o utilizzando una query SQL specifica.
  2. Il nodo principale Spark distribuisce i dati ai nodi di lavoro per la trasformazione.
  3. Il nodo di lavoro si connette a database che a loro volta si collegano al database SQL e a SQL Server e scrive i dati nel database. L'utente può scegliere se utilizzare l'inserimento riga per riga o l'inserimento in blocco.

Il diagramma seguente illustra il flusso di dati.

Il diagramma mostra il flusso descritto, con un nodo master che si connette direttamente al database e si connette a tre nodi di lavoro, che si connettono al database.

Creare il connettore Spark

Attualmente il progetto del connettore usa Maven. Per creare il connettore senza le dipendenze, è possibile eseguire:

  • mvn clean package (esegue una compilazione pulita e crea il pacchetto del progetto)
  • Scaricare le versioni più recenti del file JAR dalla cartella release
  • Includere il JAR del database SQL Spark

Effettuare la connessione e leggere i dati usando il connettore Spark

È possibile connettersi al database SQL e SQL Server dai processi di Spark per leggere o scrivere dati. È anche possibile eseguire una query DML o DDL nei database all'interno di SQL Database e di SQL Server.

Leggere dati da Azure SQL o SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "dbTable"        -> "dbo.Clients",
  "user"           -> "username",
  "password"       -> "*********",
  "connectTimeout" -> "5", //seconds
  "queryTimeout"   -> "5"  //seconds
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Leggere i dati da Azure SQL e SQL Server con una query SQL specificata

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "queryCustom"  -> "SELECT TOP 100 * FROM dbo.Clients WHERE PostalCode = 98074" //Sql query
  "user"         -> "username",
  "password"     -> "*********",
))

//Read all data in table dbo.Clients
val collection = sqlContext.read.sqlDB(config)
collection.show()

Scrivere i dati in Azure SQL e SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

// Acquire a DataFrame collection (val collection)

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "dbTable"      -> "dbo.Clients",
  "user"         -> "username",
  "password"     -> "*********"
))

import org.apache.spark.sql.SaveMode
collection.write.mode(SaveMode.Append).sqlDB(config)

Eseguire una query DML o DDL in Azure SQL o SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
val query = """
              |UPDATE Customers
              |SET ContactName = 'Alfred Schmidt', City = 'Frankfurt'
              |WHERE CustomerID = 1;
            """.stripMargin

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "user"         -> "username",
  "password"     -> "*********",
  "queryCustom"  -> query
))

sqlContext.sqlDBQuery(config)

Connettersi da Spark utilizzando l'autenticazione di Microsoft Entra

È possibile connettersi a Database SQL e Istanza SQL gestita usando l'autenticazione di Microsoft Entra. Usare l'autenticazione di Microsoft Entra per gestire centralmente le identità degli utenti del database e come alternativa all'autenticazione di SQL.

Connessione tramite la modalità di autenticazione ActiveDirectoryPassword

Requisiti di installazione

Se si utilizza la modalità di autenticazione ActiveDirectoryPassword, è necessario scaricare microsoft-authentication-library-for-java e le relative dipendenze e includerli nel percorso di compilazione Java.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "user"           -> "username",
  "password"       -> "*********",
  "authentication" -> "ActiveDirectoryPassword",
  "encrypt"        -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Connessione tramite token di accesso

Requisiti di installazione

Se si utilizza la modalità di autenticazione basata su token di accesso, è necessario scaricare microsoft-authentication-library-for-java e le relative dipendenze e includerli nel percorso di compilazione Java.

Consulta l'uso dell'autenticazione Microsoft Entra per sapere come ottenere un token di accesso per il database nel database SQL di Azure o nell'Istanza gestita di Azure SQL.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"                   -> "mysqlserver.database.windows.net",
  "databaseName"          -> "MyDatabase",
  "accessToken"           -> "access_token",
  "hostNameInCertificate" -> "*.database.windows.net",
  "encrypt"               -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Scrivere dati con BULK INSERT

Il connettore JDBC tradizionale scrive i dati nel tuo database tramite l'inserimento riga per riga. È possibile utilizzare il connettore da Spark per scrivere dati in Azure SQL e SQL Server tramite l’inserimento in blocco. Migliora significativamente le prestazioni di scrittura durante il caricamento di grandi set di dati o quando si caricano dati in tabelle in cui viene utilizzato un indice columnstore.

import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

/**
  Add column Metadata.
  If not specified, metadata is automatically added
  from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)

val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.

Passaggi successivi

Se ancora non lo si è fatto, scaricare il connettore Spark dal repository GitHub azure-sqldb-spark ed esplorare le risorse aggiuntive nel repository:

Potresti anche voler consultare la guida ad Apache Spark SQL, DataFrames e set di dati e la documentazione di Azure Databricks.