適用対象: ✅Microsoft Fabric✅Azure データ エクスプローラー✅Azure Monitor✅Microsoft Sentinel
タイムスタンプ付きデータの異常なアクセス パターンを識別するコラボレーション フィルタリング (CF) モデルを使用して異常なアクセスを検出します。
detect_anomalous_access_cf_fl()
関数は、エンティティ リソースなどの異常な相互作用を検出するためにコラボレーション フィルタリング (CF) モデルを適用するユーザー定義関数 (UDF) です。 たとえば、アクセス ログなどのタイムスタンプ付きデータに基づいて、ストレージ アカウントにアクセスするユーザー プリンシパル名 (UPN) などです。 サイバーセキュリティのコンテキストでは、この関数は、異常または未承認のアクセス パターンを検出するのに役立ちます。
CF ベースのモデルは、項目の類似性を使用してアクセス スコアを予測し、過去のアクセス パターンとエンティティとリソース間のコサインの類似性を利用します。 サブスクリプションやアカウントなど、特定のスコープ内で定義された検出期間中にリソースにアクセスするエンティティの確率を推定します。 最小しきい値を含むいくつかの省略可能なパラメーターを使用すると、モデルの動作をカスタマイズできます。
このモデルでは、範囲 [0, 1] のアクセス異常スコアが出力されます。0 は正当なアクセスの可能性が高く、1 は非常に異常なアクセスを示します。 この関数は、アクセス異常スコアと共に、(定義されたしきい値に基づいて) バイナリ異常フラグと追加の説明フィールドも返します。
構文
detect_anomalous_access_cf_fl(
entityColumnName、 resourceColumnName、 scopeColumnName、 timeColumnName、 startTraining、 startDetection、 endDetection、[anomalyScoreThresh])
構文規則について詳しく知る。
パラメーター
名前 | タイプ | 必須 | 説明 |
---|---|---|---|
entityColumnName | string |
✔️ | cf モデルが計算されるエンティティ名または ID を含む入力テーブル列の名前。 |
resourceColumnName | string |
✔️ | モデルが計算されるリソース名または ID を含む入力テーブル列の名前。 |
scopeColumnName を |
string |
✔️ | パーティションまたはスコープを含む入力テーブル列の名前。スコープごとに異なる異常モデルが構築されます。 |
timeColumnName | string |
✔️ | トレーニング期間と検出期間の定義に使用されるタイムスタンプを含む入力テーブル列の名前。 |
startTraining | datetime |
✔️ | 異常モデルのトレーニング期間の開始。 その終了は、検出期間の開始によって定義されます。 |
startDetection | datetime |
✔️ | 異常検出の検出期間の開始。 |
endDetection | datetime |
✔️ | 異常検出の検出期間の終了。 |
anomalyScoreThresh | real |
異常が検出された異常スコアの最大値。範囲 [0, 1] の数値。 値が大きいほど、より重要なケースのみが異常と見なされるため、検出される異常が少なくなります (精度が高く、再現率が低くなります)。 既定値は 0.9 です。 |
関数の定義
関数を定義するには、次のようにコードをクエリ定義関数として埋め込むか、データベースに格納された関数として作成します。
-
クエリ定義 の
- 保存されている
次の let ステートメント
Von Bedeutung
let ステートメント 単独では実行できません。 その後に、表形式の式ステートメント続ける必要があります。
detect_anomalous_access_cf_fl()
の作業例を実行するには、例を参照してください。
let detect_anomalous_access_cf_fl = (T:(*), entityColumnName:string, resourceColumnName:string, scopeColumnName:string
, timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let processedData = (
T
| extend entity = column_ifexists(entityColumnName, '')
| extend resource = column_ifexists(resourceColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
| extend sliceTime = todatetime(column_ifexists(timeColumnName, ''))
| where isnotempty(scope) and isnotempty(entity) and isnotempty(resource) and isnotempty(sliceTime)
| extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
, sliceTime >= startDetection and sliceTime <= endDetection , 'detectSet'
, 'other')
| where dataSet in ('trainSet', 'detectSet')
);
// Create all possible pairs (entity, resource) with the same scope
let entities = (
processedData
| where dataSet == 'trainSet'
| summarize by entity, scope
| extend temp = 1
);
let resources = (
processedData
| where dataSet == 'trainSet'
| summarize by resource, scope
| extend temp = 1
);
let potentialAccessTrainData = (
entities
| join kind=inner resources on temp
| distinct entity, resource, scope
);
let accessTrainData = (
potentialAccessTrainData
| join kind=leftouter hint.strategy=broadcast (processedData | where dataSet =='trainSet') on entity, resource, scope
| extend usedOperation = iff(isempty(resource1), 0, 1)
| distinct entity, resource, scope, usedOperation
);
// Aggregate interaction scores per item into a list to prepare for similarity calculations
// Add a temporary key for self-joining later in the process
let ItemUserInteractions = (
accessTrainData
| summarize interactList = make_list(usedOperation) by resource, scope
| extend tempKey=1
);
// Compute item-to-item similarity using cosine similarity
let ItemSimilarities = (
ItemUserInteractions
| join kind=inner (ItemUserInteractions) on tempKey
| where scope == scope1
| extend similarity = series_cosine_similarity(interactList, interactList1)
| extend similarity = iff(isnan(similarity), 0.0, similarity)
| project resource, resource1, scope, similarity
);
// Predict user-item interactions based on item similarities
let Predictions = (
accessTrainData
| join kind=inner (ItemSimilarities) on scope and $left.resource == $right.resource1
| project entity, resource=resource2, usedOperation, similarity
| summarize accessAnomalyScore = sum(usedOperation * similarity) / sum(abs(similarity)) by entity, resource
| extend accessAnomalyScore = iff(isnan(accessAnomalyScore), 0.0, accessAnomalyScore)
| extend accessAnomalyScore = 1 - accessAnomalyScore
| extend accessAnomalyScore = round(accessAnomalyScore, 4)
| join kind=inner accessTrainData on entity, resource
| project entity, resource, scope, usedOperation, accessAnomalyScore
| extend accessAnomalyScore = iff(usedOperation == 0.0, accessAnomalyScore, todouble(usedOperation))
| order by entity asc, resource
);
let resultsData = (
processedData
| where dataSet == "detectSet"
| join kind=leftouter Predictions on entity, resource, scope
| extend isAnomalousAccess = iff(accessAnomalyScore > anomalyScoreThresh, 1, 0)
| project-away sliceTime, entity1, resource1, scope1, usedOperation
);
resultsData
};
// Write your query to use the function here.
例
次の例では、呼び出し演算子 を使用して関数を実行します。
-
クエリ定義 の
- 保存されている
クエリ定義関数を使用するには、埋め込み関数定義の後に呼び出します。
let detect_anomalous_access_cf_fl = (T:(*), entityColumnName:string, resourceColumnName:string, scopeColumnName:string
, timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let processedData = (
T
| extend entity = column_ifexists(entityColumnName, '')
| extend resource = column_ifexists(resourceColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
| extend sliceTime = todatetime(column_ifexists(timeColumnName, ''))
| where isnotempty(scope) and isnotempty(entity) and isnotempty(resource) and isnotempty(sliceTime)
| extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
, sliceTime >= startDetection and sliceTime <= endDetection, 'detectSet'
, 'other')
| where dataSet in ('trainSet', 'detectSet')
);
// Create all possible pairs (entity, resource) with the same scope
let entities = (
processedData
| where dataSet == 'trainSet'
| summarize by entity, scope
| extend temp = 1
);
let resources = (
processedData
| where dataSet == 'trainSet'
| summarize by resource, scope
| extend temp = 1
);
let potentialAccessTrainData = (
entities
| join kind=inner resources on temp
| distinct entity, resource, scope
);
let accessTrainData = (
potentialAccessTrainData
| join kind=leftouter hint.strategy=broadcast (processedData | where dataSet =='trainSet') on entity, resource, scope
| extend usedOperation = iff(isempty(resource1), 0, 1)
| distinct entity, resource, scope, usedOperation
);
// Aggregate interaction scores per item into a list to prepare for similarity calculations
// Add a temporary key for self-joining later in the process
let ItemUserInteractions = (
accessTrainData
| summarize interactList = make_list(usedOperation) by resource, scope
| extend tempKey=1
);
// Compute item-to-item similarity using cosine similarity
let ItemSimilarities = (
ItemUserInteractions
| join kind=inner (ItemUserInteractions) on tempKey
| where scope == scope1
| extend similarity = series_cosine_similarity(interactList, interactList1)
| extend similarity = iff(isnan(similarity), 0.0, similarity)
| project resource, resource1, scope, similarity
);
// Predict user-item interactions based on item similarities
let Predictions = (
accessTrainData
| join kind=inner (ItemSimilarities) on scope and $left.resource == $right.resource1
| project entity, resource=resource2, usedOperation, similarity
| summarize accessAnomalyScore = sum(usedOperation * similarity) / sum(abs(similarity)) by entity, resource
| extend accessAnomalyScore = iff(isnan(accessAnomalyScore), 0.0, accessAnomalyScore)
| extend accessAnomalyScore = 1 - accessAnomalyScore
| extend accessAnomalyScore = round(accessAnomalyScore, 4)
| join kind=inner accessTrainData on entity, resource
| project entity, resource, scope, usedOperation, accessAnomalyScore
| extend accessAnomalyScore = iff(usedOperation == 0.0, accessAnomalyScore, todouble(usedOperation))
| order by entity asc, resource
);
let resultsData = (
processedData
| where dataSet == "detectSet"
| join kind=leftouter Predictions on entity, resource, scope
| extend isAnomalousAccess = iff(accessAnomalyScore > anomalyScoreThresh, 1, 0)
| project-away sliceTime, entity1, resource1, scope1, usedOperation
);
resultsData
};
// synthetic data generation
let detectPeriodStart = datetime(2022-04-30 05:00);
let trainPeriodStart = datetime(2022-03-01 05:00);
let names = pack_array("Admin", "Dev1", "Dev2", "IT-support");
let countNames = array_length(names);
let devices = toscalar(range device_id from 1 to 51 step 1 | extend device = strcat("device", tostring(device_id)) | summarize devices_array = make_list(device));
let countDevices = array_length(devices)-1;
let testData = range t from 0 to 24*60 step 1
| extend timeSlice = trainPeriodStart + 1h * t
| extend userName = tostring(names[toint(rand(countNames))])
| extend deviceId = tostring(devices[toint(rand(countDevices))])
| extend accountName = iff(((rand() < 0.2) and (timeSlice < detectPeriodStart)), 'testEnvironment', 'prodEnvironment')
| extend userName = iff(timeSlice == trainPeriodStart, 'H4ck3r', userName)
| extend deviceId = iff(timeSlice == trainPeriodStart, 'device1', deviceId)
| extend accountName = iff(timeSlice == trainPeriodStart, 'prodEnvironment', accountName)
| extend userName = iff(timeSlice == detectPeriodStart, 'H4ck3r', userName)
| extend deviceId = iff(timeSlice == detectPeriodStart, 'device50', deviceId)
| extend accountName = iff(timeSlice == detectPeriodStart, 'prodEnvironment', accountName)
| sort by timeSlice desc
;
testData
| invoke detect_anomalous_access_cf_fl(entityColumnName = 'userName'
, resourceColumnName = 'deviceId'
, scopeColumnName = 'accountName'
, timeColumnName = 'timeSlice'
, startTraining = trainPeriodStart
, startDetection = detectPeriodStart
, endDetection = detectPeriodStart
)
アウトプット
t | timeSlice | ユーザー名 | デバイスID | アカウント名 | エンティティ | リソース | 範囲 | データセット | accessAnomalyScore | isAnomalousAccess |
---|---|---|---|---|---|---|---|---|---|---|
1440 | 2022-04-30 05:00:00.0000000 | H4ck3r | device50 | prodEnvironment | H4ck3r | device50 | prodEnvironment | detectSet | 0.982 | 1 |
関数の実行の出力には、検出期間中の各異常なエンティティ リソース アクセス イベントが表示され、予測されたアクセス確率 (コラボレーション フィルター処理に基づく) が定義された異常しきい値 (既定では 0.9) より高かった場合にフィルター処理されます。 わかりやすくするために、追加のフィールドが追加されます。
-
dataSet
: 現在のデータセット (常にdetectSet
)。 -
accessAnomalyScore
: コラボレーション フィルタリング モデリングに基づく、このアクセスの予測されたアクセスの異常スコア。 値の範囲は [0, 1] で、値が大きいほど異常の度合いが高いことを示します。 -
isAnomalousAccess
: 異常なアクセスのバイナリ フラグ
既定のパラメーターを指定して関数を実行すると、ユーザー 'H4ck3r' によるアクセス試行に'prodEnvironment' アカウント内のデバイス 'device50' にフラグが設定されます。 予測されるアクセスの異常スコアは 0.982 です。これは非常に高く、履歴パターンに基づくトレーニング済みのモデルに従って、このアクセスが予期しないであることを示します。
トレーニング期間中、コラボレーション フィルタリング モデルは、スコープ内のユーザーとデバイス間のアクセス パターンを学習しました。 'device50' にアクセスする 'H4ck3r' は観察されず、履歴データでは考えられなかったため、異常としてフラグが付けられます。
出力テーブルには、これらの異常なアクセスと予測されるアクセス スコアが表示されます。 これらのフィールドは、詳細な調査、アラート、または広範な検出ワークフローとの統合に役立ちます。
サイバーセキュリティ コンテキストで推奨される使用方法は、ユーザー名や IP などの重要なエンティティを監視し、対応するスコープ (アカウントやサブスクリプションなど) 内のデバイス、データベース、アプリケーションなどの重要なリソースにアクセスすることです。