Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Not
Bu makale Databricks Runtime 13.3 LTS ve üzeri için Databricks Connect'i kapsar.
Bu makalede Python için Databricks Connect kullanan kod örnekleri verilmektedir. Databricks Connect popüler IDE'leri, not defteri sunucularını ve özel uygulamaları Azure Databricks kümelerine bağlamanızı sağlar. Bkz. Databricks Connect nedir?. Bu makalenin Scala sürümü için bkz . Scala için Databricks Connect için kod örnekleri.
Databricks Connect'i kullanmaya başlamadan önce Databricks Connect istemcisiniayarlamanız
Aşağıdaki örneklerde Databricks Connect istemci kurulumu için varsayılan kimlik doğrulamasını kullandığınız varsayılır.
Örnek: Tablo okuma
Bu basit kod örneği, belirtilen tabloyu sorgular ve ardından belirtilen tablonun ilk 5 satırını gösterir.
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)
Örnek: DataFrame oluşturma
Aşağıdaki kod örneği:
- Bellek içi bir DataFrame oluşturur.
- Şema içinde
zzz_demo_temps_tableadıyladefaultbir tablo oluşturur. Bu isimle bir tablo zaten mevcutsa, önce bu tablo silinir. Farklı bir şema veya tablo kullanmak için çağrılarıspark.sql,temps.write.saveAsTableveya her ikisine ayarlayın. - DataFrame'in içeriğini tabloya kaydeder.
- Tablonun içeriğinde bir
SELECTsorgu çalıştırır. - Sorgunun sonucunu gösterir.
- Tabloyu siler.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date
spark = DatabricksSession.builder.getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
Örnek: DatabricksSesssion veya SparkSession kullanma
Aşağıdaki örnek, sınıfın mevcut olmadığı ortamda belirtilen tabloyu sorgulamak ve ilk 5 satırı döndürmek için DatabricksSession sınıfını kullandığı ve Databricks Runtime 13.3 LTS ve üzeri için Databricks Connect arasında taşınabilir olan kodun nasıl yazıldığını açıklar. Bu örnekte kimlik doğrulaması için ortam değişkeni kullanılır SPARK_REMOTE .
from pyspark.sql import SparkSession, DataFrame
def get_spark() -> SparkSession:
try:
from databricks.connect import DatabricksSession
return DatabricksSession.builder.getOrCreate()
except ImportError:
return SparkSession.builder.getOrCreate()
def get_taxis(spark: SparkSession) -> DataFrame:
return spark.read.table("samples.nyctaxi.trips")
get_taxis(get_spark()).show(5)
Ek kaynaklar
Databricks, Databricks Connect GitHub deposunda Databricks Connect'in nasıl kullanılacağını gösteren aşağıdakiler de dahil olmak üzere ek örnek uygulamalar sağlar: