通过


教程:使用 Streamlit 开发 Databricks 应用

本教程演示如何使用用于 Python 和 StreamlitDatabricks SQL 连接器生成 Databricks 应用。 你将了解如何开发执行以下作的应用:

  • 读取 Unity 目录 表并将其显示在 Streamlit 接口中。
  • 编辑数据并将其写回表。

步骤:

步骤 1:配置特权

这些示例假定应用使用 应用授权。 应用的服务主体必须具有:

  • 对 Unity Catalog 表的 SELECT 权限
  • 对 Unity Catalog 表的 MODIFY 权限
  • 对 SQL 仓库的 CAN USE 权限

有关详细信息,请参阅 Unity 目录特权和安全对象 以及 SQL 仓库 ACL

步骤 2:安装依赖项

requirements.txt 文件中包括以下包:

databricks-sdk
databricks-sql-connector
streamlit
pandas

注释

仅当您正在编辑表格数据时,pandas 才是必需的。

步骤 3:读取 Unity 目录表

此示例演示如何从 Unity 目录表读取数据并使用 Streamlit 显示数据。 应用执行以下步骤:

  1. 提示用户输入 SQL 仓库 HTTP 路径和 Unity 目录表名称。
  2. 使用用于 Python 的 Databricks SQL 连接器建立缓存的 SQL 连接。
  3. 在指定的表上执行SELECT *查询。
  4. 在 Streamlit st.dataframe中显示结果。

app.py

import streamlit as st
from databricks import sql
from databricks.sdk.core import Config


cfg = Config()  # Set the DATABRICKS_HOST environment variable when running locally


@st.cache_resource  # connection is cached
def get_connection(http_path):
    return sql.connect(
        server_hostname=cfg.host,
        http_path=http_path,
        credentials_provider=lambda: cfg.authenticate,
    )

def read_table(table_name, conn):
    with conn.cursor() as cursor:
        query = f"SELECT * FROM {table_name}"
        cursor.execute(query)
        return cursor.fetchall_arrow().to_pandas()

http_path_input = st.text_input(
    "Enter your Databricks HTTP Path:", placeholder="/sql/1.0/warehouses/xxxxxx"
)

table_name = st.text_input(
    "Specify a :re[UC] table name:", placeholder="catalog.schema.table"
)

if http_path_input and table_name:
    conn = get_connection(http_path_input)
    df = read_table(table_name, conn)
    st.dataframe(df)

注释

  • 此示例使用 st.cache_resource 在不同会话中缓存数据库连接,并在需要时重新运行。
  • 使用 Streamlit 输入字段 (st.text_input) 接受用户输入。

步骤 4:编辑 Unity 目录表

此示例允许用户使用 Streamlit 的数据编辑功能读取、编辑和写回对 Unity 目录表的更改。 应用执行以下步骤:

  1. 将原始表读入 Pandas 数据帧。
  2. 在 Streamlit 编辑器中显示表(st.data_editor)。
  3. 检测原始数据帧和已编辑的数据帧之间的更改。
  4. 使用 INSERT OVERWRITE 将更新的数据写回到表中。

app.py

import pandas as pd
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config


cfg = Config()  # Set the DATABRICKS_HOST environment variable when running locally


@st.cache_resource
def get_connection(http_path):
    return sql.connect(
        server_hostname=cfg.host,
        http_path=http_path,
        credentials_provider=lambda: cfg.authenticate,
    )


def read_table(table_name: str, conn) -> pd.DataFrame:
    with conn.cursor() as cursor:
        cursor.execute(f"SELECT * FROM {table_name}")
        return cursor.fetchall_arrow().to_pandas()


def insert_overwrite_table(table_name: str, df: pd.DataFrame, conn):
    progress = st.empty()
    with conn.cursor() as cursor:
        rows = list(df.itertuples(index=False))
        values = ",".join([f"({','.join(map(repr, row))})" for row in rows])
        with progress:
            st.info("Calling Databricks SQL...")
        cursor.execute(f"INSERT OVERWRITE {table_name} VALUES {values}")
    progress.empty()
    st.success("Changes saved")


http_path_input = st.text_input(
    "Specify the HTTP Path to your Databricks SQL Warehouse:",
    placeholder="/sql/1.0/warehouses/xxxxxx",
)

table_name = st.text_input(
    "Specify a Catalog table name:", placeholder="catalog.schema.table"
)

if http_path_input and table_name:
    conn = get_connection(http_path_input)
    original_df = read_table(table_name, conn)
    edited_df = st.data_editor(original_df, num_rows="dynamic", hide_index=True)

    df_diff = pd.concat([original_df, edited_df]).drop_duplicates(keep=False)
    if not df_diff.empty:
        if st.button("Save changes"):
            insert_overwrite_table(table_name, edited_df, conn)
else:
    st.warning("Provide both the warehouse path and a table name to load data.")

注释

  • 应用通过计算原始表和编辑表之间的差异来确定是否需要更新。
  • 进度栏在使用 st.infost.success 进行写入操作期间提供反馈。
  • 此方法替换表中的所有行。 对于部分更新,请使用其他写入策略。

后续步骤