你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

快速入门:使用 Python 将固件映像上传到 Defender for IoT 固件分析

本文介绍如何使用 Python 脚本将固件映像上传到 Defender for IoT 固件分析。

Defender for IoT 固件分析是一种分析固件映像并提供对固件映像中安全漏洞的理解的工具。

先决条件

本快速入门假定你基本了解 Defender for IoT 固件分析。 有关详细信息,请参阅面向设备制造商的固件分析。 有关支持的文件系统的列表,请参阅有关 Defender for IoT 固件分析的常见问题解答

准备环境

  • 需要安装 Python 版本 3.8+ 才能使用此包。 运行命令 python --version 以检查 Python 版本。
  • 记下你的 Azure 订阅 ID、要将映像上传到的资源组的名称、你的工作区名称,以及你要上传的固件映像的名称。
  • 确保你的 Azure 帐户具有将固件映像上传到 Azure 订阅的 Defender for IoT 固件分析所需的权限。 你必须是订阅或资源组级别的所有者、参与者、安全管理员或固件分析管理员才能上传固件映像。 有关详细信息,请访问 Defender for IoT 固件分析角色、范围和功能
  • 确保固件映像与 Python 脚本存储在同一目录中。
  • 安装运行此脚本所需的包:
    pip install azure-mgmt-iotfirmwaredefense
    pip install azure-identity
    
  • 通过运行命令 az login 登录到你的 Azure 帐户。

运行以下 Python 脚本

将以下 Python 脚本复制到 .py 文件中,并将其保存到固件映像所在的同一目录中。 将 subscription_id 变量替换为你的 Azure 订阅 ID,将 resource_group_name 替换为要将固件映像上传到的资源组的名称,将 firmware_file 替换为固件映像的名称,该映像与 Python 脚本保存在同一个目录中。

from azure.identity import AzureCliCredential
from azure.mgmt.iotfirmwaredefense import *
from azure.mgmt.iotfirmwaredefense.models import *
from azure.core.exceptions import *
from azure.storage.blob import BlobClient
import uuid
from time import sleep
from halo import Halo
from tabulate import tabulate

subscription_id = "subscription-id"
resource_group_name = "resource-group-name"
workspace_name = "default"
firmware_file = "firmware-image-name"

def main():
    firmware_id = str(uuid.uuid4())
    fw_client = init_connections(firmware_id)
    upload_firmware(fw_client, firmware_id)
    get_results(fw_client, firmware_id)

def init_connections(firmware_id):
    spinner = Halo(text=f"Creating client for firmware {firmware_id}")
    cli_credential = AzureCliCredential()
    client = IoTFirmwareDefenseMgmtClient(cli_credential, subscription_id, 'https://management.azure.com')
    spinner.succeed()
    return client

def upload_firmware(fw_client, firmware_id):
    spinner = Halo(text="Uploading firmware to Azure...", spinner="dots")
    spinner.start()
    token = fw_client.workspaces.generate_upload_url(resource_group_name, workspace_name, {"firmware_id": firmware_id})
    fw_client.firmwares.create(resource_group_name, workspace_name, firmware_id, {"properties": {"file_name": firmware_file, "vendor": "Contoso Ltd.", "model": "Wifi Router", "version": "1.0.1", "status": "Pending"}})
    bl_client = BlobClient.from_blob_url(token.url)
    with open(file=firmware_file, mode="rb") as data:
        bl_client.upload_blob(data=data)
    spinner.succeed()

def get_results(fw_client, firmware_id):
    fw = fw_client.firmwares.get(resource_group_name, workspace_name, firmware_id)

    spinner = Halo("Waiting for analysis to finish...", spinner="dots")
    spinner.start()
    while fw.properties.status != "Ready":
        sleep(5)
        fw = fw_client.firmwares.get(resource_group_name, workspace_name, firmware_id)
    spinner.succeed()

    print("-"*107)

    summary = fw_client.summaries.get(resource_group_name, workspace_name, firmware_id, summary_name=SummaryName.FIRMWARE)
    print_summary(summary.properties)
    print()

    components = fw_client.sbom_components.list_by_firmware(resource_group_name, workspace_name, firmware_id)
    if components is not None:
        print_components(components)
    else:
        print("No components found")

def print_summary(summary):
    table = [[summary.extracted_size, summary.file_size, summary.extracted_file_count, summary.component_count, summary.binary_count, summary.analysis_time_seconds, summary.root_file_systems]]
    header = ["Extracted Size", "File Size", "Extracted Files", "Components", "Binaries", "Analysis Time", "File Systems"]
    print(tabulate(table, header))

def print_components(components):
    table = []
    header = ["Component", "Version", "License", "Paths"]
    for com in components:
        table.append([com.properties.component_name, com.properties.version, com.properties.license, com.properties.file_paths])
    print(tabulate(table, header, maxcolwidths=[None, None, None, 57]))

if __name__ == "__main__":
    exit(main())