本教學課程示範如何使用適用於 Python 和 Streamlit的 Databricks SQL 連接器來建置 Databricks 應用程式。 您將瞭解如何開發執行下列動作的應用程式:
- 讀取 Unity 目錄 數據表,並將其顯示在 Streamlit 介面中。
- 編輯數據,並將它寫回數據表。
步驟:
步驟 1:設定許可權
這些範例假設您的應用程式使用 應用程式授權。 您的應用程式服務主體必須具有:
-
SELECT
Unity Catalog 資料表上的權限 -
MODIFY
Unity Catalog 資料表上的權限 -
CAN USE
SQL 倉儲的權限
如需詳細資訊,請參閱 Unity Catalog 許可權和可保護物件 和 SQL 倉儲存取控制列表 (ACL)。
步驟 2:安裝依賴項目
在您的 requirements.txt 檔案中包含下列套件:
databricks-sdk
databricks-sql-connector
streamlit
pandas
備註
pandas
只有在編輯資料表資料時,才需要 。
步驟 3:讀取 Unity 目錄數據表
此範例示範如何從 Unity 目錄數據表讀取數據,並使用 Streamlit 加以顯示。 應用程式會執行下列步驟:
- 提示使用者輸入 SQL 倉儲 HTTP 路徑和 Unity 目錄數據表名稱。
- 使用 Databricks SQL 連接器為 Python 建立快取的 SQL 連線。
- 在
SELECT *
指定的數據表上執行查詢。 - 在 Streamlit
st.dataframe
中顯示結果。
app.py
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config
cfg = Config() # Set the DATABRICKS_HOST environment variable when running locally
@st.cache_resource # connection is cached
def get_connection(http_path):
return sql.connect(
server_hostname=cfg.host,
http_path=http_path,
credentials_provider=lambda: cfg.authenticate,
)
def read_table(table_name, conn):
with conn.cursor() as cursor:
query = f"SELECT * FROM {table_name}"
cursor.execute(query)
return cursor.fetchall_arrow().to_pandas()
http_path_input = st.text_input(
"Enter your Databricks HTTP Path:", placeholder="/sql/1.0/warehouses/xxxxxx"
)
table_name = st.text_input(
"Specify a :re[UC] table name:", placeholder="catalog.schema.table"
)
if http_path_input and table_name:
conn = get_connection(http_path_input)
df = read_table(table_name, conn)
st.dataframe(df)
備註
- 此範例會使用
st.cache_resource
來快取跨會話的資料庫連線並重新執行。 - 使用 Streamlit 輸入欄位 (
st.text_input
) 接受使用者輸入。
步驟 4:編輯 Unity 目錄數據表
此範例可讓使用者使用 Streamlit 的數據編輯功能,讀取、編輯及回寫 Unity 目錄數據表的變更。 應用程式會執行下列步驟:
- 將原始數據表讀入 Pandas DataFrame。
- 在 Streamlit 編輯器中顯示資料表(
st.data_editor
)。 - 偵測原始和編輯之 DataFrame 之間的變更。
- 使用
INSERT OVERWRITE
將更新的數據寫回數據表。
app.py
import pandas as pd
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config
cfg = Config() # Set the DATABRICKS_HOST environment variable when running locally
@st.cache_resource
def get_connection(http_path):
return sql.connect(
server_hostname=cfg.host,
http_path=http_path,
credentials_provider=lambda: cfg.authenticate,
)
def read_table(table_name: str, conn) -> pd.DataFrame:
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {table_name}")
return cursor.fetchall_arrow().to_pandas()
def insert_overwrite_table(table_name: str, df: pd.DataFrame, conn):
progress = st.empty()
with conn.cursor() as cursor:
rows = list(df.itertuples(index=False))
values = ",".join([f"({','.join(map(repr, row))})" for row in rows])
with progress:
st.info("Calling Databricks SQL...")
cursor.execute(f"INSERT OVERWRITE {table_name} VALUES {values}")
progress.empty()
st.success("Changes saved")
http_path_input = st.text_input(
"Specify the HTTP Path to your Databricks SQL Warehouse:",
placeholder="/sql/1.0/warehouses/xxxxxx",
)
table_name = st.text_input(
"Specify a Catalog table name:", placeholder="catalog.schema.table"
)
if http_path_input and table_name:
conn = get_connection(http_path_input)
original_df = read_table(table_name, conn)
edited_df = st.data_editor(original_df, num_rows="dynamic", hide_index=True)
df_diff = pd.concat([original_df, edited_df]).drop_duplicates(keep=False)
if not df_diff.empty:
if st.button("Save changes"):
insert_overwrite_table(table_name, edited_df, conn)
else:
st.warning("Provide both the warehouse path and a table name to load data.")
備註
- 應用程式會藉由計算原始數據表與編輯數據表之間的差異,來判斷是否需要更新。
- 進度列會在寫入作業期間,使用
st.info
和st.success
提供回饋。 - 此方法會取代數據表中的所有數據列。 針對局部更新,請使用不同的寫入策略。
後續步驟
- 將應用程式部署至您的工作區。 請參閱 部署 Databricks 應用程式。
- 限制適當使用者的存取權。 請參閱 設定 Databricks 應用程式的許可權。