Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Примечание.
Статья охватывает использование Databricks Connect с Databricks Runtime 13.3 LTS и более поздними версиями.
В этой статье приведены примеры кода, использующие Databricks Connect для Python. Databricks Connect позволяет подключать популярные IDE, серверы ноутбуков и пользовательские приложения к кластерам Azure Databricks. См. раздел "Что такое Databricks Connect?". Для версии этой статьи на Scala смотрите Примеры кода для Databricks Connect на Scala.
Прежде чем начать использовать Databricks Connect, необходимо настроить клиент Databricks Connect.
В следующих примерах предполагается, что для настройки клиента Databricks Connect используется проверка подлинности по умолчанию.
Пример. Чтение таблицы
Этот простой пример кода запрашивает указанную таблицу, а затем отображает первые 5 строк указанной таблицы.
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)
Пример: Создание DataFrame
Приведенный ниже пример кода:
- Создает DataFrame в памяти.
- Создает таблицу с именем
zzz_demo_temps_tableв схемеdefault. Если таблица с этим именем уже существует, сначала удаляется таблица. Чтобы использовать другую схему или таблицу, настройте вызовыspark.sql,temps.write.saveAsTableили обоих одновременно. - Сохраняет содержимое DataFrame в таблицу.
-
SELECTВыполняет запрос к содержимому таблицы. - Отображает результат запроса.
- Удаляет таблицу.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date
spark = DatabricksSession.builder.getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
Пример. Использование DatabricksSession или SparkSession
В следующем примере описывается, как писать код, переносимый между версиями Databricks Connect для Databricks Runtime 13.3 LTS и выше в средах, где класс DatabricksSession недоступен, в таком случае используется класс SparkSession для запросов к указанной таблице и возврата первых 5 строк. В этом примере используется SPARK_REMOTE переменная среды для проверки подлинности.
from pyspark.sql import SparkSession, DataFrame
def get_spark() -> SparkSession:
try:
from databricks.connect import DatabricksSession
return DatabricksSession.builder.getOrCreate()
except ImportError:
return SparkSession.builder.getOrCreate()
def get_taxis(spark: SparkSession) -> DataFrame:
return spark.read.table("samples.nyctaxi.trips")
get_taxis(get_spark()).show(5)
Дополнительные ресурсы
Databricks предоставляет дополнительные примеры приложений, которые показывают, как использовать Databricks Connect в репозитории Databricks Connect GitHub, включая следующее: