Contoh kode untuk Databricks Koneksi untuk Python
Catatan
Artikel ini membahas Databricks Koneksi untuk Databricks Runtime 13.0 ke atas.
Artikel ini menyediakan contoh kode yang menggunakan Databricks Koneksi untuk Python. Databricks Koneksi memungkinkan Anda menyambungkan ID, server notebook, dan aplikasi kustom populer ke kluster Azure Databricks. Lihat Apa itu Databricks Koneksi?. Untuk versi Scala artikel ini, lihat Contoh kode untuk Databricks Koneksi untuk Scala.
Catatan
Sebelum mulai menggunakan Databricks Koneksi, Anda harus menyiapkan klien Databricks Koneksi.
Databricks menyediakan beberapa contoh aplikasi tambahan yang menunjukkan cara menggunakan Databricks Koneksi. Lihat contoh aplikasi untuk repositori Databricks Koneksi di GitHub, khususnya:
- Aplikasi ETL sederhana
- Aplikasi data interaktif berdasarkan Plotly
- Aplikasi data interaktif berdasarkan Plotly dan PySpark AI
Anda juga dapat menggunakan contoh kode yang lebih sederhana berikut untuk bereksperimen dengan Databricks Koneksi. Contoh-contoh ini mengasumsikan bahwa Anda menggunakan autentikasi default untuk Penyiapan klien Databricks Koneksi.
Contoh kode sederhana ini mengkueri tabel yang ditentukan lalu memperlihatkan 5 baris pertama tabel yang ditentukan. Untuk menggunakan tabel lain, sesuaikan panggilan ke spark.read.table
.
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)
Contoh kode yang lebih panjang ini melakukan hal berikut:
- Membuat DataFrame dalam memori.
- Membuat tabel dengan nama
zzz_demo_temps_table
dalamdefault
skema. Jika tabel dengan nama ini sudah ada, tabel akan dihapus terlebih dahulu. Untuk menggunakan skema atau tabel yang berbeda, sesuaikan panggilan kespark.sql
,temps.write.saveAsTable
, atau keduanya. - Menyimpan konten DataFrame ke tabel.
SELECT
Menjalankan kueri pada konten tabel.- Memperlihatkan hasil kueri.
- Menghapus tabel.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date
spark = DatabricksSession.builder.getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
Catatan
Contoh berikut menjelaskan cara menulis kode yang portabel antara Databricks Koneksi untuk Databricks Runtime 13.3 LTS ke atas di lingkungan di mana DatabricksSession
kelas tidak tersedia.
Contoh berikut menggunakan DatabricksSession
kelas , atau menggunakan SparkSession
kelas jika DatabricksSession
kelas tidak tersedia, untuk mengkueri tabel yang ditentukan dan mengembalikan 5 baris pertama. Contoh ini menggunakan SPARK_REMOTE
variabel lingkungan untuk autentikasi.
from pyspark.sql import SparkSession, DataFrame
def get_spark() -> SparkSession:
try:
from databricks.connect import DatabricksSession
return DatabricksSession.builder.getOrCreate()
except ImportError:
return SparkSession.builder.getOrCreate()
def get_taxis(spark: SparkSession) -> DataFrame:
return spark.read.table("samples.nyctaxi.trips")
get_taxis(get_spark()).show(5)
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk