この記事では、特徴量セットの仕様、一緒に使用できるさまざまな種類の変換、関連するベスト プラクティスについて説明します。
特徴量セットは、ソース データ変換によって生成される特徴量のコレクションです。 特徴量セットの仕様は、特徴量セットの開発とローカル テストのための自己完結型の定義です。 特徴量セットの開発とローカル テストの後、その特徴量セットを特徴量セット資産として Feature Store に登録できます。 その後、バージョン管理と具体化を管理機能として使用できるようになります。
特徴量セットを定義する
FeatureSetSpec
では特徴量セットを定義します。 このサンプルは、特徴量セットの仕様ファイルを示しています。
$schema: http://azureml/sdk-2-0/FeatureSetSpec.json
source:
type: parquet
path: abfs://file_system@account_name.dfs.core.windows.net/datasources/transactions-source/*.parquet
timestamp_column: # name of the column representing the timestamp.
name: timestamp
source_delay:
days: 0
hours: 3
minutes: 0
feature_transformation:
transformation_code:
path: ./transformation_code
transformer_class: transaction_transform.TransactionFeatureTransformer
features:
- name: transaction_7d_count
type: long
- name: transaction_amount_7d_sum
type: double
- name: transaction_amount_7d_avg
type: double
- name: transaction_3d_count
type: long
- name: transaction_amount_3d_sum
type: double
- name: transaction_amount_3d_avg
type: double
index_columns:
- name: accountID
type: string
source_lookback:
days: 7
hours: 0
minutes: 0
temporal_join_lookback:
days: 1
hours: 0
minutes: 0
注
featurestore
コア SDK によって、特徴量セットの仕様 YAML
が自動生成されます。 このチュートリアルには例があります。
FeatureSetSpec
定義では、これらのプロパティは特徴量変換に関連しています。
source
: ソース データと関連するメタデータ (データ内のタイムスタンプ列など) を定義します。 現時点では、時系列のソース データと特徴量のみがサポートされています。source.timestamp_column
プロパティは必須ですfeature_transformation.transformation_code
: 特徴トランスフォーマーのコード フォルダーの場所を定義しますfeatures
: 特徴トランスフォーマーによって生成される特徴スキーマを定義しますindex_columns
: 特徴トランスフォーマーによって生成されるインデックス列スキーマを定義しますsource_lookback
: このプロパティは、特徴で時系列 (ウィンドウの集計など) データの集計を処理するときに使用されます。 このプロパティの値は、時間 T の特徴値について必要な過去のソース データの時間の範囲を示します。ベスト プラクティスに関するセクションには、詳細が含まれています。
特徴量はどのように計算されますか?
FeatureSetSpec
を定義したら、featureSetSpec.to_spark_dataframe(feature_window_start_ts, feature_window_end_ts)
を呼び出して、特定の特徴量ウィンドウの特徴量を計算します。
計算は、次の手順で行われます。
- ソース データからデータを読み取ります。
source
ではソース データを定義します。 データを時間の範囲[feature_window_start_ts - source_lookback, feature_window_end_ts)
でフィルター処理します。 時間の範囲にはウィンドウの開始が含まれており、ウィンドウの終了は除外されます feature_transformation.transformation_code
によって定義された特徴トランスフォーマーをデータに適用し、計算された特徴量を取得します- 特徴量ウィンドウ
[feature_window_start_ts, feature_window_end_ts)
内の特徴量レコードのみを返すように特徴値をフィルター処理します
このサンプル コードでは、Feature Store API によって次の特徴量が計算されます。
# define the source data time window according to feature window
source_window_start_ts = feature_window_start_ts - source_lookback
source_window_end_ts = feature_window_end_ts
# read source table into a dataframe
df1 = spark.read.parquet(source.path).filter(df1["timestamp"] >= source_window_start_ts && df1["timestamp"] < source_window_end_ts)
# apply the feature transformer
df2 = FeatureTransformer._transform(df1)
## filter the feature(set) to include only feature records within the feature window
feature_set_df = df2.filter(df2["timestamp"] >= feature_window_start_ts && df2["timestamp"] < feature_window_end_ts)
特徴トランスフォーマー関数の出力スキーマ
transform 関数では、スキーマに次の値を含むデータフレームを出力します。
- 名前と型の両方で
FeatureSetSpec
定義に一致するインデックス列 source
のタイムスタンプ定義と一致するタイムスタンプ列 (名前)。source
はFeatureSetSpec
にありますfeatures
で他のすべての列名/型の値をFeatureSetSpec
として定義します
一般的な種類の変換に特徴トランスフォーマーを実装する
行レベルの変換
行レベルの変換では、特定の行に対する特徴値の計算には、その行の列値のみが使用されます。 次のソース データから始めます。
user_id |
timestamp |
total_spend |
---|---|---|
1 | 2022-12-19 06:00:00 | 12.00 |
2 | 2022-12-10 03:00:00 | 56.00 |
1 | 2022-12-25 13:00:00 | 112.00 |
user_total_spend_profile
という名前の新しい特徴量セットを定義します。
from pyspark.sql import Dataframe
from pyspark.ml import Transformer
class UserTotalSpendProfileTransformer(Transformer):
def _transform(df: Dataframe) -> Dataframe:
df.withColumn("is_high_spend_user", col("total_spend") > 100.0) \
.withColumn("is_low_spend_user", col("total_spend") < 20.0)
この特徴量セットには、次に示すようなデータ型を持つ 3 つの特徴量があります。
total_spend
: doubleis_high_spend_user
: boolis_low_spend_user
: bool
これは、計算された特徴の値を示しています。
user_id |
timestamp |
total_spend |
is_high_spend_user |
is_low_spend_user |
---|---|---|---|---|
1 | 2022-12-19 06:00:00 | 12.00 | 偽り | ほんとう |
2 | 2022-12-10 03:00:00 | 56.00 | 偽り | 偽り |
1 | 2022-12-25 13:00:00 | 112.00 | ほんとう | 偽り |
スライディング ウィンドウ集計
スライディング ウィンドウ集計は、時間の経過と共に蓄積される統計 (合計、平均など) を示す特徴量値を処理するのに役立ちます。 SparkSQL Window
関数は、データ内の各行のスライディング ウィンドウを定義し、このような場合に役立ちます。
Window
オブジェクトでは、行ごとに将来と過去の両方を調べることができます。 機械学習の特徴量のコンテキストでは、Window
オブジェクトを定義して、各行の過去のみを調べる必要があります。 詳細については、ベスト プラクティスに関するセクションを参照してください。
次のソース データから始めます。
user_id |
timestamp |
spend |
---|---|---|
1 | 2022-12-10 06:00:00 | 12.00 |
2 | 2022-12-10 03:00:00 | 56.00 |
1 | 2022-12-11 01:00:00 | 10.00 |
2 | 2022-12-11 20:00:00 | 10.00 |
2 | 2022-12-12 02:00:00 | 100.00 |
user_rolling_spend
という名前の新しい特徴量セットを定義します。 この特徴量セットには、ユーザー別の 1 日と 3 日間のローリング合計支出が含まれます。
from pyspark.sql import Dataframe
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml import Transformer
class UserRollingSpend(Transformer):
def _transform(df: Dataframe) -> Dataframe:
days = lambda i: i * 86400
w_1d = (Window.partitionBy("user_id").orderBy(F.col("timestamp").cast('long'))\
.rangeBetween(-days(1), 0))
w_3d = (Window.partitionBy("user_id").orderBy(F.col("timestamp").cast('long')).\
rangeBetween(-days(3), 0))
res = df.withColumn("spend_1d_sum", F.sum("spend").over(w_1d)) \
.withColumn("spend_3d_sum", F.sum("spend").over(w_3d)) \
.select("user_id", "timestamp", "spend_1d_sum", "spend_3d_sum")
return res
user_rolling_spend
特徴量セットには、次の 2 つの特徴があります。
spend_1d_sum
: doublespend_3d_sum
: double
これは、計算された特徴の値を示しています。
user_id |
timestamp |
spend_1d_sum |
spend_3d_sum |
---|---|---|---|
1 | 2022-12-10 06:00:00 | 12.00 | 12.00 |
2 | 2022-12-10 03:00:00 | 56.00 | 56.00 |
1 | 2022-12-11 01:00:00 | 22.00 | 22.00 |
2 | 2022-12-11 20:00:00 | 10.00 | 66.00 |
2 | 2022-12-12 02:00:00 | 110.00 | 166.00 |
特徴の値の計算では、現在の行の列が使用され、範囲内の前の行の列と組み合わされます。
タンブリング ウィンドウ集計
タンブリング ウィンドウでは、時系列データのデータを集計できます。 データを固定サイズの重複しない継続的な時間枠にグループ化し、集計します。 たとえば、ユーザーは、日単位または時間単位の集計に基づいて特徴量を定義できます。 一貫性のある結果を得るために、pyspark.sql.functions.window
関数を使用してタンブリング ウィンドウを定義します。 出力の特徴 timestamp
は、各タンブリング ウィンドウの終了に合わせる必要があります。
次のソース データから始めます。
user_id |
timestamp |
spend |
---|---|---|
1 | 2022-12-10 06:00:00 | 12.00 |
1 | 2022-12-10 16:00:00 | 10.00 |
2 | 2022-12-10 03:00:00 | 56.00 |
1 | 2022-12-11 01:00:00 | 10.00 |
2 | 2022-12-12 04:00:00 | 23.00 |
2 | 2022-12-12 12:00:00 | 10.00 |
user_daily_spend
という名前の新しい特徴量セットを定義します。
from pyspark.sql import functions as F
from pyspark.ml import Transformer
from pyspark.sql.dataframe import DataFrame
class TransactionFeatureTransformer(Transformer):
def _transform(self, df: DataFrame) -> DataFrame:
df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="1 day"))\
.agg(F.sum("spend").alias("daily_spend"))
df2 = df1.select("user_id", df1.window.end.cast("timestamp").alias("end"),"daily_spend")
df3 = df2.withColumn('timestamp', F.expr("end - INTERVAL 1 milliseconds")) \
.select("user_id", "timestamp","daily_spend")
return df3
user_daily_spend
特徴量セットには、次の特徴があります。
daily_spend
: double
これは、計算された特徴の値を示しています。
user_id |
timestamp |
daily_spend |
---|---|---|
1 | 2022-12-10 23:59:59 | 22.00 |
2 | 2022-12-10 23:59:59 | 56.00 |
1 | 2022-12-11 23:59:59 | 10.00 |
2 | 2022-12-12 23:59:59 | 33.00 |
ずらしウィンドウ集計
ずらしウィンドウ集計は、タンブリング ウィンドウ集計のマイナー バリアントです。 ずらしウィンドウ集計では、データを固定サイズのウィンドウにグループ化します。 ただし、ウィンドウは互いに重なることがあります。 このためには、pyspark.sql.functions.window
より小さい slideDuration
で windowDuration
を使用します。
次のサンプル データから始めます。
user_id |
timestamp |
spend |
---|---|---|
1 | 2022-12-10 03:00:00 | 12.00 |
1 | 2022-12-10 09:00:00 | 10.00 |
1 | 2022-12-11 05:00:00 | 8.00 |
2 | 2022-12-12 14:00:00 | 56.00 |
user_sliding_24hr_spend
という名前の新しい特徴量セットを定義します。
from pyspark.sql import functions as F
from pyspark.ml import Transformer
from pyspark.sql.dataframe import DataFrame
class TrsactionFeatureTransformer(Transformer):
def _transform(self, df: DataFrame) -> DataFrame:
df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="6 hours"))\
.agg(F.sum("spend").alias("sliding_24hr_spend"))
df2 = df1.select("user_id", df1.window.end.cast("timestamp").alias("end"),"sliding_24hr_spend")
df3 = df2.withColumn('timestamp', F.expr("end - INTERVAL 1 milliseconds")) \
.select("user_id", "timestamp","sliding_24hr_spend")
return df3
user_sliding_24hr_spend
特徴量セットには、次の 1 つの特徴があります。
sliding_24hr_spend
: double
これは、計算された特徴の値を示しています。
user_id |
timestamp |
sliding_24hr_spend |
---|---|---|
1 | 2022-12-10 05:59:59 | 12.00 |
1 | 2022-12-10 11:59:59 | 22.00 |
1 | 2022-12-10 17:59:59 | 22.00 |
1 | 2022-12-10 23:59:59 | 22.00 |
1 | 2022-12-11 05:59:59 | 18.00 |
1 | 2022-12-11 11:59:59 | 8.00 |
1 | 2022-12-11 17:59:59 | 8.00 |
1 | 2022-12-11 23:59:59 | 8.00 |
1 | 2022-12-12 05:59:59 | 18.00 |
2 | 2022-12-12 17:59:59 | 56.00 |
2 | 2022-12-12 23:59:59 | 56.00 |
2 | 2022-12-13 05:59:59 | 56.00 |
2 | 2022-12-13 11:59:59 | 56.00 |
特徴変換の定義 - ベスト プラクティス
特徴変換でのデータ漏えいを防止する
計算された特徴の値ごとのタイムスタンプ値が ts_0
の場合は、タイムスタンプ値が source
以前の ts_0
データのみに基づいて特徴の値を計算します。 これで、特徴イベント時間後のデータに基づく特徴計算 ("データ漏えい" とも呼ばれます) が回避されます。
データ漏えいは通常、スライディング/タンブリング/ずらしウィンドウ集計で発生します。 これらのベスト プラクティスは、漏えいを回避するのに役立ちます。
- スライディング ウィンドウ集計: 各行から過去にさかのぼってのみ確認するウィンドウを定義します
- タンブリング/ずらしウィンドウ集計: 各ウィンドウの終わりに基づいて特徴のタイムスタンプを定義します
このデータ サンプルは、良好および悪いデータの例を示しています。
集計 | 良い例 | データ漏えいの悪い例 |
---|---|---|
スライディング ウィンドウ | Window.partitionBy("user_id") .orderBy(F.col("timestamp").cast('long')) . rangeBetween(-days(4), 0) |
Window.partitionBy("user_id") .orderBy(F.col("timestamp").cast('long')) . rangeBetween(-days(2), days(2)) |
タンブリング/ずらしウィンドウ | df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="1 day")) .agg(F.sum("spend").alias("daily_spend")) df2 = df1.select("user_id", df1.window. end .cast("timestamp").alias("timestamp"),"daily_spend") |
df1 = df.groupBy("user_id", F.window("timestamp", windowDuration="1 day",slideDuration="1 day")) .agg(F.sum("spend").alias("daily_spend")) df2 = df1.select("user_id", df1.window. start .cast("timestamp").alias("timestamp"),"daily_spend") |
特徴変換定義のデータ漏えいによって、次の問題が発生する可能性があります。
- 計算/具体化された特徴の値のエラー
get_offline_feature
の不整合 (即時に計算された値の代わりに、具体化された特徴の値を使用する場合)
適切な source_lookback
を設定する
時系列 (スライディング/タンブリング/ずらしウィンドウ集計) データ集計の場合は、source_lookback
プロパティを正しく設定します。 次の図は、特徴量 (セット) 計算のソース データ ウィンドウと特徴量ウィンドウの関係を示しています。
source_lookback
を時間差分値として定義します。これは、特定のタイムスタンプの特徴の値に必要なソース データの範囲を示します。 この例は、一般的な変換の種類に推奨される source_lookback
値を示しています。
変換の種類 | source_lookback |
---|---|
行レベルの変換 | 0 (既定値) |
スライディング ウィンドウ | トランスフォーマーの最大ウィンドウ範囲のサイズ。 例: source_lookback = 3 日間のローリング特徴量が特徴量セットで定義されている場合は 3 日間source_lookback = 3 日と 7 日間の両方のローリング特徴量が特徴量セットで定義されている場合は 7 日間 |
タンブリング/ずらしウィンドウ | windowDuration 定義内の window の値。 例: source_lookback = window("timestamp", windowDuration="1 day",slideDuration="6 hours) を使用する場合は 1 日間 |
source_lookback
設定が正しくないと、計算/具体化された特徴の値が正しくなくなる可能性があります。