你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

验证 Azure 机密账本写入事务收据

Azure 机密账本写入交易收据表示加密的 Merkle 证明,即相应的写入交易已由 CCF 网络全局提交。 Azure 机密账本用户可以在任何时间点通过已提交的写入事务获取收据,以验证相应的写入操作是否已成功记录到不可变分类帐中。

有关 Azure 机密账本写入交易收据的更多信息,请参阅专门的文章

收据验证步骤

写入交易收据可以按照以下小节中概述的一组特定步骤进行验证。 CCF 文档中概述了相同的步骤。

叶节点计算

第一步是计算 Merkle 树中对应于已提交事务的叶节点的 SHA-256 哈希。 叶节点由以下字段的有序连接组成,这些字段可以在 Azure 机密分类帐收据中找到,位于 leafComponents 下面:

  1. writeSetDigest
  2. commitEvidence 的 SHA-256 摘要
  3. claimsDigest 字段

这些值需要连接为字节数组:writeSetDigestclaimsDigest 都需要从十六进制数字串转换为字节数组。另一方面,commitEvidence 的哈希(作为字节数组)可以通过在 UTF-8 编码的 commitEvidence 字符串上应用 SHA-256 哈希函数来获得。

类似地,可以通过对结果字节的结果串接应用 SHA-256 哈希函数来计算叶节点哈希摘要。

根节点计算

第二步是计算提交事务时 Merkle 树的根的 SHA-256 哈希。 计算方式是:将先前迭代的结果(从在先前步骤中计算的叶节点哈希开始)与在收据的 proof 字段中提供的有序节点的哈希进行串联并哈希。 proof 列表以有序列表的形式提供,其元素需要按照给定的顺序进行迭代。

需要根据 proof 字段(leftright)中提供的对象中指示的相对顺序对字节表示进行连接。

  • 如果 proof 中当前元素的键为 left,则应将上一次迭代的结果附加到当前元素值的后面。
  • 如果 proof 中当前元素的键为 right,则上一次迭代的结果应附加到当前元素值的前面。

在每次串联之后,需要应用 SHA-256 函数,以便获得下一次迭代的输入。 在给定计算所需节点的情况下,该过程遵循标准步骤来计算 Merkle 树数据结构的根节点。

验证根节点上的签名

第三步是使用收据中的签名节点证书来验证通过根节点哈希产生的加密签名是否有效。 验证过程遵循对使用椭圆曲线数字签名算法 (ECDSA) 签名的消息进行数字签名验证的标准步骤。 更具体地说,步骤包括:

  1. 将 Base64 字符串 signature 解码为字节数组。
  2. 从签名节点证书 cert 中提取 ECDSA 公钥。
  3. 使用从上一步骤中提取的公钥,验证 Merkle 树的根上的签名(根据上一小节中的指令计算)是可信的。 该步骤有效地对应于使用 ECDSA 的标准数字签名验证过程。 在最流行的编程语言中,有许多库允许在某些数据上使用公钥证书来验证 ECDSA 签名(例如,用于 Python 的加密库)。

验证签名节点证书背书

除了上一步,还需要验证当前账本证书是否认可签名节点证书(即已签名)。 此步骤不依赖于前面的其他三个步骤,可以独立于其他步骤执行。

签发收据的当前服务标识可能与背书签名节点的服务标识不同(例如,由于证书续订)。 在这种情况下,需要验证从签名节点证书(即收据中的 cert 字段)到受信任的根证书颁发机构 (CA)(即当前服务标识证书),再到其他先前的服务身份(即收据中的 serviceEndorsements 列表字段)的证书链信任。 提供的 serviceEndorsements 列表是一个有序列表,按照服务标识从最旧到最新的顺序排序。

证书背书需要对整个链进行验证,并遵循与上一小节中概述的数字签名验证过程完全相同的过程。 有一些流行的开源加密库(例如 OpenSSL)通常可用于执行证书背书步骤。

验证应用程序声明摘要

作为可选步骤,如果应用程序声明附加到收据,则可以从公开声明(遵循特定算法)计算声明摘要,并验证摘要是否与收据有效负载中包含的内容匹配 claimsDigest 。 若要从公开的声明对象中计算摘要,需要循环访问列表中的每个应用程序声明对象,并检查其kind字段。

如果声明对象为类型 LedgerEntry,则应提取声明的账本集合 ID(collectionId)和内容(contents),并用于使用声明对象中指定的密钥(secretKey)计算其 HMAC 摘要。 然后连接这两个摘要,计算串联的 SHA-256 哈希。 然后,将串联协议(protocol)和生成的声明数据摘要,并计算串联的另一个 SHA-256 哈希以获取最终摘要。

如果声明对象为类型 ClaimDigest,则应提取声明摘要(value),并与协议(protocol)连接,并计算串联的 SHA-256 哈希以获取最终摘要。

计算每个声明摘要后,必须连接每个应用程序声明对象中的所有计算摘要(按照收据中显示的顺序相同)。 然后,应预先添加已处理的声明数的串联。 上一个串联的 SHA-256 哈希生成最终声明摘要,该摘要应与 claimsDigest 收据对象中存在的匹配。

更多资源

有关 Azure 机密账本写入事务回执的内容和每个字段的说明的详细信息,请参阅专栏文章CCF 文档还包含有关收据验证和其他相关资源的更多信息,请访问以下链接:

验证事务回执

收据验证实用工具

适用于 PythonAzure 机密账本客户端库提供实用工具函数,用于验证写入事务收据,并从应用程序声明列表中计算声明摘要。 有关如何使用数据平面 SDK 和收据特定的实用工具的详细信息,请参阅 本部分此示例代码

安装与先决条件

出于参考目的,我们在 Python 中提供示例代码,以完全验证 Azure 机密账本写入事务回执,并遵循上一部分概述的步骤。

要运行完全验证算法,需要当前服务网络证书和来自正在运行的机密分类帐资源的写交易凭证。 有关如何从机密分类帐实例获取写交易凭证和服务证书的详细信息,请参阅本文

代码演练

以下代码可用于初始化所需的对象并运行收据验证算法。 单独的实用工具 (verify_receipt) 用于运行完整的验证算法,并接受响应中GET_RECEIPT字段的内容receipt作为字典,服务证书作为简单字符串。 如果收据无效或在处理过程中遇到任何错误,该函数将引发异常。

假设收据和服务证书都可以从文件中加载。 请确保使用你要验证的服务证书和收据的相应文件名来更新 service_certificate_file_namereceipt_file_name 常量。

import json 

# Constants
service_certificate_file_name = "<your-service-certificate-file>"
receipt_file_name = "<your-receipt-file>"

# Use the receipt and the service identity to verify the receipt content 
with open(service_certificate_file_name, "r") as service_certificate_file, open( 
    receipt_file_name, "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"] 
    service_certificate_cert = service_certificate_file.read() 

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack
        raise e 

由于验证过程需要一些加密和哈希原语,因此使用以下库来简化计算。

  • CCF Python 库:该模块提供了一组用于收据验证的工具。
  • Python 密码库:一个广泛使用的库,包括各种密码算法和原语。
  • hashlib 模块,Python 标准库的一部分:为流行的哈希算法提供通用接口的模块。
from ccf.receipt import verify, check_endorsements, root 
from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

verify_receipt 函数内部,我们检查给定的收据是否有效并包含所有必需的字段。

# Check that all the fields are present in the receipt 
assert "cert" in receipt 
assert "leafComponents" in receipt 
assert "claimsDigest" in receipt["leafComponents"] 
assert "commitEvidence" in receipt["leafComponents"] 
assert "writeSetDigest" in receipt["leafComponents"] 
assert "proof" in receipt 
assert "signature" in receipt 

我们初始化将在程序的其余部分中使用的变量。

# Set the variables 
node_cert_pem = receipt["cert"] 
claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 
write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
proof_list = receipt["proof"] 
service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
root_node_signature = receipt["signature"] 

我们可以使用密码库为服务标识、签名节点和先前服务标识的背书证书加载 PEM 证书。

# Load service and node PEM certificates 
service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

# Load service endorsements PEM certificates 
service_endorsements_certs = [ 
    load_pem_x509_certificate(pem.encode()) 
    for pem in service_endorsements_certs_pem 
] 

验证过程的第一步是计算叶节点的摘要。

# Compute leaf of the Merkle Tree corresponding to our transaction 
leaf_node_hex = compute_leaf_node( 
    claims_digest_hex, commit_evidence_str, write_set_digest_hex 
)

compute_leaf_node 函数接受收据的叶组件(claimsDigestcommitEvidencewriteSetDigest)作为参数,并以十六进制形式返回叶节点哈希。

如前所述,我们计算了(使用 SHA-256 hashlib 函数)的commitEvidence摘要。 然后,我们将 writeSetDigestclaimsDigest 都转换为字节数组。 最后,我们将这三个数组串联,并使用 SHA256 函数对结果进行摘要。

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string 
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes 
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes 
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

在计算叶节点之后,我们可以计算 Merkle 树的根节点。

# Compute root of the Merkle Tree 
root_node = root(leaf_node_hex, proof_list) 

我们使用作为 CCF Python 库的一部分提供的 root 函数。 该函数将上一次迭代的结果与 proof 中的新元素连续串联,对串联结果进行摘要,然后使用先前计算的摘要对 proof 中的每个元素重复该步骤。 串联需要考虑 Merkle 树中节点的顺序,以确保正确地重新计算根。

def root(leaf: str, proof: List[dict]): 
    """ 
    Recompute root of Merkle tree from a leaf and a proof of the form: 
    [{"left": digest}, {"right": digest}, ...] 
    """ 

    current = bytes.fromhex(leaf) 

    for n in proof: 
        if "left" in n: 
            current = sha256(bytes.fromhex(n["left"]) + current).digest() 
        else: 
            current = sha256(current + bytes.fromhex(n["right"])).digest() 
    return current.hex() 

在计算根节点哈希之后,我们可以在根节点上验证收据中包含的签名,以验证签名是否正确。

# Verify signature of the signing node over the root of the tree 
verify(root_node, root_node_signature, node_cert) 

类似地,CCF 库提供了 verify 函数来执行这种验证。 我们使用签名节点证书的 ECDSA 公钥来验证树根上的签名。

def verify(root: str, signature: str, cert: Certificate):
    """ 
    Verify signature over root of Merkle Tree 
    """ 

    sig = base64.b64decode(signature) 
    pk = cert.public_key() 
    assert isinstance(pk, ec.EllipticCurvePublicKey) 
    pk.verify( 
        sig, 
        bytes.fromhex(root), 
        ec.ECDSA(utils.Prehashed(hashes.SHA256())), 
    )

收据验证的最后一步是验证用于对 Merkle 树根进行签名的证书。

# Verify node certificate is endorsed by the service certificates through endorsements 
check_endorsements(node_cert, service_cert, service_endorsements_certs) 

同样,我们也可以使用 CCF 实用工具 check_endorsements 来验证服务标识是否认可签名节点。 证书链可以由以前的服务证书组成,因此如果 serviceEndorsements 不是空列表,我们应该验证认可是可传递地应用的。

def check_endorsement(endorsee: Certificate, endorser: Certificate): 
    """ 
    Check endorser has endorsed endorsee 
    """ 

    digest_algo = endorsee.signature_hash_algorithm 
    assert digest_algo 
    digester = hashes.Hash(digest_algo) 
    digester.update(endorsee.tbs_certificate_bytes) 
    digest = digester.finalize() 
    endorser_pk = endorser.public_key() 
    assert isinstance(endorser_pk, ec.EllipticCurvePublicKey) 
    endorser_pk.verify( 
        endorsee.signature, digest, ec.ECDSA(utils.Prehashed(digest_algo)) 
    ) 

def check_endorsements( 
    node_cert: Certificate, service_cert: Certificate, endorsements: List[Certificate] 
): 
    """ 
    Check a node certificate is endorsed by a service certificate, transitively through a list of endorsements. 
    """ 

    cert_i = node_cert 
    for endorsement in endorsements: 
        check_endorsement(cert_i, endorsement) 
        cert_i = endorsement 
    check_endorsement(cert_i, service_cert) 

作为一种替代方法,我们还可以使用类似的方法通过使用 OpenSSL 库来验证证书。

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
)

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store 
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates 
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate 
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate 
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

示例代码

提供了代码演练中使用的完整示例代码。

主程序

import json 

# Use the receipt and the service identity to verify the receipt content 
with open("network_certificate.pem", "r") as service_certificate_file, open( 
    "receipt.json", "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"]
    service_certificate_cert = service_certificate_file.read()

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack 
        raise e 

收据验证

from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
) 

from ccf.receipt import root, verify, check_endorsements 

def verify_receipt(receipt: Dict[str, Any], service_cert_pem: str) -> None: 
    """Function to verify that a given write transaction receipt is valid based 
    on its content and the service certificate. 
    Throws an exception if the verification fails.""" 

    # Check that all the fields are present in the receipt 
    assert "cert" in receipt 
    assert "leafComponents" in receipt 
    assert "claimsDigest" in receipt["leafComponents"] 
    assert "commitEvidence" in receipt["leafComponents"] 
    assert "writeSetDigest" in receipt["leafComponents"] 
    assert "proof" in receipt 
    assert "signature" in receipt 

    # Set the variables 
    node_cert_pem = receipt["cert"] 
    claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
    commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 

    write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
    proof_list = receipt["proof"] 
    service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
    root_node_signature = receipt["signature"] 

    # Load service and node PEM certificates
    service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
    node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

    # Load service endorsements PEM certificates
    service_endorsements_certs = [ 
        load_pem_x509_certificate(pem.encode()) 
        for pem in service_endorsements_certs_pem 
    ] 

    # Compute leaf of the Merkle Tree 
    leaf_node_hex = compute_leaf_node( 
        claims_digest_hex, commit_evidence_str, write_set_digest_hex 
    ) 

    # Compute root of the Merkle Tree
    root_node = root(leaf_node_hex, proof_list) 

    # Verify signature of the signing node over the root of the tree
    verify(root_node, root_node_signature, node_cert) 

    # Verify node certificate is endorsed by the service certificates through endorsements
    check_endorsements(node_cert, service_cert, service_endorsements_certs) 

    # Alternative: Verify node certificate is endorsed by the service certificates through endorsements 
    verify_openssl_certificate(node_cert, service_cert, service_endorsements_certs) 

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

后续步骤