Condividi tramite


Eseguire l'inferenza batch per LLM tramite funzioni di intelligenza artificiale

Importante

Questa funzionalità è disponibile in anteprima pubblica.

Questo articolo descrive come eseguire l'inferenza batch usando Funzioni di intelligenza artificiale su larga scala. Gli esempi in questo articolo sono consigliati per gli scenari di produzione, ad esempio la distribuzione di pipeline di inferenza batch come flussi di lavoro pianificati e l'uso di ai_query e di un modello di base ospitato da Databricks per Structured Streaming.

Per iniziare a usare funzioni di intelligenza artificiale, Databricks consiglia di usare una delle opzioni seguenti:

Requisiti

  • Un'area di lavoro in una regione supportata dalle API modello fondamentale.
  • Databricks Runtime 15.4 LTS o versione successiva è necessario per i carichi di lavoro di inferenza batch tramite Funzioni di intelligenza artificiale.
  • Chiedi l'autorizzazione per le query sulla tabella Delta in Unity Catalog che contiene i dati che vuoi utilizzare.
  • Impostare pipelines.channel nelle proprietà della tabella su 'preview' per utilizzare ai_query(). Consultare i requisiti per vedere una query di esempio.

Inferenza batch LLM con funzioni di intelligenza artificiale per attività specifiche

È possibile eseguire l'inferenza batch usando funzioni di intelligenza artificiale specifiche dell'attività. Per indicazioni su come incorporare la funzione di intelligenza artificiale specifica dell'attività in una pipeline, vedere Distribuire le pipeline di inferenza batch.

Di seguito è riportato un esempio di uso della funzione di intelligenza artificiale specifica dell'attività, ai_translate:

SELECT
writer_summary,
  ai_translate(writer_summary, "cn") as cn_translation
from user.batch.news_summaries
limit 500
;

Batch di inferenza LLM con ai_query

È possibile usare la funzione ai_query di intelligenza artificiale per utilizzo generico per eseguire l'inferenza batch. Scopri quali tipi di modello e modelli associatiai_query supporta.

Gli esempi in questa sezione si concentrano sulla flessibilità di ai_query e su come usarlo nelle pipeline di inferenza batch e nei flussi di lavoro.

ai_query e modelli di base ospitati da Databricks

Quando si utilizza un modello di base ospitato e preconfigurato di Databricks per l'inferenza batch, Databricks configura un endpoint di throughput con provisioning per conto dell'utente che si ridimensiona automaticamente in base al carico di lavoro.

Per usare questo metodo per l'inferenza batch, specificare quanto segue nella richiesta:

SELECT text, ai_query(
    "databricks-meta-llama-3-1-8b-instruct",
    "Summarize the given text comprehensively, covering key points and main ideas concisely while retaining relevant details and examples. Ensure clarity and accuracy without unnecessary repetition or omissions: " || text
) AS summary
FROM uc_catalog.schema.table;

ai_query e modelli di base personalizzati o ottimizzati

Gli esempi di notebook in questa sezione illustrano i carichi di lavoro di inferenza batch che usano modelli di base personalizzati o ottimizzati per elaborare più input. Gli esempi richiedono un endpoint esistente per la gestione del modello che usa le API del Foundation Model con throughput fornito.

Inferenza batch di LLM utilizzando un modello di embedding

Il seguente notebook di esempio crea un endpoint con throughput provisionato ed esegue un'inferenza LLM in batch utilizzando Python e il modello di embeddings a scelta tra GTE Large (inglese) o BGE Large (inglese).

Incorporamenti per inferenza batch LLM con un notebook con un endpoint a velocità effettiva preconfigurata

Prendi il notebook

Inferenza batch ed estrazione di dati strutturati

Il notebook di esempio seguente illustra come eseguire l'estrazione di dati strutturati di base usando ai_query per trasformare i dati non elaborati e non strutturati in informazioni organizzate e utilizzabili tramite tecniche di estrazione automatizzate. Questo notebook illustra anche come sfruttare la valutazione dell'agente di intelligenza artificiale Mosaic per valutare l'accuratezza usando i dati di verità di base.

Notebook per l'inferenza batch e l'estrazione di dati strutturati

Prendi il notebook

Inferenza batch con BERT per il riconoscimento di entità denominate

Il notebook seguente illustra un esempio di inferenza batch del modello di Machine Learning tradizionale usando BERT.

Inferenza batch con BERT per il riconoscimento di entità nominate in un notebook

Prendi il notebook

Distribuire pipeline di inferenza batch

Questa sezione illustra come integrare funzioni di intelligenza artificiale in altri prodotti di dati e intelligenza artificiale di Databricks per creare pipeline di inferenza batch complete. Queste pipeline possono eseguire flussi di lavoro end-to-end che includono inserimento, pre-elaborazione, inferenza e post-elaborazione. Le pipeline possono essere create in SQL o Python e distribuite come segue:

  • Pipeline dichiarative di Lakeflow
  • Flussi di lavoro pianificati con flussi di lavoro di Databricks
  • Flussi di lavoro di inferenza in streaming con Structured Streaming

Esegui l'inferenza batch incrementale sulle pipeline dichiarative di Lakeflow

L'esempio seguente esegue l'inferenza batch incrementale usando le pipeline dichiarative di Lakeflow per quando i dati vengono aggiornati continuamente.

Passaggio 1: Inserire dati di notizie non elaborate da un volume

SQL

CREATE OR REFRESH STREAMING TABLE news_raw
COMMENT "Raw news articles ingested from volume."
AS SELECT *
FROM STREAM(read_files(
  '/Volumes/databricks_news_summarization_benchmarking_data/v01/csv',
  format => 'csv',
  header => true,
  mode => 'PERMISSIVE',
  multiLine => 'true'
));
Pitone

Importare i pacchetti e definire lo schema JSON della risposta LLM come variabile Python


import dlt
from pyspark.sql.functions import expr, get_json_object, concat

news_extraction_schema = (
    '{"type": "json_schema", "json_schema": {"name": "news_extraction", '
    '"schema": {"type": "object", "properties": {"title": {"type": "string"}, '
    '"category": {"type": "string", "enum": ["Politics", "Sports", "Technology", '
    '"Health", "Entertainment", "Business"]}}}, "strict": true}}'
)

Inserire i dati da un volume di Unity Catalog.

@dlt.table(
  comment="Raw news articles ingested from volume."
)
def news_raw():
  return (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("header", True)
      .option("mode", "PERMISSIVE")
      .option("multiLine", "true")
      .load("/Volumes/databricks_news_summarization_benchmarking_data/v01/csv")
  )

Passaggio 2: Applicare l'inferenza LLM per estrarre titolo e categoria

SQL

CREATE OR REFRESH MATERIALIZED VIEW news_categorized
COMMENT "Extract category and title from news articles using LLM inference."
AS
SELECT
  inputs,
  ai_query(
    "databricks-meta-llama-3-3-70b-instruct",
    "Extract the category of the following news article: " || inputs,
    responseFormat => '{
      "type": "json_schema",
      "json_schema": {
        "name": "news_extraction",
        "schema": {
          "type": "object",
          "properties": {
            "title": { "type": "string" },
            "category": {
              "type": "string",
              "enum": ["Politics", "Sports", "Technology", "Health", "Entertainment", "Business"]
            }
          }
        },
        "strict": true
      }
    }'
  ) AS meta_data
FROM news_raw
LIMIT 2;
Pitone
@dlt.table(
  comment="Extract category and title from news articles using LLM inference."
)
def news_categorized():
  # Limit the number of rows to 2 as in the SQL version
  df_raw = spark.read.table("news_raw").limit(2)
  # Inject the JSON schema variable into the ai_query call using an f-string.
  return df_raw.withColumn(
    "meta_data",
    expr(
      f"ai_query('databricks-meta-llama-3-3-70b-instruct', "
      f"concat('Extract the category of the following news article: ', inputs), "
      f"responseFormat => '{news_extraction_schema}')"
    )
  )

Passaggio 3: Convalidare l'output di inferenza LLM prima del riepilogo

SQL
CREATE OR REFRESH MATERIALIZED VIEW news_validated (
  CONSTRAINT valid_title EXPECT (size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3),
  CONSTRAINT valid_category EXPECT (get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business'))
)
COMMENT "Validated news articles ensuring the title has at least 3 words and the category is valid."
AS
SELECT *
FROM news_categorized;
Pitone
@dlt.table(
  comment="Validated news articles ensuring the title has at least 3 words and the category is valid."
)
@dlt.expect("valid_title", "size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3")
@dlt.expect_or_fail("valid_category", "get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business')")
def news_validated():
  return spark.read.table("news_categorized")

Passaggio 4: Riepilogare gli articoli sulle notizie dai dati convalidati

SQL
CREATE OR REFRESH MATERIALIZED VIEW news_summarized
COMMENT "Summarized political news articles after validation."
AS
SELECT
  get_json_object(meta_data, '$.category') as category,
  get_json_object(meta_data, '$.title') as title,
  ai_query(
    "databricks-meta-llama-3-3-70b-instruct",
    "Summarize the following political news article in 2-3 sentences: " || inputs
  ) AS summary
FROM news_validated;
Pitone

@dlt.table(
  comment="Summarized political news articles after validation."
)
def news_summarized():
  df = spark.read.table("news_validated")
  return df.select(
    get_json_object("meta_data", "$.category").alias("category"),
    get_json_object("meta_data", "$.title").alias("title"),
    expr(
      "ai_query('databricks-meta-llama-3-3-70b-instruct', "
      "concat('Summarize the following political news article in 2-3 sentences: ', inputs))"
    ).alias("summary")
  )

Automatizzare i processi di inferenza batch usando i flussi di lavoro di Databricks

Pianificare processi di inferenza batch e automatizzare le pipeline di intelligenza artificiale.

SQL

SELECT
   *,
   ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.


AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price


Examples below:


DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..


RESULT
[
 {'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
 {'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
 {'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
 {'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
 {'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]


DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10

Pitone


import json
from pyspark.sql.functions import expr

# Define the opinion mining prompt as a multi-line string.
opinion_prompt = """You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.

AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price

Examples below:

DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup.The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that.It made three or four large servings of soup.It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound.The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does.

RESULT
[
 {'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
 {'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
 {'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
 {'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
 {'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]

DOCUMENT
"""

# Escape the prompt so it can be safely embedded in the SQL expression.
escaped_prompt = json.dumps(opinion_prompt)

# Read the source table and limit to 10 rows.
df = spark.table("catalog.schema.product_reviews").limit(10)

# Apply the LLM inference to each row, concatenating the prompt, the review text, and the tail string.
result_df = df.withColumn(
    "result",
    expr(f"ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat({escaped_prompt}, REVIEW_TEXT, '\\n\\nRESULT\\n'))")
)

# Display the result DataFrame.
display(result_df)

Funzioni di intelligenza artificiale con Structured Streaming

Applicare l'inferenza di intelligenza artificiale in scenari quasi in tempo reale o in modalità micro-batch usando lo Structured Streaming con ai_query e .

Passaggio 1: Leggi la tabella Delta statica

Leggere la tabella Delta statica come se fosse un flusso.


from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs")  # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")

# Define the prompt outside the SQL expression.
prompt = (
    "You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
    "Do not include extra commentary or suggestions. Document: "
)

Passaggio 2. Applica ai_query

Spark elabora questa operazione una sola volta per i dati statici, a meno che non arrivino nuove righe nella tabella.


df_transformed = df_stream.select(
    "document_text",
    F.expr(f"""
      ai_query(
        'databricks-meta-llama-3-1-8b-instruct',
        CONCAT('{prompt}', document_text)
      )
    """).alias("summary")
)

Passaggio 3: Scrivere l'output riepilogato

Scrivere l'output riepilogato in un'altra tabella Delta


# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
    .outputMode("append") \
    .toTable("enterprise.docs_summary")

query.awaitTermination()

Visualizzare i costi per i carichi di lavoro di inferenza batch

Gli esempi seguenti illustrano come filtrare i carichi di lavoro di inferenza batch in base a processi, calcolo, warehouse SQL e pipeline dichiarative di Lakeflow.

Vedere Monitorare i costi di gestione dei modelli per esempi generali su come visualizzare i costi per i carichi di lavoro di inferenza batch che usano Funzioni di intelligenza artificiale.

Lavori

La query seguente mostra quali lavori vengono usati per l'inferenza batch usando la system.workflow.jobs tabella dei sistemi. Consulta Monitor i costi dei processi e le prestazioni & con le tabelle di sistema.


SELECT *
FROM system.billing.usage u
  JOIN system.workflow.jobs x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.job_id = x.job_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";

Calcolo

Di seguito viene illustrato quali cluster vengono usati per l'inferenza batch usando la system.compute.clusters tabella dei sistemi.

SELECT *
FROM system.billing.usage u
  JOIN system.compute.clusters x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.cluster_id = x.cluster_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";

Pipeline dichiarative di Lakeflow

Di seguito viene mostrato quali sono le pipeline dichiarative di Lakeflow utilizzate per l'inferenza batch, utilizzando la tabella dei sistemi system.lakeflow.pipelines.

SELECT *
FROM system.billing.usage u
  JOIN system.lakeflow.pipelines x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.dlt_pipeline_id = x.pipeline_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";

Magazzino SQL

Di seguito viene mostrato quali sono le pipeline dichiarative di Lakeflow utilizzate per l'inferenza batch, utilizzando la tabella dei sistemi system.compute.warehouses.

SELECT *
FROM system.billing.usage u
  JOIN system.compute.warehouses x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.warehouse_id = x.warehouse_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";