Partager via


Exemples de code pour Databricks Connect pour Python

Remarque

Cet article présente Databricks Connect pour Databricks Runtime 13.3 LTS et les versions ultérieures.

Cet article fournit des exemples de code qui utilisent Databricks Connect pour Python. Databricks Connect vous permet de connecter des environnements de développement intégré (IDE) populaires, des serveurs notebook et des applications personnalisées aux clusters Azure Databricks. Reportez-vous à Qu'est-ce que Databricks Connect ?. Pour obtenir la version Scala de cet article, reportez-vous à Exemples de code pour Databricks Connect pour Scala.

Remarque

Avant de commencer à utiliser Databricks Connect, vous devez configurer le client Databricks Connect.

Databricks fournit plusieurs exemples d'applications supplémentaires qui montrent comment utiliser Databricks Connect. Reportez-vous au référentiel d’exemples d’applications pour Databricks Connect dans GitHub, spécifiquement :

Vous pouvez également utiliser les exemples de code plus simples suivants pour expérimenter Databricks Connect. Ces exemples supposent que vous utilisez l’authentification par défaut pour la configuration du client Databricks Connect.

Cet exemple de code simple interroge la table spécifiée, puis affiche les 5 premières lignes de la table spécifiée. Pour utiliser une autre table, ajustez l’appel sur spark.read.table.

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

Cet exemple de code plus long effectue les opérations suivantes :

  1. Crée un DataFrame en mémoire.
  2. Crée une table portant le nom zzz_demo_temps_table dans le schéma default. Si la table portant ce nom existe déjà, la table est d’abord supprimée. Pour utiliser un autre schéma ou table, ajustez les appels sur spark.sql, temps.write.saveAsTable ou les deux.
  3. Enregistre le contenu du DataFrame dans la table.
  4. Exécute une requête SELECT sur le contenu de la table.
  5. Affiche le résultat de la requête.
  6. Supprime la table.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date

spark = DatabricksSession.builder.getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
  StructField('AirportCode', StringType(), False),
  StructField('Date', DateType(), False),
  StructField('TempHighF', IntegerType(), False),
  StructField('TempLowF', IntegerType(), False)
])

data = [
  [ 'BLI', date(2021, 4, 3), 52, 43],
  [ 'BLI', date(2021, 4, 2), 50, 38],
  [ 'BLI', date(2021, 4, 1), 52, 41],
  [ 'PDX', date(2021, 4, 3), 64, 45],
  [ 'PDX', date(2021, 4, 2), 61, 41],
  [ 'PDX', date(2021, 4, 1), 66, 39],
  [ 'SEA', date(2021, 4, 3), 57, 43],
  [ 'SEA', date(2021, 4, 2), 54, 39],
  [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
  "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
  "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
  "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')

Remarque

L’exemple suivant explique comment écrire du code portable entre Databricks Connect pour Databricks Runtime 13.3 LTS (et les versions ultérieures) dans des environnements dans lesquels la classe DatabricksSession n’est pas disponible.

L’exemple suivant utilise la classe DatabricksSession ou la SparkSession classe, si la DatabricksSession classe n’est pas disponible, pour interroger la table spécifiée et retourner les 5 premières lignes. Cet exemple utilise la variable d’environnement SPARK_REMOTE pour l’authentification.

from pyspark.sql import SparkSession, DataFrame

def get_spark() -> SparkSession:
  try:
    from databricks.connect import DatabricksSession
    return DatabricksSession.builder.getOrCreate()
  except ImportError:
    return SparkSession.builder.getOrCreate()

def get_taxis(spark: SparkSession) -> DataFrame:
  return spark.read.table("samples.nyctaxi.trips")

get_taxis(get_spark()).show(5)