Notes
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Important
Cette fonctionnalité est disponible en préversion publique.
Cet article explique comment effectuer une inférence par lots à l’aide de ai Functions à grande échelle. Les exemples de cet article sont recommandés pour les scénarios de production, notamment le déploiement de pipelines d’inférence par lots en tant que workflows planifiés et l’utilisation de ai_query
et d’un modèle de base hébergé par Databricks pour Structured Streaming.
Pour commencer à utiliser AI Functions, Databricks recommande d’utiliser l’une des options suivantes :
Exigences
- Un espace de travail dans une région prise en charge par les API Foundation Model.
- Databricks Runtime 15.4 LTS ou version ultérieure est requis pour les charges de travail d’inférence par lots à l’aide d’AI Functions.
- Interrogez l’autorisation sur la table Delta dans le catalogue Unity qui contient les données que vous souhaitez utiliser.
- Définissez les
pipelines.channel
propriétés de la table sur « préversion » pour utiliserai_query()
. Consultez Conditions requises pour obtenir un exemple de requête.
Inférence LLM par lot en utilisant des fonctions IA spécifiques à la tâche
Vous pouvez exécuter l’inférence par lots à l’aide de fonctions IA spécifiques à la tâche. Consultez Déployer des pipelines d’inférence par lots pour obtenir des conseils sur la façon d’incorporer votre fonction IA spécifique à une tâche dans un pipeline.
Voici un exemple d’utilisation de la fonction IA spécifique à la tâche : ai_translate
SELECT
writer_summary,
ai_translate(writer_summary, "cn") as cn_translation
from user.batch.news_summaries
limit 500
;
Inférence LLM par lot en utilisant ai_query
Vous pouvez utiliser la fonction ai_query
IA à usage général pour effectuer l’inférence par lots. Découvrez les types de modèles et les modèles associés qui ai_query
prennent en charge.
Les exemples de cette section se concentrent sur la flexibilité et la façon de ai_query
l’utiliser dans les pipelines d’inférence par lots et les flux de travail.
ai_query
et les modèles de base hébergés par Databricks
Lorsque vous utilisez un modèle de base hébergé et préprovisionné Databricks pour l’inférence par lots, Databricks configure un point de terminaison de débit provisionné pour votre compte qui s’adapte automatiquement en fonction de la charge de travail.
Pour utiliser cette méthode pour l’inférence par lots, spécifiez ce qui suit dans votre demande :
- Le grand modèle de langage (LLM) pré-approvisionné que vous souhaitez utiliser dans
ai_query
. Sélectionnez parmi les LLMs préprovisionnés pris en charge. Ces LLM préprovisionnés sont soumis à des licences permissives et à des politiques d'utilisation, consultez Licences et conditions applicables aux développeurs de modèles. - Table d’entrée et table de sortie du catalogue Unity.
- La requête de modèle et les éventuels paramètres du modèle.
SELECT text, ai_query(
"databricks-meta-llama-3-1-8b-instruct",
"Summarize the given text comprehensively, covering key points and main ideas concisely while retaining relevant details and examples. Ensure clarity and accuracy without unnecessary repetition or omissions: " || text
) AS summary
FROM uc_catalog.schema.table;
ai_query
et des modèles de base personnalisés ou affinés
Les exemples de notebooks de cette section illustrent les charges de travail d’inférence par lot qui utilisent des modèles de base personnalisés ou affinés pour traiter plusieurs entrées. Les exemples nécessitent un point de terminaison de service du modèle existant qui utilise le débit provisionné des API Foundation Model.
Inférence LLM par lot en utilisant un modèle d’incorporations
L’exemple de notebook suivant crée un point de terminaison avec débit approvisionné, et exécute l’inférence LLM par lot à l’aide de Python et de votre choix de modèle d’incorporations GTE Large (anglais) ou BGE Large (anglais).
Incorporations d’inférence LLM par lot avec un notebook de point de terminaison avec débit approvisionné
Obtenir un ordinateur portable
Inférence par lots et extraction de données structurées
L’exemple de notebook suivant montre comment effectuer une extraction de données structurées de base à l’aide de ai_query
pour transformer des données brutes et non structurées en informations organisées et utilisables grâce à des techniques d’extraction automatisée. Ce notebook montre également comment utiliser Évaluation de l’agent Mosaic AI pour évaluer l’exactitude en utilisant des données de vérité terrain.
Bloc-notes d’inférence batch et d’extraction de données structurées
Obtenir un ordinateur portable
Inférence par lots à l’aide de BERT pour la reconnaissance d’entité nommée
Le notebook suivant montre un exemple d’inférence par lot d’un modèle ML traditionnel utilisant BERT.
Notebook d’inférence par lots en utilisant BERT pour la reconnaissance des entités nommées
Obtenir un ordinateur portable
Déployer des pipelines d’inférence par lots
Cette section montre comment intégrer AI Functions à d’autres données Databricks et produits IA pour créer des pipelines d’inférence de lots complets. Ces pipelines peuvent effectuer des workflows de bout en bout qui incluent l’ingestion, le prétraitement, l’inférence et le post-traitement. Les pipelines peuvent être créés dans SQL ou Python et déployés en tant que :
- Pipelines déclaratifs Lakeflow
- Flux de travail planifiés avec Databricks Workflows
- Flux de traitement d'inférence en continu en utilisant Structured Streaming
Effectuer une inférence par lots incrémentielle sur des pipelines déclaratifs Lakeflow
L’exemple suivant effectue une inférence par lots incrémentielle à l’aide de pipelines déclaratifs Lakeflow pour le moment où les données sont mises à jour en continu.
Étape 1 : Ingérer des données brutes d’actualités à partir d’un volume
SQL
CREATE OR REFRESH STREAMING TABLE news_raw
COMMENT "Raw news articles ingested from volume."
AS SELECT *
FROM STREAM(read_files(
'/Volumes/databricks_news_summarization_benchmarking_data/v01/csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE',
multiLine => 'true'
));
Python
Importer les packages et définir le schéma JSON de la réponse LLM en tant que variable Python
import dlt
from pyspark.sql.functions import expr, get_json_object, concat
news_extraction_schema = (
'{"type": "json_schema", "json_schema": {"name": "news_extraction", '
'"schema": {"type": "object", "properties": {"title": {"type": "string"}, '
'"category": {"type": "string", "enum": ["Politics", "Sports", "Technology", '
'"Health", "Entertainment", "Business"]}}}, "strict": true}}'
)
Ingérer vos données à partir d’un volume de catalogue Unity.
@dlt.table(
comment="Raw news articles ingested from volume."
)
def news_raw():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.option("mode", "PERMISSIVE")
.option("multiLine", "true")
.load("/Volumes/databricks_news_summarization_benchmarking_data/v01/csv")
)
Étape 2 : Appliquer l’inférence LLM pour extraire le titre et la catégorie
SQL
CREATE OR REFRESH MATERIALIZED VIEW news_categorized
COMMENT "Extract category and title from news articles using LLM inference."
AS
SELECT
inputs,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Extract the category of the following news article: " || inputs,
responseFormat => '{
"type": "json_schema",
"json_schema": {
"name": "news_extraction",
"schema": {
"type": "object",
"properties": {
"title": { "type": "string" },
"category": {
"type": "string",
"enum": ["Politics", "Sports", "Technology", "Health", "Entertainment", "Business"]
}
}
},
"strict": true
}
}'
) AS meta_data
FROM news_raw
LIMIT 2;
Python
@dlt.table(
comment="Extract category and title from news articles using LLM inference."
)
def news_categorized():
# Limit the number of rows to 2 as in the SQL version
df_raw = spark.read.table("news_raw").limit(2)
# Inject the JSON schema variable into the ai_query call using an f-string.
return df_raw.withColumn(
"meta_data",
expr(
f"ai_query('databricks-meta-llama-3-3-70b-instruct', "
f"concat('Extract the category of the following news article: ', inputs), "
f"responseFormat => '{news_extraction_schema}')"
)
)
Étape 3 : Valider la sortie d’inférence LLM avant la synthèse
SQL
CREATE OR REFRESH MATERIALIZED VIEW news_validated (
CONSTRAINT valid_title EXPECT (size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3),
CONSTRAINT valid_category EXPECT (get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business'))
)
COMMENT "Validated news articles ensuring the title has at least 3 words and the category is valid."
AS
SELECT *
FROM news_categorized;
Python
@dlt.table(
comment="Validated news articles ensuring the title has at least 3 words and the category is valid."
)
@dlt.expect("valid_title", "size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3")
@dlt.expect_or_fail("valid_category", "get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business')")
def news_validated():
return spark.read.table("news_categorized")
Étape 4 : Résumer les articles d’actualités à partir des données validées
SQL
CREATE OR REFRESH MATERIALIZED VIEW news_summarized
COMMENT "Summarized political news articles after validation."
AS
SELECT
get_json_object(meta_data, '$.category') as category,
get_json_object(meta_data, '$.title') as title,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Summarize the following political news article in 2-3 sentences: " || inputs
) AS summary
FROM news_validated;
Python
@dlt.table(
comment="Summarized political news articles after validation."
)
def news_summarized():
df = spark.read.table("news_validated")
return df.select(
get_json_object("meta_data", "$.category").alias("category"),
get_json_object("meta_data", "$.title").alias("title"),
expr(
"ai_query('databricks-meta-llama-3-3-70b-instruct', "
"concat('Summarize the following political news article in 2-3 sentences: ', inputs))"
).alias("summary")
)
Automatiser des travaux d’inférence par lots à l’aide de flux de travail Databricks
Planifiez les travaux d’inférence par lots et automatisez les pipelines IA.
SQL
SELECT
*,
ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..
RESULT
[
{'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
{'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
{'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
{'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
{'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10
Python
import json
from pyspark.sql.functions import expr
# Define the opinion mining prompt as a multi-line string.
opinion_prompt = """You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup.The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that.It made three or four large servings of soup.It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound.The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does.
RESULT
[
{'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
{'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
{'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
{'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
{'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
"""
# Escape the prompt so it can be safely embedded in the SQL expression.
escaped_prompt = json.dumps(opinion_prompt)
# Read the source table and limit to 10 rows.
df = spark.table("catalog.schema.product_reviews").limit(10)
# Apply the LLM inference to each row, concatenating the prompt, the review text, and the tail string.
result_df = df.withColumn(
"result",
expr(f"ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat({escaped_prompt}, REVIEW_TEXT, '\\n\\nRESULT\\n'))")
)
# Display the result DataFrame.
display(result_df)
Fonctions IA à l’aide de Structured Streaming
Appliquez l’inférence IA dans des scénarios en quasi temps réel ou en micro-lots à l’aide ai_query
de Structured Streaming.
Étape 1. Lire votre tableau Delta statique
Lisez votre table Delta statique comme s’il s’agissait d’un flux.
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs") # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")
# Define the prompt outside the SQL expression.
prompt = (
"You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
"Do not include extra commentary or suggestions. Document: "
)
Étape 2. Appliquer ai_query
Spark traite cette opération une seule fois pour les données statiques, sauf si de nouvelles lignes arrivent dans la table.
df_transformed = df_stream.select(
"document_text",
F.expr(f"""
ai_query(
'databricks-meta-llama-3-1-8b-instruct',
CONCAT('{prompt}', document_text)
)
""").alias("summary")
)
Étape 3 : Écrire la sortie résumée
Écrire la sortie résumée dans une autre table Delta
# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
.format("delta") \
.option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
.outputMode("append") \
.toTable("enterprise.docs_summary")
query.awaitTermination()
Afficher les coûts des charges de travail d’inférence par lots
Les exemples suivants montrent comment filtrer les charges de travail d'inférence par lots en fonction des emplois, de l'environnement de calcul, des entrepôts SQL et des pipelines déclaratifs Lakeflow.
Consultez Surveiller les coûts de service de modèle pour obtenir des exemples généraux sur la façon d’afficher les coûts de vos charges de travail d’inférence par lots qui utilisent AI Functions.
Emplois
La requête suivante montre quelles tâches utilisent la system.workflow.jobs
table des systèmes pour l’inférence par lots. Consultez Surveiller les coûts et les performances des tâches à l'aide des tableaux du système.
SELECT *
FROM system.billing.usage u
JOIN system.workflow.jobs x
ON u.workspace_id = x.workspace_id
AND u.usage_metadata.job_id = x.job_id
WHERE u.usage_metadata.workspace_id = <workspace_id>
AND u.billing_origin_product = "MODEL_SERVING"
AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";
Calculer
Les clusters suivants sont utilisés pour l’inférence par lots à l’aide de la system.compute.clusters
table des systèmes.
SELECT *
FROM system.billing.usage u
JOIN system.compute.clusters x
ON u.workspace_id = x.workspace_id
AND u.usage_metadata.cluster_id = x.cluster_id
WHERE u.usage_metadata.workspace_id = <workspace_id>
AND u.billing_origin_product = "MODEL_SERVING"
AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";
Pipelines déclaratifs Lakeflow
L'exemple suivant montre quels pipelines déclaratifs Lakeflow sont utilisés pour l'inférence par lots en utilisant la table des systèmes system.lakeflow.pipelines
.
SELECT *
FROM system.billing.usage u
JOIN system.lakeflow.pipelines x
ON u.workspace_id = x.workspace_id
AND u.usage_metadata.dlt_pipeline_id = x.pipeline_id
WHERE u.usage_metadata.workspace_id = <workspace_id>
AND u.billing_origin_product = "MODEL_SERVING"
AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";
Entrepôt SQL
L'exemple suivant montre quels pipelines déclaratifs Lakeflow sont utilisés pour l'inférence par lots en utilisant la table des systèmes system.compute.warehouses
.
SELECT *
FROM system.billing.usage u
JOIN system.compute.warehouses x
ON u.workspace_id = x.workspace_id
AND u.usage_metadata.warehouse_id = x.warehouse_id
WHERE u.usage_metadata.workspace_id = <workspace_id>
AND u.billing_origin_product = "MODEL_SERVING"
AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";