Sdílet prostřednictvím


Příklady kódu pro Databricks Connect pro Python

Poznámka:

Tento článek popisuje Databricks Connect pro Databricks Runtime 13.3 LTS a novější.

Tento článek obsahuje příklady kódu, které používají Databricks Connect pro Python. Databricks Connect umožňuje připojit oblíbené prostředí IDE, servery poznámkových bloků a vlastní aplikace k clusterům Azure Databricks. Podívejte se, co je Databricks Connect? Informace o verzi Scala tohoto článku najdete v příkladech kódu pro Databricks Connect pro Scala.

Než začnete používat Databricks Connect, musíte nastavit klienta Databricks Connect.

Následující příklady předpokládají, že používáte výchozí ověřování pro instalaci klienta Databricks Connect.

Příklad: Čtení tabulky

Tento jednoduchý příklad kódu dotazuje zadanou tabulku a pak zobrazí prvních 5 řádků zadané tabulky.

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

Příklad: Vytvoření datového rámce

Následující příklad kódu:

  1. Vytvoří datový rámec v paměti.
  2. Vytvoří tabulku s názvem zzz_demo_temps_table v rámci schématu default . Pokud tabulka s tímto názvem již existuje, tabulka se nejprve odstraní. Chcete-li použít jiné schéma nebo tabulku, upravte volání na spark.sql, temps.write.saveAsTablenebo obojí.
  3. Uloží obsah datového rámce do tabulky.
  4. SELECT Spustí dotaz na obsah tabulky.
  5. Zobrazí výsledek dotazu.
  6. Odstraní tabulku.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date

spark = DatabricksSession.builder.getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
  StructField('AirportCode', StringType(), False),
  StructField('Date', DateType(), False),
  StructField('TempHighF', IntegerType(), False),
  StructField('TempLowF', IntegerType(), False)
])

data = [
  [ 'BLI', date(2021, 4, 3), 52, 43],
  [ 'BLI', date(2021, 4, 2), 50, 38],
  [ 'BLI', date(2021, 4, 1), 52, 41],
  [ 'PDX', date(2021, 4, 3), 64, 45],
  [ 'PDX', date(2021, 4, 2), 61, 41],
  [ 'PDX', date(2021, 4, 1), 66, 39],
  [ 'SEA', date(2021, 4, 3), 57, 43],
  [ 'SEA', date(2021, 4, 2), 54, 39],
  [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
  "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
  "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
  "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')

Příklad: Použití DatabricksSesssion nebo SparkSession

Následující příklad popisuje, jak psát kód, který je přenositelný mezi Databricks Connect pro Databricks Runtime 13.3 LTS a vyšších verzí v prostředích, kde není dostupná třída DatabricksSession, v takovém případě se používá třída SparkSession k dotazování zadané tabulky a vrácení prvních 5 řádků. V tomto příkladu se pro ověřování používá proměnná prostředí SPARK_REMOTE.

from pyspark.sql import SparkSession, DataFrame

def get_spark() -> SparkSession:
  try:
    from databricks.connect import DatabricksSession
    return DatabricksSession.builder.getOrCreate()
  except ImportError:
    return SparkSession.builder.getOrCreate()

def get_taxis(spark: SparkSession) -> DataFrame:
  return spark.read.table("samples.nyctaxi.trips")

get_taxis(get_spark()).show(5)

Dodatečné zdroje

Databricks poskytuje další ukázkové aplikace, které ukazují, jak používat Databricks Connect v úložišti Databricks Connect na GitHubu, včetně následujících: