Codevoorbeelden voor Databricks Connect voor Python
Notitie
Dit artikel bevat informatie over Databricks Connect voor Databricks Runtime 13.3 LTS en hoger.
Dit artikel bevat codevoorbeelden die Gebruikmaken van Databricks Connect voor Python. Met Databricks Connect kunt u populaire IDE's, notebookservers en aangepaste toepassingen verbinden met Azure Databricks-clusters. Zie Wat is Databricks Connect? Zie Codevoorbeelden voor Databricks Connect voor Scala voor de Scala-versie van dit artikel.
Notitie
Voordat u Databricks Connect gaat gebruiken, moet u de Databricks Connect-client
Databricks biedt verschillende aanvullende voorbeeldtoepassingen die laten zien hoe u Databricks Connect gebruikt. Zie de voorbeeldtoepassingen voor de Databricks Connect-opslagplaats in GitHub, met name:
- Een eenvoudige ETL-toepassing
- Een interactieve gegevenstoepassing op basis van Plotly
- Een interactieve gegevenstoepassing op basis van Plotly en PySpark AI
U kunt ook de volgende eenvoudigere codevoorbeelden gebruiken om te experimenteren met Databricks Connect. In deze voorbeelden wordt ervan uitgegaan dat u standaardverificatie gebruikt voor het instellen van de Databricks Connect-client.
In dit eenvoudige codevoorbeeld wordt een query uitgevoerd op de opgegeven table en wordt vervolgens de eerste vijf rijen van de opgegeven tableweergegeven. Als u een andere tablewilt gebruiken, past u de aanroep aan op spark.read.table
.
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)
Dit langere codevoorbeeld doet het volgende:
- Hiermee maakt u een DataFrame in het geheugen.
- Hiermee maakt u een table met de naam
zzz_demo_temps_table
in dedefault
schema. Als de table met deze naam al bestaat, wordt de table eerst verwijderd. Als u een andere schema of tablewilt gebruiken, past u de aanroepen aanspark.sql
,temps.write.saveAsTable
of beide aan. - Slaat de inhoud van het DataFrame op in de table.
- Voert een
SELECT
query uit op de inhoud van de table. - Geeft het resultaat van de query weer.
- Verwijder de table.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date
spark = DatabricksSession.builder.getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
Notitie
In het volgende voorbeeld wordt beschreven hoe u code schrijft die draagbaar is tussen Databricks Connect voor Databricks Runtime 13.3 LTS en hoger in omgevingen where de DatabricksSession
-klasse niet beschikbaar is.
In het volgende voorbeeld wordt de DatabricksSession
-klasse gebruikt of wordt de SparkSession
-klasse gebruikt als de DatabricksSession
klasse niet beschikbaar is, om een query uit te voeren op de opgegeven table en de eerste vijf rijen te retourneren. In dit voorbeeld wordt de SPARK_REMOTE
omgevingsvariabele gebruikt voor verificatie.
from pyspark.sql import SparkSession, DataFrame
def get_spark() -> SparkSession:
try:
from databricks.connect import DatabricksSession
return DatabricksSession.builder.getOrCreate()
except ImportError:
return SparkSession.builder.getOrCreate()
def get_taxis(spark: SparkSession) -> DataFrame:
return spark.read.table("samples.nyctaxi.trips")
get_taxis(get_spark()).show(5)