Exemples de code pour Databricks Connect pour Python
Remarque
Cet article présente Databricks Connect pour Databricks Runtime 13.3 LTS et les versions ultérieures.
Cet article fournit des exemples de code qui utilisent Databricks Connect pour Python. Databricks Connect vous permet de connecter des environnements de développement intégré (IDE) populaires, des serveurs notebook et des applications personnalisées aux clusters Azure Databricks. Reportez-vous à Qu'est-ce que Databricks Connect ?. Pour obtenir la version Scala de cet article, reportez-vous à Exemples de code pour Databricks Connect pour Scala.
Remarque
Avant de commencer à utiliser Databricks Connect, vous devez configurer le client Databricks Connect.
Databricks fournit plusieurs exemples d'applications supplémentaires qui montrent comment utiliser Databricks Connect. Reportez-vous au référentiel d’exemples d’applications pour Databricks Connect dans GitHub, spécifiquement :
- Une application ETL simple
- Une application de données interactive basée sur Plotly
- Une application de données interactive basée sur Plotly et PySpark AI
Vous pouvez également utiliser les exemples de code plus simples suivants pour expérimenter Databricks Connect. Ces exemples supposent que vous utilisez l’authentification par défaut pour la configuration du client Databricks Connect.
Cet exemple de code simple interroge la table spécifiée, puis affiche les 5 premières lignes de la table spécifiée. Pour utiliser une autre table, ajustez l’appel sur spark.read.table
.
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)
Cet exemple de code plus long effectue les opérations suivantes :
- Crée un DataFrame en mémoire.
- Crée une table portant le nom
zzz_demo_temps_table
dans le schémadefault
. Si la table portant ce nom existe déjà, la table est d’abord supprimée. Pour utiliser un autre schéma ou table, ajustez les appels surspark.sql
,temps.write.saveAsTable
ou les deux. - Enregistre le contenu du DataFrame dans la table.
- Exécute une requête
SELECT
sur le contenu de la table. - Affiche le résultat de la requête.
- Supprime la table.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date
spark = DatabricksSession.builder.getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
Remarque
L’exemple suivant explique comment écrire du code portable entre Databricks Connect pour Databricks Runtime 13.3 LTS (et les versions ultérieures) dans des environnements dans lesquels la classe DatabricksSession
n’est pas disponible.
L’exemple suivant utilise la classe DatabricksSession
ou la SparkSession
classe, si la DatabricksSession
classe n’est pas disponible, pour interroger la table spécifiée et retourner les 5 premières lignes. Cet exemple utilise la variable d’environnement SPARK_REMOTE
pour l’authentification.
from pyspark.sql import SparkSession, DataFrame
def get_spark() -> SparkSession:
try:
from databricks.connect import DatabricksSession
return DatabricksSession.builder.getOrCreate()
except ImportError:
return SparkSession.builder.getOrCreate()
def get_taxis(spark: SparkSession) -> DataFrame:
return spark.read.table("samples.nyctaxi.trips")
get_taxis(get_spark()).show(5)