共用方式為


教學課程:使用 Streamlit 開發 Databricks 應用程式

本教學課程示範如何使用適用於 Python 和 Streamlit的 Databricks SQL 連接器來建置 Databricks 應用程式。 您將瞭解如何開發執行下列動作的應用程式:

  • 讀取 Unity 目錄 數據表,並將其顯示在 Streamlit 介面中。
  • 編輯數據,並將它寫回數據表。

步驟:

步驟 1:設定許可權

這些範例假設您的應用程式使用 應用程式授權。 您的應用程式服務主體必須具有:

  • SELECT Unity Catalog 資料表上的權限
  • MODIFY Unity Catalog 資料表上的權限
  • CAN USE SQL 倉儲的權限

如需詳細資訊,請參閱 Unity Catalog 許可權和可保護物件SQL 倉儲存取控制列表 (ACL)

步驟 2:安裝依賴項目

在您的 requirements.txt 檔案中包含下列套件:

databricks-sdk
databricks-sql-connector
streamlit
pandas

備註

pandas 只有在編輯資料表資料時,才需要 。

步驟 3:讀取 Unity 目錄數據表

此範例示範如何從 Unity 目錄數據表讀取數據,並使用 Streamlit 加以顯示。 應用程式會執行下列步驟:

  1. 提示使用者輸入 SQL 倉儲 HTTP 路徑和 Unity 目錄數據表名稱。
  2. 使用 Databricks SQL 連接器為 Python 建立快取的 SQL 連線。
  3. SELECT * 指定的數據表上執行查詢。
  4. 在 Streamlit st.dataframe中顯示結果。

app.py

import streamlit as st
from databricks import sql
from databricks.sdk.core import Config


cfg = Config()  # Set the DATABRICKS_HOST environment variable when running locally


@st.cache_resource # connection is cached
def get_connection(http_path):
    return sql.connect(
        server_hostname=cfg.host,
        http_path=http_path,
        credentials_provider=lambda: cfg.authenticate,
    )

def read_table(table_name, conn):
    with conn.cursor() as cursor:
        query = f"SELECT * FROM {table_name}"
        cursor.execute(query)
        return cursor.fetchall_arrow().to_pandas()

http_path_input = st.text_input(
    "Enter your Databricks HTTP Path:", placeholder="/sql/1.0/warehouses/xxxxxx"
)

table_name = st.text_input(
    "Specify a :re[UC] table name:", placeholder="catalog.schema.table"
)

if http_path_input and table_name:
    conn = get_connection(http_path_input)
    df = read_table(table_name, conn)
    st.dataframe(df)

備註

  • 此範例會使用 st.cache_resource 來快取跨會話的資料庫連線並重新執行。
  • 使用 Streamlit 輸入欄位 (st.text_input) 接受使用者輸入。

步驟 4:編輯 Unity 目錄數據表

此範例可讓使用者使用 Streamlit 的數據編輯功能,讀取、編輯及回寫 Unity 目錄數據表的變更。 應用程式會執行下列步驟:

  1. 將原始數據表讀入 Pandas DataFrame。
  2. 在 Streamlit 編輯器中顯示資料表(st.data_editor)。
  3. 偵測原始和編輯之 DataFrame 之間的變更。
  4. 使用 INSERT OVERWRITE 將更新的數據寫回數據表。

app.py

import pandas as pd
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config


cfg = Config() # Set the DATABRICKS_HOST environment variable when running locally


@st.cache_resource
def get_connection(http_path):
    return sql.connect(
        server_hostname=cfg.host,
        http_path=http_path,
        credentials_provider=lambda: cfg.authenticate,
    )


def read_table(table_name: str, conn) -> pd.DataFrame:
    with conn.cursor() as cursor:
        cursor.execute(f"SELECT * FROM {table_name}")
        return cursor.fetchall_arrow().to_pandas()


def insert_overwrite_table(table_name: str, df: pd.DataFrame, conn):
    progress = st.empty()
    with conn.cursor() as cursor:
        rows = list(df.itertuples(index=False))
        values = ",".join([f"({','.join(map(repr, row))})" for row in rows])
        with progress:
            st.info("Calling Databricks SQL...")
        cursor.execute(f"INSERT OVERWRITE {table_name} VALUES {values}")
    progress.empty()
    st.success("Changes saved")


http_path_input = st.text_input(
    "Specify the HTTP Path to your Databricks SQL Warehouse:",
    placeholder="/sql/1.0/warehouses/xxxxxx",
)

table_name = st.text_input(
    "Specify a Catalog table name:", placeholder="catalog.schema.table"
)

if http_path_input and table_name:
    conn = get_connection(http_path_input)
    original_df = read_table(table_name, conn)
    edited_df = st.data_editor(original_df, num_rows="dynamic", hide_index=True)

    df_diff = pd.concat([original_df, edited_df]).drop_duplicates(keep=False)
    if not df_diff.empty:
        if st.button("Save changes"):
            insert_overwrite_table(table_name, edited_df, conn)
else:
    st.warning("Provide both the warehouse path and a table name to load data.")

備註

  • 應用程式會藉由計算原始數據表與編輯數據表之間的差異,來判斷是否需要更新。
  • 進度列會在寫入作業期間,使用 st.infost.success 提供回饋。
  • 此方法會取代數據表中的所有數據列。 針對局部更新,請使用不同的寫入策略。

後續步驟