Share via


Python için Databricks Bağlan kod örnekleri

Not

Bu makalede Databricks Runtime 13.0 ve üzeri için Databricks Bağlan yer alır.

Bu makalede Python için Databricks Bağlan kullanan kod örnekleri verilmektedir. Databricks Bağlan popüler IDE'leri, not defteri sunucularını ve özel uygulamaları Azure Databricks kümelerine bağlamanızı sağlar. Bkz. Databricks Bağlan nedir?. Bu makalenin Scala sürümü için bkz. Scala için Databricks Bağlan için kod örnekleri.

Not

Databricks Bağlan kullanmaya başlamadan önce Databricks Bağlan istemcisini ayarlamanız gerekir.

Databricks, Databricks Bağlan kullanmayı gösteren birkaç ek örnek uygulama sağlar. Özellikle GitHub'da Databricks Bağlan deposu için örnek uygulamalara bakın:

Databricks Bağlan ile deneme yapmak için aşağıdaki daha basit kod örneklerini de kullanabilirsiniz. Bu örneklerde Databricks Bağlan istemci kurulumu için varsayılan kimlik doğrulamasını kullandığınız varsayılır.

Bu basit kod örneği, belirtilen tabloyu sorgular ve ardından belirtilen tablonun ilk 5 satırını gösterir. Farklı bir tablo kullanmak için çağrısını olarak spark.read.tableayarlayın.

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

Bu uzun kod örneği aşağıdakileri yapar:

  1. Bellek içi bir DataFrame oluşturur.
  2. Şema içinde default adıyla zzz_demo_temps_table bir tablo oluşturur. Bu ada sahip tablo zaten varsa, önce tablo silinir. Farklı bir şema veya tablo kullanmak için, çağrıları , temps.write.saveAsTableveya her ikisi olarak spark.sqlayarlayın.
  3. DataFrame'in içeriğini tabloya kaydeder.
  4. Tablonun içeriğinde bir SELECT sorgu çalıştırır.
  5. Sorgunun sonucunu gösterir.
  6. Tabloyu siler.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date

spark = DatabricksSession.builder.getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
  StructField('AirportCode', StringType(), False),
  StructField('Date', DateType(), False),
  StructField('TempHighF', IntegerType(), False),
  StructField('TempLowF', IntegerType(), False)
])

data = [
  [ 'BLI', date(2021, 4, 3), 52, 43],
  [ 'BLI', date(2021, 4, 2), 50, 38],
  [ 'BLI', date(2021, 4, 1), 52, 41],
  [ 'PDX', date(2021, 4, 3), 64, 45],
  [ 'PDX', date(2021, 4, 2), 61, 41],
  [ 'PDX', date(2021, 4, 1), 66, 39],
  [ 'SEA', date(2021, 4, 3), 57, 43],
  [ 'SEA', date(2021, 4, 2), 54, 39],
  [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
  "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
  "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
  "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')

Not

Aşağıdaki örnekte, sınıfın kullanılamadığı DatabricksSession ortamlarda Databricks Runtime 13.3 LTS ve üzeri için Databricks Bağlan arasında taşınabilir kod yazma işlemi açıklanmaktadır.

Aşağıdaki örnek, belirtilen tabloyu sorgulamak DatabricksSession ve ilk 5 satırı döndürmek için sınıfını veya sınıf kullanılamıyorsa DatabricksSession sınıfını kullanırSparkSession. Bu örnekte kimlik doğrulaması için ortam değişkeni kullanılır SPARK_REMOTE .

from pyspark.sql import SparkSession, DataFrame

def get_spark() -> SparkSession:
  try:
    from databricks.connect import DatabricksSession
    return DatabricksSession.builder.getOrCreate()
  except ImportError:
    return SparkSession.builder.getOrCreate()

def get_taxis(spark: SparkSession) -> DataFrame:
  return spark.read.table("samples.nyctaxi.trips")

get_taxis(get_spark()).show(5)