Compartir a través de


Comprobación de los recibos de transacciones de escritura de Azure confidential ledger

Un recibo de transacción de escritura de Azure Confidential Ledger representa una prueba criptográfica de Merkle de que la red CCF ha confirmado globalmente la transacción de escritura correspondiente. Los usuarios de Azure Confidential Ledger pueden obtener un recibo sobre una transacción de escritura confirmada en cualquier momento para comprobar que la operación de escritura correspondiente se registró correctamente en el libro de contabilidad inmutable.

Para más información sobre los recibos de transacción de escritura de Azure Confidential Ledger, consulte el artículo dedicado.

Pasos de comprobación de recibos

Se puede comprobar un recibo de transacción de escritura siguiendo un conjunto específico de pasos descritos en las subsecciones siguientes. Los mismos pasos se describen en la documentación de CCF.

Cálculo del nodo hoja

El primer paso es calcular el hash SHA-256 del nodo hoja en el árbol de Merkle correspondiente a la transacción confirmada. Un nodo hoja se compone de la concatenación ordenada de los siguientes campos que se pueden encontrar en una recepción de Azure Confidential Ledger, en leafComponents:

  1. writeSetDigest
  2. Resumen SHA-256 de commitEvidence
  3. Campos de claimsDigest

Estos valores deben concatenarse como matrices de bytes: tanto writeSetDigest como claimsDigest tendrían que convertirse de cadenas de dígitos hexadecimales a matrices de bytes; por otro lado, el hash de commitEvidence (como una matriz de bytes) se puede obtener aplicando la función hash SHA-256 a través de la cadena codificada en UTF-8 commitEvidence.

De forma similar, el resumen hash del nodo hoja se puede calcular aplicando la función hash SHA-256 sobre la concatenación de resultados de los bytes resultantes.

Cálculo del nodo raíz

El segundo paso es calcular el hash SHA-256 de la raíz del árbol de Merkle en el momento en que se confirmó la transacción. El cálculo se realiza mediante la concatenación iterativa y el hash del resultado de la iteración anterior (a partir del hash del nodo hoja calculado en el paso anterior) con los hash de los nodos ordenados proporcionados en el campo proof de un recibo. La lista proof se proporciona como una lista ordenada y sus elementos deben iterarse en el orden especificado.

La concatenación debe realizarse en la representación de bytes con respecto al orden relativo indicado en los objetos proporcionados en el campo proof (ya sea left o right).

  • Si la clave del elemento actual de proof es left, el resultado de la iteración anterior debe anexarse al valor del elemento actual.
  • Si la clave del elemento actual de proof es right, el resultado de la iteración anterior debe anteponerse al valor del elemento actual.

Después de cada concatenación, la función SHA-256 debe aplicarse para obtener la entrada para la siguiente iteración. Este proceso sigue los pasos estándar para calcular el nodo raíz de una estructura de datos de árbol de Merkle según los nodos necesarios para el cálculo.

Comprobación de la firma a través del nodo raíz

El tercer paso consiste en comprobar que la firma criptográfica generada a través del hash del nodo raíz es válida mediante el certificado de nodo de firma en el recibo. El proceso de verificación sigue los pasos estándar para la verificación de firmas digitales para los mensajes firmados mediante el Algoritmo de firma digital de curva elíptica (ECDSA). Más concretamente, los pasos son:

  1. Descodifique la cadena signature en base64 en una matriz de bytes.
  2. Extraiga la clave pública ECDSA del certificado cert de nodo de firma.
  3. Compruebe que la firma sobre la raíz del árbol de Merkle (calculado mediante las instrucciones de la subsección anterior) es auténtica mediante la clave pública extraída del paso anterior. Este paso corresponde eficazmente a un proceso de verificación de firma digital estándar mediante ECDSA. Hay muchas bibliotecas en los lenguajes de programación más populares que permiten comprobar una firma ECDSA mediante un certificado de clave pública sobre algunos datos (por ejemplo, la biblioteca de criptografía para Python).

Comprobación de la aprobación del certificado de nodo de firma

Además del paso anterior, también es necesario comprobar que el certificado del nodo de firma está aprobado (es decir, firmado) por el certificado de libro de contabilidad actual. Este paso no depende de los otros tres pasos anteriores y se puede llevar a cabo independientemente de los demás.

Es posible que la identidad de servicio actual que emitió el recibo sea diferente de la que aprobó el nodo de firma (por ejemplo, debido a una renovación de certificados). En este caso, es necesario comprobar la cadena de certificados de confianza del certificado de nodo de firma (es decir, el campo cert del recibo) hasta la entidad de certificación (CA) de raíz de confianza (es decir, el certificado de identidad de servicio actual) a través de otras identidades de servicio anteriores (es decir, el campo serviceEndorsements de lista en el recibo). La lista serviceEndorsements se proporciona como una lista ordenada de la identidad de servicio más antigua a la más reciente.

La aprobación del certificado debe comprobarse para toda la cadena y seguir exactamente el mismo proceso de verificación de firma digital descrito en la subsección anterior. Hay bibliotecas criptográficas de código abierto populares (por ejemplo, OpenSSL) que se pueden usar normalmente para llevar a cabo un paso de aprobación de certificados.

Comprobación del resumen de notificaciones de la aplicación

Como paso opcional, en caso de que las notificaciones de la aplicación se adjunten a un recibo, es posible calcular el resumen de notificaciones a partir de las notificaciones expuestas (siguiendo un algoritmo específico) y comprobar que el resumen coincide con el claimsDigest contenido en la carga del recibo. Para calcular el resumen a partir de los objetos de notificación expuestos, es necesario recorrer cada objeto de notificación de aplicación de la lista y comprobar su campo kind.

Si el objeto de notificación es de tipo LedgerEntry, el identificador de colección del libro de contabilidad (collectionId) y el contenido (contents) de la notificación se deben extraer y usar para calcular sus resúmenes HMAC mediante la clave secreta (secretKey) especificada en el objeto de notificación. Estos dos resúmenes se concatenan y se calcula el hash SHA-256 de la concatenación. El protocolo (protocol) y el resumen de datos de notificación resultante se concatenan y se calcula otro hash SHA-256 de la concatenación para obtener el resumen final.

Si el objeto de notificación es de tipo ClaimDigest, se debe extraer el resumen de notificación (value), concatenado con el protocolo (protocol) y se calcula el hash SHA-256 de la concatenación para obtener el resumen final.

Después de calcular cada resumen de notificación único, es necesario concatenar todos los resúmenes calculados de cada objeto de notificación de aplicación (en el mismo orden en que se presentan en el recibo). A continuación, la concatenación debe anteponerse al número de notificaciones procesadas. El hash SHA-256 de la concatenación anterior genera el resumen final de notificaciones, que debe coincidir con el claimsDigest presente en el objeto del recibo.

Más recursos

Para más información sobre el contenido de una confirmación de transacción de escritura de Azure Confidential Ledger y la explicación de cada campo, consulte el artículo dedicado. La documentación de CCF también contiene más información sobre la comprobación de recibos y otros recursos relacionados en los vínculos siguientes:

Comprobación de recibos de transacciones de escritura

Utilidades de comprobación de recibos

La biblioteca cliente de Azure Confidential Ledger para Python proporciona funciones de utilidad para comprobar los recibos de transacciones de escritura y calcular el resumen de notificaciones de una lista de notificaciones de aplicación. Para obtener más información sobre cómo usar el SDK del plano de datos y las utilidades específicas del recibo, consulte esta sección y este código de ejemplo.

Configuración y requisitos previos

Para fines de referencia, se proporciona código de ejemplo en Python para comprobar completamente los recibos de transacción de escritura de Azure Confidential Ledger siguiendo los pasos descritos en la sección anterior.

Para ejecutar el algoritmo de verificación completo, se requieren el certificado de red de servicio actual y una confirmación de transacción de escritura de un recurso de Confidential Ledger en ejecución. Consulte este artículo para obtener más información sobre cómo capturar un recibo de transacción de escritura y el certificado de servicio de una instancia de Confidential Ledger.

Tutorial de código

El código siguiente se puede usar para inicializar los objetos necesarios y ejecutar el algoritmo de comprobación de recibos. Se usa una utilidad independiente (verify_receipt) para ejecutar el algoritmo de verificación completo y acepta el contenido del campo receipt en una respuesta GET_RECEIPT como un diccionario y el certificado de servicio como una cadena simple. La función produce una excepción si el recibo no es válido o si se encontró algún error durante el procesamiento.

Se supone que tanto el recibo como el certificado de servicio se pueden cargar desde archivos. Asegúrese de actualizar las constantes service_certificate_file_name y receipt_file_name con los nombres de archivos respectivos del certificado de servicio y la recepción que desea comprobar.

import json 

# Constants
service_certificate_file_name = "<your-service-certificate-file>"
receipt_file_name = "<your-receipt-file>"

# Use the receipt and the service identity to verify the receipt content 
with open(service_certificate_file_name, "r") as service_certificate_file, open( 
    receipt_file_name, "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"] 
    service_certificate_cert = service_certificate_file.read() 

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack
        raise e 

Como el proceso de comprobación requiere algunos primitivos criptográficos y hash, se usan las bibliotecas siguientes para facilitar el cálculo.

  • La biblioteca CCF de Python: el módulo proporciona un conjunto de herramientas para la comprobación de recibos.
  • La biblioteca de criptografía de Python: una biblioteca ampliamente usada que incluye varios algoritmos criptográficos y primitivos.
  • El módulo hashlib, parte de la biblioteca estándar de Python: un módulo que proporciona una interfaz común para algoritmos hash populares.
from ccf.receipt import verify, check_endorsements, root 
from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

Dentro de la función verify_receipt, comprobamos que el recibo especificado es válido y contiene todos los campos obligatorios.

# Check that all the fields are present in the receipt 
assert "cert" in receipt 
assert "leafComponents" in receipt 
assert "claimsDigest" in receipt["leafComponents"] 
assert "commitEvidence" in receipt["leafComponents"] 
assert "writeSetDigest" in receipt["leafComponents"] 
assert "proof" in receipt 
assert "signature" in receipt 

Inicializamos las variables que se van a usar en el resto del programa.

# Set the variables 
node_cert_pem = receipt["cert"] 
claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 
write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
proof_list = receipt["proof"] 
service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
root_node_signature = receipt["signature"] 

Podemos cargar los certificados PEM para la identidad de servicio, el nodo de firma y los certificados de aprobación de identidades de servicio anteriores mediante la biblioteca de criptografía.

# Load service and node PEM certificates 
service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

# Load service endorsements PEM certificates 
service_endorsements_certs = [ 
    load_pem_x509_certificate(pem.encode()) 
    for pem in service_endorsements_certs_pem 
] 

El primer paso del proceso de comprobación es calcular el resumen del nodo hoja.

# Compute leaf of the Merkle Tree corresponding to our transaction 
leaf_node_hex = compute_leaf_node( 
    claims_digest_hex, commit_evidence_str, write_set_digest_hex 
)

La función compute_leaf_node acepta como parámetros los componentes hoja del recibo (claimsDigest, commitEvidence y writeSetDigest) y devuelve el hash del nodo hoja en forma hexadecimal.

Como se ha detallado anteriormente, calculamos el resumen de commitEvidence (mediante la función SHA-256 hashlib). A continuación, convertimos writeSetDigest y claimsDigest en matrices de bytes. Por último, concatenamos las tres matrices y se resume el resultado mediante la función SHA256.

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string 
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes 
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes 
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

Después de calcular la hoja, podemos calcular la raíz del árbol de Merkle.

# Compute root of the Merkle Tree 
root_node = root(leaf_node_hex, proof_list) 

Usamos la función root proporcionada como parte de la biblioteca CCF de Python. La función concatena sucesivamente el resultado de la iteración anterior con un nuevo elemento de proof, resume la concatenación y repite el paso para cada elemento de proof con el resumen calculado anteriormente. La concatenación debe respetar el orden de los nodos del árbol de Merkle para asegurarse de que la raíz se vuelve a calcular correctamente.

def root(leaf: str, proof: List[dict]): 
    """ 
    Recompute root of Merkle tree from a leaf and a proof of the form: 
    [{"left": digest}, {"right": digest}, ...] 
    """ 

    current = bytes.fromhex(leaf) 

    for n in proof: 
        if "left" in n: 
            current = sha256(bytes.fromhex(n["left"]) + current).digest() 
        else: 
            current = sha256(current + bytes.fromhex(n["right"])).digest() 
    return current.hex() 

Después de calcular el hash del nodo raíz, podemos comprobar la firma contenida en el recibo sobre la raíz para validar que la firma es correcta.

# Verify signature of the signing node over the root of the tree 
verify(root_node, root_node_signature, node_cert) 

Del mismo modo, la biblioteca CCF proporciona una función verify para realizar esta comprobación. Usamos la clave pública ECDSA del certificado de nodo de firma para comprobar la firma sobre la raíz del árbol.

def verify(root: str, signature: str, cert: Certificate):
    """ 
    Verify signature over root of Merkle Tree 
    """ 

    sig = base64.b64decode(signature) 
    pk = cert.public_key() 
    assert isinstance(pk, ec.EllipticCurvePublicKey) 
    pk.verify( 
        sig, 
        bytes.fromhex(root), 
        ec.ECDSA(utils.Prehashed(hashes.SHA256())), 
    )

El último paso de la comprobación de recibos es validar el certificado que se usó para firmar la raíz del árbol de Merkle.

# Verify node certificate is endorsed by the service certificates through endorsements 
check_endorsements(node_cert, service_cert, service_endorsements_certs) 

Del mismo modo, podemos usar la utilidad CCF check_endorsements para validar que la identidad del servicio aprueba el nodo de firma. La cadena de certificados podría estar compuesta de certificados de servicio anteriores, por lo que debemos validar que la aprobación se aplica transitivamente si serviceEndorsements no es una lista vacía.

def check_endorsement(endorsee: Certificate, endorser: Certificate): 
    """ 
    Check endorser has endorsed endorsee 
    """ 

    digest_algo = endorsee.signature_hash_algorithm 
    assert digest_algo 
    digester = hashes.Hash(digest_algo) 
    digester.update(endorsee.tbs_certificate_bytes) 
    digest = digester.finalize() 
    endorser_pk = endorser.public_key() 
    assert isinstance(endorser_pk, ec.EllipticCurvePublicKey) 
    endorser_pk.verify( 
        endorsee.signature, digest, ec.ECDSA(utils.Prehashed(digest_algo)) 
    ) 

def check_endorsements( 
    node_cert: Certificate, service_cert: Certificate, endorsements: List[Certificate] 
): 
    """ 
    Check a node certificate is endorsed by a service certificate, transitively through a list of endorsements. 
    """ 

    cert_i = node_cert 
    for endorsement in endorsements: 
        check_endorsement(cert_i, endorsement) 
        cert_i = endorsement 
    check_endorsement(cert_i, service_cert) 

Como alternativa, también podríamos validar el certificado mediante la biblioteca OpenSSL mediante un método similar.

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
)

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store 
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates 
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate 
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate 
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

Código de ejemplo

Se proporciona el ejemplo de código completo utilizado en el tutorial de código.

Programa principal

import json 

# Use the receipt and the service identity to verify the receipt content 
with open("network_certificate.pem", "r") as service_certificate_file, open( 
    "receipt.json", "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"]
    service_certificate_cert = service_certificate_file.read()

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack 
        raise e 

Comprobación de recibos

from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
) 

from ccf.receipt import root, verify, check_endorsements 

def verify_receipt(receipt: Dict[str, Any], service_cert_pem: str) -> None: 
    """Function to verify that a given write transaction receipt is valid based 
    on its content and the service certificate. 
    Throws an exception if the verification fails.""" 

    # Check that all the fields are present in the receipt 
    assert "cert" in receipt 
    assert "leafComponents" in receipt 
    assert "claimsDigest" in receipt["leafComponents"] 
    assert "commitEvidence" in receipt["leafComponents"] 
    assert "writeSetDigest" in receipt["leafComponents"] 
    assert "proof" in receipt 
    assert "signature" in receipt 

    # Set the variables 
    node_cert_pem = receipt["cert"] 
    claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
    commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 

    write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
    proof_list = receipt["proof"] 
    service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
    root_node_signature = receipt["signature"] 

    # Load service and node PEM certificates
    service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
    node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

    # Load service endorsements PEM certificates
    service_endorsements_certs = [ 
        load_pem_x509_certificate(pem.encode()) 
        for pem in service_endorsements_certs_pem 
    ] 

    # Compute leaf of the Merkle Tree 
    leaf_node_hex = compute_leaf_node( 
        claims_digest_hex, commit_evidence_str, write_set_digest_hex 
    ) 

    # Compute root of the Merkle Tree
    root_node = root(leaf_node_hex, proof_list) 

    # Verify signature of the signing node over the root of the tree
    verify(root_node, root_node_signature, node_cert) 

    # Verify node certificate is endorsed by the service certificates through endorsements
    check_endorsements(node_cert, service_cert, service_endorsements_certs) 

    # Alternative: Verify node certificate is endorsed by the service certificates through endorsements 
    verify_openssl_certificate(node_cert, service_cert, service_endorsements_certs) 

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

Pasos siguientes