Share via


Azure Confidential Ledger 쓰기 트랜잭션 영수증

Azure Confidential Ledger 쓰기 트랜잭션 영수증은 해당 쓰기 트랜잭션이 CCF 네트워크에 의해 전역적으로 커밋되었다는 암호화 Merkle 증명을 나타냅니다. Azure Confidential Ledger 사용자는 해당 쓰기 작업이 변경할 수 없는 원장에 성공적으로 기록되었는지 확인하기 위해 언제든지 커밋된 쓰기 트랜잭션에 대한 영수증을 받을 수 있습니다.

Azure Confidential Ledger 쓰기 트랜잭션 영수증에 대한 자세한 내용은 전담 문서를 참조하세요.

영수증 확인 단계

다음 하위 섹션에 설명된 일련의 특정 단계에 따라 쓰기 트랜잭션 영수증을 확인할 수 있습니다. 동일한 단계가 CCF 설명서에 설명되어 있습니다.

리프 노드 계산

첫 번째 단계는 커밋된 트랜잭션에 해당하는 Merkle 트리에서 리프 노드의 SHA-256 해시를 계산하는 것입니다. 리프 노드는 leafComponents 아래의 Azure Confidential Ledger 영수증에서 찾을 수 있는 다음 필드의 정렬된 연결로 구성됩니다.

  1. writeSetDigest
  2. commitEvidence의 SHA-256 다이제스트
  3. claimsDigest 필드

이러한 값은 바이트 배열로 연결되어야 합니다. writeSetDigestclaimsDigest 모두 16진수 문자열에서 바이트 배열로 변환해야 합니다. 반면 commitEvidence의 해시(바이트 배열)는 UTF-8로 인코딩된 commitEvidence 문자열에 SHA-256 해시 함수를 적용하여 가져올 수 있습니다.

마찬가지로 리프 노드 해시 다이제스트는 결과 바이트의 결과 연결에 SHA-256 해시 함수를 적용하여 계산할 수 있습니다.

루트 노드 계산

두 번째 단계는 트랜잭션이 커밋된 시점에 Merkle 트리 루트의 SHA-256 해시를 계산하는 것입니다. 계산은 이전 반복의 결과(이전 단계에서 계산된 리프 노드 해시에서 시작)를 영수증의 proof 필드에 제공된 정렬된 노드의 해시와 반복적으로 연결하고 해싱하여 수행됩니다. proof 목록은 순서가 지정된 목록으로 제공되며 해당 요소는 지정된 순서로 반복되어야 합니다.

연결은 proof 필드(left 또는 right)에 제공된 개체에 표시된 상대 순서와 관련하여 바이트 표현에서 수행되어야 합니다.

  • proof에 있는 현재 요소의 키가 left이면 이전 반복의 결과가 현재 요소 값에 추가되어야 합니다.
  • proof의 현재 요소 키가 right이면 이전 반복의 결과가 현재 요소 값 앞에 추가되어야 합니다.

각 연결 후 다음 반복에 대한 입력을 가져오기 위해 SHA-256 함수를 적용해야 합니다. 이 프로세스는 계산에 필요한 노드가 지정된 Merkle 트리 데이터 구조의 루트 노드를 계산하는 표준 단계를 따릅니다.

루트 노드를 통해 서명 확인

세 번째 단계는 영수증에 있는 서명 노드 인증서를 사용하여 루트 노드 해시를 통해 생성된 암호화 서명이 유효한지 확인하는 것입니다. 확인 프로세스는 ECDSA(타원 곡선 디지털 서명 알고리즘)를 사용하여 서명된 메시지에 대한 디지털 서명 확인을 위한 표준 단계를 따릅니다. 보다 구체적으로 단계는 다음과 같습니다.

  1. base64 문자열 signature를 바이트 배열로 디코딩합니다.
  2. 서명 노드 인증서 cert에서 ECDSA 공개 키를 추출합니다.
  3. Merkle 트리 루트의 서명(이전 하위 섹션의 지침을 사용하여 컴퓨팅됨)이 이전 단계에서 추출한 공개 키를 사용하여 인증되었는지 확인합니다. 이 단계는 사실상 ECDSA를 사용하는 표준 디지털 서명 확인 프로세스에 해당합니다. 일부 데이터(예: Python용 암호화 라이브러리)에 대해 공개 키 인증서를 사용하여 ECDSA 서명을 확인할 수 있는 가장 자주 사용되는 프로그래밍 언어의 라이브러리가 많이 있습니다.

서명 노드 인증서 보증 확인

이전 단계 외에도 서명 노드 인증서가 현재 원장 인증서에 의해 보증(즉, 서명)되었는지 확인해야 합니다. 이 단계는 이전의 다른 세 단계에 의존하지 않으며 다른 단계와 독립적으로 수행할 수 있습니다.

영수증을 발급한 현재 서비스 ID가 서명 노드를 보증한 서비스 ID와 다를 수 있습니다(예: 인증서 갱신). 이 경우 다른 이전 서비스 ID(즉, 영수증의 serviceEndorsements 목록 필드)를 통해 서명 노드 인증서(즉, 영수증의 cert 필드)에서 신뢰할 수 있는 루트 CA(인증 기관)(즉, 현재 서비스 ID 인증서)까지 인증서 신뢰 체인을 확인해야 합니다. serviceEndorsements 목록은 가장 오래된 서비스 ID부터 최신 서비스 ID 순으로 순서가 지정된 목록으로 제공됩니다.

인증서 보증은 전체 체인에 대해 확인되어야 하며 이전 하위 섹션에서 설명한 것과 정확히 동일한 디지털 서명 확인 프로세스를 따릅니다. 일반적으로 인증서 보증 단계를 수행하는 데 사용할 수 있는 자주 사용되는 오픈 소스 암호화 라이브러리(예: OpenSSL)가 있습니다.

애플리케이션 클레임 다이제스트 확인

선택적 단계로 애플리케이션 클레임이 영수증에 첨부된 경우 노출된 클레임에서 클레임 다이제스트를 계산하고(특정 알고리즘에 따름) 다이제스트가 영수증 페이로드에 포함된 claimsDigest와 일치하는지 확인할 수 있습니다. 노출된 클레임 개체에서 다이제스트를 계산하려면 목록의 각 애플리케이션 클레임 개체를 반복하고 해당 kind 필드를 확인해야 합니다.

클레임 개체가 LedgerEntry 종류인 경우, 클레임의 원장 컬렉션 ID(collectionId) 및 내용(contents)을 추출하여 클레임 개체에 지정된 비밀 키(secretKey)를 사용하여 HMAC 다이제스트를 계산하는 데 사용해야 합니다. 그러면 이러한 두 다이제스트가 연결되고 연결의 SHA-256 해시가 계산됩니다. 그런 다음 프로토콜(protocol) 및 결과 클레임 데이터 다이제스트가 연결되고 연결의 또 다른 SHA-256 해시가 계산되어 최종 다이제스트가 나옵니다.

클레임 개체가 ClaimDigest 종류인 경우 클레임 다이제스트(value)를 추출하고 프로토콜(protocol)과 연결해야 하며 연결의 SHA-256 해시가 계산되어 최종 다이제스트가 나옵니다.

각 단일 클레임 다이제스트를 계산한 후에는 각 애플리케이션 클레임 개체의 모든 계산 다이제스트를 연결해야 합니다(영수증에 표시되는 순서에 따름). 그런 다음 연결은 처리된 클레임 수 앞에 추가되어야 합니다. 이전 연결의 SHA-256 해시는 최종 클레임 다이제스트를 생성하는데, 이는 영수증 개체에 있는 claimsDigest와 일치해야 합니다.

추가 리소스

Azure Confidential Ledger 쓰기 트랜잭션 영수증 콘텐츠와 각 필드에 대한 설명에 대한 자세한 콘텐츠는 전담 문서를 참조하세요. 다음 링크의 CCF 설명서에는 영수증 확인 및 기타 관련 리소스에 대한 자세한 정보도 포함되어 있습니다.

쓰기 트랜잭션 영수증 확인

영수증 확인 유틸리티

Python 용 Azure Confidential Ledger 클라이언트 라이브러리는 쓰기 트랜잭션 영수증을 확인하고 애플리케이션 클레임 목록에서 클레임 다이제스트를 계산하는 유틸리티 함수를 제공합니다. Data Plane SDK 및 영수증 관련 유틸리티를 사용하는 방법에 대한 자세한 내용은 이 섹션이 샘플 코드를 참조하세요.

설정 및 필수 조건

참조용으로 앞선 섹션에 설명한 단계에 따라 Azure Confidential Ledger 쓰기 트랜잭션 영수증을 완전히 확인하기 위해 Python의 샘플 코드를 제공합니다.

전체 확인 알고리즘을 실행하려면 현재 서비스 네트워크 인증서와 실행 중인 Confidential Ledger 리소스의 쓰기 트랜잭션 영수증이 필요합니다. Confidential Ledger 인스턴스에서 쓰기 트랜잭션 영수증 및 서비스 인증서를 가져오는 방법에 대한 자세한 내용은 이 문서을 참조하세요.

코드 연습

다음 코드는 필요한 개체를 초기화하고 영수증 확인 알고리즘을 실행하는 데 사용할 수 있습니다. 별도의 유틸리티(verify_receipt)를 사용하여 전체 확인 알고리즘을 실행하고 GET_RECEIPT 응답의 receipt 필드 콘텐츠를 사전으로, 서비스 인증서를 간단한 문자열로 받아 들입니다. 영수증이 유효하지 않거나 처리 중에 오류가 발생한 경우 함수에서 예외가 발생합니다.

영수증과 서비스 인증서를 모두 파일에서 로드할 수 있다고 가정합니다. 확인하려는 서비스 인증서 및 영수증의 해당 파일 이름으로 service_certificate_file_namereceipt_file_name 상수를 모두 업데이트해야 합니다.

import json 

# Constants
service_certificate_file_name = "<your-service-certificate-file>"
receipt_file_name = "<your-receipt-file>"

# Use the receipt and the service identity to verify the receipt content 
with open(service_certificate_file_name, "r") as service_certificate_file, open( 
    receipt_file_name, "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"] 
    service_certificate_cert = service_certificate_file.read() 

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack
        raise e 

확인 프로세스에는 일부 암호화 및 해싱 기본 요소가 필요하므로 계산을 용이하게 하기 위해 다음 라이브러리가 사용됩니다.

  • CCF Python 라이브러리: 이 모듈은 영수증 확인을 위한 일련의 도구를 제공합니다.
  • Python 암호화 라이브러리: 다양한 암호화 알고리즘과 기본 형식을 포함하는 널리 사용되는 라이브러리입니다.
  • Python 표준 라이브러리의 일부인 hashlib 모듈: 널리 사용되는 해싱 알고리즘에 공통 인터페이스를 제공하는 모듈입니다.
from ccf.receipt import verify, check_endorsements, root 
from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

verify_receipt 함수 내에서 지정된 영수증이 유효하고 모든 필수 필드가 포함되어 있는지 확인합니다.

# Check that all the fields are present in the receipt 
assert "cert" in receipt 
assert "leafComponents" in receipt 
assert "claimsDigest" in receipt["leafComponents"] 
assert "commitEvidence" in receipt["leafComponents"] 
assert "writeSetDigest" in receipt["leafComponents"] 
assert "proof" in receipt 
assert "signature" in receipt 

나머지 프로그램에서 사용할 변수를 초기화합니다.

# Set the variables 
node_cert_pem = receipt["cert"] 
claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 
write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
proof_list = receipt["proof"] 
service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
root_node_signature = receipt["signature"] 

암호화 라이브러리를 사용하여 이전 서비스 ID에서 서비스 ID, 서명 노드 및 보증 인증서에 대한 PEM 인증서를 로드할 수 있습니다.

# Load service and node PEM certificates 
service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

# Load service endorsements PEM certificates 
service_endorsements_certs = [ 
    load_pem_x509_certificate(pem.encode()) 
    for pem in service_endorsements_certs_pem 
] 

검증 프로세스의 첫 번째 단계는 리프 노드의 다이제스트를 계산하는 것입니다.

# Compute leaf of the Merkle Tree corresponding to our transaction 
leaf_node_hex = compute_leaf_node( 
    claims_digest_hex, commit_evidence_str, write_set_digest_hex 
)

compute_leaf_node 함수는 영수증의 리프 구성 요소(claimsDigest, commitEvidencewriteSetDigest)를 매개 변수로 받아들이고 리프 노드 해시를 16진수 형식으로 반환합니다.

앞에서 자세히 설명한 대로 commitEvidence의 다이제스트를 계산합니다(SHA-256hashlib 함수 사용). 그런 다음 writeSetDigestclaimsDigest를 모두 바이트 배열로 변환합니다. 마지막으로 세 개의 배열을 연결하고 SHA256 함수를 사용하여 결과를 다이제스트합니다.

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string 
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes 
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes 
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

리프를 계산한 후 Merkle 트리의 루트를 계산할 수 있습니다.

# Compute root of the Merkle Tree 
root_node = root(leaf_node_hex, proof_list) 

CCF Python 라이브러리의 일부로 제공된 root 함수를 사용합니다. 이 함수는 이전 반복의 결과를 proof의 새 요소와 연속적으로 연결하고 연결을 다이제스트한 다음 이전에 계산된 다이제스트를 사용하여 proof의 모든 요소에 대해 단계를 반복합니다. 연결은 루트가 올바르게 다시 계산되도록 Merkle 트리의 노드 순서를 존중해야 합니다.

def root(leaf: str, proof: List[dict]): 
    """ 
    Recompute root of Merkle tree from a leaf and a proof of the form: 
    [{"left": digest}, {"right": digest}, ...] 
    """ 

    current = bytes.fromhex(leaf) 

    for n in proof: 
        if "left" in n: 
            current = sha256(bytes.fromhex(n["left"]) + current).digest() 
        else: 
            current = sha256(current + bytes.fromhex(n["right"])).digest() 
    return current.hex() 

루트 노드 해시를 계산한 후 루트를 통해 영수증에 포함된 서명의 유효성을 검사하여 서명이 올바른지 유효성을 검사할 수 있습니다.

# Verify signature of the signing node over the root of the tree 
verify(root_node, root_node_signature, node_cert) 

마찬가지로 CCF 라이브러리는 이 확인을 수행하는 verify 함수를 제공합니다. 서명 노드 인증서의 ECDSA 공개 키를 사용하여 트리의 루트에서 서명을 확인합니다.

def verify(root: str, signature: str, cert: Certificate):
    """ 
    Verify signature over root of Merkle Tree 
    """ 

    sig = base64.b64decode(signature) 
    pk = cert.public_key() 
    assert isinstance(pk, ec.EllipticCurvePublicKey) 
    pk.verify( 
        sig, 
        bytes.fromhex(root), 
        ec.ECDSA(utils.Prehashed(hashes.SHA256())), 
    )

영수증 유효성 검사의 마지막 단계는 Merkle 트리의 루트에 서명하는 데 사용된 인증서의 유효성을 검사하는 것입니다.

# Verify node certificate is endorsed by the service certificates through endorsements 
check_endorsements(node_cert, service_cert, service_endorsements_certs) 

마찬가지로 CCF 유틸리티 check_endorsements를 사용하여 서비스 ID가 서명 노드를 보증하는지 유효성을 검사할 수 있습니다. 인증서 체인은 이전 서비스 인증서로 구성될 수 있으므로 serviceEndorsements가 빈 목록이 아닌 경우 보증이 전이적으로 적용되는지 유효성을 검사해야 합니다.

def check_endorsement(endorsee: Certificate, endorser: Certificate): 
    """ 
    Check endorser has endorsed endorsee 
    """ 

    digest_algo = endorsee.signature_hash_algorithm 
    assert digest_algo 
    digester = hashes.Hash(digest_algo) 
    digester.update(endorsee.tbs_certificate_bytes) 
    digest = digester.finalize() 
    endorser_pk = endorser.public_key() 
    assert isinstance(endorser_pk, ec.EllipticCurvePublicKey) 
    endorser_pk.verify( 
        endorsee.signature, digest, ec.ECDSA(utils.Prehashed(digest_algo)) 
    ) 

def check_endorsements( 
    node_cert: Certificate, service_cert: Certificate, endorsements: List[Certificate] 
): 
    """ 
    Check a node certificate is endorsed by a service certificate, transitively through a list of endorsements. 
    """ 

    cert_i = node_cert 
    for endorsement in endorsements: 
        check_endorsement(cert_i, endorsement) 
        cert_i = endorsement 
    check_endorsement(cert_i, service_cert) 

대안으로 비슷한 방법으로 OpenSSL 라이브러리를 사용하여 인증서의 유효성을 검사할 수도 있습니다.

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
)

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store 
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates 
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate 
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate 
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

샘플 코드

코드 연습에 사용되는 전체 샘플 코드가 제공됩니다.

기본 프로그램

import json 

# Use the receipt and the service identity to verify the receipt content 
with open("network_certificate.pem", "r") as service_certificate_file, open( 
    "receipt.json", "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"]
    service_certificate_cert = service_certificate_file.read()

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack 
        raise e 

영수증 확인

from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
) 

from ccf.receipt import root, verify, check_endorsements 

def verify_receipt(receipt: Dict[str, Any], service_cert_pem: str) -> None: 
    """Function to verify that a given write transaction receipt is valid based 
    on its content and the service certificate. 
    Throws an exception if the verification fails.""" 

    # Check that all the fields are present in the receipt 
    assert "cert" in receipt 
    assert "leafComponents" in receipt 
    assert "claimsDigest" in receipt["leafComponents"] 
    assert "commitEvidence" in receipt["leafComponents"] 
    assert "writeSetDigest" in receipt["leafComponents"] 
    assert "proof" in receipt 
    assert "signature" in receipt 

    # Set the variables 
    node_cert_pem = receipt["cert"] 
    claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
    commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 

    write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
    proof_list = receipt["proof"] 
    service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
    root_node_signature = receipt["signature"] 

    # Load service and node PEM certificates
    service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
    node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

    # Load service endorsements PEM certificates
    service_endorsements_certs = [ 
        load_pem_x509_certificate(pem.encode()) 
        for pem in service_endorsements_certs_pem 
    ] 

    # Compute leaf of the Merkle Tree 
    leaf_node_hex = compute_leaf_node( 
        claims_digest_hex, commit_evidence_str, write_set_digest_hex 
    ) 

    # Compute root of the Merkle Tree
    root_node = root(leaf_node_hex, proof_list) 

    # Verify signature of the signing node over the root of the tree
    verify(root_node, root_node_signature, node_cert) 

    # Verify node certificate is endorsed by the service certificates through endorsements
    check_endorsements(node_cert, service_cert, service_endorsements_certs) 

    # Alternative: Verify node certificate is endorsed by the service certificates through endorsements 
    verify_openssl_certificate(node_cert, service_cert, service_endorsements_certs) 

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

다음 단계