Kodexempel för Databricks Connect för Python
Kommentar
Den här artikeln beskriver Databricks Connect för Databricks Runtime 13.3 LTS och senare.
Den här artikeln innehåller kodexempel som använder Databricks Connect för Python. Med Databricks Connect kan du ansluta populära IDE:er, notebook-servrar och anpassade program till Azure Databricks-kluster. Se Vad är Databricks Connect?. För Scala-versionen av den här artikeln, se Kodexempel för Databricks Connect för Scala.
Kommentar
Innan du börjar använda Databricks Connect måste du set upp Databricks Connect-klienten.
Databricks innehåller flera ytterligare exempelprogram som visar hur du använder Databricks Connect. Se exempelprogram för Databricks Connect-lagringsplatsen i GitHub, specifikt:
- Ett enkelt ETL-program
- Ett interaktivt dataprogram baserat på Plotly
- Ett interaktivt dataprogram baserat på Plotly och PySpark AI
Du kan också använda följande enklare kodexempel för att experimentera med Databricks Connect. Dessa exempel förutsätter att du använder standardautentisering för Databricks Connect-klientkonfiguration.
Det här enkla kodexemplet frågar den angivna table och visar sedan den angivna tableförsta 5 raderna. Om du vill använda en annan tablejusterar du anropet till spark.read.table
.
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)
Det här längre kodexemplet gör följande:
- Skapar en minnesintern DataFrame.
- Skapar en table med namnet
zzz_demo_temps_table
idefault
schema. Om table med det här namnet redan finns tas table bort först. Om du vill använda en annan schema eller tablejusterar du anropen tillspark.sql
,temps.write.saveAsTable
eller båda. - Sparar DataFrame-innehållet i table.
- Kör en
SELECT
-fråga på innehållet i table. - Visar frågans resultat.
- Tar bort table.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date
spark = DatabricksSession.builder.getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
Kommentar
I följande exempel beskrivs hur du skriver kod som är portabel mellan Databricks Connect för Databricks Runtime 13.3 LTS och senare i miljöer där klassen whereDatabricksSession
inte är tillgänglig.
I följande exempel används klassen DatabricksSession
, eller klassen SparkSession
om klassen DatabricksSession
inte är tillgänglig, för att fråga den angivna table och returnera de första 5 raderna. I det SPARK_REMOTE
här exemplet används miljövariabeln för autentisering.
from pyspark.sql import SparkSession, DataFrame
def get_spark() -> SparkSession:
try:
from databricks.connect import DatabricksSession
return DatabricksSession.builder.getOrCreate()
except ImportError:
return SparkSession.builder.getOrCreate()
def get_taxis(spark: SparkSession) -> DataFrame:
return spark.read.table("samples.nyctaxi.trips")
get_taxis(get_spark()).show(5)