Aracılığıyla paylaş


Jupyter not defteri kodu örnekleri

Bu makalede, Microsoft Sentinel veri gölündeki güvenlik verilerini analiz etmek için Jupyter not defterlerini kullanarak Microsoft Sentinel lake verileriyle (önizleme) nasıl etkileşim kurulduğunu gösteren bazı örnek kod parçacıkları sunulur. Bu örneklerde Microsoft Entra ID oturum açma günlükleri, grup bilgileri ve cihaz ağ olayları gibi çeşitli tablolardaki verilere erişme ve bunları analiz etme adımları gösterilmektedir. Kod parçacıkları, Microsoft Sentinel uzantısını kullanarak Visual Studio Code içindeki Jupyter not defterlerinde çalışacak şekilde tasarlanmıştır.

Bu örnekleri çalıştırmak için gerekli izinlere ve Visual Studio Code'un Microsoft Sentinel uzantısıyla yüklenmiş olması gerekir. Daha fazla bilgi için bkz . Microsoft Sentinel data lake izinleri ve Microsoft Sentinel data lake ile Jupyter not defterlerini kullanma.

Başarısız oturum açma girişimleri analizi

Bu örnek, başarısız oturum açma girişimleri olan kullanıcıları tanımlar. Bunu yapmak için, bu not defteri örneği iki tablodan oturum açma verilerini işler:

  • SigninLogs
  • AAD Etkileşimsiz Kullanıcı Oturum Açma Kayıtları

Not defteri aşağıdaki adımları gerçekleştirir:

  1. Belirtilen tablolardan verileri işlemek için aşağıdakileri içeren bir işlev oluşturun:
    1. Belirtilen tablolardaki verileri DataFrames'e yükleyin.
    2. 'ErrorCode' öğesini ayıklamak ve her oturum açma girişiminin başarılı mı yoksa başarısız mı olduğunu belirlemek için 'Status' JSON alanını ayrıştırın.
    3. Her kullanıcı için başarısız ve başarılı oturum açma girişimlerinin sayısını saymak için verileri toplama.
    4. Verileri yalnızca 100'den fazla başarısız oturum açma girişimine ve en az bir başarılı oturum açma girişimine sahip kullanıcıları içerecek şekilde filtreleyin.
    5. Sonuçları başarısız oturum açma denemelerinin sayısına göre sıralayın.
  2. SigninLogs ve AADNonInteractiveUserSignInLogs tabloları için fonksiyonu çağırın.
  3. Her iki tablonun sonuçlarını tek bir DataFrame'de birleştirin.
  4. DataFrame'i Pandas DataFrame'e dönüştürün.
  5. En fazla başarısız oturum açma girişimine sahip ilk 20 kullanıcıyı göstermek için Pandas DataFrame'i filtreleyin.
  6. En fazla başarısız oturum açma girişimine sahip kullanıcıları görselleştirmek için bir çubuk grafik oluşturun.

Uyarı

Bu not defterinin günlük tablolarındaki veri hacmine bağlı olarak Büyük havuzda çalıştırılması yaklaşık 10 dakika sürer

# Import necessary libraries
import matplotlib.pyplot as plt
from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, when, count, from_json, desc
from pyspark.sql.types import StructType, StructField, StringType

data_provider = MicrosoftSentinelProvider(spark)

# Function to process data
def process_data(table_name,workspace_name):
    # Load data into DataFrame
    df = data_provider.read_table(table_name, workspace_name)
    
    # Define schema for parsing the 'Status' JSON field
    status_schema = StructType([StructField("errorCode", StringType(), True)])
    # Parse the 'Status' JSON field to extract 'errorCode'
    df = df.withColumn("Status_json", from_json(col("Status"), status_schema)) \
           .withColumn("ResultType", col("Status_json.errorCode"))
    # Define success codes
    success_codes = ["0", "50125", "50140", "70043", "70044"]
    
    # Determine FailureOrSuccess based on ResultType
    df = df.withColumn("FailureOrSuccess", when(col("ResultType").isin(success_codes), "Success").otherwise("Failure"))
    
    # Summarize FailureCount and SuccessCount
    df = df.groupBy("UserPrincipalName", "UserDisplayName", "IPAddress") \
           .agg(count(when(col("FailureOrSuccess") == "Failure", True)).alias("FailureCount"),
                count(when(col("FailureOrSuccess") == "Success", True)).alias("SuccessCount"))
    
    # Filter where FailureCount > 100 and SuccessCount > 0
    df = df.filter((col("FailureCount") > 100) & (col("SuccessCount") > 0))
    
    # Order by FailureCount descending
    df = df.orderBy(desc("FailureCount"))
         
    return df

# Process the tables to a common schema
workspace_name = "your-workspace-name"  # Replace with your actual workspace name
aad_signin = process_data("SigninLogs", workspace_name)
aad_non_int = process_data("AADNonInteractiveUserSignInLogs", workspace_name)

# Union the DataFrames
result_df = aad_signin.unionByName(aad_non_int)

# Show the result
result_df.show()

# Convert the Spark DataFrame to a Pandas DataFrame
result_pd_df = result_df.toPandas()

# Filter to show table with top 20 users with the highest failed sign-ins attempted
top_20_df = result_pd_df.nlargest(20, 'FailureCount')

# Create bar chart to show users by highest failed sign-ins attempted
plt.figure(figsize=(12, 6))
plt.bar(top_20_df['UserDisplayName'], top_20_df['FailureCount'], color='skyblue')
plt.xlabel('Users')
plt.ylabel('Number of Failed sign-ins')
plt.title('Top 20 Users with Failed sign-ins')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()  

Aşağıdaki ekran görüntüsünde, çubuk grafik biçiminde en fazla başarısız oturum açma girişimine sahip ilk 20 kullanıcıyı gösteren yukarıdaki kodun çıkışının bir örneği gösterilmektedir.

En fazla başarısız oturum açma girişimine sahip kullanıcıların çubuk grafiğini gösteren ekran görüntüsü.

Access lake tier Microsoft Entra ID Group tablosu

Aşağıdaki kod örneği, Microsoft Sentinel veri gölündeki EntraGroups tablosuna nasıl erişileceğini gösterir. displayName, groupTypes, mail, mailNickname, description ve tenantId gibi çeşitli alanları görüntüler.

from sentinel_lake.providers import MicrosoftSentinelProvider
data_provider = MicrosoftSentinelProvider(spark)
 
table_name = "EntraGroups"  
df = data_provider.read_table(table_name)  
df.select("displayName", "groupTypes", "mail", "mailNickname", "description", "tenantId").show(100, truncate=False)   

Aşağıdaki ekran görüntüsünde, Microsoft Entra ID grup bilgilerini bir veri çerçevesi biçiminde görüntüleyen yukarıdaki kodun çıktısının bir örneği gösterilmektedir.

Microsoft Entra Id grup tablosundan alınan örnek çıkışı gösteren ekran görüntüsü.

Belirli bir kullanıcı için Microsoft Entra Id oturum açma günlüklerine erişme

Aşağıdaki kod örneği, Microsoft Entra ID SigninLogs tablosuna nasıl erişip belirli bir kullanıcının sonuçlarını filtrelemeyi gösterir. UserDisplayName, UserPrincipalName, UserId ve daha fazlası gibi çeşitli alanları alır.

from sentinel_lake.providers import MicrosoftSentinelProvider
data_provider = MicrosoftSentinelProvider(spark)
 
table_name = "SigninLogs"  
workspace_name = "your-workspace-name"  # Replace with your actual workspace name
df = data_provider.read_table(table_name, workspace_name)  
df.select("UserDisplayName", "UserPrincipalName", "UserId", "CorrelationId", "UserType", 
 "ResourceTenantId", "RiskLevelDuringSignIn", "ResourceProvider", "IPAddress", "AppId", "AADTenantId")\
    .filter(df.UserPrincipalName == "bploni5@contoso.com")\
    .show(100, truncate=False) 

Oturum açma konumlarını inceleme

Aşağıdaki kod örneği, Microsoft Entra ID SigninLogs tablosundan oturum açma konumlarının nasıl ayıklanıp görüntüleneceğini gösterir. from_json işlevini kullanarak, LocationDetails alanının JSON yapısını ayrıştırabilir ve şehir, eyalet ve ülke veya bölge gibi belirli konum özniteliklerine erişebilirsiniz.

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import from_json, col  
from pyspark.sql.types import StructType, StructField, StringType  
 
data_provider = MicrosoftSentinelProvider(spark)  
workspace_name = "your-workspace-name"  # Replace with your actual workspace name
table_name = "SigninLogs"  
df = data_provider.read_table(table_name, workspace_name)  
 
location_schema = StructType([  
  StructField("city", StringType(), True),  
  StructField("state", StringType(), True),  
  StructField("countryOrRegion", StringType(), True)  
])  
 
# Extract location details from JSON  
df = df.withColumn("LocationDetails", from_json(col("LocationDetails"), location_schema))  
df = df.select("UserPrincipalName", "CreatedDateTime", "IPAddress", 
 "LocationDetails.city", "LocationDetails.state", "LocationDetails.countryOrRegion")  
 
sign_in_locations_df = df.orderBy("CreatedDateTime", ascending=False)  
sign_in_locations_df.show(100, truncate=False) 

Olağan dışı ülkelerden oturum açma işlemleri

Aşağıdaki kod örneği, bir kullanıcının tipik oturum açma düzeninin parçası olmayan ülkelerden gelen oturum açmaların nasıl tanımlandığını gösterir.

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

data_provider = MicrosoftSentinelProvider(spark)
table_name = "signinlogs"
workspace_name = "your-workspace-name"  # Replace with your actual workspace name
df = data_provider.read_table(table_name, workspace_name)

location_schema = StructType([
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("countryOrRegion", StringType(), True)
])

# Extract location details from JSON
df = df.withColumn("LocationDetails", from_json(col("LocationDetails"), location_schema))
df = df.select(
    "UserPrincipalName",
    "CreatedDateTime",
    "IPAddress",
    "LocationDetails.city",
    "LocationDetails.state",
    "LocationDetails.countryOrRegion"
)

sign_in_locations_df = df.orderBy("CreatedDateTime", ascending=False)
sign_in_locations_df.show(100, truncate=False)

Birden çok başarısız oturum açma girişiminden kaynaklanan brute force saldırısı

Çok sayıda başarısız oturum açma girişimi olan hesaplar için kullanıcı oturum açma günlüklerini analiz ederek olası deneme yanılma saldırılarını belirleyin.

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, when, count, from_json, desc
from pyspark.sql.types import StructType, StructField, StringType

data_provider = MicrosoftSentinelProvider(spark)

def process_data(table_name, workspace_name):
    df = data_provider.read_table(table_name, workspace_name)
    status_schema = StructType([StructField("errorCode", StringType(), True)])
    df = df.withColumn("Status_json", from_json(col("Status"), status_schema)) \
           .withColumn("ResultType", col("Status_json.errorCode"))
    success_codes = ["0", "50125", "50140", "70043", "70044"]
    df = df.withColumn("FailureOrSuccess", when(col("ResultType").isin(success_codes), "Success").otherwise("Failure"))
    df = df.groupBy("UserPrincipalName", "UserDisplayName", "IPAddress") \
           .agg(count(when(col("FailureOrSuccess") == "Failure", True)).alias("FailureCount"),
                count(when(col("FailureOrSuccess") == "Success", True)).alias("SuccessCount"))
    # Lower the brute force threshold to >10 failures and remove the success requirement
    df = df.filter(col("FailureCount") > 10)
    df = df.orderBy(desc("FailureCount"))
    df = df.withColumn("AccountCustomEntity", col("UserPrincipalName")) \
           .withColumn("IPCustomEntity", col("IPAddress"))
    return df
workspace_name = "your-workspace-name"  # Replace with your actual workspace name
aad_signin = process_data("SigninLogs", workspace_name)
aad_non_int = process_data("AADNonInteractiveUserSignInLogs",workspace_name)
result_df = aad_signin.unionByName(aad_non_int)
result_df.show()

Yanal hareket girişimlerini algılama

Uç noktalar arasındaki anormal SMB/RDP trafiği gibi yanal hareket sinyali veren şüpheli iç IP bağlantılarını belirlemek için DeviceNetworkEvents'i kullanın.

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, count, countDistinct, desc

deviceNetworkEventTable = "DeviceNetworkEvents"
workspace_name = "<your-workspace-name>"  # Replace with your actual workspace name
data_provider = MicrosoftSentinelProvider(spark)
device_network_events = data_provider.read_table(deviceNetworkEventTable, workspace_name)

# Define internal IP address range (example: 10.x.x.x, 192.168.x.x, 172.16.x.x - 172.31.x.x)
internal_ip_regex = r"^(10\.\d{1,3}\.\d{1,3}\.\d{1,3}|192\.168\.\d{1,3}\.\d{1,3}|172\.(1[6-9]|2[0-9]|3[0-1])\.\d{1,3}\.\d{1,3})$"

# Filter for internal-to-internal connections
internal_connections = device_network_events.filter(
    col("RemoteIP").rlike(internal_ip_regex) &
    col("LocalIP").rlike(internal_ip_regex)
)

# Group by source and destination, count connections
suspicious_lateral = (
    internal_connections.groupBy("LocalIP", "RemoteIP", "InitiatingProcessAccountName")
    .agg(count("*").alias("ConnectionCount"))
    .filter(col("ConnectionCount") > 10)  # Threshold can be adjusted
    .orderBy(desc("ConnectionCount"))
)
suspicious_lateral.show()

Kimlik bilgisi dökümü araçlarını keşfetme

DeviceProcessEvents'i sorgulayarak kimlik bilgisi avcılığına işaret edebilecek mimikatz.exe gibi süreçleri veya beklenmeyen lsass.exe yürütmelerini bulun.

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, lower

workspace_id = "<your-workspace-name>"
device_process_table = "DeviceProcessEvents"

data_provider = MicrosoftSentinelProvider(spark)
process_events = data_provider.read_table(device_process_table, workspace_id)

# Look for known credential dumping tools and suspicious access to lsass.exe
suspicious_processes = process_events.filter(
    (lower(col("FileName")).rlike("mimikatz|procdump|lsassy|nanodump|sekurlsa|dumpert")) |
    (
        (lower(col("FileName")) == "lsass.exe") &
        (~lower(col("InitiatingProcessFileName")).isin(["services.exe", "wininit.exe", "taskmgr.exe"]))
    )
)

suspicious_processes.select(
    "Timestamp",
    "DeviceName",
    "AccountName",
    "FileName",
    "FolderPath",
    "InitiatingProcessFileName",
    "InitiatingProcessCommandLine"
).show(50, truncate=False)

USB etkinliğinin hassas dosya erişimiyle bağıntısı

Olası veri sızdırma desenlerini ortaya çıkarabilmek için bir not defterinde DeviceEvents ve DeviceFileEvents'i birleştirin. Hangi cihazların, kullanıcıların veya dosyaların ne zaman dahil olduğunu göstermek için görselleştirmeler ekleyin.

from sentinel_lake.providers import MicrosoftSentinelProvider
from pyspark.sql.functions import col, lower, to_timestamp, expr
import matplotlib.pyplot as plt

data_provider = MicrosoftSentinelProvider(spark)
workspace_id = “<your-workspace-id>”

# Load DeviceEvents and DeviceFileEvents tables
device_events = data_provider.read_table("DeviceEvents", workspace_id)
device_file_events = data_provider.read_table("DeviceFileEvents", workspace_id)
device_info = data_provider.read_table("DeviceInfo", workspace_id)

# Filter for USB device activity (adjust 'ActionType' or 'AdditionalFields' as needed)
usb_events = device_events.filter(
    lower(col("ActionType")).rlike("usb|removable|storage")
)

# Filter for sensitive file access (e.g., files in Documents, Desktop, or with sensitive extensions)
sensitive_file_events = device_file_events.filter(
    lower(col("FolderPath")).rlike("documents|desktop|finance|confidential|secret|sensitive") |
    lower(col("FileName")).rlike(r"\.(docx|xlsx|pdf|csv|zip|7z|rar|pst|bak)$")
)

# Convert timestamps
usb_events = usb_events.withColumn("EventTime", to_timestamp(col("Timestamp")))
sensitive_file_events = sensitive_file_events.withColumn("FileEventTime", to_timestamp(col("Timestamp")))

# Join on DeviceId and time proximity (within 10 minutes) using expr for column operations
joined = usb_events.join(
    sensitive_file_events,
    (usb_events.DeviceId == sensitive_file_events.DeviceId) &
    (expr("abs(unix_timestamp(EventTime) - unix_timestamp(FileEventTime)) <= 600")),
    "inner"
) \
.join(device_info, usb_events.DeviceId == device_info.DeviceId, "inner")


# Select relevant columns
correlated = joined.select(
    device_info.DeviceName,
    usb_events.DeviceId,
    usb_events.AccountName,
    usb_events.EventTime.alias("USBEventTime"),
    sensitive_file_events.FileName,
    sensitive_file_events.FolderPath,
    sensitive_file_events.FileEventTime
)

correlated.show(50, truncate=False)

# Visualization: Number of sensitive file accesses per device
pd_df = correlated.toPandas()
if not pd_df.empty:
    plt.figure(figsize=(12, 6))
    pd_df.groupby('DeviceName').size().sort_values(ascending=False).head(10).plot(kind='bar')
    plt.title('Top Devices with Correlated USB and Sensitive File Access Events')
    plt.xlabel('DeviceName')
    plt.ylabel('Number of Events')
    plt.tight_layout()
    plt.show()
else:
    print("No correlated USB and sensitive file access events found in the selected period.")

İşaret davranışı algılama

Uzun süreler boyunca düşük bayt birimlerinde normal giden bağlantıları kümeleyerek olası komut ve denetimi algılayın.

# Setup
from pyspark.sql.functions import col, to_timestamp, window, count, avg, stddev, hour, date_trunc
from sentinel_lake.providers import MicrosoftSentinelProvider 
import matplotlib.pyplot as plt
import pandas as pd

data_provider = MicrosoftSentinelProvider(spark)
device_net_events = "DeviceNetworkEvents"
workspace_id = "<your-workspace-id>"

network_df = data_provider.read_table(device_net_events, workspace_id)

# Add hour bucket to group by frequency
network_df = network_df.withColumn("HourBucket", date_trunc("hour", col("Timestamp")))

# Group by device and IP to count hourly traffic
hourly_traffic = network_df.groupBy("DeviceName", "RemoteIP", "HourBucket") \
    .agg(count("*").alias("ConnectionCount"))

# Count number of hours this IP talks to device
stats_df = hourly_traffic.groupBy("DeviceName", "RemoteIP") \
    .agg(
        count("*").alias("HoursSeen"),
        avg("ConnectionCount").alias("AvgConnPerHour"),
        stddev("ConnectionCount").alias("StdDevConnPerHour")
    )

# Filter beacon-like traffic: low stddev, repeated presence
beacon_candidates = stats_df.filter(
    (col("HoursSeen") > 10) &
    (col("AvgConnPerHour") < 5) &
    (col("StdDevConnPerHour") < 1.0)
)

beacon_candidates.show(truncate=False)

# Choose one Device + IP pair to plot
example = beacon_candidates.limit(1).collect()[0]
example_device = example["DeviceName"]
example_ip = example["RemoteIP"]

# Filter hourly traffic for that pair
example_df = hourly_traffic.filter(
    (col("DeviceName") == example_device) & 
    (col("RemoteIP") == example_ip)
).orderBy("HourBucket")

# Convert to Pandas and plot
example_pd = example_df.toPandas()
example_pd["HourBucket"] = pd.to_datetime(example_pd["HourBucket"])

plt.figure(figsize=(12, 5))
plt.plot(example_pd["HourBucket"], example_pd["ConnectionCount"], marker="o", linestyle="-")
plt.title(f"Outbound Connections – {example_device} to {example_ip}")
plt.xlabel("Time (Hourly)")
plt.ylabel("Connection Count")
plt.grid(True)
plt.tight_layout()
plt.show()