Partager via


Effectuer une inférence par lots LLM à l'aide de fonctions d'intelligence artificielle

Important

Cette fonctionnalité est disponible en préversion publique.

Cet article explique comment effectuer une inférence par lots à l’aide de ai Functions à grande échelle. Les exemples de cet article sont recommandés pour les scénarios de production, notamment le déploiement de pipelines d’inférence par lots en tant que workflows planifiés et l’utilisation de ai_query et d’un modèle de base hébergé par Databricks pour Structured Streaming.

Pour commencer à utiliser AI Functions, Databricks recommande d’utiliser l’une des options suivantes :

Exigences

  • Un espace de travail dans une région prise en charge par les API Foundation Model.
  • Databricks Runtime 15.4 LTS ou version ultérieure est requis pour les charges de travail d’inférence par lots à l’aide d’AI Functions.
  • Interrogez l’autorisation sur la table Delta dans le catalogue Unity qui contient les données que vous souhaitez utiliser.
  • Définissez les pipelines.channel propriétés de la table sur « préversion » pour utiliser ai_query(). Consultez Conditions requises pour obtenir un exemple de requête.

Inférence LLM par lot en utilisant des fonctions IA spécifiques à la tâche

Vous pouvez exécuter l’inférence par lots à l’aide de fonctions IA spécifiques à la tâche. Consultez Déployer des pipelines d’inférence par lots pour obtenir des conseils sur la façon d’incorporer votre fonction IA spécifique à une tâche dans un pipeline.

Voici un exemple d’utilisation de la fonction IA spécifique à la tâche : ai_translate

SELECT
writer_summary,
  ai_translate(writer_summary, "cn") as cn_translation
from user.batch.news_summaries
limit 500
;

Inférence LLM par lot en utilisant ai_query

Vous pouvez utiliser la fonction ai_query IA à usage général pour effectuer l’inférence par lots. Découvrez les types de modèles et les modèles associés qui ai_query prennent en charge.

Les exemples de cette section se concentrent sur la flexibilité et la façon de ai_query l’utiliser dans les pipelines d’inférence par lots et les flux de travail.

ai_query et les modèles de base hébergés par Databricks

Lorsque vous utilisez un modèle de base hébergé et préprovisionné Databricks pour l’inférence par lots, Databricks configure un point de terminaison de débit provisionné pour votre compte qui s’adapte automatiquement en fonction de la charge de travail.

Pour utiliser cette méthode pour l’inférence par lots, spécifiez ce qui suit dans votre demande :

  • Le grand modèle de langage (LLM) pré-approvisionné que vous souhaitez utiliser dans ai_query. Sélectionnez parmi les LLMs préprovisionnés pris en charge. Ces LLM préprovisionnés sont soumis à des licences permissives et à des politiques d'utilisation, consultez Licences et conditions applicables aux développeurs de modèles.
  • Table d’entrée et table de sortie du catalogue Unity.
  • La requête de modèle et les éventuels paramètres du modèle.
SELECT text, ai_query(
    "databricks-meta-llama-3-1-8b-instruct",
    "Summarize the given text comprehensively, covering key points and main ideas concisely while retaining relevant details and examples. Ensure clarity and accuracy without unnecessary repetition or omissions: " || text
) AS summary
FROM uc_catalog.schema.table;

ai_query et des modèles de base personnalisés ou affinés

Les exemples de notebooks de cette section illustrent les charges de travail d’inférence par lot qui utilisent des modèles de base personnalisés ou affinés pour traiter plusieurs entrées. Les exemples nécessitent un point de terminaison de service du modèle existant qui utilise le débit provisionné des API Foundation Model.

Inférence LLM par lot en utilisant un modèle d’incorporations

L’exemple de notebook suivant crée un point de terminaison avec débit approvisionné, et exécute l’inférence LLM par lot à l’aide de Python et de votre choix de modèle d’incorporations GTE Large (anglais) ou BGE Large (anglais).

Incorporations d’inférence LLM par lot avec un notebook de point de terminaison avec débit approvisionné

Obtenir un ordinateur portable

Inférence par lots et extraction de données structurées

L’exemple de notebook suivant montre comment effectuer une extraction de données structurées de base à l’aide de ai_query pour transformer des données brutes et non structurées en informations organisées et utilisables grâce à des techniques d’extraction automatisée. Ce notebook montre également comment utiliser Évaluation de l’agent Mosaic AI pour évaluer l’exactitude en utilisant des données de vérité terrain.

Bloc-notes d’inférence batch et d’extraction de données structurées

Obtenir un ordinateur portable

Inférence par lots à l’aide de BERT pour la reconnaissance d’entité nommée

Le notebook suivant montre un exemple d’inférence par lot d’un modèle ML traditionnel utilisant BERT.

Notebook d’inférence par lots en utilisant BERT pour la reconnaissance des entités nommées

Obtenir un ordinateur portable

Déployer des pipelines d’inférence par lots

Cette section montre comment intégrer AI Functions à d’autres données Databricks et produits IA pour créer des pipelines d’inférence de lots complets. Ces pipelines peuvent effectuer des workflows de bout en bout qui incluent l’ingestion, le prétraitement, l’inférence et le post-traitement. Les pipelines peuvent être créés dans SQL ou Python et déployés en tant que :

  • Pipelines déclaratifs Lakeflow
  • Flux de travail planifiés avec Databricks Workflows
  • Flux de traitement d'inférence en continu en utilisant Structured Streaming

Effectuer une inférence par lots incrémentielle sur des pipelines déclaratifs Lakeflow

L’exemple suivant effectue une inférence par lots incrémentielle à l’aide de pipelines déclaratifs Lakeflow pour le moment où les données sont mises à jour en continu.

Étape 1 : Ingérer des données brutes d’actualités à partir d’un volume

SQL

CREATE OR REFRESH STREAMING TABLE news_raw
COMMENT "Raw news articles ingested from volume."
AS SELECT *
FROM STREAM(read_files(
  '/Volumes/databricks_news_summarization_benchmarking_data/v01/csv',
  format => 'csv',
  header => true,
  mode => 'PERMISSIVE',
  multiLine => 'true'
));
Python

Importer les packages et définir le schéma JSON de la réponse LLM en tant que variable Python


import dlt
from pyspark.sql.functions import expr, get_json_object, concat

news_extraction_schema = (
    '{"type": "json_schema", "json_schema": {"name": "news_extraction", '
    '"schema": {"type": "object", "properties": {"title": {"type": "string"}, '
    '"category": {"type": "string", "enum": ["Politics", "Sports", "Technology", '
    '"Health", "Entertainment", "Business"]}}}, "strict": true}}'
)

Ingérer vos données à partir d’un volume de catalogue Unity.

@dlt.table(
  comment="Raw news articles ingested from volume."
)
def news_raw():
  return (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("header", True)
      .option("mode", "PERMISSIVE")
      .option("multiLine", "true")
      .load("/Volumes/databricks_news_summarization_benchmarking_data/v01/csv")
  )

Étape 2 : Appliquer l’inférence LLM pour extraire le titre et la catégorie

SQL

CREATE OR REFRESH MATERIALIZED VIEW news_categorized
COMMENT "Extract category and title from news articles using LLM inference."
AS
SELECT
  inputs,
  ai_query(
    "databricks-meta-llama-3-3-70b-instruct",
    "Extract the category of the following news article: " || inputs,
    responseFormat => '{
      "type": "json_schema",
      "json_schema": {
        "name": "news_extraction",
        "schema": {
          "type": "object",
          "properties": {
            "title": { "type": "string" },
            "category": {
              "type": "string",
              "enum": ["Politics", "Sports", "Technology", "Health", "Entertainment", "Business"]
            }
          }
        },
        "strict": true
      }
    }'
  ) AS meta_data
FROM news_raw
LIMIT 2;
Python
@dlt.table(
  comment="Extract category and title from news articles using LLM inference."
)
def news_categorized():
  # Limit the number of rows to 2 as in the SQL version
  df_raw = spark.read.table("news_raw").limit(2)
  # Inject the JSON schema variable into the ai_query call using an f-string.
  return df_raw.withColumn(
    "meta_data",
    expr(
      f"ai_query('databricks-meta-llama-3-3-70b-instruct', "
      f"concat('Extract the category of the following news article: ', inputs), "
      f"responseFormat => '{news_extraction_schema}')"
    )
  )

Étape 3 : Valider la sortie d’inférence LLM avant la synthèse

SQL
CREATE OR REFRESH MATERIALIZED VIEW news_validated (
  CONSTRAINT valid_title EXPECT (size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3),
  CONSTRAINT valid_category EXPECT (get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business'))
)
COMMENT "Validated news articles ensuring the title has at least 3 words and the category is valid."
AS
SELECT *
FROM news_categorized;
Python
@dlt.table(
  comment="Validated news articles ensuring the title has at least 3 words and the category is valid."
)
@dlt.expect("valid_title", "size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3")
@dlt.expect_or_fail("valid_category", "get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business')")
def news_validated():
  return spark.read.table("news_categorized")

Étape 4 : Résumer les articles d’actualités à partir des données validées

SQL
CREATE OR REFRESH MATERIALIZED VIEW news_summarized
COMMENT "Summarized political news articles after validation."
AS
SELECT
  get_json_object(meta_data, '$.category') as category,
  get_json_object(meta_data, '$.title') as title,
  ai_query(
    "databricks-meta-llama-3-3-70b-instruct",
    "Summarize the following political news article in 2-3 sentences: " || inputs
  ) AS summary
FROM news_validated;
Python

@dlt.table(
  comment="Summarized political news articles after validation."
)
def news_summarized():
  df = spark.read.table("news_validated")
  return df.select(
    get_json_object("meta_data", "$.category").alias("category"),
    get_json_object("meta_data", "$.title").alias("title"),
    expr(
      "ai_query('databricks-meta-llama-3-3-70b-instruct', "
      "concat('Summarize the following political news article in 2-3 sentences: ', inputs))"
    ).alias("summary")
  )

Automatiser des travaux d’inférence par lots à l’aide de flux de travail Databricks

Planifiez les travaux d’inférence par lots et automatisez les pipelines IA.

SQL

SELECT
   *,
   ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.


AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price


Examples below:


DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..


RESULT
[
 {'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
 {'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
 {'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
 {'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
 {'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]


DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10

Python


import json
from pyspark.sql.functions import expr

# Define the opinion mining prompt as a multi-line string.
opinion_prompt = """You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.

AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price

Examples below:

DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup.The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that.It made three or four large servings of soup.It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound.The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does.

RESULT
[
 {'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
 {'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
 {'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
 {'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
 {'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]

DOCUMENT
"""

# Escape the prompt so it can be safely embedded in the SQL expression.
escaped_prompt = json.dumps(opinion_prompt)

# Read the source table and limit to 10 rows.
df = spark.table("catalog.schema.product_reviews").limit(10)

# Apply the LLM inference to each row, concatenating the prompt, the review text, and the tail string.
result_df = df.withColumn(
    "result",
    expr(f"ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat({escaped_prompt}, REVIEW_TEXT, '\\n\\nRESULT\\n'))")
)

# Display the result DataFrame.
display(result_df)

Fonctions IA à l’aide de Structured Streaming

Appliquez l’inférence IA dans des scénarios en quasi temps réel ou en micro-lots à l’aide ai_query de Structured Streaming.

Étape 1. Lire votre tableau Delta statique

Lisez votre table Delta statique comme s’il s’agissait d’un flux.


from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs")  # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")

# Define the prompt outside the SQL expression.
prompt = (
    "You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
    "Do not include extra commentary or suggestions. Document: "
)

Étape 2. Appliquer ai_query

Spark traite cette opération une seule fois pour les données statiques, sauf si de nouvelles lignes arrivent dans la table.


df_transformed = df_stream.select(
    "document_text",
    F.expr(f"""
      ai_query(
        'databricks-meta-llama-3-1-8b-instruct',
        CONCAT('{prompt}', document_text)
      )
    """).alias("summary")
)

Étape 3 : Écrire la sortie résumée

Écrire la sortie résumée dans une autre table Delta


# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
    .outputMode("append") \
    .toTable("enterprise.docs_summary")

query.awaitTermination()

Afficher les coûts des charges de travail d’inférence par lots

Les exemples suivants montrent comment filtrer les charges de travail d'inférence par lots en fonction des emplois, de l'environnement de calcul, des entrepôts SQL et des pipelines déclaratifs Lakeflow.

Consultez Surveiller les coûts de service de modèle pour obtenir des exemples généraux sur la façon d’afficher les coûts de vos charges de travail d’inférence par lots qui utilisent AI Functions.

Emplois

La requête suivante montre quelles tâches utilisent la system.workflow.jobs table des systèmes pour l’inférence par lots. Consultez Surveiller les coûts et les performances des tâches à l'aide des tableaux du système.


SELECT *
FROM system.billing.usage u
  JOIN system.workflow.jobs x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.job_id = x.job_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";

Calculer

Les clusters suivants sont utilisés pour l’inférence par lots à l’aide de la system.compute.clusters table des systèmes.

SELECT *
FROM system.billing.usage u
  JOIN system.compute.clusters x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.cluster_id = x.cluster_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";

Pipelines déclaratifs Lakeflow

L'exemple suivant montre quels pipelines déclaratifs Lakeflow sont utilisés pour l'inférence par lots en utilisant la table des systèmes system.lakeflow.pipelines.

SELECT *
FROM system.billing.usage u
  JOIN system.lakeflow.pipelines x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.dlt_pipeline_id = x.pipeline_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";

Entrepôt SQL

L'exemple suivant montre quels pipelines déclaratifs Lakeflow sont utilisés pour l'inférence par lots en utilisant la table des systèmes system.compute.warehouses.

SELECT *
FROM system.billing.usage u
  JOIN system.compute.warehouses x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.warehouse_id = x.warehouse_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";