다음을 통해 공유


배치 추론 파이프라인 배포

중요합니다

이 기능은 공개 미리보기 단계에 있습니다.

이 페이지에서는 AI Functions를 다른 Databricks 데이터 및 AI 제품에 통합하여 완전한 일괄 처리 유추 파이프라인을 빌드하는 방법을 보여 줍니다. 이러한 파이프라인은 수집, 전처리, 유추 및 후처리를 포함하는 엔드 투 엔드 워크플로를 수행할 수 있습니다. 파이프라인은 SQL 또는 Python에서 작성하여 다음과 같이 배포할 수 있습니다.

  • Lakeflow Spark 선언적 파이프라인
  • Databricks 워크플로를 사용한 예약된 워크플로
  • 구조적 스트리밍을 사용하는 스트리밍 유추 워크플로

요구 사항

  • Foundation Model API 지원 지역의 작업 영역입니다.
  • AI Functions를 사용하는 일괄 처리 유추 워크로드에는 Databricks Runtime 15.4 LTS 이상이 필요합니다.
  • 사용할 데이터가 포함된 Unity 카탈로그의 델타 테이블에 대한 쿼리 권한입니다.
  • 테이블 속성에서 pipelines.channel을 '미리 보기'로 설정하여 ai_query()을 사용합니다. 예제 쿼리 에 대한 요구 사항을 참조하세요.

Lakeflow Spark 선언적 파이프라인에서 증분 일괄 추론 수행

다음 예제에서는 데이터가 지속적으로 업데이트되는 경우에 대해 Lakeflow Spark 선언적 파이프라인을 사용하여 증분 일괄 처리 유추를 수행합니다.

1단계: 볼륨에서 원시 뉴스 데이터 수집

SQL

CREATE OR REFRESH STREAMING TABLE news_raw
COMMENT "Raw news articles ingested from volume."
AS SELECT *
FROM STREAM(read_files(
  '/Volumes/databricks_news_summarization_benchmarking_data/v01/csv',
  format => 'csv',
  header => true,
  mode => 'PERMISSIVE',
  multiLine => 'true'
));

파이썬

패키지를 가져오고 LLM 응답의 JSON 스키마를 Python 변수로 정의합니다.

from pyspark import pipelines as dp
from pyspark.sql.functions import expr, get_json_object, concat

news_extraction_schema = (
    '{"type": "json_schema", "json_schema": {"name": "news_extraction", '
    '"schema": {"type": "object", "properties": {"title": {"type": "string"}, '
    '"category": {"type": "string", "enum": ["Politics", "Sports", "Technology", '
    '"Health", "Entertainment", "Business"]}}}, "strict": true}}'
)

Unity 카탈로그 볼륨에서 데이터를 수집합니다.

@dp.table(
  comment="Raw news articles ingested from volume."
)
def news_raw():
  return (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("header", True)
      .option("mode", "PERMISSIVE")
      .option("multiLine", "true")
      .load("/Volumes/databricks_news_summarization_benchmarking_data/v01/csv")
  )

2단계: LLM 유추를 적용하여 제목 및 범주 추출

SQL


CREATE OR REFRESH MATERIALIZED VIEW news_categorized
COMMENT "Extract category and title from news articles using LLM inference."
AS
SELECT
  inputs,
  ai_query(
    "databricks-meta-llama-3-3-70b-instruct",
    "Extract the category of the following news article: " || inputs,
    responseFormat => '{
      "type": "json_schema",
      "json_schema": {
        "name": "news_extraction",
        "schema": {
          "type": "object",
          "properties": {
            "title": { "type": "string" },
            "category": {
              "type": "string",
              "enum": ["Politics", "Sports", "Technology", "Health", "Entertainment", "Business"]
            }
          }
        },
        "strict": true
      }
    }'
  ) AS meta_data
FROM news_raw
LIMIT 2;

파이썬

@dp.materialized_view(
  comment="Extract category and title from news articles using LLM inference."
)
def news_categorized():
  # Limit the number of rows to 2 as in the SQL version
  df_raw = spark.read.table("news_raw").limit(2)
  # Inject the JSON schema variable into the ai_query call using an f-string.
  return df_raw.withColumn(
    "meta_data",
    expr(
      f"ai_query('databricks-meta-llama-3-3-70b-instruct', "
      f"concat('Extract the category of the following news article: ', inputs), "
      f"responseFormat => '{news_extraction_schema}')"
    )
  )

3단계: 요약 전에 LLM 유추 출력의 유효성 검사

SQL

CREATE OR REFRESH MATERIALIZED VIEW news_validated (
  CONSTRAINT valid_title EXPECT (size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3),
  CONSTRAINT valid_category EXPECT (get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business'))
)
COMMENT "Validated news articles ensuring the title has at least 3 words and the category is valid."
AS
SELECT *
FROM news_categorized;

파이썬

@dp.materialized_view(
  comment="Validated news articles ensuring the title has at least 3 words and the category is valid."
)
@dp.expect("valid_title", "size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3")
@dp.expect_or_fail("valid_category", "get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business')")
def news_validated():
  return spark.read.table("news_categorized")

4단계: 유효성이 검사된 데이터의 뉴스 기사 요약

SQL

CREATE OR REFRESH MATERIALIZED VIEW news_summarized
COMMENT "Summarized political news articles after validation."
AS
SELECT
  get_json_object(meta_data, '$.category') as category,
  get_json_object(meta_data, '$.title') as title,
  ai_query(
    "databricks-meta-llama-3-3-70b-instruct",
    "Summarize the following political news article in 2-3 sentences: " || inputs
  ) AS summary
FROM news_validated;

파이썬

@dp.materialized_view(
  comment="Summarized political news articles after validation."
)
def news_summarized():
  df = spark.read.table("news_validated")
  return df.select(
    get_json_object("meta_data", "$.category").alias("category"),
    get_json_object("meta_data", "$.title").alias("title"),
    expr(
      "ai_query('databricks-meta-llama-3-3-70b-instruct', "
      "concat('Summarize the following political news article in 2-3 sentences: ', inputs))"
    ).alias("summary")
  )

Databricks 워크플로를 사용하여 일괄 처리 유추 작업 자동화

일괄 처리 유추 작업을 예약하고 AI 파이프라인을 자동화합니다.

SQL

SELECT
   *,
   ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..
RESULT
[
 {'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
 {'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
 {'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
 {'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
 {'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10

파이썬


import json
from pyspark.sql.functions import expr

# Define the opinion mining prompt as a multi-line string.
opinion_prompt = """You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup.The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that.It made three or four large servings of soup.It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound.The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does.
RESULT
[
 {'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
 {'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
 {'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
 {'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
 {'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
"""

# Escape the prompt so it can be safely embedded in the SQL expression.
escaped_prompt = json.dumps(opinion_prompt)

# Read the source table and limit to 10 rows.
df = spark.table("catalog.schema.product_reviews").limit(10)

# Apply the LLM inference to each row, concatenating the prompt, the review text, and the tail string.
result_df = df.withColumn(
    "result",
    expr(f"ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat({escaped_prompt}, REVIEW_TEXT, '\\n\\nRESULT\\n'))")
)

# Display the result DataFrame.
display(result_df)

구조적 스트리밍을 사용하는 AI 함수

구조적 스트리밍을 사용하여 거의 실시간 또는 마이크로 일괄 처리 시나리오에서 AI 추론을 적용합니다.

1단계. 정적 델타 테이블 읽기

정적 델타 테이블을 스트림인 것처럼 읽습니다.


from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs")  # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")

# Define the prompt outside the SQL expression.
prompt = (
    "You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
    "Do not include extra commentary or suggestions. Document: "
)

2단계. 적용하다 ai_query

Spark는 새 행이 테이블에 도착하지 않는 한 정적 데이터에 대해 이 작업을 한 번만 처리합니다.


df_transformed = df_stream.select(
    "document_text",
    F.expr(f"""
      ai_query(
        'databricks-meta-llama-3-1-8b-instruct',
        CONCAT('{prompt}', document_text)
      )
    """).alias("summary")
)

3단계: 요약된 출력 작성

요약된 출력을 다른 델타 테이블에 씁니다.


# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
    .outputMode("append") \
    .toTable("enterprise.docs_summary")

query.awaitTermination()