다음을 통해 공유


AI 함수를 사용하여 일괄 처리 LLM 유추 수행

중요한

이 기능은 공개 미리 보기로 제공됩니다.

이 문서에서는 대규모 AI Functions 를 사용하여 일괄 처리 유추를 수행하는 방법을 설명합니다. 이 문서의 예제는 일괄 처리 유추 파이프라인을 예약된 워크플로로 배포하고 구조적 스트리밍을 위한 Databricks 호스팅 기반 모델을 사용하는 ai_query 등의 프로덕션 시나리오에 권장됩니다.

AI Functions 사용을 시작하기 위해 Databricks는 다음 중 하나를 사용하는 것이 좋습니다.

요구 사항

  • Foundation Model API 지원 지역의 작업 영역입니다.
  • AI Functions를 사용하는 일괄 처리 유추 워크로드에는 Databricks Runtime 15.4 LTS 이상이 필요합니다.
  • 사용할 데이터가 포함된 Unity 카탈로그의 델타 테이블에 대한 쿼리 권한입니다.
  • 테이블 속성에서 pipelines.channel를 '미리 보기'로 설정하여 ai_query()을/를 사용합니다. 예제 쿼리 에 대한 요구 사항을 참조하세요.

작업별 AI 함수를 사용하여 배치 LLM 추론

작업별 AI 함수를 사용하여 일괄 처리 유추를 실행할 수 있습니다. 작업별 AI 함수를 파이프라인에 통합하는 방법에 대한 지침은 일괄 처리 유추 파이프라인 배포 를 참조하세요.

다음은 작업별 AI 함수 ai_translate를 사용하는 예입니다.

SELECT
writer_summary,
  ai_translate(writer_summary, "cn") as cn_translation
from user.batch.news_summaries
limit 500
;

ai_query를 사용하여 배치 LLM 추론

범용 AI 함수 ai_query 를 사용하여 일괄 처리 유추를 수행할 수 있습니다. 지원하는 모델 유형 및 관련 모델을ai_query 확인합니다.

이 섹션의 예제는 ai_query의 유연성과 그것을 일괄 처리 추론 파이프라인 및 워크플로에서 사용하는 방법에 중점을 둡니다.

ai_query 및 Databricks에서 호스팅하는 기반 모델

일괄 처리 유추에 Databricks 호스팅 및 미리 프로비전된 기본 모델을 사용하는 경우 Databricks는 워크로드에 따라 자동으로 스케일링되는 프로비전된 처리량 엔드포인트를 대신 구성합니다.

일괄 처리 유추에 이 메서드를 사용하려면 요청에 다음을 지정합니다.

  • 미리 프로비전된 LLM을 ai_query에서 사용하려고 합니다. 지원되는 미리 프로비전된 LLM 중에서 선택합니다. 미리 프로비전된 이러한 LLM에는 허용 라이선스 및 사용 정책이 적용됩니다. 해당 모델 개발자 라이선스 및 사용 약관을 참조하세요.
  • Unity 카탈로그 입력 테이블 및 출력 테이블입니다.
  • 모델 프롬프트 및 모든 모델 매개 변수입니다.
SELECT text, ai_query(
    "databricks-meta-llama-3-1-8b-instruct",
    "Summarize the given text comprehensively, covering key points and main ideas concisely while retaining relevant details and examples. Ensure clarity and accuracy without unnecessary repetition or omissions: " || text
) AS summary
FROM uc_catalog.schema.table;

ai_query 사용자 지정 또는 미세 조정된 기본 모델

이 섹션의 Notebook 예제에서는 사용자 지정 또는 미세 조정된 기초 모델을 사용하여 여러 입력을 처리하는 일괄 처리 유추 워크로드를 보여 줍니다. 이 예제에서는 Foundation Model API 프로비전된 처리량을 사용하는 엔드포인트를 제공하는 기존 모델이 필요합니다.

LLM 일괄 추론에 embeddings 모델 사용

다음 예제 Notebook은 프로비전된 처리량 엔드포인트를 생성하고, Python을 사용하여 일괄 LLM 추론을 실행하며, GTE Large(영어) 또는 BGE Large(영어) 임베딩 모델 중 하나를 선택합니다.

LLM 임베딩 일괄 추론을 위한 프로비저닝된 처리량 엔드포인트 노트북

전자 필기장 가져오기

일괄 처리 유추 및 구조적 데이터 추출

다음 예제 Notebook에서는 ai_query 사용하여 기본 구조화된 데이터 추출을 수행하여 자동화된 추출 기술을 통해 원시 구조화되지 않은 데이터를 구성되고 사용 가능한 정보로 변환하는 방법을 보여 줍니다. 이 노트북은 Mosaic AI 에이전트 평가를 활용하여 실제 데이터를 사용해 정확성을 평가하는 방법을 보여줍니다.

일괄 처리 유추 및 구조적 데이터 추출 Notebook

전자 필기장 가져오기

명명된 엔터티 인식에 BERT를 사용하는 일괄 처리 유추

다음 Notebook은 BERT를 사용하는 기존 ML 모델 일괄 처리 유추 예제를 보여 있습니다.

개체명 인식을 위한 BERT 기반의 일괄 추론 노트북

전자 필기장 가져오기

배치 추론 파이프라인 배포

이 섹션에서는 AI Functions를 다른 Databricks 데이터 및 AI 제품에 통합하여 완전한 일괄 처리 유추 파이프라인을 빌드하는 방법을 보여 줍니다. 이러한 파이프라인은 수집, 전처리, 유추 및 후처리를 포함하는 종단 간 워크플로를 수행할 수 있습니다. 파이프라인은 SQL 또는 Python에서 작성하여 다음과 같이 배포할 수 있습니다.

  • Lakeflow의 선언적 파이프라인
  • Databricks 워크플로를 사용하여 예약된 작업
  • 구조적 스트리밍을 사용하는 스트리밍 유추 워크플로

Lakeflow 선언형 파이프라인에서 증분 배치 추론 수행

다음 예제에서는 데이터가 지속적으로 업데이트되는 경우에 대해 Lakeflow 선언적 파이프라인을 사용하여 증분 일괄 처리 유추를 수행합니다.

1단계: 볼륨에서 원시 뉴스 데이터 수집

SQL (영문)

CREATE OR REFRESH STREAMING TABLE news_raw
COMMENT "Raw news articles ingested from volume."
AS SELECT *
FROM STREAM(read_files(
  '/Volumes/databricks_news_summarization_benchmarking_data/v01/csv',
  format => 'csv',
  header => true,
  mode => 'PERMISSIVE',
  multiLine => 'true'
));
파이썬

패키지를 가져오고 LLM 응답의 JSON 스키마를 Python 변수로 정의합니다.


import dlt
from pyspark.sql.functions import expr, get_json_object, concat

news_extraction_schema = (
    '{"type": "json_schema", "json_schema": {"name": "news_extraction", '
    '"schema": {"type": "object", "properties": {"title": {"type": "string"}, '
    '"category": {"type": "string", "enum": ["Politics", "Sports", "Technology", '
    '"Health", "Entertainment", "Business"]}}}, "strict": true}}'
)

Unity 카탈로그 볼륨에서 데이터를 수집합니다.

@dlt.table(
  comment="Raw news articles ingested from volume."
)
def news_raw():
  return (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("header", True)
      .option("mode", "PERMISSIVE")
      .option("multiLine", "true")
      .load("/Volumes/databricks_news_summarization_benchmarking_data/v01/csv")
  )

2단계: LLM 유추를 적용하여 제목 및 범주 추출

SQL (영문)

CREATE OR REFRESH MATERIALIZED VIEW news_categorized
COMMENT "Extract category and title from news articles using LLM inference."
AS
SELECT
  inputs,
  ai_query(
    "databricks-meta-llama-3-3-70b-instruct",
    "Extract the category of the following news article: " || inputs,
    responseFormat => '{
      "type": "json_schema",
      "json_schema": {
        "name": "news_extraction",
        "schema": {
          "type": "object",
          "properties": {
            "title": { "type": "string" },
            "category": {
              "type": "string",
              "enum": ["Politics", "Sports", "Technology", "Health", "Entertainment", "Business"]
            }
          }
        },
        "strict": true
      }
    }'
  ) AS meta_data
FROM news_raw
LIMIT 2;
파이썬
@dlt.table(
  comment="Extract category and title from news articles using LLM inference."
)
def news_categorized():
  # Limit the number of rows to 2 as in the SQL version
  df_raw = spark.read.table("news_raw").limit(2)
  # Inject the JSON schema variable into the ai_query call using an f-string.
  return df_raw.withColumn(
    "meta_data",
    expr(
      f"ai_query('databricks-meta-llama-3-3-70b-instruct', "
      f"concat('Extract the category of the following news article: ', inputs), "
      f"responseFormat => '{news_extraction_schema}')"
    )
  )

3단계: 요약 전에 LLM 유추 출력의 유효성 검사

SQL (영문)
CREATE OR REFRESH MATERIALIZED VIEW news_validated (
  CONSTRAINT valid_title EXPECT (size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3),
  CONSTRAINT valid_category EXPECT (get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business'))
)
COMMENT "Validated news articles ensuring the title has at least 3 words and the category is valid."
AS
SELECT *
FROM news_categorized;
파이썬
@dlt.table(
  comment="Validated news articles ensuring the title has at least 3 words and the category is valid."
)
@dlt.expect("valid_title", "size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3")
@dlt.expect_or_fail("valid_category", "get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business')")
def news_validated():
  return spark.read.table("news_categorized")

4단계: 유효성이 검사된 데이터의 뉴스 기사 요약

SQL (영문)
CREATE OR REFRESH MATERIALIZED VIEW news_summarized
COMMENT "Summarized political news articles after validation."
AS
SELECT
  get_json_object(meta_data, '$.category') as category,
  get_json_object(meta_data, '$.title') as title,
  ai_query(
    "databricks-meta-llama-3-3-70b-instruct",
    "Summarize the following political news article in 2-3 sentences: " || inputs
  ) AS summary
FROM news_validated;
파이썬

@dlt.table(
  comment="Summarized political news articles after validation."
)
def news_summarized():
  df = spark.read.table("news_validated")
  return df.select(
    get_json_object("meta_data", "$.category").alias("category"),
    get_json_object("meta_data", "$.title").alias("title"),
    expr(
      "ai_query('databricks-meta-llama-3-3-70b-instruct', "
      "concat('Summarize the following political news article in 2-3 sentences: ', inputs))"
    ).alias("summary")
  )

Databricks 워크플로를 사용하여 일괄 처리 유추 작업 자동화

일괄 처리 유추 작업을 예약하고 AI 파이프라인을 자동화합니다.

SQL (영문)

SELECT
   *,
   ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.


AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price


Examples below:


DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..


RESULT
[
 {'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
 {'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
 {'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
 {'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
 {'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]


DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10

파이썬


import json
from pyspark.sql.functions import expr

# Define the opinion mining prompt as a multi-line string.
opinion_prompt = """You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.

AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price

Examples below:

DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup.The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that.It made three or four large servings of soup.It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound.The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does.

RESULT
[
 {'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
 {'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
 {'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
 {'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
 {'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]

DOCUMENT
"""

# Escape the prompt so it can be safely embedded in the SQL expression.
escaped_prompt = json.dumps(opinion_prompt)

# Read the source table and limit to 10 rows.
df = spark.table("catalog.schema.product_reviews").limit(10)

# Apply the LLM inference to each row, concatenating the prompt, the review text, and the tail string.
result_df = df.withColumn(
    "result",
    expr(f"ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat({escaped_prompt}, REVIEW_TEXT, '\\n\\nRESULT\\n'))")
)

# Display the result DataFrame.
display(result_df)

구조적 스트리밍을 사용하는 AI 함수

ai_query구조적 스트리밍을 사용하여 AI 유추를 거의 실시간 또는 마이크로 배치 시나리오에서 적용합니다.

1단계. 정적 델타 테이블 읽기

정적 델타 테이블을 스트림인 것처럼 읽습니다.


from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs")  # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")

# Define the prompt outside the SQL expression.
prompt = (
    "You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
    "Do not include extra commentary or suggestions. Document: "
)

2단계. ai_query 적용하십시오

Spark는 새 행이 테이블에 도착하지 않는 한 정적 데이터에 대해 이 작업을 한 번만 처리합니다.


df_transformed = df_stream.select(
    "document_text",
    F.expr(f"""
      ai_query(
        'databricks-meta-llama-3-1-8b-instruct',
        CONCAT('{prompt}', document_text)
      )
    """).alias("summary")
)

3단계: 요약된 출력 작성

요약된 출력을 다른 델타 테이블에 씁니다.


# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
    .outputMode("append") \
    .toTable("enterprise.docs_summary")

query.awaitTermination()

일괄 처리 유추 워크로드에 대한 비용 보기

다음 예제에서는 작업, 컴퓨팅, SQL 웨어하우스 및 Lakeflow 선언적 파이프라인을 기반으로 일괄 처리 유추 워크로드를 필터링하는 방법을 보여 줍니다.

AI Functions를 사용하는 일괄 처리 유추 워크로드에 대한 비용을 확인하는 일반적인 예제는 모델 서비스 비용 모니터링을 참조하세요.

직업

다음 쿼리는 시스템 테이블을 사용하여 system.workflow.jobs 일괄 처리 유추에 사용되는 작업을 보여줍니다. 시스템 테이블사용하여 작업 비용 & 성능 모니터링을 참조하세요.


SELECT *
FROM system.billing.usage u
  JOIN system.workflow.jobs x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.job_id = x.job_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";

컴퓨팅

다음은 시스템 테이블을 사용하여 일괄 처리 유추에 사용되는 클러스터를 system.compute.clusters 보여 줍니다.

SELECT *
FROM system.billing.usage u
  JOIN system.compute.clusters x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.cluster_id = x.cluster_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";

Lakeflow 선언적 파이프라인

다음은 system.lakeflow.pipelines 시스템 테이블을 사용하여 일괄 추론에 사용되는 Lakeflow 선언적 파이프라인을 보여 줍니다.

SELECT *
FROM system.billing.usage u
  JOIN system.lakeflow.pipelines x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.dlt_pipeline_id = x.pipeline_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";

SQL 웨어하우스

다음은 system.compute.warehouses 시스템 테이블을 사용하여 일괄 추론에 사용되는 Lakeflow 선언적 파이프라인을 보여 줍니다.

SELECT *
FROM system.billing.usage u
  JOIN system.compute.warehouses x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.warehouse_id = x.warehouse_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";