次の方法で共有


特徴量変換とベスト プラクティス

この記事では、特徴量セットの仕様、一緒に使用できるさまざまな種類の変換、関連するベスト プラクティスについて説明します。

特徴量セットは、ソース データ変換によって生成される特徴量のコレクションです。 特徴量セットの仕様は、特徴量セットの開発とローカル テストのための自己完結型の定義です。 特徴量セットの開発とローカル テストの後、その特徴量セットを特徴量セット資産として Feature Store に登録できます。 その後、バージョン管理と具体化を管理機能として使用できるようになります。

特徴量セットを定義する

FeatureSetSpec では特徴量セットを定義します。 このサンプルは、特徴量セットの仕様ファイルを示しています。

$schema: http://azureml/sdk-2-0/FeatureSetSpec.json

source:
  type: parquet
  path: abfs://file_system@account_name.dfs.core.windows.net/datasources/transactions-source/*.parquet
  timestamp_column: # name of the column representing the timestamp.
    name: timestamp
  source_delay:
    days: 0
    hours: 3
    minutes: 0
feature_transformation:
  transformation_code:
    path: ./transformation_code
    transformer_class: transaction_transform.TransactionFeatureTransformer
features:
  - name: transaction_7d_count
    type: long
  - name: transaction_amount_7d_sum
    type: double
  - name: transaction_amount_7d_avg
    type: double
  - name: transaction_3d_count
    type: long
  - name: transaction_amount_3d_sum
    type: double
  - name: transaction_amount_3d_avg
    type: double
index_columns:
  - name: accountID
    type: string
source_lookback:
  days: 7
  hours: 0
  minutes: 0
temporal_join_lookback:
  days: 1
  hours: 0
  minutes: 0

featurestore コア SDK によって、特徴量セットの仕様 YAML が自動生成されます。 このチュートリアルには例があります。

FeatureSetSpec 定義では、これらのプロパティは特徴量変換に関連しています。

  • source: ソース データと関連するメタデータ (データ内のタイムスタンプ列など) を定義します。 現時点では、時系列のソース データと特徴量のみがサポートされています。 source.timestamp_column プロパティは必須です
  • feature_transformation.transformation_code: 特徴トランスフォーマーのコード フォルダーの場所を定義します
  • features: 特徴トランスフォーマーによって生成される特徴スキーマを定義します
  • index_columns: 特徴トランスフォーマーによって生成されるインデックス列スキーマを定義します
  • source_lookback: このプロパティは、特徴で時系列 (ウィンドウの集計など) データの集計を処理するときに使用されます。 このプロパティの値は、時間 T の特徴値について必要な過去のソース データの時間の範囲を示します。ベスト プラクティスに関するセクションには、詳細が含まれています。

特徴量はどのように計算されますか?

FeatureSetSpec を定義したら、featureSetSpec.to_spark_dataframe(feature_window_start_ts, feature_window_end_ts) を呼び出して、特定の特徴量ウィンドウの特徴量を計算します。

計算は、次の手順で行われます。

  • ソース データからデータを読み取ります。 source ではソース データを定義します。 データを時間の範囲 [feature_window_start_ts - source_lookback, feature_window_end_ts) でフィルター処理します。 時間の範囲にはウィンドウの開始が含まれており、ウィンドウの終了は除外されます
  • feature_transformation.transformation_code によって定義された特徴トランスフォーマーをデータに適用し、計算された特徴量を取得します
  • 特徴量ウィンドウ [feature_window_start_ts, feature_window_end_ts) 内の特徴量レコードのみを返すように特徴値をフィルター処理します

このサンプル コードでは、Feature Store API によって次の特徴量が計算されます。

# define the source data time window according to feature window
source_window_start_ts = feature_window_start_ts - source_lookback
source_window_end_ts = feature_window_end_ts

# read source table into a dataframe
df1 = spark.read.parquet(source.path).filter(df1["timestamp"] >= source_window_start_ts && df1["timestamp"] < source_window_end_ts)

# apply the feature transformer
df2 = FeatureTransformer._transform(df1)

## filter the feature(set) to include only feature records within the feature window
feature_set_df = df2.filter(df2["timestamp"] >= feature_window_start_ts && df2["timestamp"] < feature_window_end_ts)

特徴セットの仕様と、特徴データフレームを生成するためにソース データに適用される対応する変換を示す図。

特徴トランスフォーマー関数の出力スキーマ

transform 関数では、スキーマに次の値を含むデータフレームを出力します。

  • 名前と型の両方で FeatureSetSpec 定義に一致するインデックス列
  • source のタイムスタンプ定義と一致するタイムスタンプ列 (名前)。 sourceFeatureSetSpec にあります
  • features で他のすべての列名/型の値を FeatureSetSpec として定義します

一般的な種類の変換に特徴トランスフォーマーを実装する

行レベルの変換

行レベルの変換では、特定の行に対する特徴値の計算には、その行の列値のみが使用されます。 次のソース データから始めます。

user_id timestamp total_spend
1 2022-12-19 06:00:00 12.00
2 2022-12-10 03:00:00 56.00
1 2022-12-25 13:00:00 112.00

user_total_spend_profile という名前の新しい特徴量セットを定義します。

from pyspark.sql import Dataframe
from pyspark.ml import Transformer

class UserTotalSpendProfileTransformer(Transformer):

    def _transform(df: Dataframe) -> Dataframe:
        df.withColumn("is_high_spend_user", col("total_spend") > 100.0) \
           .withColumn("is_low_spend_user", col("total_spend") < 20.0)

この特徴量セットには、次に示すようなデータ型を持つ 3 つの特徴量があります。

  • total_spend: double
  • is_high_spend_user: bool
  • is_low_spend_user: bool

これは、計算された特徴の値を示しています。

user_id timestamp total_spend is_high_spend_user is_low_spend_user
1 2022-12-19 06:00:00 12.00 偽り ほんとう
2 2022-12-10 03:00:00 56.00 偽り 偽り
1 2022-12-25 13:00:00 112.00 ほんとう 偽り

スライディング ウィンドウ集計

スライディング ウィンドウ集計は、時間の経過と共に蓄積される統計 (合計、平均など) を示す特徴量値を処理するのに役立ちます。 SparkSQL Window 関数は、データ内の各行のスライディング ウィンドウを定義し、このような場合に役立ちます。

Window オブジェクトでは、行ごとに将来と過去の両方を調べることができます。 機械学習の特徴量のコンテキストでは、Window オブジェクトを定義して、各行の過去のみを調べる必要があります。 詳細については、ベスト プラクティスに関するセクションを参照してください。

次のソース データから始めます。

user_id timestamp spend
1 2022-12-10 06:00:00 12.00
2 2022-12-10 03:00:00 56.00
1 2022-12-11 01:00:00 10.00
2 2022-12-11 20:00:00 10.00
2 2022-12-12 02:00:00 100.00

user_rolling_spend という名前の新しい特徴量セットを定義します。 この特徴量セットには、ユーザー別の 1 日と 3 日間のローリング合計支出が含まれます。

from pyspark.sql import Dataframe
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml import Transformer

class UserRollingSpend(Transformer):

    def _transform(df: Dataframe) -> Dataframe:
        days = lambda i: i * 86400
        w_1d = (Window.partitionBy("user_id").orderBy(F.col("timestamp").cast('long'))\
                .rangeBetween(-days(1), 0))
        w_3d = (Window.partitionBy("user_id").orderBy(F.col("timestamp").cast('long')).\
                rangeBetween(-days(3), 0))
        res = df.withColumn("spend_1d_sum", F.sum("spend").over(w_1d)) \
            .withColumn("spend_3d_sum", F.sum("spend").over(w_3d)) \
            .select("user_id", "timestamp", "spend_1d_sum", "spend_3d_sum")
        return res
    

user_rolling_spend 特徴量セットには、次の 2 つの特徴があります。

  • spend_1d_sum: double
  • spend_3d_sum: double

これは、計算された特徴の値を示しています。

user_id timestamp spend_1d_sum spend_3d_sum
1 2022-12-10 06:00:00 12.00 12.00
2 2022-12-10 03:00:00 56.00 56.00
1 2022-12-11 01:00:00 22.00 22.00
2 2022-12-11 20:00:00 10.00 66.00
2 2022-12-12 02:00:00 110.00 166.00

特徴の値の計算では、現在の行の列が使用され、範囲内の前の行の列と組み合わされます。

タンブリング ウィンドウ集計

タンブリング ウィンドウでは、時系列データのデータを集計できます。 データを固定サイズの重複しない継続的な時間枠にグループ化し、集計します。 たとえば、ユーザーは、日単位または時間単位の集計に基づいて特徴量を定義できます。 一貫性のある結果を得るために、pyspark.sql.functions.window 関数を使用してタンブリング ウィンドウを定義します。 出力の特徴 timestamp は、各タンブリング ウィンドウの終了に合わせる必要があります。

次のソース データから始めます。

user_id timestamp spend
1 2022-12-10 06:00:00 12.00
1 2022-12-10 16:00:00 10.00
2 2022-12-10 03:00:00 56.00
1 2022-12-11 01:00:00 10.00
2 2022-12-12 04:00:00 23.00
2 2022-12-12 12:00:00 10.00

user_daily_spend という名前の新しい特徴量セットを定義します。

from pyspark.sql import functions as F
from pyspark.ml import Transformer
from pyspark.sql.dataframe import DataFrame

class TransactionFeatureTransformer(Transformer):
    def _transform(self, df: DataFrame) -> DataFrame:
        df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="1 day"))\
                .agg(F.sum("spend").alias("daily_spend"))
        df2 = df1.select("user_id", df1.window.end.cast("timestamp").alias("end"),"daily_spend")
        df3 = df2.withColumn('timestamp', F.expr("end - INTERVAL 1 milliseconds")) \
              .select("user_id", "timestamp","daily_spend")
        return df3

user_daily_spend 特徴量セットには、次の特徴があります。

  • daily_spend: double

これは、計算された特徴の値を示しています。

user_id timestamp daily_spend
1 2022-12-10 23:59:59 22.00
2 2022-12-10 23:59:59 56.00
1 2022-12-11 23:59:59 10.00
2 2022-12-12 23:59:59 33.00

ずらしウィンドウ集計

ずらしウィンドウ集計は、タンブリング ウィンドウ集計のマイナー バリアントです。 ずらしウィンドウ集計では、データを固定サイズのウィンドウにグループ化します。 ただし、ウィンドウは互いに重なることがあります。 このためには、pyspark.sql.functions.window より小さい slideDurationwindowDuration を使用します。

次のサンプル データから始めます。

user_id timestamp spend
1 2022-12-10 03:00:00 12.00
1 2022-12-10 09:00:00 10.00
1 2022-12-11 05:00:00 8.00
2 2022-12-12 14:00:00 56.00

user_sliding_24hr_spend という名前の新しい特徴量セットを定義します。

from pyspark.sql import functions as F
from pyspark.ml import Transformer
from pyspark.sql.dataframe import DataFrame

class TrsactionFeatureTransformer(Transformer):
    def _transform(self, df: DataFrame) -> DataFrame:
        df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="6 hours"))\
                .agg(F.sum("spend").alias("sliding_24hr_spend"))
        df2 = df1.select("user_id", df1.window.end.cast("timestamp").alias("end"),"sliding_24hr_spend")
        df3 = df2.withColumn('timestamp', F.expr("end - INTERVAL 1 milliseconds")) \
              .select("user_id", "timestamp","sliding_24hr_spend")
        return df3

user_sliding_24hr_spend 特徴量セットには、次の 1 つの特徴があります。

  • sliding_24hr_spend: double

これは、計算された特徴の値を示しています。

user_id timestamp sliding_24hr_spend
1 2022-12-10 05:59:59 12.00
1 2022-12-10 11:59:59 22.00
1 2022-12-10 17:59:59 22.00
1 2022-12-10 23:59:59 22.00
1 2022-12-11 05:59:59 18.00
1 2022-12-11 11:59:59 8.00
1 2022-12-11 17:59:59 8.00
1 2022-12-11 23:59:59 8.00
1 2022-12-12 05:59:59 18.00
2 2022-12-12 17:59:59 56.00
2 2022-12-12 23:59:59 56.00
2 2022-12-13 05:59:59 56.00
2 2022-12-13 11:59:59 56.00

特徴変換の定義 - ベスト プラクティス

特徴変換でのデータ漏えいを防止する

計算された特徴の値ごとのタイムスタンプ値が ts_0 の場合は、タイムスタンプ値が source 以前の ts_0 データのみに基づいて特徴の値を計算します。 これで、特徴イベント時間後のデータに基づく特徴計算 ("データ漏えい" とも呼ばれます) が回避されます。

データ漏えいは通常、スライディング/タンブリング/ずらしウィンドウ集計で発生します。 これらのベスト プラクティスは、漏えいを回避するのに役立ちます。

  • スライディング ウィンドウ集計: 各行から過去にさかのぼってのみ確認するウィンドウを定義します
  • タンブリング/ずらしウィンドウ集計: 各ウィンドウの終わりに基づいて特徴のタイムスタンプを定義します

このデータ サンプルは、良好および悪いデータの例を示しています。

集計 良い例 データ漏えいの悪い例
スライディング ウィンドウ Window.partitionBy("user_id")
.orderBy(F.col("timestamp").cast('long'))
.rangeBetween(-days(4), 0)
Window.partitionBy("user_id")
.orderBy(F.col("timestamp").cast('long'))
.rangeBetween(-days(2), days(2))
タンブリング/ずらしウィンドウ df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="1 day"))
.agg(F.sum("spend").alias("daily_spend"))

df2 = df1.select("user_id", df1.window.end.cast("timestamp").alias("timestamp"),"daily_spend")
df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="1 day"))
.agg(F.sum("spend").alias("daily_spend"))

df2 = df1.select("user_id", df1.window.start.cast("timestamp").alias("timestamp"),"daily_spend")

特徴変換定義のデータ漏えいによって、次の問題が発生する可能性があります。

  • 計算/具体化された特徴の値のエラー
  • get_offline_feature の不整合 (即時に計算された値の代わりに、具体化された特徴の値を使用する場合)

適切な source_lookback を設定する

時系列 (スライディング/タンブリング/ずらしウィンドウ集計) データ集計の場合は、source_lookback プロパティを正しく設定します。 次の図は、特徴量 (セット) 計算のソース データ ウィンドウと特徴量ウィンドウの関係を示しています。

source_lookback の概念を示す図。

source_lookback を時間差分値として定義します。これは、特定のタイムスタンプの特徴の値に必要なソース データの範囲を示します。 この例は、一般的な変換の種類に推奨される source_lookback 値を示しています。

変換の種類 source_lookback
行レベルの変換 0 (既定値)
スライディング ウィンドウ トランスフォーマーの最大ウィンドウ範囲のサイズ。
例:
source_lookback = 3 日間のローリング特徴量が特徴量セットで定義されている場合は 3 日間
source_lookback = 3 日と 7 日間の両方のローリング特徴量が特徴量セットで定義されている場合は 7 日間
タンブリング/ずらしウィンドウ windowDuration 定義内の window の値。 例: source_lookback = window("timestamp", windowDuration="1 day",slideDuration="6 hours) を使用する場合は 1 日間

source_lookback 設定が正しくないと、計算/具体化された特徴の値が正しくなくなる可能性があります。

次のステップ