Dela via


Självstudie: Utveckla en Databricks-app med Streamlit

Den här handledningen visar hur du skapar en Databricks-app med Databricks SQL Connector för Python och Streamlit. Du får lära dig hur du utvecklar en app som gör följande:

  • Läser en Unity Catalog-tabell och visar den i ett Streamlit-gränssnitt.
  • Redigerar data och skriver tillbaka dem till tabellen.

Steg:

Steg 1: Konfigurera behörigheter

Dessa exempel förutsätter att din app använder appauktorisering. Din apps tjänstehuvudkonto måste ha:

  • SELECT behörighet i tabellen Unity Catalog
  • MODIFY behörighet i tabellen Unity Catalog
  • CAN USE privilegier på SQL-lagret

Mer information finns i Behörigheter för Unity Catalog och skyddsbara objekt och ACL:er för SQL-lager.

Steg 2: Installera beroenden

Inkludera följande paket i dinrequirements.txt-fil :

databricks-sdk
databricks-sql-connector
streamlit
pandas

Anmärkning

pandas krävs endast om du redigerar tabelldata.

Steg 3: Läsa en Unity Catalog-tabell

Det här exemplet visar hur du läser data från en Unity Catalog-tabell och visar dem med Streamlit. Appen utför följande steg:

  1. Uppmanar användaren att ange HTTP-sökvägen till SQL-lagret och Unity Catalog-tabellnamnet.
  2. Upprättar en cachelagrad SQL-anslutning med hjälp av Databricks SQL Connector för Python.
  3. Kör en SELECT * fråga i den angivna tabellen.
  4. Visar resultatet i en Streamlit st.dataframe.

app.py

import streamlit as st
from databricks import sql
from databricks.sdk.core import Config


cfg = Config()  # Set the DATABRICKS_HOST environment variable when running locally


@st.cache_resource # connection is cached
def get_connection(http_path):
    return sql.connect(
        server_hostname=cfg.host,
        http_path=http_path,
        credentials_provider=lambda: cfg.authenticate,
    )

def read_table(table_name, conn):
    with conn.cursor() as cursor:
        query = f"SELECT * FROM {table_name}"
        cursor.execute(query)
        return cursor.fetchall_arrow().to_pandas()

http_path_input = st.text_input(
    "Enter your Databricks HTTP Path:", placeholder="/sql/1.0/warehouses/xxxxxx"
)

table_name = st.text_input(
    "Specify a :re[UC] table name:", placeholder="catalog.schema.table"
)

if http_path_input and table_name:
    conn = get_connection(http_path_input)
    df = read_table(table_name, conn)
    st.dataframe(df)

Anmärkning

  • I det här exemplet används st.cache_resource för att cachea databasanslutningen mellan sessioner och omstarter.
  • Använd Streamlit-indatafält (st.text_input) för att acceptera användarindata.

Steg 4: Redigera en Unity Catalog-tabell

I det här exemplet kan användare läsa, redigera och skriva tillbaka ändringar i en Unity Catalog-tabell med hjälp av Streamlits dataredigeringsfunktioner. Appen utför följande steg:

  1. Läser den ursprungliga tabellen i en Pandas DataFrame.
  2. Visar tabellen i en Streamlit-redigerare (st.data_editor).
  3. Identifierar ändringar mellan de ursprungliga och redigerade dataramarna.
  4. Används INSERT OVERWRITE för att skriva tillbaka uppdaterade data till tabellen.

app.py

import pandas as pd
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config


cfg = Config() # Set the DATABRICKS_HOST environment variable when running locally


@st.cache_resource
def get_connection(http_path):
    return sql.connect(
        server_hostname=cfg.host,
        http_path=http_path,
        credentials_provider=lambda: cfg.authenticate,
    )


def read_table(table_name: str, conn) -> pd.DataFrame:
    with conn.cursor() as cursor:
        cursor.execute(f"SELECT * FROM {table_name}")
        return cursor.fetchall_arrow().to_pandas()


def insert_overwrite_table(table_name: str, df: pd.DataFrame, conn):
    progress = st.empty()
    with conn.cursor() as cursor:
        rows = list(df.itertuples(index=False))
        values = ",".join([f"({','.join(map(repr, row))})" for row in rows])
        with progress:
            st.info("Calling Databricks SQL...")
        cursor.execute(f"INSERT OVERWRITE {table_name} VALUES {values}")
    progress.empty()
    st.success("Changes saved")


http_path_input = st.text_input(
    "Specify the HTTP Path to your Databricks SQL Warehouse:",
    placeholder="/sql/1.0/warehouses/xxxxxx",
)

table_name = st.text_input(
    "Specify a Catalog table name:", placeholder="catalog.schema.table"
)

if http_path_input and table_name:
    conn = get_connection(http_path_input)
    original_df = read_table(table_name, conn)
    edited_df = st.data_editor(original_df, num_rows="dynamic", hide_index=True)

    df_diff = pd.concat([original_df, edited_df]).drop_duplicates(keep=False)
    if not df_diff.empty:
        if st.button("Save changes"):
            insert_overwrite_table(table_name, edited_df, conn)
else:
    st.warning("Provide both the warehouse path and a table name to load data.")

Anmärkning

  • Appen avgör om uppdateringar krävs av databehandlingsskillnader mellan de ursprungliga och redigerade tabellerna.
  • En förloppsindikator ger feedback under skrivåtgärden med hjälp av st.info och st.success.
  • Den här metoden ersätter alla rader i tabellen. För partiella uppdateringar använder du en annan skrivstrategi.

Nästa steg