使用共同作業篩選 (CF) 模型偵測異常存取,以識別時間戳數據中的異常存取模式。
函 detect_anomalous_access_cf_fl()
式是 使用者定義函式 (UDF), 可套用共同作業篩選 (CF) 模型來偵測異常互動,例如實體資源。 例如,根據存取記錄之類的時間戳數據,存取記憶體帳戶的用戶主體名稱(UPN)。 在網路安全內容中,此函式可協助偵測異常或未經授權的存取模式。
CF 型模型會使用專案相似性來預測存取分數,並利用歷程記錄存取模式以及實體和資源之間的餘弦相似度。 它會估計實體在指定範圍內的定義偵測期間存取資源的機率,例如訂用帳戶或帳戶。 數個選擇性參數,包括最小臨界值,允許自定義模型的行為。
模型輸出範圍 [0, 1] 中的存取異常分數,其中 0 表示合法存取的可能性很高,1 表示高度異常存取。 除了存取異常分數之外,函式也會傳回二進位異常旗標(根據定義的臨界值),以及其他說明字段。
語法
detect_anomalous_access_cf_fl(
entityColumnName, resourceColumnName, scopeColumnName, timeColumnName, startTraining, startDetection, endDetection, [anomalyScoreThresh] )
深入瞭解 語法慣例。
參數
名稱 | 類型 | 為必填項目 | 說明 |
---|---|---|---|
entityColumnName | string |
✔️ | 輸入數據表數據行的名稱,其中包含計算 cf 模型的實體名稱或標識碼。 |
resourceColumnName | string |
✔️ | 輸入數據表數據行的名稱,其中包含計算模型的資源名稱或標識符。 |
scopeColumnName | string |
✔️ | 包含數據分割或範圍的輸入數據表數據行名稱,以便針對每個範圍建置不同的異常模型。 |
timeColumnName | string |
✔️ | 輸入數據表數據行的名稱,其中包含用來定義定型和偵測周期的時間戳。 |
startTraining | datetime |
✔️ | 異常模型的定型期間開頭。 其結尾是由偵測週期的開頭所定義。 |
startDetection | datetime |
✔️ | 異常偵測的偵測期間開頭。 |
endDetection | datetime |
✔️ | 異常偵測的偵測期間結束。 |
anomalyScoreThresh | real |
偵測到異常的異常分數最大值,範圍 [0, 1] 中的數位。 較高的值表示只會將較顯著的案例視為異常,因此偵測到的異常較少(精確度較高、召回率較低)。 預設值為 0.9。 |
函式定義
您可以將函式的程式代碼內嵌為查詢定義的函式,或將其建立為資料庫中的預存函式,以定義函式,如下所示:
使用下列 let 語句定義函式。 不需要任何許可權。
let detect_anomalous_access_cf_fl = (T:(*), entityColumnName:string, resourceColumnName:string, scopeColumnName:string
, timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let processedData = (
T
| extend entity = column_ifexists(entityColumnName, '')
| extend resource = column_ifexists(resourceColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
| extend sliceTime = todatetime(column_ifexists(timeColumnName, ''))
| where isnotempty(scope) and isnotempty(entity) and isnotempty(resource) and isnotempty(sliceTime)
| extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
, sliceTime >= startDetection and sliceTime <= endDetection , 'detectSet'
, 'other')
| where dataSet in ('trainSet', 'detectSet')
);
// Create all possible pairs (entity, resource) with the same scope
let entities = (
processedData
| where dataSet == 'trainSet'
| summarize by entity, scope
| extend temp = 1
);
let resources = (
processedData
| where dataSet == 'trainSet'
| summarize by resource, scope
| extend temp = 1
);
let potentialAccessTrainData = (
entities
| join kind=inner resources on temp
| distinct entity, resource, scope
);
let accessTrainData = (
potentialAccessTrainData
| join kind=leftouter hint.strategy=broadcast (processedData | where dataSet =='trainSet') on entity, resource, scope
| extend usedOperation = iff(isempty(resource1), 0, 1)
| distinct entity, resource, scope, usedOperation
);
// Aggregate interaction scores per item into a list to prepare for similarity calculations
// Add a temporary key for self-joining later in the process
let ItemUserInteractions = (
accessTrainData
| summarize interactList = make_list(usedOperation) by resource, scope
| extend tempKey=1
);
// Compute item-to-item similarity using cosine similarity
let ItemSimilarities = (
ItemUserInteractions
| join kind=inner (ItemUserInteractions) on tempKey
| where scope == scope1
| extend similarity = series_cosine_similarity(interactList, interactList1)
| extend similarity = iff(isnan(similarity), 0.0, similarity)
| project resource, resource1, scope, similarity
);
// Predict user-item interactions based on item similarities
let Predictions = (
accessTrainData
| join kind=inner (ItemSimilarities) on scope and $left.resource == $right.resource1
| project entity, resource=resource2, usedOperation, similarity
| summarize accessAnomalyScore = sum(usedOperation * similarity) / sum(abs(similarity)) by entity, resource
| extend accessAnomalyScore = iff(isnan(accessAnomalyScore), 0.0, accessAnomalyScore)
| extend accessAnomalyScore = 1 - accessAnomalyScore
| extend accessAnomalyScore = round(accessAnomalyScore, 4)
| join kind=inner accessTrainData on entity, resource
| project entity, resource, scope, usedOperation, accessAnomalyScore
| extend accessAnomalyScore = iff(usedOperation == 0.0, accessAnomalyScore, todouble(usedOperation))
| order by entity asc, resource
);
let resultsData = (
processedData
| where dataSet == "detectSet"
| join kind=leftouter Predictions on entity, resource, scope
| extend isAnomalousAccess = iff(accessAnomalyScore > anomalyScoreThresh, 1, 0)
| project-away sliceTime, entity1, resource1, scope1, usedOperation
);
resultsData
};
// Write your query to use the function here.
範例
下列範例會使用 invoke 運算子 來執行函式。
若要使用查詢定義的函式,請在內嵌函數定義之後叫用它。
let detect_anomalous_access_cf_fl = (T:(*), entityColumnName:string, resourceColumnName:string, scopeColumnName:string
, timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let processedData = (
T
| extend entity = column_ifexists(entityColumnName, '')
| extend resource = column_ifexists(resourceColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
| extend sliceTime = todatetime(column_ifexists(timeColumnName, ''))
| where isnotempty(scope) and isnotempty(entity) and isnotempty(resource) and isnotempty(sliceTime)
| extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
, sliceTime >= startDetection and sliceTime <= endDetection, 'detectSet'
, 'other')
| where dataSet in ('trainSet', 'detectSet')
);
// Create all possible pairs (entity, resource) with the same scope
let entities = (
processedData
| where dataSet == 'trainSet'
| summarize by entity, scope
| extend temp = 1
);
let resources = (
processedData
| where dataSet == 'trainSet'
| summarize by resource, scope
| extend temp = 1
);
let potentialAccessTrainData = (
entities
| join kind=inner resources on temp
| distinct entity, resource, scope
);
let accessTrainData = (
potentialAccessTrainData
| join kind=leftouter hint.strategy=broadcast (processedData | where dataSet =='trainSet') on entity, resource, scope
| extend usedOperation = iff(isempty(resource1), 0, 1)
| distinct entity, resource, scope, usedOperation
);
// Aggregate interaction scores per item into a list to prepare for similarity calculations
// Add a temporary key for self-joining later in the process
let ItemUserInteractions = (
accessTrainData
| summarize interactList = make_list(usedOperation) by resource, scope
| extend tempKey=1
);
// Compute item-to-item similarity using cosine similarity
let ItemSimilarities = (
ItemUserInteractions
| join kind=inner (ItemUserInteractions) on tempKey
| where scope == scope1
| extend similarity = series_cosine_similarity(interactList, interactList1)
| extend similarity = iff(isnan(similarity), 0.0, similarity)
| project resource, resource1, scope, similarity
);
// Predict user-item interactions based on item similarities
let Predictions = (
accessTrainData
| join kind=inner (ItemSimilarities) on scope and $left.resource == $right.resource1
| project entity, resource=resource2, usedOperation, similarity
| summarize accessAnomalyScore = sum(usedOperation * similarity) / sum(abs(similarity)) by entity, resource
| extend accessAnomalyScore = iff(isnan(accessAnomalyScore), 0.0, accessAnomalyScore)
| extend accessAnomalyScore = 1 - accessAnomalyScore
| extend accessAnomalyScore = round(accessAnomalyScore, 4)
| join kind=inner accessTrainData on entity, resource
| project entity, resource, scope, usedOperation, accessAnomalyScore
| extend accessAnomalyScore = iff(usedOperation == 0.0, accessAnomalyScore, todouble(usedOperation))
| order by entity asc, resource
);
let resultsData = (
processedData
| where dataSet == "detectSet"
| join kind=leftouter Predictions on entity, resource, scope
| extend isAnomalousAccess = iff(accessAnomalyScore > anomalyScoreThresh, 1, 0)
| project-away sliceTime, entity1, resource1, scope1, usedOperation
);
resultsData
};
// synthetic data generation
let detectPeriodStart = datetime(2022-04-30 05:00);
let trainPeriodStart = datetime(2022-03-01 05:00);
let names = pack_array("Admin", "Dev1", "Dev2", "IT-support");
let countNames = array_length(names);
let devices = toscalar(range device_id from 1 to 51 step 1 | extend device = strcat("device", tostring(device_id)) | summarize devices_array = make_list(device));
let countDevices = array_length(devices)-1;
let testData = range t from 0 to 24*60 step 1
| extend timeSlice = trainPeriodStart + 1h * t
| extend userName = tostring(names[toint(rand(countNames))])
| extend deviceId = tostring(devices[toint(rand(countDevices))])
| extend accountName = iff(((rand() < 0.2) and (timeSlice < detectPeriodStart)), 'testEnvironment', 'prodEnvironment')
| extend userName = iff(timeSlice == trainPeriodStart, 'H4ck3r', userName)
| extend deviceId = iff(timeSlice == trainPeriodStart, 'device1', deviceId)
| extend accountName = iff(timeSlice == trainPeriodStart, 'prodEnvironment', accountName)
| extend userName = iff(timeSlice == detectPeriodStart, 'H4ck3r', userName)
| extend deviceId = iff(timeSlice == detectPeriodStart, 'device50', deviceId)
| extend accountName = iff(timeSlice == detectPeriodStart, 'prodEnvironment', accountName)
| sort by timeSlice desc
;
testData
| invoke detect_anomalous_access_cf_fl(entityColumnName = 'userName'
, resourceColumnName = 'deviceId'
, scopeColumnName = 'accountName'
, timeColumnName = 'timeSlice'
, startTraining = trainPeriodStart
, startDetection = detectPeriodStart
, endDetection = detectPeriodStart
)
輸出
t | timeSlice | 使用者名稱 | 裝置識別碼 | 帳戶名稱 | 實體 | 資源 | 範圍 | 數據 | accessAnomalyScore | isAnomalousAccess |
---|---|---|---|---|---|---|---|---|---|---|
1440 | 2022-04-30 05:00:00.0000000 | H4ck3r | device50 | prodEnvironment | H4ck3r | device50 | prodEnvironment | detectSet | 0.982 | 1 |
執行函式的輸出會顯示偵測期間的每個異常實體資源存取事件,針對預測存取機率(根據共同作業篩選)高於定義的異常閾值(預設為 0.9) 的情況進行篩選。 為了清楚起見,會新增其他欄位:
-
dataSet
:目前數據集(一律detectSet
為 )。 -
accessAnomalyScore
:根據共同作業篩選模型,此存取的預測存取異常分數。 此值在 [0, 1] 範圍內,較高的值表示異常程度較高。 -
isAnomalousAccess
:異常存取的二進位旗標
使用預設參數執行函式會將使用者 『H4ck3r』 對 『prodEnvironment』 帳戶內的裝置 'device50' 的存取嘗試加上旗標。 預測存取異常分數為 0.982,這是非常高的,表示根據歷程記錄模式的定型模型,此存取是非預期的。
在定型期間,共同作業篩選模型瞭解使用者與裝置在範圍內之間的存取模式。 由於未觀察到存取 『device50』 的 『H4ck3r』,因此在歷史數據中不太可能被視為異常。
輸出數據表會與預測存取分數一起呈現這些異常存取。 這些欄位有助於進一步調查、警示或與更廣泛的偵測工作流程整合。
網路安全性內容中建議的使用方式是監視重要實體,例如用戶名稱或IP、存取其對應範圍內的重要資源,例如裝置、資料庫或應用程式(例如帳戶或訂用帳戶)。