共用方式為


確認 Azure 機密總帳寫入交易收據

Azure 機密總帳寫入交易收據代表 CCF 網路已全域認可對應寫入交易的密碼編譯 Merkle 證明。 Azure 機密總帳使用者可以在任何時間點取得認可寫入交易的收據,以確認對應的寫入作業已成功記錄到不可變的總帳中。

如需 Azure 機密總帳寫入交易收據的詳細資訊,請參閱<專用文章>。

收據驗證步驟

您可以遵循下列小節中所述的特定一組步驟來驗證寫入交易收據。 CCF 文件概述相同的步驟。

分葉節點計算

第一個步驟是計算 Merkle 樹狀結構中與已認可的交易相對應的分葉節點 SHA-256 雜湊。 分葉節點是由在 leafComponents 下方的 Azure 機密總帳收據中找到的下列欄位形成的已排序串連所組成:

  1. writeSetDigest
  2. commitEvidence 的 SHA-256 摘要
  3. claimsDigest 欄位

這些值必須串連為位元組陣列:writeSetDigestclaimsDigest 都必須從十六進位數字的字串轉換為位元組陣列;另一方面,可以透過將 SHA-256 雜湊函數套用至 UTF-8 編碼 commitEvidence 字串來取得 commitEvidence 的雜湊 (做為位元組陣列)。

同樣地,可以藉由將 SHA-256 雜湊函式套用至產生的位元組結果串連來計算分葉節點雜湊摘要。

根節點計算

第二個步驟是在認可交易時計算 Merkle 樹狀結構根目錄的 SHA-256 雜湊。 計算是藉由收據的 proof 欄位中提供的已排序節點雜湊反覆串連和雜湊上一個反覆項目的結果來完成 (從上一個步驟中計算的分葉節點雜湊開始)。 清單 proof 會以已排序的清單的形式提供,而且其元素必須依指定順序反覆。

串連必須相對於 proof 欄位 (leftright) 中提供的物件所指示的相對順序在位元組表示上完成。

  • 如果 proof 之中目前項目的索引鍵是 left,則先前反覆項目的結果應該附加至目前的項目值。
  • 如果 proof 之中目前項目的索引鍵是 right,則先前反覆項目的結果應該前面加上目前的項目值。

每次串連之後,必須套用 SHA-256 函式,才能取得下一個反覆項目的輸入。 此流程遵循標準步驟,根據計算所需的節點,計算 Merkle 樹狀結構資料結構的根節點。

確認根節點的簽章

第三個步驟是確認透過根節點雜湊產生的密碼編譯簽章是否有效,使用收據中的簽署節點憑證。 驗證流程遵循使用橢圓曲線數位簽章演算法 (ECDSA) 簽署的訊息適用的數位簽章驗證標準步驟。 更具體來說,這些步驟如下:

  1. 將 base64 字串 signature 解碼為位元組陣列。
  2. 從簽署節點憑證 cert 擷取 ECDSA 公開金鑰。
  3. 使用上一個子區段擷取的公開金鑰驗證 Merkle 樹狀結構根目錄簽章 (使用上一個步驟中的指示計算而得) 是否為真實。 此步驟實際上會對應至使用 ECDSA 的標準數位簽章驗證流程。 有許多熱門程式設計語言的程式庫可透過某些資料使用公開金鑰憑證來驗證 ECDSA 簽章 (例如適用於 Python 的密碼編譯程式庫)。

確認簽署節點憑證簽署

除了上一個步驟之外,還必須確認目前的總帳憑證已背書簽署節點憑證 (也就是已簽署)。 此步驟不相依於前三個步驟,而且可以獨立於其他步驟執行。

發行收據的目前服務身分識別可能會與背書簽署節點的身分識別不同 (例如,由於憑證更新)。 在此情況下,必須透過其他先前服務身分識別 (也就是收據中的 serviceEndorsements 清單欄位) 從簽署節點憑證 (也就是收據中的 cert 欄位) 確認憑證信任鏈結到受信任的根憑證授權單位 (CA) (也就是目前的服務識別憑證)。 serviceEndorsements 清單會以最舊到最新服務身分識別的已排序清單的形式提供。

憑證簽署必須針對整個鏈結進行驗證,並遵循上一節中所述的完全相同數位簽章驗證流程。 熱門的開放原始碼密碼編譯程式庫 (例如,OpenSSL) 通常可用來執行憑證簽署步驟。

驗證應用程式宣告摘要

作為選擇性步驟,如果應用程式宣告已附加至收據,您可以從公開的宣告計算宣告摘要 (遵循特定演算法),並確認摘要符合收據承載中包含的 claimsDigest。 若要從公開的宣告對象計算摘要,必須逐一查看清單中的每個應用程式宣告物件,並檢查其 kind 欄位。

如果宣告對象的類型為 LedgerEntry,則應該擷取宣告的總帳集合識別碼 (collectionId) 和內容 (contents),並使用宣告物件中指定的祕密金鑰 (secretKey) 來計算其 HMAC 摘要。 接著會串連這兩個摘要,並計算串連的 SHA-256 雜凑。 接著會串連通訊協定 (protocol) 和產生的宣告資料摘要,並會計算串連的另一個 SHA-256 雜凑,以取得最終摘要。

如果宣告物件的類型為 ClaimDigest,則應該擷取宣告摘要 (value),並串連通訊協定 (protocol),並計算串連的SHA-256 雜凑以取得最終摘要。

計算每個單一宣告摘要之後,必須串連每個應用程式宣告物件的所有計算摘要 (按照它們在收據中呈現的相同順序)。 接著,串連應該前面加上已處理的宣告數目。 先前串連的 SHA-256 雜凑會產生最終宣告摘要,這應該符合收據物件中存在的 claimsDigest

更多資源

如需 Azure 機密總帳寫入交易收據和每個欄位說明內容的詳細資訊,請參閱<專用文章>。 CCF 文件也包含有關收據驗證和其他相關資源的詳細資訊,請參閱下列連結:

確認寫入交易收據

收據驗證公用程式

適用於 Python 的 Azure 機密總帳用戶端程式庫 提供公用程式函式,以驗證寫入交易收據,並從應用程式宣告清單計算宣告摘要。 如需如何使用資料平面 SDK 和收據特定公用程式的詳細資訊,請參閱 本節此範例程式碼

設定和必要條件

為了方便參考,我們會在 Python 中提供範例程式碼,以依照上一節所述的步驟完整確認 Azure 機密總帳寫入交易收據。

若要執行完整驗證演算法,需要目前服務網路憑證,以及來自執行中機密總帳資源的寫入交易收據。 如需如何從機密總帳執行個體擷取寫入交易收據和服務憑證的詳細資訊,請參閱本文

程式碼逐步解說

下列程式碼可用來初始化必要的物件,並執行收據驗證演算法。 個別的公用程式 (verify_receipt) 可用來執行完整驗證演算法,並接受 GET_RECEIPT 回應中的 receipt 欄位內容作爲字典及服務憑證作爲簡單的字串。 如果收據無效,或處理期間發生任何錯誤,函式會擲回例外狀況。

假設收據和服務憑證都可以從檔案載入。 請務必使用您想要驗證的服務憑證和收據各自的檔案名來更新 service_certificate_file_namereceipt_file_name 常數。

import json 

# Constants
service_certificate_file_name = "<your-service-certificate-file>"
receipt_file_name = "<your-receipt-file>"

# Use the receipt and the service identity to verify the receipt content 
with open(service_certificate_file_name, "r") as service_certificate_file, open( 
    receipt_file_name, "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"] 
    service_certificate_cert = service_certificate_file.read() 

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack
        raise e 

由於驗證流程需要一些密碼編譯和雜湊基本類型,因此會使用下列流程庫來加速計算。

from ccf.receipt import verify, check_endorsements, root 
from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

verify_receipt 函式中,我們會檢查指定的收據是否有效,並包含所有必要的欄位。

# Check that all the fields are present in the receipt 
assert "cert" in receipt 
assert "leafComponents" in receipt 
assert "claimsDigest" in receipt["leafComponents"] 
assert "commitEvidence" in receipt["leafComponents"] 
assert "writeSetDigest" in receipt["leafComponents"] 
assert "proof" in receipt 
assert "signature" in receipt 

我們會初始化將在程式其餘部分使用的變數。

# Set the variables 
node_cert_pem = receipt["cert"] 
claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 
write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
proof_list = receipt["proof"] 
service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
root_node_signature = receipt["signature"] 

我們可以使用密碼編譯程式庫,載入服務識別的 PEM 憑證、簽署節點,以及先前服務身分識別的簽署憑證。

# Load service and node PEM certificates 
service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

# Load service endorsements PEM certificates 
service_endorsements_certs = [ 
    load_pem_x509_certificate(pem.encode()) 
    for pem in service_endorsements_certs_pem 
] 

驗證流程的第一個步驟是計算分葉節點的摘要。

# Compute leaf of the Merkle Tree corresponding to our transaction 
leaf_node_hex = compute_leaf_node( 
    claims_digest_hex, commit_evidence_str, write_set_digest_hex 
)

compute_leaf_node 函式會接受做為收據的分葉元件參數 (claimsDigestcommitEvidencewriteSetDigest),並以十六進位形式傳回分葉節點雜湊。

如上所述,我們會計算 commitEvidence 的摘要 (使用 SHA-256 hashlib 函式)。 然後,我們會將 writeSetDigestclaimsDigest 轉換為位元組陣列。 最後,我們會串連三個數組,並使用 SHA256 函式來摘要結果。

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string 
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes 
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes 
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

計算分葉之後,我們可以計算 Merkle 樹狀結構的根。

# Compute root of the Merkle Tree 
root_node = root(leaf_node_hex, proof_list) 

我們會使用提供的 root 函式做為 CCF Python 程式庫的一部分。 函式會連續串連上一個反覆項目的結果與 proof 中的新元素,並摘要串連,然後使用先前計算的摘要重複 proof 之中每個元素的步驟。 串連必須遵循 Merkle 樹狀結構中節點的順序,以確定根目錄已正確計算。

def root(leaf: str, proof: List[dict]): 
    """ 
    Recompute root of Merkle tree from a leaf and a proof of the form: 
    [{"left": digest}, {"right": digest}, ...] 
    """ 

    current = bytes.fromhex(leaf) 

    for n in proof: 
        if "left" in n: 
            current = sha256(bytes.fromhex(n["left"]) + current).digest() 
        else: 
            current = sha256(current + bytes.fromhex(n["right"])).digest() 
    return current.hex() 

計算根節點雜湊之後,我們可以透過根目錄驗證收據中包含的簽章,以驗證簽章是否正確。

# Verify signature of the signing node over the root of the tree 
verify(root_node, root_node_signature, node_cert) 

同樣地,CCF 程式庫會提供函式 verify 來執行這項驗證。 我們使用簽署節點憑證的 ECDSA 公開金鑰來驗證樹狀目錄根目錄的簽章。

def verify(root: str, signature: str, cert: Certificate):
    """ 
    Verify signature over root of Merkle Tree 
    """ 

    sig = base64.b64decode(signature) 
    pk = cert.public_key() 
    assert isinstance(pk, ec.EllipticCurvePublicKey) 
    pk.verify( 
        sig, 
        bytes.fromhex(root), 
        ec.ECDSA(utils.Prehashed(hashes.SHA256())), 
    )

收據驗證的最後一個步驟是驗證用來簽署 Merkle 樹狀結構根目錄的憑證。

# Verify node certificate is endorsed by the service certificates through endorsements 
check_endorsements(node_cert, service_cert, service_endorsements_certs) 

同樣地,我們也可以使用 CCF 公用程式 check_endorsements 來驗證簽署節點是否由服務識別背書。 憑證鏈結可能是由先前的服務憑證所組成,因此,如果 serviceEndorsements 不是空清單,我們應該驗證簽署是否可轉移套用。

def check_endorsement(endorsee: Certificate, endorser: Certificate): 
    """ 
    Check endorser has endorsed endorsee 
    """ 

    digest_algo = endorsee.signature_hash_algorithm 
    assert digest_algo 
    digester = hashes.Hash(digest_algo) 
    digester.update(endorsee.tbs_certificate_bytes) 
    digest = digester.finalize() 
    endorser_pk = endorser.public_key() 
    assert isinstance(endorser_pk, ec.EllipticCurvePublicKey) 
    endorser_pk.verify( 
        endorsee.signature, digest, ec.ECDSA(utils.Prehashed(digest_algo)) 
    ) 

def check_endorsements( 
    node_cert: Certificate, service_cert: Certificate, endorsements: List[Certificate] 
): 
    """ 
    Check a node certificate is endorsed by a service certificate, transitively through a list of endorsements. 
    """ 

    cert_i = node_cert 
    for endorsement in endorsements: 
        check_endorsement(cert_i, endorsement) 
        cert_i = endorsement 
    check_endorsement(cert_i, service_cert) 

或者,我們也可以使用類似的方法來使用 OpenSSL 程式庫來驗證憑證。

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
)

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store 
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates 
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate 
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate 
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

範例指令碼

提供程式碼逐步解說中使用的完整範例程式碼。

主要程式

import json 

# Use the receipt and the service identity to verify the receipt content 
with open("network_certificate.pem", "r") as service_certificate_file, open( 
    "receipt.json", "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"]
    service_certificate_cert = service_certificate_file.read()

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack 
        raise e 

收據驗證

from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
) 

from ccf.receipt import root, verify, check_endorsements 

def verify_receipt(receipt: Dict[str, Any], service_cert_pem: str) -> None: 
    """Function to verify that a given write transaction receipt is valid based 
    on its content and the service certificate. 
    Throws an exception if the verification fails.""" 

    # Check that all the fields are present in the receipt 
    assert "cert" in receipt 
    assert "leafComponents" in receipt 
    assert "claimsDigest" in receipt["leafComponents"] 
    assert "commitEvidence" in receipt["leafComponents"] 
    assert "writeSetDigest" in receipt["leafComponents"] 
    assert "proof" in receipt 
    assert "signature" in receipt 

    # Set the variables 
    node_cert_pem = receipt["cert"] 
    claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
    commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 

    write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
    proof_list = receipt["proof"] 
    service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
    root_node_signature = receipt["signature"] 

    # Load service and node PEM certificates
    service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
    node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

    # Load service endorsements PEM certificates
    service_endorsements_certs = [ 
        load_pem_x509_certificate(pem.encode()) 
        for pem in service_endorsements_certs_pem 
    ] 

    # Compute leaf of the Merkle Tree 
    leaf_node_hex = compute_leaf_node( 
        claims_digest_hex, commit_evidence_str, write_set_digest_hex 
    ) 

    # Compute root of the Merkle Tree
    root_node = root(leaf_node_hex, proof_list) 

    # Verify signature of the signing node over the root of the tree
    verify(root_node, root_node_signature, node_cert) 

    # Verify node certificate is endorsed by the service certificates through endorsements
    check_endorsements(node_cert, service_cert, service_endorsements_certs) 

    # Alternative: Verify node certificate is endorsed by the service certificates through endorsements 
    verify_openssl_certificate(node_cert, service_cert, service_endorsements_certs) 

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

下一步