Dela via


Utföra batch-LLM-slutsatsdragning med hjälp av AI Functions

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion.

Den här artikeln beskriver hur du utför batchinferens med ai-funktioner i stor skala. Exemplen i den här artikeln rekommenderas för produktionsscenarier, till exempel att driftsätta batchinferenspipelines som schemalagda arbetsflöden och användning av ai_query och en Databricks-värdbaserad grundmodell för strukturerad direktuppspelning.

För att komma igång med AI Functions rekommenderar Databricks att du använder något av följande:

krav

  • En arbetsyta i en region som stöder Foundation Model API:er.
  • Databricks Runtime 15.4 LTS eller senare krävs för batchinferens med AI-funktioner.
  • Frågebehörighet i deltatabellen i Unity Catalog som innehåller de data som du vill använda.
  • Ställ in pipelines.channel i tabellegenskaperna som 'förhandsversion' för att använda ai_query(). En exempelfråga finns i Krav.

Partiell LLM-inferens med hjälp av uppgiftsspecifika AI-funktioner

Du kan köra batchinferens med hjälp av uppgiftsspecifika AI-funktioner. Se Använd pipelines för batch-inferens för vägledning om hur du införlivar din uppgiftsspecifika AI-funktion i en pipeline.

Följande är ett exempel på hur du använder den uppgiftsspecifika AI-funktionen: ai_translate

SELECT
writer_summary,
  ai_translate(writer_summary, "cn") as cn_translation
from user.batch.news_summaries
limit 500
;

Batchbearbetning av LLM-inferenser med ai_query

Du kan använda ai-funktionen ai_query för generell användning för att utföra batchinferens. Se vilka modelltyper och associerade modeller som ai_query stöder.

Exemplen i det här avsnittet fokuserar på flexibiliteten hos ai_query och hur det används i batchinferenspipelines och arbetsflöden.

ai_query och Databricks-värdade basmodeller

När du använder en Databricks-värd och förhandskonfigurerad grundmodell för batchinferens konfigurerar Databricks en förberedd kapacitetsändpunkt åt dig som skalar automatiskt baserat på arbetsbelastningen.

Om du vill använda den här metoden för batchinferens anger du följande i din begäran:

SELECT text, ai_query(
    "databricks-meta-llama-3-1-8b-instruct",
    "Summarize the given text comprehensively, covering key points and main ideas concisely while retaining relevant details and examples. Ensure clarity and accuracy without unnecessary repetition or omissions: " || text
) AS summary
FROM uc_catalog.schema.table;

ai_query och anpassade eller finjusterade grundmodeller

Notebook-exemplen i det här avsnittet visar batchinferensarbetsbelastningar som använder anpassade eller finjusterade grundmodeller för att bearbeta flera indata. Exemplen kräver en befintlig modellservicetjänst som använder äFoundation Model APIs med tilldelad kapacitet på.

LLM-batch-inferens med en inbäddningsmodell

Följande exempelanteckningsbok skapar en etablerad dataflödesslutpunkt och kör batch-LLM-slutsatsdragning med hjälp av Python och ditt val av inbäddningsmodellen GTE Large (engelska) eller BGE Large (engelska).

LLM-batchinferensinbäddningar med en etablerad dataflödesslutpunktsanteckningsbok

Hämta anteckningsbok

Batchinferens och strukturerad dataextrahering

Följande notebook-exempel visar hur du utför grundläggande strukturerad dataextrahering med hjälp av ai_query för att omvandla rådata, ostrukturerade data till organiserad och användbar information via automatiserade extraheringstekniker. Den här anteckningsboken visar också hur du kan utnyttja Mosaic AI Agent Evaluation för att bedöma noggrannheten med hjälp av grundsanningsdata.

Batch-slutsatsdragning och strukturerad dataextraheringsanteckningsbok

Hämta anteckningsbok

Batch-slutsatsdragning med BERT för namngiven entitetsigenkänning

Följande notebook-fil visar ett exempel på batchinferens med en traditionell ML-modell med BERT.

Batch-inferens med BERT för namngiven entitetsigenkänning i notebook

Hämta anteckningsbok

Distribuera batchinferenspipelines

Det här avsnittet visar hur du kan integrera AI Functions i andra Databricks-data och AI-produkter för att skapa fullständiga pipelines för batchinferens. Dessa pipelines kan utföra arbetsflöden från slutpunkt till slutpunkt som omfattar inmatning, förbearbetning, slutsatsdragning och efterbearbetning. Pipelines kan skapas i SQL eller Python och distribueras som:

  • Deklarativa pipelines för Lakeflow
  • Schemalagda arbetsflöden med Databricks-arbetsflöden
  • Arbetsflöden för streaminginfernser med strukturerad streaming

Utför inkrementell batchinferens på Lakeflow Deklarativa Pipelines

I följande exempel utförs inkrementell batchinferens med Lakeflow Deklarativa Pipelines när data kontinuerligt uppdateras.

Steg 1: Mata in rådata från en volym

SQL

CREATE OR REFRESH STREAMING TABLE news_raw
COMMENT "Raw news articles ingested from volume."
AS SELECT *
FROM STREAM(read_files(
  '/Volumes/databricks_news_summarization_benchmarking_data/v01/csv',
  format => 'csv',
  header => true,
  mode => 'PERMISSIVE',
  multiLine => 'true'
));
python

Importera paketen och definiera JSON-schemat för LLM-svaret som en Python-variabel


import dlt
from pyspark.sql.functions import expr, get_json_object, concat

news_extraction_schema = (
    '{"type": "json_schema", "json_schema": {"name": "news_extraction", '
    '"schema": {"type": "object", "properties": {"title": {"type": "string"}, '
    '"category": {"type": "string", "enum": ["Politics", "Sports", "Technology", '
    '"Health", "Entertainment", "Business"]}}}, "strict": true}}'
)

Mata in dina data från en Unity Catalog-volym.

@dlt.table(
  comment="Raw news articles ingested from volume."
)
def news_raw():
  return (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("header", True)
      .option("mode", "PERMISSIVE")
      .option("multiLine", "true")
      .load("/Volumes/databricks_news_summarization_benchmarking_data/v01/csv")
  )

Steg 2: Tillämpa LLM-slutsatsdragning för att extrahera rubrik och kategori

SQL

CREATE OR REFRESH MATERIALIZED VIEW news_categorized
COMMENT "Extract category and title from news articles using LLM inference."
AS
SELECT
  inputs,
  ai_query(
    "databricks-meta-llama-3-3-70b-instruct",
    "Extract the category of the following news article: " || inputs,
    responseFormat => '{
      "type": "json_schema",
      "json_schema": {
        "name": "news_extraction",
        "schema": {
          "type": "object",
          "properties": {
            "title": { "type": "string" },
            "category": {
              "type": "string",
              "enum": ["Politics", "Sports", "Technology", "Health", "Entertainment", "Business"]
            }
          }
        },
        "strict": true
      }
    }'
  ) AS meta_data
FROM news_raw
LIMIT 2;
python
@dlt.table(
  comment="Extract category and title from news articles using LLM inference."
)
def news_categorized():
  # Limit the number of rows to 2 as in the SQL version
  df_raw = spark.read.table("news_raw").limit(2)
  # Inject the JSON schema variable into the ai_query call using an f-string.
  return df_raw.withColumn(
    "meta_data",
    expr(
      f"ai_query('databricks-meta-llama-3-3-70b-instruct', "
      f"concat('Extract the category of the following news article: ', inputs), "
      f"responseFormat => '{news_extraction_schema}')"
    )
  )

Steg 3: Verifiera LLM-inferensutdata före sammanfattning

SQL
CREATE OR REFRESH MATERIALIZED VIEW news_validated (
  CONSTRAINT valid_title EXPECT (size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3),
  CONSTRAINT valid_category EXPECT (get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business'))
)
COMMENT "Validated news articles ensuring the title has at least 3 words and the category is valid."
AS
SELECT *
FROM news_categorized;
python
@dlt.table(
  comment="Validated news articles ensuring the title has at least 3 words and the category is valid."
)
@dlt.expect("valid_title", "size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3")
@dlt.expect_or_fail("valid_category", "get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business')")
def news_validated():
  return spark.read.table("news_categorized")

Steg 4: Sammanfatta nyhetsartiklar från verifierade data

SQL
CREATE OR REFRESH MATERIALIZED VIEW news_summarized
COMMENT "Summarized political news articles after validation."
AS
SELECT
  get_json_object(meta_data, '$.category') as category,
  get_json_object(meta_data, '$.title') as title,
  ai_query(
    "databricks-meta-llama-3-3-70b-instruct",
    "Summarize the following political news article in 2-3 sentences: " || inputs
  ) AS summary
FROM news_validated;
python

@dlt.table(
  comment="Summarized political news articles after validation."
)
def news_summarized():
  df = spark.read.table("news_validated")
  return df.select(
    get_json_object("meta_data", "$.category").alias("category"),
    get_json_object("meta_data", "$.title").alias("title"),
    expr(
      "ai_query('databricks-meta-llama-3-3-70b-instruct', "
      "concat('Summarize the following political news article in 2-3 sentences: ', inputs))"
    ).alias("summary")
  )

Automatisera batchinferensjobb med databricks-arbetsflöden

Schemalägg batchjobb för inferens och automatisera AI-pipelines.

SQL

SELECT
   *,
   ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.


AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price


Examples below:


DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..


RESULT
[
 {'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
 {'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
 {'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
 {'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
 {'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]


DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10

python


import json
from pyspark.sql.functions import expr

# Define the opinion mining prompt as a multi-line string.
opinion_prompt = """You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.

AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price

Examples below:

DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup.The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that.It made three or four large servings of soup.It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound.The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does.

RESULT
[
 {'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
 {'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
 {'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
 {'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
 {'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]

DOCUMENT
"""

# Escape the prompt so it can be safely embedded in the SQL expression.
escaped_prompt = json.dumps(opinion_prompt)

# Read the source table and limit to 10 rows.
df = spark.table("catalog.schema.product_reviews").limit(10)

# Apply the LLM inference to each row, concatenating the prompt, the review text, and the tail string.
result_df = df.withColumn(
    "result",
    expr(f"ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat({escaped_prompt}, REVIEW_TEXT, '\\n\\nRESULT\\n'))")
)

# Display the result DataFrame.
display(result_df)

AI-funktioner med strukturerad strömning

Tillämpa AI-slutledning i scenarier i nära realtid eller mikrobatchscenario med hjälp av Strukturerad Streaming ai_query och .

Steg 1. Läs din statiska Delta-tabell

Läs din statiska Delta-tabell som om den vore en ström.


from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs")  # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")

# Define the prompt outside the SQL expression.
prompt = (
    "You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
    "Do not include extra commentary or suggestions. Document: "
)

Steg 2. Använd ai_query

Spark bearbetar detta bara en gång för statiska data om inte nya rader tas emot i tabellen.


df_transformed = df_stream.select(
    "document_text",
    F.expr(f"""
      ai_query(
        'databricks-meta-llama-3-1-8b-instruct',
        CONCAT('{prompt}', document_text)
      )
    """).alias("summary")
)

Steg 3: Skriv sammanfattade utdata

Skriva sammanfattade utdata till en annan Delta-tabell


# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
    .outputMode("append") \
    .toTable("enterprise.docs_summary")

query.awaitTermination()

Visa kostnader för batch-inferensjobb

I följande exempel visas hur du filtrerar batchinferensarbetsbelastningar baserat på jobb, beräkningsresurser, SQL-datavaruhus och Lakeflow-deklarativa pipelines.

Se Övervaka kostnader för modellbetjäning för allmänna exempel på hur du visar kostnader för batchinferens och AI Functions-användande arbetsbelastningar.

Jobb

Följande fråga visar vilka jobb som används för batchinferens med hjälp av systemtabellen system.workflow.jobs . Se Övervaka prestanda för jobbkostnader & med systemtabeller.


SELECT *
FROM system.billing.usage u
  JOIN system.workflow.jobs x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.job_id = x.job_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";

Räkna ut

Följande visar vilka kluster som används för batchinferens med hjälp av systemtabellen system.compute.clusters .

SELECT *
FROM system.billing.usage u
  JOIN system.compute.clusters x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.cluster_id = x.cluster_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";

Pipeline för deklarativa lakeflow-pipelines

Följande visar vilka Lakeflow Deklarativa pipelines som används för batchinferens med hjälp av systemtabellen system.lakeflow.pipelines .

SELECT *
FROM system.billing.usage u
  JOIN system.lakeflow.pipelines x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.dlt_pipeline_id = x.pipeline_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";

SQL-lager

Följande visar vilka Lakeflow Deklarativa pipelines som används för batchinferens med hjälp av systemtabellen system.compute.warehouses .

SELECT *
FROM system.billing.usage u
  JOIN system.compute.warehouses x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.warehouse_id = x.warehouse_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";