Przykłady kodu dla programu Databricks Connect dla języka Python
Uwaga
W tym artykule opisano usługę Databricks Connect dla środowiska Databricks Runtime 13.3 LTS lub nowszego.
Ten artykuł zawiera przykłady kodu korzystające z programu Databricks Connect dla języka Python. Usługa Databricks Connect umożliwia łączenie popularnych środowisk IDE, serwerów notesów i aplikacji niestandardowych z klastrami usługi Azure Databricks. Zobacz Co to jest usługa Databricks Connect?. Aby zapoznać się z wersją języka Scala tego artykułu, zobacz Przykłady kodu dla programu Databricks Connect dla języka Scala.
Uwaga
Przed rozpoczęciem korzystania z usługi Databricks Connect należy skonfigurować klienta Usługi Databricks Connect.
Usługa Databricks udostępnia kilka dodatkowych przykładowych aplikacji, które pokazują, jak używać usługi Databricks Connect. Zobacz przykładowe aplikacje dla repozytorium Databricks Connect w usłudze GitHub, w szczególności:
- Prosta aplikacja ETL
- Interaktywna aplikacja danych oparta na narzędziu Plotly
- Interaktywna aplikacja danych oparta na sztucznej inteligencji Plotly i PySpark
Aby eksperymentować z usługą Databricks Connect, możesz również użyć następujących prostszych przykładów kodu. W tych przykładach przyjęto założenie, że używasz domyślnego uwierzytelniania dla konfiguracji klienta programu Databricks Connect.
Ten prosty przykład kodu wysyła zapytanie do określonej tabeli, a następnie pokazuje pierwsze 5 wierszy określonej tabeli. Aby użyć innej tabeli, dostosuj wywołanie do spark.read.table
.
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)
Ten dłuższy przykład kodu wykonuje następujące czynności:
- Tworzy ramkę danych w pamięci.
- Tworzy tabelę o nazwie
zzz_demo_temps_table
w schemaciedefault
. Jeśli tabela o tej nazwie już istnieje, tabela zostanie usunięta jako pierwsza. Aby użyć innego schematu lub tabeli, dostosuj wywołania dospark.sql
,temps.write.saveAsTable
lub obu. - Zapisuje zawartość ramki danych w tabeli.
SELECT
Uruchamia zapytanie dotyczące zawartości tabeli.- Pokazuje wynik zapytania.
- Usuwa tabelę.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date
spark = DatabricksSession.builder.getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
Uwaga
W poniższym przykładzie opisano sposób pisania kodu przenośnego między programem Databricks Connect for Databricks Runtime 13.3 LTS i nowszym w środowiskach, w których DatabricksSession
klasa jest niedostępna.
W poniższym przykładzie DatabricksSession
użyto klasy lub użyto SparkSession
klasy , jeśli DatabricksSession
klasa jest niedostępna, aby wysłać zapytanie do określonej tabeli i zwrócić pierwsze 5 wierszy. W tym przykładzie użyto zmiennej środowiskowej SPARK_REMOTE
do uwierzytelniania.
from pyspark.sql import SparkSession, DataFrame
def get_spark() -> SparkSession:
try:
from databricks.connect import DatabricksSession
return DatabricksSession.builder.getOrCreate()
except ImportError:
return SparkSession.builder.getOrCreate()
def get_taxis(spark: SparkSession) -> DataFrame:
return spark.read.table("samples.nyctaxi.trips")
get_taxis(get_spark()).show(5)