Module dexa_sdk.jsonld.core

Expand source code
import hashlib
import uuid
import requests
import datetime
from merklelib import utils
from aries_cloudagent.wallet.base import BaseWallet
from aries_cloudagent.messaging.jsonld.credential import (
    sign_credential,
    verify_credential,
)
from ..utils import jcs_rfc8785, replace_jws, replace_proof_value, replace_proof_chain
from .exceptions import ProofNotAvailableException


DEXA_JSONLD_CONTEXT_URL = (
    "https://raw.githubusercontent.com"
    "/decentralised-dataexchange"
    "/data-exchange-agreements/main/interface-specs"
    "/jsonld/contexts/dexa-context.jsonld"
)


def fetch_jsonld_context_from_remote(
    context_type: str = None, remote_context_url: str = DEXA_JSONLD_CONTEXT_URL
) -> dict:
    """
    Fetch JSONLD context from remote

    Args:
        context_type (str): Specific JSONLD context type
        remote_context_url (str): Remote JSONLD context URL

    Returns:
        jresp (dict): JSONLD context
    """
    # Perform HTTP GET against remote context URL
    req = requests.get(remote_context_url)
    assert req.status_code == 200, "Failed to fetch JSONLD context from remote."
    # JSON response
    jresp = req.json()
    # Return context
    return jresp if not context_type else jresp.get("@context", {}).get(context_type)


def jsonld_context_fingerprint(
    context_type: str = None, remote_context_url: str = DEXA_JSONLD_CONTEXT_URL
) -> str:
    """Returns the fingerprint (SHA2-256) of JSON-LD context

    Args:
        context_type (str, optional): JSONLD context type. Defaults to None.
        remote_context_url (str, optional): Remote context URL. Defaults to DEXA_JSONLD_CONTEXT_URL.

    Returns:
        str: SHA2-256 finger for the jsonld document
    """
    # Fetch context from remote
    jsonld_context = fetch_jsonld_context_from_remote(context_type, remote_context_url)
    # Canonicalise the context document
    jcs = jcs_rfc8785(jsonld_context)
    # Convert bytes to string
    value = utils.to_string(jcs)
    # Return the SHA2-256 hexdigest
    return hashlib.sha256(value).hexdigest()


async def sign_proof(*,
                     proof,
                     verkey: str,
                     wallet: BaseWallet,
                     signature_options: dict = None) -> dict:
    """Sign embedded proof in agreement to generated counter signatures chain

    Create proof algorithm is defined at w3c vc data integrity 1.0 spec.

    Args:
        verkey (str): public key
        wallet (BaseWallet): wallet instance

    Returns:
        dict: proof of proof (counter signed proof)
    """
    # Signature options.
    # verification method should be did:key identifier
    if not signature_options:
        signature_options = {
            "id": f"urn:uuid:{str(uuid.uuid4())}",
            "verificationMethod": verkey,
            "proofPurpose": "authentication",
            "created": datetime.datetime.now(datetime.timezone.utc).strftime(
                "%Y-%m-%dT%H:%M:%SZ"
            ),
        }

    # Add security context to the proof.
    proof_with_context = {
        **proof,
        "@context": "https://w3id.org/security/v2",
    }

    # Sign the proof document
    proof_with_proof = await sign_credential(
        proof_with_context, signature_options, verkey, wallet
    )

    # Replace 'jws' field with 'proofValue' field
    proof = replace_jws(proof_with_proof["proof"].copy())

    return proof


async def sign_agreement(*,
                         agreement: dict,
                         verkey: str,
                         wallet: BaseWallet,
                         signature_options: dict = None) -> None:
    """Sign agreement.

    Create proof algorithm is defined at w3c vc data integrity 1.0 spec.

    Args:
        verkey (str): public key
        wallet (BaseWallet): wallet instance
    """

    # Check if proof chain
    if "proof" in agreement:
        proof = [agreement["proof"]]
    elif "proofChain" in agreement:
        proof = agreement["proofChain"]
    else:
        proof = None

    if proof:
        # To be signed
        tbs = proof[-1].copy()

        # Replace 'proofValue' field with 'jws' field
        tbs = replace_proof_value(tbs)

        signed_proof = await sign_proof(
            proof=tbs,
            verkey=verkey,
            wallet=wallet,
            signature_options=signature_options
        )
        proof.append(signed_proof)

        # del 'proof' in agreement
        if "proof" in agreement:
            del agreement["proof"]

        # Replace 'proofChain' with new proofs
        agreement["proofChain"] = proof
    else:
        # Signature options.
        # verification method should be did:key identifier
        if not signature_options:
            signature_options = {
                "id": f"urn:uuid:{str(uuid.uuid4())}",
                "verificationMethod": verkey,
                "proofPurpose": "authentication",
                "created": datetime.datetime.now(datetime.timezone.utc).strftime(
                    "%Y-%m-%dT%H:%M:%SZ"
                ),
            }

        # Sign the agreement
        agreement = await sign_credential(
            agreement, signature_options, verkey, wallet
        )

        # Replace 'jws' field with 'proofValue' field
        agreement["proof"] = replace_jws(agreement["proof"].copy())

    # Return agreement with proofs
    return agreement


async def verify_agreement(*, agreement: dict, wallet: BaseWallet) -> bool:
    """Verify agreement

    Args:
        verkey (str): public key
        wallet (BaseWallet): wallet instance

    Returns:
        bool: Validity of the agreement
    """

    # To be verified agreement
    tbv = agreement

    # Check if proof chain
    proofs = None
    if "proof" in tbv:
        proofs = [tbv["proof"]]
    elif "proofChain" in tbv:
        proofs = tbv["proofChain"]
        # Replace 'proofChain' field with 'proof' field
        tbv = replace_proof_chain(tbv.copy())
    else:
        raise ProofNotAvailableException("Proof or proof chain is not present")

    valid = []
    # Iterate through the proofs
    for index, proof in enumerate(proofs):
        if index == 0:
            # First proof in the chain
            # Replace 'proofValue' field with 'jws' field
            tbv["proof"] = replace_proof_value(proofs[0].copy())
            valid.append(await verify_credential(tbv, tbv["proof"]["verificationMethod"], wallet))
        else:
            # From second proof onwards, tbv would be the proof before it.
            tbv = {**proofs[index - 1], "@context": "https://w3id.org/security/v2"}
            tbv = replace_proof_value(tbv.copy())
            tbv["proof"] = replace_proof_value(proof.copy())
            valid.append(await verify_credential(tbv, tbv["proof"]["verificationMethod"], wallet))

    return all(valid)

Functions

def fetch_jsonld_context_from_remote(context_type: str = None, remote_context_url: str = 'https://raw.githubusercontent.com/decentralised-dataexchange/data-exchange-agreements/main/interface-specs/jsonld/contexts/dexa-context.jsonld') ‑> dict

Fetch JSONLD context from remote

Args

context_type : str
Specific JSONLD context type
remote_context_url : str
Remote JSONLD context URL

Returns

jresp (dict): JSONLD context

Expand source code
def fetch_jsonld_context_from_remote(
    context_type: str = None, remote_context_url: str = DEXA_JSONLD_CONTEXT_URL
) -> dict:
    """
    Fetch JSONLD context from remote

    Args:
        context_type (str): Specific JSONLD context type
        remote_context_url (str): Remote JSONLD context URL

    Returns:
        jresp (dict): JSONLD context
    """
    # Perform HTTP GET against remote context URL
    req = requests.get(remote_context_url)
    assert req.status_code == 200, "Failed to fetch JSONLD context from remote."
    # JSON response
    jresp = req.json()
    # Return context
    return jresp if not context_type else jresp.get("@context", {}).get(context_type)
def jsonld_context_fingerprint(context_type: str = None, remote_context_url: str = 'https://raw.githubusercontent.com/decentralised-dataexchange/data-exchange-agreements/main/interface-specs/jsonld/contexts/dexa-context.jsonld') ‑> str

Returns the fingerprint (SHA2-256) of JSON-LD context

Args

context_type : str, optional
JSONLD context type. Defaults to None.
remote_context_url : str, optional
Remote context URL. Defaults to DEXA_JSONLD_CONTEXT_URL.

Returns

str
SHA2-256 finger for the jsonld document
Expand source code
def jsonld_context_fingerprint(
    context_type: str = None, remote_context_url: str = DEXA_JSONLD_CONTEXT_URL
) -> str:
    """Returns the fingerprint (SHA2-256) of JSON-LD context

    Args:
        context_type (str, optional): JSONLD context type. Defaults to None.
        remote_context_url (str, optional): Remote context URL. Defaults to DEXA_JSONLD_CONTEXT_URL.

    Returns:
        str: SHA2-256 finger for the jsonld document
    """
    # Fetch context from remote
    jsonld_context = fetch_jsonld_context_from_remote(context_type, remote_context_url)
    # Canonicalise the context document
    jcs = jcs_rfc8785(jsonld_context)
    # Convert bytes to string
    value = utils.to_string(jcs)
    # Return the SHA2-256 hexdigest
    return hashlib.sha256(value).hexdigest()
async def sign_agreement(*, agreement: dict, verkey: str, wallet: aries_cloudagent.wallet.base.BaseWallet, signature_options: dict = None) ‑> None

Sign agreement.

Create proof algorithm is defined at w3c vc data integrity 1.0 spec.

Args

verkey : str
public key
wallet : BaseWallet
wallet instance
Expand source code
async def sign_agreement(*,
                         agreement: dict,
                         verkey: str,
                         wallet: BaseWallet,
                         signature_options: dict = None) -> None:
    """Sign agreement.

    Create proof algorithm is defined at w3c vc data integrity 1.0 spec.

    Args:
        verkey (str): public key
        wallet (BaseWallet): wallet instance
    """

    # Check if proof chain
    if "proof" in agreement:
        proof = [agreement["proof"]]
    elif "proofChain" in agreement:
        proof = agreement["proofChain"]
    else:
        proof = None

    if proof:
        # To be signed
        tbs = proof[-1].copy()

        # Replace 'proofValue' field with 'jws' field
        tbs = replace_proof_value(tbs)

        signed_proof = await sign_proof(
            proof=tbs,
            verkey=verkey,
            wallet=wallet,
            signature_options=signature_options
        )
        proof.append(signed_proof)

        # del 'proof' in agreement
        if "proof" in agreement:
            del agreement["proof"]

        # Replace 'proofChain' with new proofs
        agreement["proofChain"] = proof
    else:
        # Signature options.
        # verification method should be did:key identifier
        if not signature_options:
            signature_options = {
                "id": f"urn:uuid:{str(uuid.uuid4())}",
                "verificationMethod": verkey,
                "proofPurpose": "authentication",
                "created": datetime.datetime.now(datetime.timezone.utc).strftime(
                    "%Y-%m-%dT%H:%M:%SZ"
                ),
            }

        # Sign the agreement
        agreement = await sign_credential(
            agreement, signature_options, verkey, wallet
        )

        # Replace 'jws' field with 'proofValue' field
        agreement["proof"] = replace_jws(agreement["proof"].copy())

    # Return agreement with proofs
    return agreement
async def sign_proof(*, proof, verkey: str, wallet: aries_cloudagent.wallet.base.BaseWallet, signature_options: dict = None) ‑> dict

Sign embedded proof in agreement to generated counter signatures chain

Create proof algorithm is defined at w3c vc data integrity 1.0 spec.

Args

verkey : str
public key
wallet : BaseWallet
wallet instance

Returns

dict
proof of proof (counter signed proof)
Expand source code
async def sign_proof(*,
                     proof,
                     verkey: str,
                     wallet: BaseWallet,
                     signature_options: dict = None) -> dict:
    """Sign embedded proof in agreement to generated counter signatures chain

    Create proof algorithm is defined at w3c vc data integrity 1.0 spec.

    Args:
        verkey (str): public key
        wallet (BaseWallet): wallet instance

    Returns:
        dict: proof of proof (counter signed proof)
    """
    # Signature options.
    # verification method should be did:key identifier
    if not signature_options:
        signature_options = {
            "id": f"urn:uuid:{str(uuid.uuid4())}",
            "verificationMethod": verkey,
            "proofPurpose": "authentication",
            "created": datetime.datetime.now(datetime.timezone.utc).strftime(
                "%Y-%m-%dT%H:%M:%SZ"
            ),
        }

    # Add security context to the proof.
    proof_with_context = {
        **proof,
        "@context": "https://w3id.org/security/v2",
    }

    # Sign the proof document
    proof_with_proof = await sign_credential(
        proof_with_context, signature_options, verkey, wallet
    )

    # Replace 'jws' field with 'proofValue' field
    proof = replace_jws(proof_with_proof["proof"].copy())

    return proof
async def verify_agreement(*, agreement: dict, wallet: aries_cloudagent.wallet.base.BaseWallet) ‑> bool

Verify agreement

Args

verkey : str
public key
wallet : BaseWallet
wallet instance

Returns

bool
Validity of the agreement
Expand source code
async def verify_agreement(*, agreement: dict, wallet: BaseWallet) -> bool:
    """Verify agreement

    Args:
        verkey (str): public key
        wallet (BaseWallet): wallet instance

    Returns:
        bool: Validity of the agreement
    """

    # To be verified agreement
    tbv = agreement

    # Check if proof chain
    proofs = None
    if "proof" in tbv:
        proofs = [tbv["proof"]]
    elif "proofChain" in tbv:
        proofs = tbv["proofChain"]
        # Replace 'proofChain' field with 'proof' field
        tbv = replace_proof_chain(tbv.copy())
    else:
        raise ProofNotAvailableException("Proof or proof chain is not present")

    valid = []
    # Iterate through the proofs
    for index, proof in enumerate(proofs):
        if index == 0:
            # First proof in the chain
            # Replace 'proofValue' field with 'jws' field
            tbv["proof"] = replace_proof_value(proofs[0].copy())
            valid.append(await verify_credential(tbv, tbv["proof"]["verificationMethod"], wallet))
        else:
            # From second proof onwards, tbv would be the proof before it.
            tbv = {**proofs[index - 1], "@context": "https://w3id.org/security/v2"}
            tbv = replace_proof_value(tbv.copy())
            tbv["proof"] = replace_proof_value(proof.copy())
            valid.append(await verify_credential(tbv, tbv["proof"]["verificationMethod"], wallet))

    return all(valid)