Udostępnij za pośrednictwem


Przykłady kodu dla usługi Databricks Połączenie dla języka Python

Uwaga

W tym artykule opisano Połączenie databricks dla środowiska Databricks Runtime 13.0 lub nowszego.

Ten artykuł zawiera przykłady kodu, które używają usługi Databricks Połączenie dla języka Python. Usługa Databricks Połączenie umożliwia łączenie popularnych środowisk IDE, serwerów notesów i aplikacji niestandardowych z klastrami usługi Azure Databricks. Zobacz Co to jest usługa Databricks Połączenie?. Aby zapoznać się z wersją języka Scala tego artykułu, zobacz Przykłady kodu dla usługi Databricks Połączenie dla języka Scala.

Uwaga

Przed rozpoczęciem korzystania z usługi Databricks Połączenie należy skonfigurować klienta usługi Databricks Połączenie.

Usługa Databricks udostępnia kilka dodatkowych przykładowych aplikacji, które pokazują, jak używać Połączenie usługi Databricks. Zobacz przykładowe aplikacje dla repozytorium usługi Databricks Połączenie w usłudze GitHub, w szczególności:

Możesz również użyć następujących prostszych przykładów kodu do eksperymentowania z usługą Databricks Połączenie. W tych przykładach przyjęto założenie, że używasz domyślnego uwierzytelniania dla usługi Databricks Połączenie konfiguracji klienta.

Ten prosty przykład kodu wysyła zapytanie do określonej tabeli, a następnie pokazuje pierwsze 5 wierszy określonej tabeli. Aby użyć innej tabeli, dostosuj wywołanie do spark.read.table.

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

Ten dłuższy przykład kodu wykonuje następujące czynności:

  1. Tworzy ramkę danych w pamięci.
  2. Tworzy tabelę o nazwie zzz_demo_temps_table w schemacie default . Jeśli tabela o tej nazwie już istnieje, tabela zostanie usunięta jako pierwsza. Aby użyć innego schematu lub tabeli, dostosuj wywołania do spark.sql, temps.write.saveAsTablelub obu.
  3. Zapisuje zawartość ramki danych w tabeli.
  4. SELECT Uruchamia zapytanie dotyczące zawartości tabeli.
  5. Pokazuje wynik zapytania.
  6. Usuwa tabelę.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date

spark = DatabricksSession.builder.getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
  StructField('AirportCode', StringType(), False),
  StructField('Date', DateType(), False),
  StructField('TempHighF', IntegerType(), False),
  StructField('TempLowF', IntegerType(), False)
])

data = [
  [ 'BLI', date(2021, 4, 3), 52, 43],
  [ 'BLI', date(2021, 4, 2), 50, 38],
  [ 'BLI', date(2021, 4, 1), 52, 41],
  [ 'PDX', date(2021, 4, 3), 64, 45],
  [ 'PDX', date(2021, 4, 2), 61, 41],
  [ 'PDX', date(2021, 4, 1), 66, 39],
  [ 'SEA', date(2021, 4, 3), 57, 43],
  [ 'SEA', date(2021, 4, 2), 54, 39],
  [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
  "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
  "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
  "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')

Uwaga

W poniższym przykładzie opisano sposób pisania kodu przenośnego między usługą Databricks Połączenie dla środowiska Databricks Runtime 13.3 LTS i nowszych w środowiskach, w których DatabricksSession klasa jest niedostępna.

W poniższym przykładzie DatabricksSession użyto klasy lub użyto SparkSession klasy , jeśli DatabricksSession klasa jest niedostępna, aby wysłać zapytanie do określonej tabeli i zwrócić pierwsze 5 wierszy. W tym przykładzie użyto zmiennej środowiskowej SPARK_REMOTE do uwierzytelniania.

from pyspark.sql import SparkSession, DataFrame

def get_spark() -> SparkSession:
  try:
    from databricks.connect import DatabricksSession
    return DatabricksSession.builder.getOrCreate()
  except ImportError:
    return SparkSession.builder.getOrCreate()

def get_taxis(spark: SparkSession) -> DataFrame:
  return spark.read.table("samples.nyctaxi.trips")

get_taxis(get_spark()).show(5)