共用方式為


檢測異常訪問_cf_fl()

適用於:✅Microsoft網狀架構

使用共同作業篩選 (CF) 模型偵測異常存取,以識別時間戳數據中的異常存取模式。

detect_anomalous_access_cf_fl() 式是 使用者定義函式 (UDF), 可套用共同作業篩選 (CF) 模型來偵測異常互動,例如實體資源。 例如,根據存取記錄之類的時間戳數據,存取記憶體帳戶的用戶主體名稱(UPN)。 在網路安全內容中,此函式可協助偵測異常或未經授權的存取模式。

CF 型模型會使用專案相似性來預測存取分數,並利用歷程記錄存取模式以及實體和資源之間的餘弦相似度。 它會估計實體在指定範圍內的定義偵測期間存取資源的機率,例如訂用帳戶或帳戶。 數個選擇性參數,包括最小臨界值,允許自定義模型的行為。

模型輸出範圍 [0, 1] 中的存取異常分數,其中 0 表示合法存取的可能性很高,1 表示高度異常存取。 除了存取異常分數之外,函式也會傳回二進位異常旗標(根據定義的臨界值),以及其他說明字段。

語法

detect_anomalous_access_cf_fl( entityColumnNameresourceColumnName, scopeColumnNametimeColumnNamestartTrainingstartDetection, endDetection, [anomalyScoreThresh] )

深入瞭解 語法慣例

參數

名稱 類型 為必填項目 說明
entityColumnName string ✔️ 輸入數據表數據行的名稱,其中包含計算 cf 模型的實體名稱或標識碼。
resourceColumnName string ✔️ 輸入數據表數據行的名稱,其中包含計算模型的資源名稱或標識符。
scopeColumnName string ✔️ 包含數據分割或範圍的輸入數據表數據行名稱,以便針對每個範圍建置不同的異常模型。
timeColumnName string ✔️ 輸入數據表數據行的名稱,其中包含用來定義定型和偵測周期的時間戳。
startTraining datetime ✔️ 異常模型的定型期間開頭。 其結尾是由偵測週期的開頭所定義。
startDetection datetime ✔️ 異常偵測的偵測期間開頭。
endDetection datetime ✔️ 異常偵測的偵測期間結束。
anomalyScoreThresh real 偵測到異常的異常分數最大值,範圍 [0, 1] 中的數位。 較高的值表示只會將較顯著的案例視為異常,因此偵測到的異常較少(精確度較高、召回率較低)。 預設值為 0.9。

函式定義

您可以將函式的程式代碼內嵌為查詢定義的函式,或將其建立為資料庫中的預存函式,以定義函式,如下所示:

使用下列 let 語句定義函式。 不需要任何許可權。

這很重要

let 語句 無法自行執行。 它後面必須接著 表格式表示式語句。 若要執行 detect_anomalous_access_cf_fl()的工作範例,請參閱 範例

let detect_anomalous_access_cf_fl = (T:(*), entityColumnName:string, resourceColumnName:string, scopeColumnName:string
                                          , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
                                          , anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let processedData = (
    T
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend resource   = column_ifexists(resourceColumnName, '')
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(entity) and isnotempty(resource) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection ,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
// Create all possible pairs (entity, resource) with the same scope
let entities = (
    processedData
    | where dataSet == 'trainSet'
    | summarize by entity, scope
    | extend temp = 1
);
let resources = (
    processedData
    | where dataSet == 'trainSet'
    | summarize by resource, scope
    | extend temp = 1
);
let potentialAccessTrainData = (
    entities
    | join kind=inner resources on temp
    | distinct  entity, resource, scope
);
let accessTrainData = (
    potentialAccessTrainData
    | join kind=leftouter hint.strategy=broadcast (processedData | where dataSet =='trainSet') on entity, resource, scope
    | extend usedOperation = iff(isempty(resource1), 0, 1)
    | distinct entity, resource, scope, usedOperation
);
// Aggregate interaction scores per item into a list to prepare for similarity calculations
// Add a temporary key for self-joining later in the process
let ItemUserInteractions = (
    accessTrainData
    | summarize interactList = make_list(usedOperation) by resource, scope
    | extend tempKey=1
);
// Compute item-to-item similarity using cosine similarity
let ItemSimilarities = (
    ItemUserInteractions
    | join kind=inner (ItemUserInteractions) on tempKey
    | where scope == scope1
    | extend similarity = series_cosine_similarity(interactList, interactList1)
    | extend similarity = iff(isnan(similarity), 0.0, similarity)
    | project resource, resource1, scope, similarity
);
// Predict user-item interactions based on item similarities
let Predictions = (
    accessTrainData
    | join kind=inner (ItemSimilarities) on scope and $left.resource == $right.resource1
    | project entity, resource=resource2, usedOperation, similarity
    | summarize accessAnomalyScore = sum(usedOperation * similarity) / sum(abs(similarity)) by entity, resource
    | extend accessAnomalyScore = iff(isnan(accessAnomalyScore), 0.0, accessAnomalyScore)
    | extend accessAnomalyScore = 1 - accessAnomalyScore
    | extend accessAnomalyScore = round(accessAnomalyScore, 4)
    | join kind=inner accessTrainData on entity, resource
    | project entity, resource, scope, usedOperation, accessAnomalyScore
    | extend accessAnomalyScore = iff(usedOperation == 0.0, accessAnomalyScore, todouble(usedOperation))
    | order by entity asc, resource
);
let resultsData = (
    processedData
    | where dataSet == "detectSet"
    | join kind=leftouter Predictions on entity, resource, scope
    | extend isAnomalousAccess = iff(accessAnomalyScore > anomalyScoreThresh, 1, 0)
    | project-away sliceTime, entity1, resource1, scope1, usedOperation
);
resultsData
};
// Write your query to use the function here.

範例

下列範例會使用 invoke 運算子 來執行函式。

若要使用查詢定義的函式,請在內嵌函數定義之後叫用它。

let detect_anomalous_access_cf_fl = (T:(*), entityColumnName:string, resourceColumnName:string, scopeColumnName:string
                                                , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
                                                , anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let processedData = (
    T
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend resource = column_ifexists(resourceColumnName, '')
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(entity) and isnotempty(resource) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
// Create all possible pairs (entity, resource) with the same scope
let entities = (
    processedData
    | where dataSet == 'trainSet'
    | summarize by entity, scope
    | extend temp = 1
);
let resources = (
    processedData
    | where dataSet == 'trainSet'
    | summarize by resource, scope
    | extend temp = 1
);
let potentialAccessTrainData = (
    entities
    | join kind=inner resources on temp
    | distinct  entity, resource, scope
);
let accessTrainData = (
    potentialAccessTrainData
    | join kind=leftouter hint.strategy=broadcast (processedData | where dataSet =='trainSet') on entity, resource, scope
    | extend usedOperation = iff(isempty(resource1), 0, 1)
    | distinct entity, resource, scope, usedOperation
);
// Aggregate interaction scores per item into a list to prepare for similarity calculations
// Add a temporary key for self-joining later in the process
let ItemUserInteractions = (
    accessTrainData
    | summarize interactList = make_list(usedOperation) by resource, scope
    | extend tempKey=1
);
// Compute item-to-item similarity using cosine similarity
let ItemSimilarities = (
    ItemUserInteractions
    | join kind=inner (ItemUserInteractions) on tempKey
    | where scope == scope1
    | extend similarity = series_cosine_similarity(interactList, interactList1)
    | extend similarity = iff(isnan(similarity), 0.0, similarity)
    | project resource, resource1, scope, similarity
);
// Predict user-item interactions based on item similarities
let Predictions = (
    accessTrainData
    | join kind=inner (ItemSimilarities) on scope and $left.resource == $right.resource1
    | project entity, resource=resource2, usedOperation, similarity
    | summarize accessAnomalyScore = sum(usedOperation * similarity) / sum(abs(similarity)) by entity, resource
    | extend accessAnomalyScore = iff(isnan(accessAnomalyScore), 0.0, accessAnomalyScore)
    | extend accessAnomalyScore = 1 - accessAnomalyScore
    | extend accessAnomalyScore = round(accessAnomalyScore, 4)
    | join kind=inner accessTrainData on entity, resource
    | project entity, resource, scope, usedOperation, accessAnomalyScore
    | extend accessAnomalyScore = iff(usedOperation == 0.0, accessAnomalyScore, todouble(usedOperation))
    | order by entity asc, resource
);
let resultsData = (
    processedData
    | where dataSet == "detectSet"
    | join kind=leftouter Predictions on entity, resource, scope
    | extend isAnomalousAccess = iff(accessAnomalyScore > anomalyScoreThresh, 1, 0)
    | project-away sliceTime, entity1, resource1, scope1, usedOperation
);
resultsData
};
// synthetic data generation
let detectPeriodStart   = datetime(2022-04-30 05:00);
let trainPeriodStart    = datetime(2022-03-01 05:00);
let names               = pack_array("Admin", "Dev1", "Dev2", "IT-support");
let countNames          = array_length(names);
let devices             = toscalar(range device_id from 1 to 51 step 1 | extend device = strcat("device", tostring(device_id)) | summarize devices_array = make_list(device));
let countDevices          = array_length(devices)-1;
let testData            = range t from 0 to 24*60 step 1
    | extend timeSlice      = trainPeriodStart + 1h * t
    | extend userName       = tostring(names[toint(rand(countNames))])
    | extend deviceId       = tostring(devices[toint(rand(countDevices))])
    | extend accountName    = iff(((rand() < 0.2) and (timeSlice < detectPeriodStart)), 'testEnvironment', 'prodEnvironment')
    | extend userName       = iff(timeSlice == trainPeriodStart, 'H4ck3r', userName)
    | extend deviceId       = iff(timeSlice == trainPeriodStart, 'device1', deviceId)
    | extend accountName    = iff(timeSlice == trainPeriodStart, 'prodEnvironment', accountName)
    | extend userName       = iff(timeSlice == detectPeriodStart, 'H4ck3r', userName)
    | extend deviceId       = iff(timeSlice == detectPeriodStart, 'device50', deviceId)
    | extend accountName    = iff(timeSlice == detectPeriodStart, 'prodEnvironment', accountName)
    | sort by timeSlice desc
;
testData
| invoke detect_anomalous_access_cf_fl(entityColumnName    = 'userName'
                                      , resourceColumnName = 'deviceId'
                                      , scopeColumnName    = 'accountName'
                                      , timeColumnName     = 'timeSlice'
                                      , startTraining      = trainPeriodStart
                                      , startDetection     = detectPeriodStart
                                      , endDetection       = detectPeriodStart
                                  )

輸出

t timeSlice 使用者名稱 裝置識別碼 帳戶名稱 實體 資源 範圍 數據 accessAnomalyScore isAnomalousAccess
1440 2022-04-30 05:00:00.0000000 H4ck3r device50 prodEnvironment H4ck3r device50 prodEnvironment detectSet 0.982 1

執行函式的輸出會顯示偵測期間的每個異常實體資源存取事件,針對預測存取機率(根據共同作業篩選)高於定義的異常閾值(預設為 0.9) 的情況進行篩選。 為了清楚起見,會新增其他欄位:

  • dataSet:目前數據集(一律 detectSet為 )。
  • accessAnomalyScore:根據共同作業篩選模型,此存取的預測存取異常分數。 此值在 [0, 1] 範圍內,較高的值表示異常程度較高。
  • isAnomalousAccess:異常存取的二進位旗標

使用預設參數執行函式會將使用者 『H4ck3r』 對 『prodEnvironment』 帳戶內的裝置 'device50' 的存取嘗試加上旗標。 預測存取異常分數為 0.982,這是非常高的,表示根據歷程記錄模式的定型模型,此存取是非預期的。

在定型期間,共同作業篩選模型瞭解使用者與裝置在範圍內之間的存取模式。 由於未觀察到存取 『device50』 的 『H4ck3r』,因此在歷史數據中不太可能被視為異常。

輸出數據表會與預測存取分數一起呈現這些異常存取。 這些欄位有助於進一步調查、警示或與更廣泛的偵測工作流程整合。

網路安全性內容中建議的使用方式是監視重要實體,例如用戶名稱或IP、存取其對應範圍內的重要資源,例如裝置、資料庫或應用程式(例如帳戶或訂用帳戶)。