Module dexa_sdk.managers.ada_manager

Expand source code
import base64
import asyncio
import uuid
import typing
import json
import aiohttp
from loguru import logger
from web3._utils.encoding import to_json
from marshmallow.exceptions import ValidationError
from aries_cloudagent.wallet.base import BaseWallet, DIDInfo
from aries_cloudagent.wallet.indy import IndyWallet
from aries_cloudagent.config.injection_context import InjectionContext
from aries_cloudagent.core.error import BaseError
from aries_cloudagent.core.dispatcher import DispatcherResponder
from aries_cloudagent.utils.task_queue import CompletedTask, PendingTask
from aries_cloudagent.messaging.decorators.transport_decorator import TransportDecorator
from aries_cloudagent.transport.pack_format import BaseWireFormat, PackWireFormat
from aries_cloudagent.connections.models.connection_record import ConnectionRecord
from aries_cloudagent.messaging.decorators.default import DecoratorSet
from aries_cloudagent.messaging.responder import BaseResponder
from aries_cloudagent.messaging.models.base_record import match_post_filter
from aries_cloudagent.messaging.agent_message import AgentMessage
from aries_cloudagent.cache.basic import BaseCache
from aries_cloudagent.protocols.basicmessage.v1_0.messages.basicmessage import BasicMessage
from aries_cloudagent.protocols.connections.v1_0.messages.connection_invitation import (
    ConnectionInvitation
)
from aries_cloudagent.protocols.connections.v1_0.manager import (
    ConnectionManagerError
)
from aries_cloudagent.protocols.issue_credential.v1_0.models.credential_exchange import (
    V10CredentialExchange
)
from aries_cloudagent.protocols.present_proof.v1_0.models.presentation_exchange import (
    V10PresentationExchange
)
from aries_cloudagent.transport.inbound.receipt import MessageReceipt
from aries_cloudagent.protocols.problem_report.v1_0.message import ProblemReport
from aries_cloudagent.protocols.present_proof.v1_0.messages.presentation_request import (
    PresentationRequest
)
from aries_cloudagent.protocols.present_proof.v1_0.message_types import (
    ATTACH_DECO_IDS,
    PRESENTATION_REQUEST
)
from aries_cloudagent.messaging.decorators.attach_decorator import AttachDecorator
from aries_cloudagent.indy.util import generate_pr_nonce
from mydata_did.v1_0.utils.util import bool_to_str, str_to_bool
from mydata_did.patched_protocols.present_proof.v1_0.manager import PresentationManager
from mydata_did.v1_0.messages.data_agreement_offer import (
    DataAgreementNegotiationOfferMessage
)
from mydata_did.v1_0.messages.data_agreement_accept import (
    DataAgreementNegotiationAcceptMessage
)
from mydata_did.v1_0.message_types import (
    DATA_AGREEMENT_NEGOTIATION_OFFER,
    DATA_AGREEMENT_NEGOTIATION_ACCEPT,
)
from mydata_did.v1_0.decorators.data_agreement_context_decorator import (
    DataAgreementContextDecorator,
)
from mydata_did.v1_0.models.data_agreement_qr_code_initiate_model import (
    DataAgreementQrCodeInitiateBody
)
from mydata_did.v1_0.messages.data_agreement_qr_code_initiate import (
    DataAgreementQrCodeInitiateMessage
)
from mydata_did.v1_0.messages.data_controller_details import (
    DataControllerDetailsMessage
)
from mydata_did.v1_0.messages.data_controller_details_response import (
    DataControllerDetailsResponseMessage
)
from mydata_did.v1_0.models.data_controller_model import (
    DataController
)
from mydata_did.v1_0.messages.existing_connections import ExistingConnectionsMessage
from mydata_did.v1_0.models.existing_connections_model import (
    ExistingConnectionsBody
)
from mydata_did.v1_0.messages.da_negotiation_receipt import (
    DataAgreementNegotiationReceiptMessage,
    DataAgreementNegotiationReceiptBody
)
from dexa_sdk.marketplace.records.marketplace_connection_record import MarketplaceConnectionRecord
from ..connections.records.existing_connections_record import (
    ExistingConnectionRecord
)
from ..agreements.da.v1_0.models.da_models import (
    DataAgreementModel,
    DA_DEFAULT_CONTEXT,
    DA_TYPE
)
from ..data_controller.records.controller_details_record import ControllerDetailsRecord
from ..agreements.da.v1_0.records.da_qrcode_record import DataAgreementQRCodeRecord
from ..agreements.da.v1_0.records.da_instance_record import DataAgreementInstanceRecord
from ..agreements.da.v1_0.records.da_template_record import DataAgreementTemplateRecord
from ..agreements.da.v1_0.records.personal_data_record import PersonalDataRecord
from ..agreements.da.v1_0.models.da_models import DataAgreementPersonalDataModel
from ..agreements.da.v1_0.models.da_instance_models import DataAgreementInstanceModel
from ..ledgers.indy.core import (
    create_cred_def_and_anchor_to_ledger,
    create_schema_def_and_anchor_to_ledger
)
from ..utils import (
    paginate,
    paginate_records,
    PaginationResult,
    drop_none_dict,
    bump_major_for_semver_string,
    fetch_org_details_from_intermediary,
    generate_firebase_dynamic_link
)
from ..did_mydata.core import DIDMyDataBuilder
from ..ledgers.ethereum.core import EthereumClient
from ..data_controller.records.connection_controller_details_record import (
    ConnectionControllerDetailsRecord
)


class V2ADAManagerError(BaseError):
    """ADA manager error"""


class V2ADAManager:
    """Manages ADA related functions (v2)
    """

    def __init__(self, context: InjectionContext) -> None:
        """Initialise ADA manager

        Args:
            context (InjectionContext): _description_
        """

        # Injection context
        self._context = context

        # Logger
        self._logger = logger

    @ property
    def context(self) -> InjectionContext:
        """Accessor for injection context

        Returns:
            InjectionContext: Injection context
        """
        return self._context

    async def create_invitation(
        self,
        my_label: str = None,
        my_endpoint: str = None,
        their_role: str = None,
        auto_accept: bool = None,
        public: bool = False,
        multi_use: bool = False,
        alias: str = None,
    ) -> typing.Tuple[ConnectionRecord, ConnectionInvitation]:
        """Generate new connection invitation."""

        if not my_label:
            my_label = self.context.settings.get("default_label")

        image_url = None

        # Fetch organisation details from intermediary.
        org_details = await fetch_org_details_from_intermediary(
            self.context
        )

        my_label = org_details["Name"]
        image_url = org_details["LogoImageURL"] + "/web"

        wallet: BaseWallet = await self.context.inject(BaseWallet)

        if public:
            if not self.context.settings.get("public_invites"):
                raise ConnectionManagerError(
                    "Public invitations are not enabled")

            public_did = await wallet.get_public_did()
            if not public_did:
                raise ConnectionManagerError(
                    "Cannot create public invitation with no public DID"
                )

            if multi_use:
                raise ConnectionManagerError(
                    "Cannot use public and multi_use at the same time"
                )

            # FIXME - allow ledger instance to format public DID with prefix?
            invitation = ConnectionInvitation(
                label=my_label, did=f"did:sov:{public_did.did}", image_url=image_url
            )
            return None, invitation

        invitation_mode = ConnectionRecord.INVITATION_MODE_ONCE
        if multi_use:
            invitation_mode = ConnectionRecord.INVITATION_MODE_MULTI

        if not my_endpoint:
            my_endpoint = self.context.settings.get("default_endpoint")
        accept = (
            ConnectionRecord.ACCEPT_AUTO
            if (
                auto_accept
                or (
                    auto_accept is None
                    and self.context.settings.get("debug.auto_accept_requests")
                )
            )
            else ConnectionRecord.ACCEPT_MANUAL
        )

        # Create and store new invitation key
        connection_key = await wallet.create_signing_key()

        # Create connection record
        connection = ConnectionRecord(
            initiator=ConnectionRecord.INITIATOR_SELF,
            invitation_key=connection_key.verkey,
            their_role=their_role,
            state=ConnectionRecord.STATE_INVITATION,
            accept=accept,
            invitation_mode=invitation_mode,
            alias=alias,
        )

        await connection.save(self.context, reason="Created new invitation")

        # Create connection invitation message
        # Note: Need to split this into two stages to support inbound routing of invites
        # Would want to reuse create_did_document and convert the result
        invitation = ConnectionInvitation(
            label=my_label, recipient_keys=[
                connection_key.verkey], endpoint=my_endpoint, image_url=image_url
        )
        await connection.attach_invitation(self.context, invitation)

        return connection, invitation

    async def create_and_store_ledger_payloads_for_da_template(
            self,
            *,
            template_record: DataAgreementTemplateRecord,
            pd_records: typing.List[PersonalDataRecord] = None,
            schema_id: str = None
    ) -> DataAgreementTemplateRecord:
        """Create and store ledger payloads for a da template

        Args:
            template_record (DataAgreementTemplateRecord): Data agreement template record
            pd_records (typing.List[PersonalDataRecord]): Personal data records
            schema_id (str): Schema identifier if available

        Returns:
            DataAgreementTemplateRecord: Record with ledger payloads
        """
        if template_record.method_of_use == DataAgreementTemplateRecord.METHOD_OF_USE_DATA_SOURCE:

            # Create schema if not existing
            if not schema_id:
                data_agreement = template_record.data_agreement
                # Schema name
                schema_name = data_agreement.get("purpose")
                # Schema version
                schema_version = data_agreement.get("version")
                # Schema attributes
                attributes = [
                    personal_data.attribute_name
                    for personal_data in pd_records
                ]
                # Creata schema and anchor to ledger
                (schema_id, schema_def) = await create_schema_def_and_anchor_to_ledger(
                    context=self.context,
                    schema_name=schema_name,
                    schema_version=schema_version,
                    attributes=attributes
                )

            # Create credential definition and anchor to ledger

            (cred_def_id, cred_def, novel) = await create_cred_def_and_anchor_to_ledger(
                context=self.context,
                schema_id=schema_id
            )

            template_record.cred_def_id = cred_def_id
            template_record.schema_id = schema_id
            await template_record.save(self.context)

        else:
            data_agreement = template_record.data_agreement

            # Usage purpose
            usage_purpose = data_agreement.get("purpose")

            # Usage purpose description
            usage_purpose_description = data_agreement.get("purposeDescription")

            # Data agreement template version
            da_template_version = data_agreement.get("version")

            # Create presentation request
            presentation_request = self.construct_presentation_request(
                usage_purpose=usage_purpose,
                usage_purpose_description=usage_purpose_description,
                da_template_version=da_template_version,
                personal_data=pd_records
            )

            template_record.presentation_request = presentation_request
            await template_record.save(self.context)

        return template_record

    def construct_presentation_request(
            self,
            *,
            usage_purpose: str,
            usage_purpose_description: str,
            da_template_version: str,
            personal_data: typing.List[PersonalDataRecord]
    ) -> dict:
        """
        Construct presentation request

        Args:
            usage_purpose: Usage purpose.
            usage_purpose_description: Usage purpose description.
            da_template_version: Data agreement template version.
            personal_data: List of personal data.

        Returns:
            :rtype: dict: Proof request

        """

        presentation_request_dict: dict = {
            "name": usage_purpose,
            "comment": usage_purpose_description,
            "version": da_template_version,
            "requested_attributes": {},
            "requested_predicates": {}
        }

        index = 1
        requested_attributes = {}

        for pd in personal_data:

            requested_attributes["additionalProp" + str(index)] = {
                "name": pd.attribute_name,
                "restrictions": pd.restrictions if pd.restrictions else []
            }
            if pd.restrictions:
                restrictions = [
                    {
                        "schema_id": restriction.get("schemaId"),
                        "cred_def_id": restriction.get("credDefId")
                    }
                    for restriction in pd.restrictions
                ]
                requested_attributes["additionalProp" +
                                     str(index)].update({"restrictions": restrictions})
            else:
                requested_attributes["additionalProp" + str(index)].update({})
            index += 1

        presentation_request_dict["requested_attributes"] = requested_attributes

        return presentation_request_dict

    async def create_and_store_da_template_in_wallet(
            self,
            data_agreement: dict,
            *,
            publish_flag: bool = True,
            schema_id: str = None
    ) -> DataAgreementTemplateRecord:
        """Create and store data agreement template in wallet

        Args:
            data_agreement (dict): Data agreement
            publish_flag (bool): Publish flag
            schema_id (str): Schema identifier
        """

        # Temp hack
        template_version = "1.0.0"
        template_id = str(uuid.uuid4())
        data_agreement.update({"@context": DA_DEFAULT_CONTEXT})
        data_agreement.update({"@id": template_id})
        data_agreement.update({"@type": DA_TYPE})
        data_agreement.update({"version": template_version})

        try:
            # Validate the data agreement.
            data_agreement: DataAgreementModel = DataAgreementModel.deserialize(data_agreement)
        except ValidationError as err:
            raise V2ADAManagerError(
                f"Failed to create data agreement; Reason: {err}"
            )

        # Create personal data records
        pds = data_agreement.personal_data
        pd_records = []
        pd_models_with_id = []
        for pd in pds:
            pd_record: PersonalDataRecord = \
                await PersonalDataRecord.build_and_save_record_from_pd_model(
                    self.context,
                    template_id,
                    template_version,
                    pd
                )
            pd_records.append(pd_record)
            pd_models_with_id.append(pd_record.convert_record_to_pd_model())

        # Update the personal data with attribute identifiers to the agreement
        data_agreement.personal_data = pd_models_with_id

        # Create template record
        record = DataAgreementTemplateRecord(
            template_id=template_id,
            template_version=template_version,
            state=DataAgreementTemplateRecord.STATE_DEFINITION,
            method_of_use=data_agreement.method_of_use,
            data_agreement=data_agreement.serialize(),
            publish_flag=bool_to_str(publish_flag),
            schema_id=schema_id,
            existing_schema_flag=bool_to_str(True) if schema_id else bool_to_str(False),
            third_party_data_sharing=bool_to_str(
                data_agreement.data_policy.third_party_data_sharing)
        )

        await record.save(self.context)

        if publish_flag:
            # Create ledger payloads
            record = await self.create_and_store_ledger_payloads_for_da_template(
                template_record=record,
                pd_records=pd_records,
                schema_id=schema_id
            )

        return record

    async def query_da_templates_in_wallet(
            self,
            *,
            template_id: str = None,
            delete_flag: str = "false",
            method_of_use: str = None,
            publish_flag: str = "true",
            template_version: str = None,
            latest_version_flag: str = "true",
            third_party_data_sharing: str = "false",
            page: int = 1,
            page_size: int = 10,
    ) -> PaginationResult:
        """Query DA templates in wallet

        Args:
            template_id (str, optional): Template identifier. Defaults to None.
            delete_flag (str, optional): Delete flag. Defaults to false.
            method_of_use (str, optional): Method of use. Defaults to None.
            publish_flag (str, optional): Publish flag. Defaults to true.
            latest_version_flag (str, optional): Latest version flag. Defaults to true.
            template_version (str, optional): Template version. Defaults to None.
            third_party_data_sharing (str, optional): Third party data sharing.
                Defaults to false.
            page (int, optional): Page. Defaults to 1.

        Returns:
            PaginationResult: Pagination results.
        """

        # Query by version is only possible if the template id is provided
        if template_version:
            assert template_id, "Template identifier is required to query by version"

        # Tag filter
        tag_filter = {
            "delete_flag": delete_flag,
            "publish_flag": publish_flag,
            "method_of_use": method_of_use,
            "template_id": template_id,
            "template_version": template_version,
            "latest_version_flag": latest_version_flag,
            "third_party_data_sharing": third_party_data_sharing
        }

        tag_filter = drop_none_dict(tag_filter)

        records = await DataAgreementTemplateRecord.query(
            context=self.context,
            tag_filter=tag_filter
        )

        records = sorted(records, key=lambda k: k.created_at, reverse=True)

        paginate_result = paginate_records(records, page, page_size)

        return paginate_result

    async def publish_da_template_in_wallet(self,
                                            template_id: str) -> DataAgreementTemplateRecord:
        """Publish data agreement template.

        Args:
            template_id (str): Template identifier

        Returns:
            DataAgreementTemplateRecord: Template record.
        """

        tag_filter = {
            "delete_flag": bool_to_str(False),
            "publish_flag": bool_to_str(False),
            "latest_version_flag": bool_to_str(True),
            "template_id": template_id
        }

        records = await DataAgreementTemplateRecord.query(
            context=self.context,
            tag_filter=tag_filter
        )

        assert records, "Data agreement template not found."

        record: DataAgreementTemplateRecord = records[0]

        await record.publish_template(self.context)

        pd_records = await record.fetch_personal_data_records(self.context)

        # Create ledger payloads
        record = await self.create_and_store_ledger_payloads_for_da_template(
            template_record=record,
            pd_records=pd_records,
            schema_id=record.schema_id
        )

        return record

    async def update_and_store_da_template_in_wallet(
            self,
            template_id: str,
            data_agreement: dict,
            *,
            publish_flag: bool = True,
            schema_id: str = None
    ) -> DataAgreementTemplateRecord:
        """Update and store data agreement template in wallet.

        Args:
            template_id (str): Template identifier
            data_agreement (dict): Data agreement
            publish_flag (bool): Publish flag
            schema_id (str): Schema identifier

        Returns:
            DataAgreementTemplateRecord: Updated record.
        """

        # Tag filter
        tag_filter = {
            "delete_flag": bool_to_str(False),
            "template_id": template_id,
            "latest_version_flag": bool_to_str(True)
        }

        # Fetch data agreement record
        record: DataAgreementTemplateRecord = \
            await DataAgreementTemplateRecord.retrieve_by_tag_filter(self.context, tag_filter)

        # Validate the data agreement.
        previous_da: DataAgreementModel = DataAgreementModel.deserialize(record.data_agreement)

        assert previous_da.method_of_use == data_agreement.get(
            "methodOfUse"), "Method of use cannot be updated."

        assert previous_da.data_policy.third_party_data_sharing \
            == data_agreement.get("dataPolicy").get("thirdPartyDataSharing"), \
            "Third party data sharing cannot be updated."

        # Copy the id, version from previous da to new da
        template_version = bump_major_for_semver_string(previous_da.version)
        template_id = previous_da.id
        data_agreement.update({"@context": DA_DEFAULT_CONTEXT})
        data_agreement.update({"@type": DA_TYPE})
        data_agreement.update({"@id": template_id})
        data_agreement.update({"version": template_version})

        updated_da: DataAgreementModel = DataAgreementModel.deserialize(data_agreement)

        # Create personal data records
        pds = updated_da.personal_data
        pd_records = []
        pd_models_with_id = []
        for pd in pds:
            pd_record: PersonalDataRecord = \
                await PersonalDataRecord.build_and_save_record_from_pd_model(
                    self.context,
                    template_id,
                    template_version,
                    pd
                )
            pd_records.append(pd_record)
            pd_models_with_id.append(pd_record.convert_record_to_pd_model())

        # Update the personal data with attribute identifiers to the agreement
        updated_da.personal_data = pd_models_with_id

        record.data_agreement = updated_da.serialize()
        record.publish_flag = bool_to_str(publish_flag)
        record.schema_id = schema_id
        record.existing_schema_flag = bool_to_str(True) if schema_id else bool_to_str(False)
        record.template_version = template_version

        await record.upgrade(self.context)

        if publish_flag:
            # Create ledger payloads
            record = await self.create_and_store_ledger_payloads_for_da_template(
                template_record=record,
                pd_records=pd_records,
                schema_id=schema_id
            )

        return record

    async def delete_da_template_in_wallet(self, template_id: str) -> str:
        """Deactivate DA template in wallet.

        This is not a normal delete operation of a specific version of template. Instead it
        marks the template with latest version flag as deleted i.e. Any version under this
        template is no longer active.

        Args:
            template_id (str): Template identifier
            template_version (str): Template version

        Returns:
            record_id: Record identifier for the deleted template.
        """
        # Query for the data agreement by id
        data_agreement_records: DataAgreementTemplateRecord = \
            await DataAgreementTemplateRecord.non_deleted_template_by_id(
                self.context,
                template_id
            )

        assert data_agreement_records, "Data agreement template not found."
        data_agreement_record = data_agreement_records[0]

        # Mark the data agreement as deleted and save.
        return await data_agreement_record.delete_template(self.context)

    async def query_pd_of_da_template_from_wallet(self,
                                                  template_id: str = None,
                                                  method_of_use: str = None,
                                                  third_party_data_sharing: str = None,
                                                  page: int = 1,
                                                  page_size: int = 10,
                                                  ) -> PaginationResult:
        """Query personal data for DA template.

        Args:
            template_id (str): Template identifier
            page (int, optional): Page number. Defaults to 1.
            page_size (int, optional): Page size. Defaults to 10.

        Returns:
            PaginationResult: Pagination results
        """

        # Tag filter
        tag_filter = {
            "delete_flag": bool_to_str(False),
            "method_of_use": method_of_use,
            "template_id": template_id,
            "latest_version_flag": bool_to_str(True),
            "third_party_data_sharing": third_party_data_sharing
        }

        tag_filter = drop_none_dict(tag_filter)

        records = await DataAgreementTemplateRecord.query(
            context=self.context,
            tag_filter=tag_filter
        )

        records = sorted(records, key=lambda k: k.created_at, reverse=True)

        # Fetch personal data records
        pd_records = []
        for record in records:
            pd_records.extend(await record.fetch_personal_data_records(self.context))

        paginate_result = paginate_records(pd_records, page, page_size)

        return paginate_result

    async def update_personal_data_description(self,
                                               attribute_id: str,
                                               desc: str) -> PersonalDataRecord:
        """Update personal data description

        Args:
            attribute_id (str): Attribute id
            desc (str): Description

        Returns:
            PersonalDataRecord: Personal data record
        """

        # Fetch personal data record by id
        pd_record: PersonalDataRecord = await PersonalDataRecord.retrieve_by_id(
            self.context,
            attribute_id
        )

        # Fetch the associated data agreement record
        da_template_record: DataAgreementTemplateRecord = \
            await DataAgreementTemplateRecord.latest_template_by_id(
                self.context,
                pd_record.data_agreement_template_id
            )

        assert da_template_record, "Matching data agreement template not found."
        assert da_template_record.template_version == \
            pd_record.data_agreement_template_version, \
            "Matching data agreement template with same version not found."

        # Update the personal data record.
        pd_record.attribute_description = desc
        await pd_record.save(self.context)

        pd_model: DataAgreementPersonalDataModel = pd_record.convert_record_to_pd_model()

        # Update the data agreement record with new personal data.
        da: DataAgreementModel = DataAgreementModel.deserialize(da_template_record.data_agreement)
        # Iterate through the existing personal data in data agreements
        # And update the personal data matching the attribute id
        da_pds = []
        for da_pd in da.personal_data:
            if da_pd.attribute_id != pd_model.attribute_id:
                da_pds.append(da_pd)
        da_pds.append(pd_model)
        da.personal_data = da_pds

        da_template_record.data_agreement = da.serialize()
        await da_template_record.save(self.context)

        return pd_record

    async def delete_personal_data(self, attribute_id: str) -> None:
        """Delete personal data record.

        On deleting personal data record, the associated data agreement template is
        updated. If the personal data record deleted, is the last one in the template,
        proceed to delete the template record.

        Args:
            attribute_id (str): _description_
        """

        # Fetch personal data record by id
        pd_record: PersonalDataRecord = await PersonalDataRecord.retrieve_by_id(
            self.context,
            attribute_id
        )

        # Fetch the associated data agreement record
        da_template_record: DataAgreementTemplateRecord = \
            await DataAgreementTemplateRecord.latest_template_by_id(
                self.context,
                pd_record.data_agreement_template_id
            )

        assert da_template_record, "Matching data agreement template not found."
        assert da_template_record.template_version == \
            pd_record.data_agreement_template_version, \
            "Matching data agreement template with same version not found."

        da: DataAgreementModel = DataAgreementModel.deserialize(da_template_record.data_agreement)

        # Iterate through the existing personal data in data agreements
        # And remove the deleted personal data.
        da_pds = []
        for da_pd in da.personal_data:
            if da_pd.attribute_id != pd_record.attribute_id:
                da_pd.attribute_id = None
                da_pds.append(da_pd)

        da.personal_data = da_pds

        if len(da_pds) == 0:
            await da_template_record.delete_template(self.context)
        else:
            # Update template record with new agreement.
            await self.update_and_store_da_template_in_wallet(
                pd_record.data_agreement_template_id,
                da.serialize(),
                publish_flag=str_to_bool(da_template_record.publish_flag)
            )

    async def build_data_agreement_offer_for_credential_exchange(
        self,
        template_id: str,
        connection_record: ConnectionRecord,
        cred_ex_record: V10CredentialExchange,
    ) -> DataAgreementNegotiationOfferMessage:
        """Build data agreement offer for credential exchange.

        Args:
            context (InjectionContext): Injection context to be used.
            template_id (str): Data agreement template identifier.
            connection_record (ConnectionRecord): Connection record.
            cred_ex_record (V10CredentialExchange): Credential exchange record.

        Returns:
            DataAgreementNegotiationOfferMessage: Offer message.
        """

        # Build instance record
        (da_instance_record, da_instance_model) = \
            await DataAgreementInstanceRecord.build_instance_from_template(
            self.context,
            template_id,
            connection_record,
            cred_ex_record.credential_exchange_id
        )

        # Build negotiation offer agent message
        agent_message = DataAgreementNegotiationOfferMessage(body=da_instance_model)

        return agent_message

    async def build_data_agreement_offer_for_presentation_exchange(
        self,
        template_id: str,
        connection_record: ConnectionRecord,
        pres_ex_record: V10PresentationExchange,
    ) -> DataAgreementNegotiationOfferMessage:
        """Build data agreement offer for presentaton exchange.

        Args:
            context (InjectionContext): Injection context to be used.
            template_id (str): Data agreement template identifier.
            connection_record (ConnectionRecord): Connection record.
            pres_ex_record (V10PresentationExchange): Presentation exchange record.

        Returns:
            DataAgreementNegotiationOfferMessage: Offer message.
        """

        # Build instance record
        (da_instance_record, da_instance_model) = \
            await DataAgreementInstanceRecord.build_instance_from_template(
            self.context,
            template_id,
            connection_record,
            pres_ex_record.presentation_exchange_id
        )

        # Build negotiation offer agent message
        agent_message = DataAgreementNegotiationOfferMessage(body=da_instance_model)

        return agent_message

    async def process_decorator_with_da_offer_message(
        self,
        decorator_set: DecoratorSet,
        data_ex_record: typing.Union[V10CredentialExchange, V10PresentationExchange],
        connection_record: ConnectionRecord
    ) -> DataAgreementInstanceRecord:
        """Process data agreement context decorator with DA offer message

        Args:
            decorator_set (DecoratorSet): Decorator set
            cred_ex_record (V10CredentialExchange): Credential exchange record
            connection_record (ConnectionRecord): Connection record

        Returns:
            DataAgreementInstanceRecord: Data agreement instance record.
        """

        # Check if data agreement context decorator is present
        if "data-agreement-context" not in decorator_set.keys():
            self._logger.info(
                "Data agreement context decorator is not present in the incoming message.")
            return None

        # Deserialize data agreement context decorator
        da_decorator_dict = decorator_set["data-agreement-context"]
        da_decorator_model: DataAgreementContextDecorator = \
            DataAgreementContextDecorator.deserialize(da_decorator_dict)

        assert da_decorator_model.message_type == "protocol", \
            "DA context message type must be 'protocol'."

        message_type = da_decorator_model.message.get("@type")
        assert DATA_AGREEMENT_NEGOTIATION_OFFER in message_type, \
            f"DA context protocol message type must be '{DATA_AGREEMENT_NEGOTIATION_OFFER}'"

        da_offer_message: DataAgreementNegotiationOfferMessage = \
            DataAgreementNegotiationOfferMessage.deserialize(da_decorator_model.message)

        # Build and save data agreement instance record.
        if data_ex_record.__class__.__name__ == V10CredentialExchange.__name__:
            return await DataAgreementInstanceRecord.build_instance_from_da_offer(
                self.context,
                da_offer_message,
                connection_record,
                data_ex_record.credential_exchange_id
            )
        else:
            return await DataAgreementInstanceRecord.build_instance_from_da_offer(
                self.context,
                da_offer_message,
                connection_record,
                data_ex_record.presentation_exchange_id
            )

    async def process_decorator_with_da_accept_message(
        self,
        decorator_set: DecoratorSet,
        data_ex_record: typing.Union[V10CredentialExchange, V10PresentationExchange],
        connection_record: ConnectionRecord
    ) -> DataAgreementInstanceRecord:
        """Process data agreement context decorator with DA accept message

        Args:
            decorator_set (DecoratorSet): Decorator set
            data_ex_record (typing.Union[V10CredentialExchange, V10PresentationExchange]):
                Data exchange record.
            connection_record (ConnectionRecord): Connection record

        Returns:
            DataAgreementInstanceRecord: Data agreement instance record.
        """

        # Check if data agreement context decorator is present
        if "data-agreement-context" not in decorator_set.keys():
            self._logger.info(
                "Data agreement context decorator is not present in the incoming message.")
            return None

        # Deserialize data agreement context decorator
        da_decorator_dict = decorator_set["data-agreement-context"]
        da_decorator_model: DataAgreementContextDecorator = \
            DataAgreementContextDecorator.deserialize(da_decorator_dict)

        assert da_decorator_model.message_type == "protocol", \
            "DA context message type must be 'protocol'."

        message_type = da_decorator_model.message.get("@type")
        assert DATA_AGREEMENT_NEGOTIATION_ACCEPT in message_type, \
            f"DA context protocol message type must be '{DATA_AGREEMENT_NEGOTIATION_ACCEPT}'"

        da_accept_message: DataAgreementNegotiationAcceptMessage = \
            DataAgreementNegotiationAcceptMessage.deserialize(da_decorator_model.message)

        # Build and save data agreement instance record.
        if data_ex_record.__class__.__name__ == V10CredentialExchange.__name__:
            # Build and save data agreement instance record.
            instance_record = await DataAgreementInstanceRecord.update_instance_from_da_accept(
                self.context,
                da_accept_message,
                data_ex_record.credential_exchange_id
            )
        else:
            # Build and save data agreement instance record.
            instance_record = await DataAgreementInstanceRecord.update_instance_from_da_accept(
                self.context,
                da_accept_message,
                data_ex_record.presentation_exchange_id
            )

        # Anchor da to blockchain.
        await self.anchor_da_instance_to_blockchain_async_task(instance_record.instance_id)

        return instance_record

    async def build_data_agreement_negotiation_accept_by_instance_id(
        self,
        instance_id: str,
        connection_record: ConnectionRecord
    ) -> DataAgreementNegotiationAcceptMessage:
        # Counter sign da
        (da_instance_record, da_instance_model) = \
            await DataAgreementInstanceRecord.counter_sign_instance(
                self.context,
                instance_id,
                connection_record,
        )

        # Build negotiation accept agent message
        agent_message = DataAgreementNegotiationAcceptMessage(body=da_instance_model)

        return agent_message

    async def build_data_agreement_accept_for_data_ex_record(
        self,
        connection_record: ConnectionRecord,
        data_ex_record: typing.Union[V10CredentialExchange, V10PresentationExchange],
    ) -> DataAgreementNegotiationAcceptMessage:
        """Build data agreement accept message for credential exchange.

        Args:
            context (InjectionContext): Injection context to be used.
            template_id (str): Data agreement template identifier.
            connection_record (ConnectionRecord): Connection record.
            data_ex_record (typing.Union[V10CredentialExchange, V10PresentationExchange]):
                Data exchange record.

        Returns:
            DataAgreementNegotiationAcceptMessage: Accept message.
        """
        if data_ex_record.__class__.__name__ == V10CredentialExchange.__name__:
            # Fetch data agreement instance matching credential exchange record.
            instance_record = await DataAgreementInstanceRecord.fetch_by_data_ex_id(
                self.context,
                data_ex_record.credential_exchange_id
            )
        else:
            # Fetch data agreement instance matching credential exchange record.
            instance_record = await DataAgreementInstanceRecord.fetch_by_data_ex_id(
                self.context,
                data_ex_record.presentation_exchange_id
            )

        # Build instance record
        (da_instance_record, da_instance_model) = \
            await DataAgreementInstanceRecord.counter_sign_instance(
            self.context,
            instance_record.instance_id,
            connection_record
        )

        # Build negotiation accept agent message
        agent_message = DataAgreementNegotiationAcceptMessage(body=da_instance_model)

        return agent_message

    async def query_data_agreement_instances(
        self,
        instance_id: str,
        template_id: str,
        template_version: str,
        method_of_use: str,
        third_party_data_sharing: str,
        data_ex_id: str,
        data_subject_did: str,
        page: int = 1,
        page_size: int = 10
    ) -> PaginationResult:
        """Query data agreement instances

        Args:
            instance_id (str): Instance identifier
            template_id (str): Template identifier
            template_version (str): Template version
            method_of_use (str): Method of use
            third_party_data_sharing (str): Third party data sharing
            data_ex_id (str): Data exchange id
            data_subject_did (str): Data subject did
            page (int, optional): Page. Defaults to 1.
            page_size (int, optional): Page size. Defaults to 10.

        Returns:
            PaginationResult: Pagination result
        """
        # Query by version is only possible if the template id is provided
        if template_version:
            assert template_id, "Template identifier is required to query by version"

        # Tag filter
        tag_filter = {
            "instance_id": instance_id,
            "template_id": template_id,
            "template_version": template_version,
            "method_of_use": method_of_use,
            "third_party_data_sharing": third_party_data_sharing,
            "data_ex_id": data_ex_id,
            "data_subject_did": data_subject_did
        }

        tag_filter = drop_none_dict(tag_filter)

        records = await DataAgreementInstanceRecord.query(
            context=self.context,
            tag_filter=tag_filter
        )

        records = sorted(records, key=lambda k: k.created_at, reverse=True)

        paginate_result = paginate_records(records, page, page_size)

        return paginate_result

    async def delete_da_instance_by_data_ex_id(
        self,
        cred_ex_id: str
    ) -> None:
        """Delete da instance by cred ex id.

        Args:
            cred_ex_id (str): Credential exchange identifier.
        """

        # Data agreement instance
        instance = await DataAgreementInstanceRecord.fetch_by_data_ex_id(
            self.context,
            cred_ex_id
        )

        await instance.delete_record(self.context)

    async def anchor_da_instance_to_blockchain_async_task_callback(
        self, *args, **kwargs
    ):
        """Anchor DA instance to blockchain async task callback function
        """

        # Obtain the completed task.
        completed_task: CompletedTask = args[0]

        # Obtain the results from the task.
        (instance_id, mydata_did, tx_hash, tx_receipt) = completed_task.task.result()

        tag_filter = {
            "instance_id": instance_id
        }

        # Fetch data agreement instance record.
        da_instance_records = await DataAgreementInstanceRecord.query(
            self.context,
            tag_filter,
        )

        assert da_instance_records, "Data agreement instance not found."

        da_instance_record: DataAgreementInstanceRecord = da_instance_records[0]

        transaction_receipt = json.loads(to_json(tx_receipt))
        transaction_hash = transaction_receipt.get("transactionHash")

        # Update the data agreement with blockchain metadata.
        da_instance_record.blink = f"blink:ethereum:rinkeby:{transaction_hash}"
        da_instance_record.mydata_did = mydata_did
        da_instance_record.blockchain_receipt = transaction_receipt

        await da_instance_record.save(self.context)

        # Send receipt.
        message = DataAgreementNegotiationReceiptMessage(
            body=DataAgreementNegotiationReceiptBody(
                instance_id=da_instance_record.instance_id,
                blockchain_receipt=transaction_receipt,
                blink=f"blink:ethereum:rinkeby:{transaction_hash}",
                mydata_did=mydata_did
            )
        )

        # Find the connection record.
        data_subject_did = da_instance_record.data_subject_did.replace("did:sov:", "")
        connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_did(
            self.context,
            their_did=data_subject_did,
        )

        self.send_reply_message(
            message,
            connection_record.connection_id
        )

    async def anchor_da_instance_to_blockchain_async_task(
        self,
        instance_id: str
    ):
        """Async task to anchor da instance to blockchain.

        Args:
            instance_id (str): Instance id
        """
        pending_task = await self.add_task(
            self.context,
            self.anchor_da_instance_to_blockchain(instance_id),
            self.anchor_da_instance_to_blockchain_async_task_callback
        )
        self._logger.info(pending_task)

    async def anchor_da_instance_to_blockchain(
        self,
        instance_id: str
    ) -> None:
        """Anchor da instance to blockchain.

        Args:
            instance_id (str): Instance id
        """

        eth_client: EthereumClient = await self.context.inject(EthereumClient)

        tag_filter = {
            "instance_id": instance_id
        }

        # Fetch data agreement instance record.
        da_instance_records = await DataAgreementInstanceRecord.query(
            self.context,
            tag_filter,
        )

        assert da_instance_records, "Data agreement instance not found."

        da_instance_record: DataAgreementInstanceRecord = da_instance_records[0]
        da_model: DataAgreementInstanceModel = \
            DataAgreementInstanceModel.deserialize(da_instance_record.data_agreement)

        did_mydata_builder = DIDMyDataBuilder(
            artefact=da_model
        )

        (tx_hash, tx_receipt) = await eth_client.emit_da_did(did_mydata_builder.mydata_did)

        return (da_instance_record.instance_id, did_mydata_builder.mydata_did, tx_hash, tx_receipt)

    async def create_data_agreement_qr_code(
        self,
        template_id: str,
        multi_use_flag: bool
    ) -> dict:
        """Create data agreement qr code

        Args:
            template_id (str): Template identifier
            multi_use_flag (bool): Multi use flag

        Returns:
            dict: Qr code.
        """

        qr_record = DataAgreementQRCodeRecord(
            template_id=template_id,
            multi_use_flag=bool_to_str(multi_use_flag)
        )
        await qr_record.save(self.context)

        (connection, invitation) = await self.create_invitation(
            auto_accept=True,
            public=False,
            multi_use=multi_use_flag,
            alias=f"DA_{template_id}_QR_{qr_record._id}"
        )

        qr_record.connection_id = connection.connection_id
        await qr_record.save(self.context)

        res = {
            "qr_id": qr_record._id,
            "invitation": invitation.serialize()
        }

        res_base64 = base64.b64encode(json.dumps(res).encode()).decode()
        payload = self.context.settings.get("default_endpoint") + "?qt=2&qp=" + res_base64

        firebase_dynamic_link = await generate_firebase_dynamic_link(self.context, payload)
        qr_record.dynamic_link = firebase_dynamic_link
        await qr_record.save(self.context)

        res.update({"dynamic_link": firebase_dynamic_link})

        return res

    async def create_connection_qr_code(
        self,
        connection_id: str
    ) -> dict:
        """Create connection QR code.

        Args:
            connection_id (str): Connection identifier.

        Returns:
            dict: Dict with dynamic link.
        """

        # Connection record.
        connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_id(
            self.context,
            connection_id
        )

        # Connection invitation
        connection_invitation: ConnectionInvitation = await connection_record.retrieve_invitation(
            self.context
        )

        # Generate firebase dynamic link.
        payload = connection_invitation.to_url()
        firebase_dynamic_link = await generate_firebase_dynamic_link(self.context, payload)

        res = {"dynamic_link": firebase_dynamic_link}

        return res

    async def query_data_agreement_qr_codes(
        self,
        template_id: str,
    ) -> PaginationResult:
        """Query data agreement qr codes

        Returns:
            PaginationResult: List of qr code records.
        """

        records = await DataAgreementQRCodeRecord.query(self.context, {"template_id": template_id})
        pagination_result = paginate_records(records, page=1, page_size=1000000)
        return pagination_result

    async def send_reply_message(self, message: AgentMessage, connection_id: str = None) -> None:
        """Send reply message to remote agent.

        Args:
            message (AgentMessage): Agent message.
            connection_id (str): Connection identifier
        """
        # Responder instance
        responder: DispatcherResponder = await self.context.inject(BaseResponder, required=False)

        if responder:
            await responder.send_reply(message, connection_id=connection_id)

    async def send_problem_report_message(self, explain: str, connection_id: str) -> None:
        """Send problem report message as reply.

        Args:
            explain (str): Explaination.
            connection_id (str): Connection id.
        """

        # Responder instance
        responder: DispatcherResponder = await self.context.inject(BaseResponder, required=False)

        problem_report = ProblemReport(explain_ltxt=explain)

        if responder:
            await responder.send_reply(problem_report, connection_id=connection_id)

    async def delete_data_agreement_qr_code(
        self,
        template_id: str,
        qr_id: str
    ) -> None:
        """Delete data agreement qr code."""
        record = await DataAgreementQRCodeRecord.retrieve_by_id(self.context, qr_id)
        assert record.template_id == template_id, "Data agreement not found."
        await record.delete_record(self.context)

    async def process_data_agreement_qr_code_initiate_message(
        self,
        message: DataAgreementQrCodeInitiateMessage,
        receipt: MessageReceipt
    ):
        """Process data QR code initiate message.

        Args:
            message (DataAgreementQrCodeInitiateMessage): Data agreement QR code initiate message.
            receipt (MessageReceipt): Message receipt.
        """
        qr_id = message.body.qr_id
        connection_id = self.context.connection_record.connection_id

        connection_record = await ConnectionRecord.retrieve_by_id(self.context, connection_id)

        # Fetch the qr code record.
        record: DataAgreementQRCodeRecord = \
            await DataAgreementQRCodeRecord.retrieve_by_id(
                self.context,
                qr_id
            )

        if record._multi_use_flag:
            record._scanned_flag = True
            await record.save(self.context)
        else:
            if record._scanned_flag:
                explain = "Qr code cannot be scanned twice"
                await self.send_problem_report_message(explain, connection_id)
                raise Exception(explain)

        # Fetch data agreement template record.
        template_record: DataAgreementTemplateRecord = \
            await DataAgreementTemplateRecord.latest_template_by_id(
                self.context,
                record.template_id
            )

        # Construct presentation request
        preset_presentation_request = template_record.presentation_request
        comment = preset_presentation_request.pop("comment")
        if not preset_presentation_request.get("nonce"):
            preset_presentation_request["nonce"] = await generate_pr_nonce()

        presentation_request = PresentationRequest(
            comment=comment,
            request_presentations_attach=[
                AttachDecorator.from_indy_dict(
                    indy_dict=preset_presentation_request,
                    ident=ATTACH_DECO_IDS[PRESENTATION_REQUEST],
                )
            ],
        )

        # Construct presentation exchange record
        presentation_manager = PresentationManager(self.context)
        (pres_ex_record) = await presentation_manager.create_exchange_for_request(
            connection_id=self.context.connection_record.connection_id,
            presentation_request_message=presentation_request,
        )

        # Update qr code with record id.
        record.data_ex_id = pres_ex_record.presentation_exchange_id
        await record.save(self.context)

        offer_message = await self.build_data_agreement_offer_for_presentation_exchange(
            template_record.template_id,
            connection_record,
            pres_ex_record
        )

        # Add data agreement context decorator
        presentation_request._decorators["data-agreement-context"] = DataAgreementContextDecorator(
            message_type="protocol",
            message=offer_message.serialize()
        )

        pres_ex_record.presentation_request_dict = presentation_request.serialize()
        pres_ex_record.template_id = template_record.template_id
        await pres_ex_record.save(self.context)

        await self.send_reply_message(presentation_request, connection_id)

    async def send_qr_code_initiate_message(
        self,
        qr_id,
        connection_id
    ):
        """Send data agreement qr code initiate message.

        Args:
            qr_id (_type_): QR id
            connection_id (_type_): connection id
        """

        message = DataAgreementQrCodeInitiateMessage(
            body=DataAgreementQrCodeInitiateBody(
                qr_id=qr_id
            )
        )

        await self.send_reply_message(message, connection_id)

    async def send_data_controller_details_message(
        self,
        connection_id: str
    ):
        """Send data controller details message

        Args:
            connection_id (str): Connection ID
        """

        message = DataControllerDetailsMessage()
        await self.send_reply_message(message, connection_id)

    async def process_data_controller_details_message(
        self,
        message: DataControllerDetailsMessage,
        receipt: MessageReceipt
    ):
        """Process data controller details message.

        Args:
            message (DataControllerDetailsMessage): Data controller details message.
            receipt (MessageReceipt): Message receipt.
        """

        # Query controller records.
        records = await ControllerDetailsRecord.query(self.context, {})

        connection_id = self.context.connection_record.connection_id

        if not records:
            wallet: BaseWallet = await self.context.inject(BaseWallet)
            controller_did = await wallet.get_public_did()

            cache: BaseCache = await self.context.inject(BaseCache, required=False)
            cache_key = f"did:sov:{controller_did.did}"

            assert cache, "Cache not available."

            controller_details = None
            async with cache.acquire(cache_key) as entry:
                if entry.result:
                    cached = entry.result
                    controller_details = DataController.deserialize(cached)
                else:
                    org_details = await fetch_org_details_from_intermediary(self.context)

                    # Organisation did
                    organisation_did = f"did:sov:{controller_did.did}"

                    controller_details = DataController(
                        organisation_did=organisation_did,
                        organisation_name=org_details["Name"],
                        cover_image_url=org_details["CoverImageURL"] + "/web",
                        logo_image_url=org_details["LogoImageURL"] + "/web",
                        location=org_details["Location"],
                        organisation_type=org_details["Type"]["Type"],
                        description=org_details["Description"],
                        policy_url=org_details["PolicyURL"],
                        eula_url=org_details["EulaURL"]
                    )
                    cache_val = controller_details.serialize()
                    await entry.set_result(cache_val, 3600)

                response_message = DataControllerDetailsResponseMessage(
                    body=controller_details
                )

                await self.send_reply_message(response_message, connection_id)
        else:
            # If found update record.
            record: ControllerDetailsRecord = records[0]

            controller_details = DataController(
                organisation_did=record.organisation_did,
                organisation_name=record.organisation_name,
                cover_image_url=record.cover_image_url,
                logo_image_url=record.logo_image_url,
                location=record.location,
                organisation_type=record.organisation_type,
                description=record.description,
                policy_url=record.policy_url,
                eula_url=record.eula_url
            )

            response_message = DataControllerDetailsResponseMessage(
                body=controller_details
            )

            await self.send_reply_message(response_message, connection_id)

    async def update_controller_details(
        self,
        organisation_name: str = None,
        cover_image_url: str = None,
        logo_image_url: str = None,
        location: str = None,
        organisation_type: str = None,
        description: str = None,
        policy_url: str = None,
        eula_url: str = None
    ) -> ControllerDetailsRecord:
        """Update controller details

        Args:
            organisation_name (str, optional): Organisation name. Defaults to None.
            cover_image_url (str, optional): Cover image URL. Defaults to None.
            logo_image_url (str, optional): Logo image URL. Defaults to None.
            location (str, optional): Location. Defaults to None.
            organisation_type (str, optional): Organisation type. Defaults to None.
            description (str, optional): Description. Defaults to None.
            policy_url (str, optional): Policy URL. Defaults to None.
            eula_url (str, optional): EULA URL. Defaults to None.

        Returns:
            ControllerDetailsRecord: Controller details record.
        """

        # Query controller records.
        records = await ControllerDetailsRecord.query(self.context, {})
        if not records:

            wallet: BaseWallet = await self.context.inject(BaseWallet)

            controller_did = await wallet.get_public_did()

            organisation_did = f"did:sov:{controller_did.did}"

            # If not found, create new record.
            record = ControllerDetailsRecord(
                organisation_did=organisation_did,
                organisation_name=organisation_name,
                cover_image_url=cover_image_url,
                logo_image_url=logo_image_url,
                location=location,
                organisation_type=organisation_type,
                description=description,
                policy_url=policy_url,
                eula_url=eula_url
            )

            await record.save(self.context)
        else:
            # If found update record.
            record: ControllerDetailsRecord = records[0]
            record.organisation_name = organisation_name
            record.cover_image_url = cover_image_url
            record.logo_image_url = logo_image_url
            record.location = location
            record.organisation_type = organisation_type
            record.description = description
            record.policy_url = policy_url
            record.eula_url = eula_url

            await record.save(self.context)

        return record

    async def process_existing_connections_message(
        self,
        message: ExistingConnectionsMessage,
        message_receipt: MessageReceipt
    ):
        """Process existing connections message.

        Args:
            message (ExistingConnectionsMessage): Existing connections message.
            message_receipt (MessageReceipt): Message receipt.
        """

        # Invitation key.
        invitation_key = message_receipt.recipient_verkey

        # Fetch current connection record using invitation key
        connection_record = await ConnectionRecord.retrieve_by_invitation_key(
            self.context,
            invitation_key
        )

        # Fetch existing connections record for the current connection.
        tag_filter = {
            "connection_id": connection_record.connection_id
        }
        existing_connection_records = await ExistingConnectionRecord.query(
            self.context,
            tag_filter
        )

        if existing_connection_records:
            # Existing connection record.
            existing_connection_record: ExistingConnectionRecord = existing_connection_records[0]

            # Delete the record.
            await existing_connection_record.delete_record(self.context)

        # Fetch associated connection record.
        old_connection_record = await ConnectionRecord.retrieve_by_did(
            self.context,
            their_did=None,
            my_did=message.body.theirdid
        )

        # Create a new existing connection record.
        existing_connection_record = ExistingConnectionRecord(
            existing_connection_id=old_connection_record.connection_id,
            my_did=old_connection_record.my_did,
            connection_status="available",
            connection_id=connection_record.connection_id
        )

        await existing_connection_record.save(self.context)

        # updating the current connection invitation status to inactive
        connection_record.state = ConnectionRecord.STATE_INACTIVE
        await connection_record.save(context=self.context)

    async def get_existing_connection_record_for_new_connection_id(
        self,
        connection_id: str
    ) -> ExistingConnectionRecord:
        """Get existing connection record for new connection id.

        Args:
            connection_id (str): Connection id.

        Returns:
            ExistingConnectionRecord: Existing connection record.
        """

        # Tag filter.
        tag_filter = {
            "connection_id": connection_id
        }

        # Fetch existing connection records.
        existing_connection_records = await ExistingConnectionRecord.query(
            self.context,
            tag_filter
        )

        res = None
        if existing_connection_records:
            res = existing_connection_records[0]

        return res

    async def send_message_with_connection_invitation_and_return_route_all(
        self,
        message: AgentMessage,
        connection_id: str,
    ) -> typing.Tuple[str, str, dict]:
        """Send message with connection invitation and return route all.

        Args:
            message (AgentMessage): Agent message.
            connection_id (str): Connection id.

        Returns:
            typing.Tuple[str, str, dict]: sender_verkey, recipient_verkey, message_dict
        """
        # Fetch connection record.
        connection_record: ConnectionRecord = \
            await ConnectionRecord.retrieve_by_id(self.context, connection_id)

        # Get invitation key.
        invitation_key = connection_record.invitation_key
        # Service enpoint
        invitation = await connection_record.retrieve_invitation(self.context)
        service_endpoint = invitation.endpoint

        # Fetch wallet from context
        wallet: IndyWallet = await self.context.inject(BaseWallet)

        # Set transport return route all
        message._decorators["transport"] = TransportDecorator(
            return_route="all"
        )

        # Create a local did
        did: DIDInfo = await wallet.create_local_did()

        sender_key = did.verkey
        packed_message = await wallet.pack_message(
            message.to_json(),
            [invitation_key],
            sender_key
        )

        headers = {
            "Content-Type": "application/ssi-agent-wire"
        }

        async with aiohttp.ClientSession(headers=headers) as session:
            async with session.post(service_endpoint, data=packed_message) as response:
                if response.status == 200:
                    message_body = await response.read()

                    # Unpack message
                    unpacked = await wallet.unpack_message(message_body)
                    (message_json, sender_verkey, recipient_verkey) = unpacked

                    # Convert message to dict.
                    message_dict = json.loads(message_json)

                    return (sender_verkey, recipient_verkey, message_dict)

    async def send_message_with_connection_invitation(
        self,
        message: AgentMessage,
        connection_id: str,
    ) -> None:
        """Send message with connection invitation.

        Args:
            message (AgentMessage): Agent message.
            connection_id (str): Connection id.
        """
        # Fetch connection record.
        connection_record: ConnectionRecord = \
            await ConnectionRecord.retrieve_by_id(self.context, connection_id)

        # Get invitation key.
        invitation_key = connection_record.invitation_key
        # Service enpoint
        invitation = await connection_record.retrieve_invitation(self.context)
        service_endpoint = invitation.endpoint

        # Fetch wallet from context
        wallet: IndyWallet = await self.context.inject(BaseWallet)

        # Create a local did
        did: DIDInfo = await wallet.create_local_did()

        sender_key = did.verkey
        packed_message = await wallet.pack_message(
            message.to_json(),
            [invitation_key],
            sender_key
        )

        headers = {
            "Content-Type": "application/ssi-agent-wire"
        }

        async with aiohttp.ClientSession(headers=headers) as session:
            async with session.post(service_endpoint, data=packed_message) as response:
                if response.status == 200:
                    self._logger.info("Posted existing connection message...")

    async def send_existing_connections_message(
        self,
        theirdid: str,
        connection_id: str
    ):
        """Send existing connections notification message.

        Args:
            theirdid (str): Their DID of remote agent in old connection.
            connection_id (str): Connection identifier.
        """

        # Construct existing connection message.
        message = ExistingConnectionsMessage(
            body=ExistingConnectionsBody(
                theirdid=theirdid
            )
        )

        # Send the message to remote agent.
        await self.send_message_with_connection_invitation(
            message,
            connection_id
        )

    async def query_connections_and_categorise_results(
        self,
        tag_filter: dict = None,
        post_filter_positive: dict = None,
        page: int = 1,
        page_size: int = 10,
        org_flag: bool = False,
        marketplace_flag: bool = False,
    ) -> PaginationResult:

        # Query the connection records.
        records = await ConnectionRecord.query(
            self.context,
            tag_filter,
            post_filter_positive
        )

        # Sort the connection records.
        records = sorted(
            records,
            key=lambda k: k.created_at,
            reverse=True
        )

        res = []
        for record in records:
            tag_filter = {"connection_id": record.connection_id}

            # Fetch controller details attached to the connection.
            controller_details: typing.List[ConnectionControllerDetailsRecord] = \
                await ConnectionControllerDetailsRecord.query(
                self.context,
                tag_filter
            )

            # Fetch marketplace connection record.
            marketplace_connections: typing.List[MarketplaceConnectionRecord] = \
                await MarketplaceConnectionRecord.query(
                self.context,
                tag_filter
            )

            connection = record.serialize()

            # Update controller details to the connection dict.
            if controller_details:
                connection.update({
                    "org_flag": True,
                    "controller_details": controller_details[0].controller_details
                })
            else:
                connection.update({
                    "controller_details": {},
                    "org_flag": False
                })

            if marketplace_connections:
                connection.update({
                    "marketplace_flag": True
                })
            else:
                connection.update({"marketplace_flag": False})

            # Apply category filter on connections.
            categorise_filter = {
                "org_flag": org_flag,
                "marketplace_flag": marketplace_flag
            }

            categorise_filter = drop_none_dict(categorise_filter)

            if match_post_filter(connection, categorise_filter, True):
                res.append(connection)

        pagination_result = paginate(res, page if page else 1, page_size if page_size else 10)

        return pagination_result

    async def add_task(self,
                       context: InjectionContext,
                       coro: typing.Coroutine,
                       task_complete: typing.Callable = None,
                       ident: str = None) -> PendingTask:
        """
        Add a new task to the queue, delaying execution if busy.

        Args:
            context: Injection context to be used.
            coro: The coroutine to run
            task_complete: A callback to run on completion
            ident: A string identifier for the task

        Returns: a future resolving to the asyncio task instance once queued
        """
        loop = asyncio.get_event_loop()
        pack_format: PackWireFormat = await context.inject(BaseWireFormat, required=False)
        return pack_format.task_queue.put(coro, lambda x: loop.create_task(task_complete(x)), ident)

    async def process_da_negotiation_receipt_message(
        self,
        message: DataAgreementNegotiationReceiptMessage,
        message_receipt: MessageReceipt
    ):
        """Process DA negotiation receipt message.

        Args:
            message (DataAgreementNegotiationReceiptMessage): DA negotiation receipt message.
            message_receipt (MessageReceipt): Message receipt.
        """

        instance_id = message.body.instance_id
        blockchain_receipt = message.body.blockchain_receipt
        blink = message.body.blink
        mydata_did = message.body.mydata_did

        # Fetch the DDA instance record.
        tag_filter = {
            "instance_id": instance_id
        }
        instance_record: DataAgreementInstanceRecord = \
            await DataAgreementInstanceRecord.retrieve_by_tag_filter(
                self.context,
                tag_filter
            )

        # Update instance record.
        instance_record.blockchain_receipt = blockchain_receipt
        instance_record.blink = blink
        instance_record.mydata_did = mydata_did

        await instance_record.save(self.context)

Classes

class V2ADAManager (context: aries_cloudagent.config.injection_context.InjectionContext)

Manages ADA related functions (v2)

Initialise ADA manager

Args

context : InjectionContext
description
Expand source code
class V2ADAManager:
    """Manages ADA related functions (v2)
    """

    def __init__(self, context: InjectionContext) -> None:
        """Initialise ADA manager

        Args:
            context (InjectionContext): _description_
        """

        # Injection context
        self._context = context

        # Logger
        self._logger = logger

    @ property
    def context(self) -> InjectionContext:
        """Accessor for injection context

        Returns:
            InjectionContext: Injection context
        """
        return self._context

    async def create_invitation(
        self,
        my_label: str = None,
        my_endpoint: str = None,
        their_role: str = None,
        auto_accept: bool = None,
        public: bool = False,
        multi_use: bool = False,
        alias: str = None,
    ) -> typing.Tuple[ConnectionRecord, ConnectionInvitation]:
        """Generate new connection invitation."""

        if not my_label:
            my_label = self.context.settings.get("default_label")

        image_url = None

        # Fetch organisation details from intermediary.
        org_details = await fetch_org_details_from_intermediary(
            self.context
        )

        my_label = org_details["Name"]
        image_url = org_details["LogoImageURL"] + "/web"

        wallet: BaseWallet = await self.context.inject(BaseWallet)

        if public:
            if not self.context.settings.get("public_invites"):
                raise ConnectionManagerError(
                    "Public invitations are not enabled")

            public_did = await wallet.get_public_did()
            if not public_did:
                raise ConnectionManagerError(
                    "Cannot create public invitation with no public DID"
                )

            if multi_use:
                raise ConnectionManagerError(
                    "Cannot use public and multi_use at the same time"
                )

            # FIXME - allow ledger instance to format public DID with prefix?
            invitation = ConnectionInvitation(
                label=my_label, did=f"did:sov:{public_did.did}", image_url=image_url
            )
            return None, invitation

        invitation_mode = ConnectionRecord.INVITATION_MODE_ONCE
        if multi_use:
            invitation_mode = ConnectionRecord.INVITATION_MODE_MULTI

        if not my_endpoint:
            my_endpoint = self.context.settings.get("default_endpoint")
        accept = (
            ConnectionRecord.ACCEPT_AUTO
            if (
                auto_accept
                or (
                    auto_accept is None
                    and self.context.settings.get("debug.auto_accept_requests")
                )
            )
            else ConnectionRecord.ACCEPT_MANUAL
        )

        # Create and store new invitation key
        connection_key = await wallet.create_signing_key()

        # Create connection record
        connection = ConnectionRecord(
            initiator=ConnectionRecord.INITIATOR_SELF,
            invitation_key=connection_key.verkey,
            their_role=their_role,
            state=ConnectionRecord.STATE_INVITATION,
            accept=accept,
            invitation_mode=invitation_mode,
            alias=alias,
        )

        await connection.save(self.context, reason="Created new invitation")

        # Create connection invitation message
        # Note: Need to split this into two stages to support inbound routing of invites
        # Would want to reuse create_did_document and convert the result
        invitation = ConnectionInvitation(
            label=my_label, recipient_keys=[
                connection_key.verkey], endpoint=my_endpoint, image_url=image_url
        )
        await connection.attach_invitation(self.context, invitation)

        return connection, invitation

    async def create_and_store_ledger_payloads_for_da_template(
            self,
            *,
            template_record: DataAgreementTemplateRecord,
            pd_records: typing.List[PersonalDataRecord] = None,
            schema_id: str = None
    ) -> DataAgreementTemplateRecord:
        """Create and store ledger payloads for a da template

        Args:
            template_record (DataAgreementTemplateRecord): Data agreement template record
            pd_records (typing.List[PersonalDataRecord]): Personal data records
            schema_id (str): Schema identifier if available

        Returns:
            DataAgreementTemplateRecord: Record with ledger payloads
        """
        if template_record.method_of_use == DataAgreementTemplateRecord.METHOD_OF_USE_DATA_SOURCE:

            # Create schema if not existing
            if not schema_id:
                data_agreement = template_record.data_agreement
                # Schema name
                schema_name = data_agreement.get("purpose")
                # Schema version
                schema_version = data_agreement.get("version")
                # Schema attributes
                attributes = [
                    personal_data.attribute_name
                    for personal_data in pd_records
                ]
                # Creata schema and anchor to ledger
                (schema_id, schema_def) = await create_schema_def_and_anchor_to_ledger(
                    context=self.context,
                    schema_name=schema_name,
                    schema_version=schema_version,
                    attributes=attributes
                )

            # Create credential definition and anchor to ledger

            (cred_def_id, cred_def, novel) = await create_cred_def_and_anchor_to_ledger(
                context=self.context,
                schema_id=schema_id
            )

            template_record.cred_def_id = cred_def_id
            template_record.schema_id = schema_id
            await template_record.save(self.context)

        else:
            data_agreement = template_record.data_agreement

            # Usage purpose
            usage_purpose = data_agreement.get("purpose")

            # Usage purpose description
            usage_purpose_description = data_agreement.get("purposeDescription")

            # Data agreement template version
            da_template_version = data_agreement.get("version")

            # Create presentation request
            presentation_request = self.construct_presentation_request(
                usage_purpose=usage_purpose,
                usage_purpose_description=usage_purpose_description,
                da_template_version=da_template_version,
                personal_data=pd_records
            )

            template_record.presentation_request = presentation_request
            await template_record.save(self.context)

        return template_record

    def construct_presentation_request(
            self,
            *,
            usage_purpose: str,
            usage_purpose_description: str,
            da_template_version: str,
            personal_data: typing.List[PersonalDataRecord]
    ) -> dict:
        """
        Construct presentation request

        Args:
            usage_purpose: Usage purpose.
            usage_purpose_description: Usage purpose description.
            da_template_version: Data agreement template version.
            personal_data: List of personal data.

        Returns:
            :rtype: dict: Proof request

        """

        presentation_request_dict: dict = {
            "name": usage_purpose,
            "comment": usage_purpose_description,
            "version": da_template_version,
            "requested_attributes": {},
            "requested_predicates": {}
        }

        index = 1
        requested_attributes = {}

        for pd in personal_data:

            requested_attributes["additionalProp" + str(index)] = {
                "name": pd.attribute_name,
                "restrictions": pd.restrictions if pd.restrictions else []
            }
            if pd.restrictions:
                restrictions = [
                    {
                        "schema_id": restriction.get("schemaId"),
                        "cred_def_id": restriction.get("credDefId")
                    }
                    for restriction in pd.restrictions
                ]
                requested_attributes["additionalProp" +
                                     str(index)].update({"restrictions": restrictions})
            else:
                requested_attributes["additionalProp" + str(index)].update({})
            index += 1

        presentation_request_dict["requested_attributes"] = requested_attributes

        return presentation_request_dict

    async def create_and_store_da_template_in_wallet(
            self,
            data_agreement: dict,
            *,
            publish_flag: bool = True,
            schema_id: str = None
    ) -> DataAgreementTemplateRecord:
        """Create and store data agreement template in wallet

        Args:
            data_agreement (dict): Data agreement
            publish_flag (bool): Publish flag
            schema_id (str): Schema identifier
        """

        # Temp hack
        template_version = "1.0.0"
        template_id = str(uuid.uuid4())
        data_agreement.update({"@context": DA_DEFAULT_CONTEXT})
        data_agreement.update({"@id": template_id})
        data_agreement.update({"@type": DA_TYPE})
        data_agreement.update({"version": template_version})

        try:
            # Validate the data agreement.
            data_agreement: DataAgreementModel = DataAgreementModel.deserialize(data_agreement)
        except ValidationError as err:
            raise V2ADAManagerError(
                f"Failed to create data agreement; Reason: {err}"
            )

        # Create personal data records
        pds = data_agreement.personal_data
        pd_records = []
        pd_models_with_id = []
        for pd in pds:
            pd_record: PersonalDataRecord = \
                await PersonalDataRecord.build_and_save_record_from_pd_model(
                    self.context,
                    template_id,
                    template_version,
                    pd
                )
            pd_records.append(pd_record)
            pd_models_with_id.append(pd_record.convert_record_to_pd_model())

        # Update the personal data with attribute identifiers to the agreement
        data_agreement.personal_data = pd_models_with_id

        # Create template record
        record = DataAgreementTemplateRecord(
            template_id=template_id,
            template_version=template_version,
            state=DataAgreementTemplateRecord.STATE_DEFINITION,
            method_of_use=data_agreement.method_of_use,
            data_agreement=data_agreement.serialize(),
            publish_flag=bool_to_str(publish_flag),
            schema_id=schema_id,
            existing_schema_flag=bool_to_str(True) if schema_id else bool_to_str(False),
            third_party_data_sharing=bool_to_str(
                data_agreement.data_policy.third_party_data_sharing)
        )

        await record.save(self.context)

        if publish_flag:
            # Create ledger payloads
            record = await self.create_and_store_ledger_payloads_for_da_template(
                template_record=record,
                pd_records=pd_records,
                schema_id=schema_id
            )

        return record

    async def query_da_templates_in_wallet(
            self,
            *,
            template_id: str = None,
            delete_flag: str = "false",
            method_of_use: str = None,
            publish_flag: str = "true",
            template_version: str = None,
            latest_version_flag: str = "true",
            third_party_data_sharing: str = "false",
            page: int = 1,
            page_size: int = 10,
    ) -> PaginationResult:
        """Query DA templates in wallet

        Args:
            template_id (str, optional): Template identifier. Defaults to None.
            delete_flag (str, optional): Delete flag. Defaults to false.
            method_of_use (str, optional): Method of use. Defaults to None.
            publish_flag (str, optional): Publish flag. Defaults to true.
            latest_version_flag (str, optional): Latest version flag. Defaults to true.
            template_version (str, optional): Template version. Defaults to None.
            third_party_data_sharing (str, optional): Third party data sharing.
                Defaults to false.
            page (int, optional): Page. Defaults to 1.

        Returns:
            PaginationResult: Pagination results.
        """

        # Query by version is only possible if the template id is provided
        if template_version:
            assert template_id, "Template identifier is required to query by version"

        # Tag filter
        tag_filter = {
            "delete_flag": delete_flag,
            "publish_flag": publish_flag,
            "method_of_use": method_of_use,
            "template_id": template_id,
            "template_version": template_version,
            "latest_version_flag": latest_version_flag,
            "third_party_data_sharing": third_party_data_sharing
        }

        tag_filter = drop_none_dict(tag_filter)

        records = await DataAgreementTemplateRecord.query(
            context=self.context,
            tag_filter=tag_filter
        )

        records = sorted(records, key=lambda k: k.created_at, reverse=True)

        paginate_result = paginate_records(records, page, page_size)

        return paginate_result

    async def publish_da_template_in_wallet(self,
                                            template_id: str) -> DataAgreementTemplateRecord:
        """Publish data agreement template.

        Args:
            template_id (str): Template identifier

        Returns:
            DataAgreementTemplateRecord: Template record.
        """

        tag_filter = {
            "delete_flag": bool_to_str(False),
            "publish_flag": bool_to_str(False),
            "latest_version_flag": bool_to_str(True),
            "template_id": template_id
        }

        records = await DataAgreementTemplateRecord.query(
            context=self.context,
            tag_filter=tag_filter
        )

        assert records, "Data agreement template not found."

        record: DataAgreementTemplateRecord = records[0]

        await record.publish_template(self.context)

        pd_records = await record.fetch_personal_data_records(self.context)

        # Create ledger payloads
        record = await self.create_and_store_ledger_payloads_for_da_template(
            template_record=record,
            pd_records=pd_records,
            schema_id=record.schema_id
        )

        return record

    async def update_and_store_da_template_in_wallet(
            self,
            template_id: str,
            data_agreement: dict,
            *,
            publish_flag: bool = True,
            schema_id: str = None
    ) -> DataAgreementTemplateRecord:
        """Update and store data agreement template in wallet.

        Args:
            template_id (str): Template identifier
            data_agreement (dict): Data agreement
            publish_flag (bool): Publish flag
            schema_id (str): Schema identifier

        Returns:
            DataAgreementTemplateRecord: Updated record.
        """

        # Tag filter
        tag_filter = {
            "delete_flag": bool_to_str(False),
            "template_id": template_id,
            "latest_version_flag": bool_to_str(True)
        }

        # Fetch data agreement record
        record: DataAgreementTemplateRecord = \
            await DataAgreementTemplateRecord.retrieve_by_tag_filter(self.context, tag_filter)

        # Validate the data agreement.
        previous_da: DataAgreementModel = DataAgreementModel.deserialize(record.data_agreement)

        assert previous_da.method_of_use == data_agreement.get(
            "methodOfUse"), "Method of use cannot be updated."

        assert previous_da.data_policy.third_party_data_sharing \
            == data_agreement.get("dataPolicy").get("thirdPartyDataSharing"), \
            "Third party data sharing cannot be updated."

        # Copy the id, version from previous da to new da
        template_version = bump_major_for_semver_string(previous_da.version)
        template_id = previous_da.id
        data_agreement.update({"@context": DA_DEFAULT_CONTEXT})
        data_agreement.update({"@type": DA_TYPE})
        data_agreement.update({"@id": template_id})
        data_agreement.update({"version": template_version})

        updated_da: DataAgreementModel = DataAgreementModel.deserialize(data_agreement)

        # Create personal data records
        pds = updated_da.personal_data
        pd_records = []
        pd_models_with_id = []
        for pd in pds:
            pd_record: PersonalDataRecord = \
                await PersonalDataRecord.build_and_save_record_from_pd_model(
                    self.context,
                    template_id,
                    template_version,
                    pd
                )
            pd_records.append(pd_record)
            pd_models_with_id.append(pd_record.convert_record_to_pd_model())

        # Update the personal data with attribute identifiers to the agreement
        updated_da.personal_data = pd_models_with_id

        record.data_agreement = updated_da.serialize()
        record.publish_flag = bool_to_str(publish_flag)
        record.schema_id = schema_id
        record.existing_schema_flag = bool_to_str(True) if schema_id else bool_to_str(False)
        record.template_version = template_version

        await record.upgrade(self.context)

        if publish_flag:
            # Create ledger payloads
            record = await self.create_and_store_ledger_payloads_for_da_template(
                template_record=record,
                pd_records=pd_records,
                schema_id=schema_id
            )

        return record

    async def delete_da_template_in_wallet(self, template_id: str) -> str:
        """Deactivate DA template in wallet.

        This is not a normal delete operation of a specific version of template. Instead it
        marks the template with latest version flag as deleted i.e. Any version under this
        template is no longer active.

        Args:
            template_id (str): Template identifier
            template_version (str): Template version

        Returns:
            record_id: Record identifier for the deleted template.
        """
        # Query for the data agreement by id
        data_agreement_records: DataAgreementTemplateRecord = \
            await DataAgreementTemplateRecord.non_deleted_template_by_id(
                self.context,
                template_id
            )

        assert data_agreement_records, "Data agreement template not found."
        data_agreement_record = data_agreement_records[0]

        # Mark the data agreement as deleted and save.
        return await data_agreement_record.delete_template(self.context)

    async def query_pd_of_da_template_from_wallet(self,
                                                  template_id: str = None,
                                                  method_of_use: str = None,
                                                  third_party_data_sharing: str = None,
                                                  page: int = 1,
                                                  page_size: int = 10,
                                                  ) -> PaginationResult:
        """Query personal data for DA template.

        Args:
            template_id (str): Template identifier
            page (int, optional): Page number. Defaults to 1.
            page_size (int, optional): Page size. Defaults to 10.

        Returns:
            PaginationResult: Pagination results
        """

        # Tag filter
        tag_filter = {
            "delete_flag": bool_to_str(False),
            "method_of_use": method_of_use,
            "template_id": template_id,
            "latest_version_flag": bool_to_str(True),
            "third_party_data_sharing": third_party_data_sharing
        }

        tag_filter = drop_none_dict(tag_filter)

        records = await DataAgreementTemplateRecord.query(
            context=self.context,
            tag_filter=tag_filter
        )

        records = sorted(records, key=lambda k: k.created_at, reverse=True)

        # Fetch personal data records
        pd_records = []
        for record in records:
            pd_records.extend(await record.fetch_personal_data_records(self.context))

        paginate_result = paginate_records(pd_records, page, page_size)

        return paginate_result

    async def update_personal_data_description(self,
                                               attribute_id: str,
                                               desc: str) -> PersonalDataRecord:
        """Update personal data description

        Args:
            attribute_id (str): Attribute id
            desc (str): Description

        Returns:
            PersonalDataRecord: Personal data record
        """

        # Fetch personal data record by id
        pd_record: PersonalDataRecord = await PersonalDataRecord.retrieve_by_id(
            self.context,
            attribute_id
        )

        # Fetch the associated data agreement record
        da_template_record: DataAgreementTemplateRecord = \
            await DataAgreementTemplateRecord.latest_template_by_id(
                self.context,
                pd_record.data_agreement_template_id
            )

        assert da_template_record, "Matching data agreement template not found."
        assert da_template_record.template_version == \
            pd_record.data_agreement_template_version, \
            "Matching data agreement template with same version not found."

        # Update the personal data record.
        pd_record.attribute_description = desc
        await pd_record.save(self.context)

        pd_model: DataAgreementPersonalDataModel = pd_record.convert_record_to_pd_model()

        # Update the data agreement record with new personal data.
        da: DataAgreementModel = DataAgreementModel.deserialize(da_template_record.data_agreement)
        # Iterate through the existing personal data in data agreements
        # And update the personal data matching the attribute id
        da_pds = []
        for da_pd in da.personal_data:
            if da_pd.attribute_id != pd_model.attribute_id:
                da_pds.append(da_pd)
        da_pds.append(pd_model)
        da.personal_data = da_pds

        da_template_record.data_agreement = da.serialize()
        await da_template_record.save(self.context)

        return pd_record

    async def delete_personal_data(self, attribute_id: str) -> None:
        """Delete personal data record.

        On deleting personal data record, the associated data agreement template is
        updated. If the personal data record deleted, is the last one in the template,
        proceed to delete the template record.

        Args:
            attribute_id (str): _description_
        """

        # Fetch personal data record by id
        pd_record: PersonalDataRecord = await PersonalDataRecord.retrieve_by_id(
            self.context,
            attribute_id
        )

        # Fetch the associated data agreement record
        da_template_record: DataAgreementTemplateRecord = \
            await DataAgreementTemplateRecord.latest_template_by_id(
                self.context,
                pd_record.data_agreement_template_id
            )

        assert da_template_record, "Matching data agreement template not found."
        assert da_template_record.template_version == \
            pd_record.data_agreement_template_version, \
            "Matching data agreement template with same version not found."

        da: DataAgreementModel = DataAgreementModel.deserialize(da_template_record.data_agreement)

        # Iterate through the existing personal data in data agreements
        # And remove the deleted personal data.
        da_pds = []
        for da_pd in da.personal_data:
            if da_pd.attribute_id != pd_record.attribute_id:
                da_pd.attribute_id = None
                da_pds.append(da_pd)

        da.personal_data = da_pds

        if len(da_pds) == 0:
            await da_template_record.delete_template(self.context)
        else:
            # Update template record with new agreement.
            await self.update_and_store_da_template_in_wallet(
                pd_record.data_agreement_template_id,
                da.serialize(),
                publish_flag=str_to_bool(da_template_record.publish_flag)
            )

    async def build_data_agreement_offer_for_credential_exchange(
        self,
        template_id: str,
        connection_record: ConnectionRecord,
        cred_ex_record: V10CredentialExchange,
    ) -> DataAgreementNegotiationOfferMessage:
        """Build data agreement offer for credential exchange.

        Args:
            context (InjectionContext): Injection context to be used.
            template_id (str): Data agreement template identifier.
            connection_record (ConnectionRecord): Connection record.
            cred_ex_record (V10CredentialExchange): Credential exchange record.

        Returns:
            DataAgreementNegotiationOfferMessage: Offer message.
        """

        # Build instance record
        (da_instance_record, da_instance_model) = \
            await DataAgreementInstanceRecord.build_instance_from_template(
            self.context,
            template_id,
            connection_record,
            cred_ex_record.credential_exchange_id
        )

        # Build negotiation offer agent message
        agent_message = DataAgreementNegotiationOfferMessage(body=da_instance_model)

        return agent_message

    async def build_data_agreement_offer_for_presentation_exchange(
        self,
        template_id: str,
        connection_record: ConnectionRecord,
        pres_ex_record: V10PresentationExchange,
    ) -> DataAgreementNegotiationOfferMessage:
        """Build data agreement offer for presentaton exchange.

        Args:
            context (InjectionContext): Injection context to be used.
            template_id (str): Data agreement template identifier.
            connection_record (ConnectionRecord): Connection record.
            pres_ex_record (V10PresentationExchange): Presentation exchange record.

        Returns:
            DataAgreementNegotiationOfferMessage: Offer message.
        """

        # Build instance record
        (da_instance_record, da_instance_model) = \
            await DataAgreementInstanceRecord.build_instance_from_template(
            self.context,
            template_id,
            connection_record,
            pres_ex_record.presentation_exchange_id
        )

        # Build negotiation offer agent message
        agent_message = DataAgreementNegotiationOfferMessage(body=da_instance_model)

        return agent_message

    async def process_decorator_with_da_offer_message(
        self,
        decorator_set: DecoratorSet,
        data_ex_record: typing.Union[V10CredentialExchange, V10PresentationExchange],
        connection_record: ConnectionRecord
    ) -> DataAgreementInstanceRecord:
        """Process data agreement context decorator with DA offer message

        Args:
            decorator_set (DecoratorSet): Decorator set
            cred_ex_record (V10CredentialExchange): Credential exchange record
            connection_record (ConnectionRecord): Connection record

        Returns:
            DataAgreementInstanceRecord: Data agreement instance record.
        """

        # Check if data agreement context decorator is present
        if "data-agreement-context" not in decorator_set.keys():
            self._logger.info(
                "Data agreement context decorator is not present in the incoming message.")
            return None

        # Deserialize data agreement context decorator
        da_decorator_dict = decorator_set["data-agreement-context"]
        da_decorator_model: DataAgreementContextDecorator = \
            DataAgreementContextDecorator.deserialize(da_decorator_dict)

        assert da_decorator_model.message_type == "protocol", \
            "DA context message type must be 'protocol'."

        message_type = da_decorator_model.message.get("@type")
        assert DATA_AGREEMENT_NEGOTIATION_OFFER in message_type, \
            f"DA context protocol message type must be '{DATA_AGREEMENT_NEGOTIATION_OFFER}'"

        da_offer_message: DataAgreementNegotiationOfferMessage = \
            DataAgreementNegotiationOfferMessage.deserialize(da_decorator_model.message)

        # Build and save data agreement instance record.
        if data_ex_record.__class__.__name__ == V10CredentialExchange.__name__:
            return await DataAgreementInstanceRecord.build_instance_from_da_offer(
                self.context,
                da_offer_message,
                connection_record,
                data_ex_record.credential_exchange_id
            )
        else:
            return await DataAgreementInstanceRecord.build_instance_from_da_offer(
                self.context,
                da_offer_message,
                connection_record,
                data_ex_record.presentation_exchange_id
            )

    async def process_decorator_with_da_accept_message(
        self,
        decorator_set: DecoratorSet,
        data_ex_record: typing.Union[V10CredentialExchange, V10PresentationExchange],
        connection_record: ConnectionRecord
    ) -> DataAgreementInstanceRecord:
        """Process data agreement context decorator with DA accept message

        Args:
            decorator_set (DecoratorSet): Decorator set
            data_ex_record (typing.Union[V10CredentialExchange, V10PresentationExchange]):
                Data exchange record.
            connection_record (ConnectionRecord): Connection record

        Returns:
            DataAgreementInstanceRecord: Data agreement instance record.
        """

        # Check if data agreement context decorator is present
        if "data-agreement-context" not in decorator_set.keys():
            self._logger.info(
                "Data agreement context decorator is not present in the incoming message.")
            return None

        # Deserialize data agreement context decorator
        da_decorator_dict = decorator_set["data-agreement-context"]
        da_decorator_model: DataAgreementContextDecorator = \
            DataAgreementContextDecorator.deserialize(da_decorator_dict)

        assert da_decorator_model.message_type == "protocol", \
            "DA context message type must be 'protocol'."

        message_type = da_decorator_model.message.get("@type")
        assert DATA_AGREEMENT_NEGOTIATION_ACCEPT in message_type, \
            f"DA context protocol message type must be '{DATA_AGREEMENT_NEGOTIATION_ACCEPT}'"

        da_accept_message: DataAgreementNegotiationAcceptMessage = \
            DataAgreementNegotiationAcceptMessage.deserialize(da_decorator_model.message)

        # Build and save data agreement instance record.
        if data_ex_record.__class__.__name__ == V10CredentialExchange.__name__:
            # Build and save data agreement instance record.
            instance_record = await DataAgreementInstanceRecord.update_instance_from_da_accept(
                self.context,
                da_accept_message,
                data_ex_record.credential_exchange_id
            )
        else:
            # Build and save data agreement instance record.
            instance_record = await DataAgreementInstanceRecord.update_instance_from_da_accept(
                self.context,
                da_accept_message,
                data_ex_record.presentation_exchange_id
            )

        # Anchor da to blockchain.
        await self.anchor_da_instance_to_blockchain_async_task(instance_record.instance_id)

        return instance_record

    async def build_data_agreement_negotiation_accept_by_instance_id(
        self,
        instance_id: str,
        connection_record: ConnectionRecord
    ) -> DataAgreementNegotiationAcceptMessage:
        # Counter sign da
        (da_instance_record, da_instance_model) = \
            await DataAgreementInstanceRecord.counter_sign_instance(
                self.context,
                instance_id,
                connection_record,
        )

        # Build negotiation accept agent message
        agent_message = DataAgreementNegotiationAcceptMessage(body=da_instance_model)

        return agent_message

    async def build_data_agreement_accept_for_data_ex_record(
        self,
        connection_record: ConnectionRecord,
        data_ex_record: typing.Union[V10CredentialExchange, V10PresentationExchange],
    ) -> DataAgreementNegotiationAcceptMessage:
        """Build data agreement accept message for credential exchange.

        Args:
            context (InjectionContext): Injection context to be used.
            template_id (str): Data agreement template identifier.
            connection_record (ConnectionRecord): Connection record.
            data_ex_record (typing.Union[V10CredentialExchange, V10PresentationExchange]):
                Data exchange record.

        Returns:
            DataAgreementNegotiationAcceptMessage: Accept message.
        """
        if data_ex_record.__class__.__name__ == V10CredentialExchange.__name__:
            # Fetch data agreement instance matching credential exchange record.
            instance_record = await DataAgreementInstanceRecord.fetch_by_data_ex_id(
                self.context,
                data_ex_record.credential_exchange_id
            )
        else:
            # Fetch data agreement instance matching credential exchange record.
            instance_record = await DataAgreementInstanceRecord.fetch_by_data_ex_id(
                self.context,
                data_ex_record.presentation_exchange_id
            )

        # Build instance record
        (da_instance_record, da_instance_model) = \
            await DataAgreementInstanceRecord.counter_sign_instance(
            self.context,
            instance_record.instance_id,
            connection_record
        )

        # Build negotiation accept agent message
        agent_message = DataAgreementNegotiationAcceptMessage(body=da_instance_model)

        return agent_message

    async def query_data_agreement_instances(
        self,
        instance_id: str,
        template_id: str,
        template_version: str,
        method_of_use: str,
        third_party_data_sharing: str,
        data_ex_id: str,
        data_subject_did: str,
        page: int = 1,
        page_size: int = 10
    ) -> PaginationResult:
        """Query data agreement instances

        Args:
            instance_id (str): Instance identifier
            template_id (str): Template identifier
            template_version (str): Template version
            method_of_use (str): Method of use
            third_party_data_sharing (str): Third party data sharing
            data_ex_id (str): Data exchange id
            data_subject_did (str): Data subject did
            page (int, optional): Page. Defaults to 1.
            page_size (int, optional): Page size. Defaults to 10.

        Returns:
            PaginationResult: Pagination result
        """
        # Query by version is only possible if the template id is provided
        if template_version:
            assert template_id, "Template identifier is required to query by version"

        # Tag filter
        tag_filter = {
            "instance_id": instance_id,
            "template_id": template_id,
            "template_version": template_version,
            "method_of_use": method_of_use,
            "third_party_data_sharing": third_party_data_sharing,
            "data_ex_id": data_ex_id,
            "data_subject_did": data_subject_did
        }

        tag_filter = drop_none_dict(tag_filter)

        records = await DataAgreementInstanceRecord.query(
            context=self.context,
            tag_filter=tag_filter
        )

        records = sorted(records, key=lambda k: k.created_at, reverse=True)

        paginate_result = paginate_records(records, page, page_size)

        return paginate_result

    async def delete_da_instance_by_data_ex_id(
        self,
        cred_ex_id: str
    ) -> None:
        """Delete da instance by cred ex id.

        Args:
            cred_ex_id (str): Credential exchange identifier.
        """

        # Data agreement instance
        instance = await DataAgreementInstanceRecord.fetch_by_data_ex_id(
            self.context,
            cred_ex_id
        )

        await instance.delete_record(self.context)

    async def anchor_da_instance_to_blockchain_async_task_callback(
        self, *args, **kwargs
    ):
        """Anchor DA instance to blockchain async task callback function
        """

        # Obtain the completed task.
        completed_task: CompletedTask = args[0]

        # Obtain the results from the task.
        (instance_id, mydata_did, tx_hash, tx_receipt) = completed_task.task.result()

        tag_filter = {
            "instance_id": instance_id
        }

        # Fetch data agreement instance record.
        da_instance_records = await DataAgreementInstanceRecord.query(
            self.context,
            tag_filter,
        )

        assert da_instance_records, "Data agreement instance not found."

        da_instance_record: DataAgreementInstanceRecord = da_instance_records[0]

        transaction_receipt = json.loads(to_json(tx_receipt))
        transaction_hash = transaction_receipt.get("transactionHash")

        # Update the data agreement with blockchain metadata.
        da_instance_record.blink = f"blink:ethereum:rinkeby:{transaction_hash}"
        da_instance_record.mydata_did = mydata_did
        da_instance_record.blockchain_receipt = transaction_receipt

        await da_instance_record.save(self.context)

        # Send receipt.
        message = DataAgreementNegotiationReceiptMessage(
            body=DataAgreementNegotiationReceiptBody(
                instance_id=da_instance_record.instance_id,
                blockchain_receipt=transaction_receipt,
                blink=f"blink:ethereum:rinkeby:{transaction_hash}",
                mydata_did=mydata_did
            )
        )

        # Find the connection record.
        data_subject_did = da_instance_record.data_subject_did.replace("did:sov:", "")
        connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_did(
            self.context,
            their_did=data_subject_did,
        )

        self.send_reply_message(
            message,
            connection_record.connection_id
        )

    async def anchor_da_instance_to_blockchain_async_task(
        self,
        instance_id: str
    ):
        """Async task to anchor da instance to blockchain.

        Args:
            instance_id (str): Instance id
        """
        pending_task = await self.add_task(
            self.context,
            self.anchor_da_instance_to_blockchain(instance_id),
            self.anchor_da_instance_to_blockchain_async_task_callback
        )
        self._logger.info(pending_task)

    async def anchor_da_instance_to_blockchain(
        self,
        instance_id: str
    ) -> None:
        """Anchor da instance to blockchain.

        Args:
            instance_id (str): Instance id
        """

        eth_client: EthereumClient = await self.context.inject(EthereumClient)

        tag_filter = {
            "instance_id": instance_id
        }

        # Fetch data agreement instance record.
        da_instance_records = await DataAgreementInstanceRecord.query(
            self.context,
            tag_filter,
        )

        assert da_instance_records, "Data agreement instance not found."

        da_instance_record: DataAgreementInstanceRecord = da_instance_records[0]
        da_model: DataAgreementInstanceModel = \
            DataAgreementInstanceModel.deserialize(da_instance_record.data_agreement)

        did_mydata_builder = DIDMyDataBuilder(
            artefact=da_model
        )

        (tx_hash, tx_receipt) = await eth_client.emit_da_did(did_mydata_builder.mydata_did)

        return (da_instance_record.instance_id, did_mydata_builder.mydata_did, tx_hash, tx_receipt)

    async def create_data_agreement_qr_code(
        self,
        template_id: str,
        multi_use_flag: bool
    ) -> dict:
        """Create data agreement qr code

        Args:
            template_id (str): Template identifier
            multi_use_flag (bool): Multi use flag

        Returns:
            dict: Qr code.
        """

        qr_record = DataAgreementQRCodeRecord(
            template_id=template_id,
            multi_use_flag=bool_to_str(multi_use_flag)
        )
        await qr_record.save(self.context)

        (connection, invitation) = await self.create_invitation(
            auto_accept=True,
            public=False,
            multi_use=multi_use_flag,
            alias=f"DA_{template_id}_QR_{qr_record._id}"
        )

        qr_record.connection_id = connection.connection_id
        await qr_record.save(self.context)

        res = {
            "qr_id": qr_record._id,
            "invitation": invitation.serialize()
        }

        res_base64 = base64.b64encode(json.dumps(res).encode()).decode()
        payload = self.context.settings.get("default_endpoint") + "?qt=2&qp=" + res_base64

        firebase_dynamic_link = await generate_firebase_dynamic_link(self.context, payload)
        qr_record.dynamic_link = firebase_dynamic_link
        await qr_record.save(self.context)

        res.update({"dynamic_link": firebase_dynamic_link})

        return res

    async def create_connection_qr_code(
        self,
        connection_id: str
    ) -> dict:
        """Create connection QR code.

        Args:
            connection_id (str): Connection identifier.

        Returns:
            dict: Dict with dynamic link.
        """

        # Connection record.
        connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_id(
            self.context,
            connection_id
        )

        # Connection invitation
        connection_invitation: ConnectionInvitation = await connection_record.retrieve_invitation(
            self.context
        )

        # Generate firebase dynamic link.
        payload = connection_invitation.to_url()
        firebase_dynamic_link = await generate_firebase_dynamic_link(self.context, payload)

        res = {"dynamic_link": firebase_dynamic_link}

        return res

    async def query_data_agreement_qr_codes(
        self,
        template_id: str,
    ) -> PaginationResult:
        """Query data agreement qr codes

        Returns:
            PaginationResult: List of qr code records.
        """

        records = await DataAgreementQRCodeRecord.query(self.context, {"template_id": template_id})
        pagination_result = paginate_records(records, page=1, page_size=1000000)
        return pagination_result

    async def send_reply_message(self, message: AgentMessage, connection_id: str = None) -> None:
        """Send reply message to remote agent.

        Args:
            message (AgentMessage): Agent message.
            connection_id (str): Connection identifier
        """
        # Responder instance
        responder: DispatcherResponder = await self.context.inject(BaseResponder, required=False)

        if responder:
            await responder.send_reply(message, connection_id=connection_id)

    async def send_problem_report_message(self, explain: str, connection_id: str) -> None:
        """Send problem report message as reply.

        Args:
            explain (str): Explaination.
            connection_id (str): Connection id.
        """

        # Responder instance
        responder: DispatcherResponder = await self.context.inject(BaseResponder, required=False)

        problem_report = ProblemReport(explain_ltxt=explain)

        if responder:
            await responder.send_reply(problem_report, connection_id=connection_id)

    async def delete_data_agreement_qr_code(
        self,
        template_id: str,
        qr_id: str
    ) -> None:
        """Delete data agreement qr code."""
        record = await DataAgreementQRCodeRecord.retrieve_by_id(self.context, qr_id)
        assert record.template_id == template_id, "Data agreement not found."
        await record.delete_record(self.context)

    async def process_data_agreement_qr_code_initiate_message(
        self,
        message: DataAgreementQrCodeInitiateMessage,
        receipt: MessageReceipt
    ):
        """Process data QR code initiate message.

        Args:
            message (DataAgreementQrCodeInitiateMessage): Data agreement QR code initiate message.
            receipt (MessageReceipt): Message receipt.
        """
        qr_id = message.body.qr_id
        connection_id = self.context.connection_record.connection_id

        connection_record = await ConnectionRecord.retrieve_by_id(self.context, connection_id)

        # Fetch the qr code record.
        record: DataAgreementQRCodeRecord = \
            await DataAgreementQRCodeRecord.retrieve_by_id(
                self.context,
                qr_id
            )

        if record._multi_use_flag:
            record._scanned_flag = True
            await record.save(self.context)
        else:
            if record._scanned_flag:
                explain = "Qr code cannot be scanned twice"
                await self.send_problem_report_message(explain, connection_id)
                raise Exception(explain)

        # Fetch data agreement template record.
        template_record: DataAgreementTemplateRecord = \
            await DataAgreementTemplateRecord.latest_template_by_id(
                self.context,
                record.template_id
            )

        # Construct presentation request
        preset_presentation_request = template_record.presentation_request
        comment = preset_presentation_request.pop("comment")
        if not preset_presentation_request.get("nonce"):
            preset_presentation_request["nonce"] = await generate_pr_nonce()

        presentation_request = PresentationRequest(
            comment=comment,
            request_presentations_attach=[
                AttachDecorator.from_indy_dict(
                    indy_dict=preset_presentation_request,
                    ident=ATTACH_DECO_IDS[PRESENTATION_REQUEST],
                )
            ],
        )

        # Construct presentation exchange record
        presentation_manager = PresentationManager(self.context)
        (pres_ex_record) = await presentation_manager.create_exchange_for_request(
            connection_id=self.context.connection_record.connection_id,
            presentation_request_message=presentation_request,
        )

        # Update qr code with record id.
        record.data_ex_id = pres_ex_record.presentation_exchange_id
        await record.save(self.context)

        offer_message = await self.build_data_agreement_offer_for_presentation_exchange(
            template_record.template_id,
            connection_record,
            pres_ex_record
        )

        # Add data agreement context decorator
        presentation_request._decorators["data-agreement-context"] = DataAgreementContextDecorator(
            message_type="protocol",
            message=offer_message.serialize()
        )

        pres_ex_record.presentation_request_dict = presentation_request.serialize()
        pres_ex_record.template_id = template_record.template_id
        await pres_ex_record.save(self.context)

        await self.send_reply_message(presentation_request, connection_id)

    async def send_qr_code_initiate_message(
        self,
        qr_id,
        connection_id
    ):
        """Send data agreement qr code initiate message.

        Args:
            qr_id (_type_): QR id
            connection_id (_type_): connection id
        """

        message = DataAgreementQrCodeInitiateMessage(
            body=DataAgreementQrCodeInitiateBody(
                qr_id=qr_id
            )
        )

        await self.send_reply_message(message, connection_id)

    async def send_data_controller_details_message(
        self,
        connection_id: str
    ):
        """Send data controller details message

        Args:
            connection_id (str): Connection ID
        """

        message = DataControllerDetailsMessage()
        await self.send_reply_message(message, connection_id)

    async def process_data_controller_details_message(
        self,
        message: DataControllerDetailsMessage,
        receipt: MessageReceipt
    ):
        """Process data controller details message.

        Args:
            message (DataControllerDetailsMessage): Data controller details message.
            receipt (MessageReceipt): Message receipt.
        """

        # Query controller records.
        records = await ControllerDetailsRecord.query(self.context, {})

        connection_id = self.context.connection_record.connection_id

        if not records:
            wallet: BaseWallet = await self.context.inject(BaseWallet)
            controller_did = await wallet.get_public_did()

            cache: BaseCache = await self.context.inject(BaseCache, required=False)
            cache_key = f"did:sov:{controller_did.did}"

            assert cache, "Cache not available."

            controller_details = None
            async with cache.acquire(cache_key) as entry:
                if entry.result:
                    cached = entry.result
                    controller_details = DataController.deserialize(cached)
                else:
                    org_details = await fetch_org_details_from_intermediary(self.context)

                    # Organisation did
                    organisation_did = f"did:sov:{controller_did.did}"

                    controller_details = DataController(
                        organisation_did=organisation_did,
                        organisation_name=org_details["Name"],
                        cover_image_url=org_details["CoverImageURL"] + "/web",
                        logo_image_url=org_details["LogoImageURL"] + "/web",
                        location=org_details["Location"],
                        organisation_type=org_details["Type"]["Type"],
                        description=org_details["Description"],
                        policy_url=org_details["PolicyURL"],
                        eula_url=org_details["EulaURL"]
                    )
                    cache_val = controller_details.serialize()
                    await entry.set_result(cache_val, 3600)

                response_message = DataControllerDetailsResponseMessage(
                    body=controller_details
                )

                await self.send_reply_message(response_message, connection_id)
        else:
            # If found update record.
            record: ControllerDetailsRecord = records[0]

            controller_details = DataController(
                organisation_did=record.organisation_did,
                organisation_name=record.organisation_name,
                cover_image_url=record.cover_image_url,
                logo_image_url=record.logo_image_url,
                location=record.location,
                organisation_type=record.organisation_type,
                description=record.description,
                policy_url=record.policy_url,
                eula_url=record.eula_url
            )

            response_message = DataControllerDetailsResponseMessage(
                body=controller_details
            )

            await self.send_reply_message(response_message, connection_id)

    async def update_controller_details(
        self,
        organisation_name: str = None,
        cover_image_url: str = None,
        logo_image_url: str = None,
        location: str = None,
        organisation_type: str = None,
        description: str = None,
        policy_url: str = None,
        eula_url: str = None
    ) -> ControllerDetailsRecord:
        """Update controller details

        Args:
            organisation_name (str, optional): Organisation name. Defaults to None.
            cover_image_url (str, optional): Cover image URL. Defaults to None.
            logo_image_url (str, optional): Logo image URL. Defaults to None.
            location (str, optional): Location. Defaults to None.
            organisation_type (str, optional): Organisation type. Defaults to None.
            description (str, optional): Description. Defaults to None.
            policy_url (str, optional): Policy URL. Defaults to None.
            eula_url (str, optional): EULA URL. Defaults to None.

        Returns:
            ControllerDetailsRecord: Controller details record.
        """

        # Query controller records.
        records = await ControllerDetailsRecord.query(self.context, {})
        if not records:

            wallet: BaseWallet = await self.context.inject(BaseWallet)

            controller_did = await wallet.get_public_did()

            organisation_did = f"did:sov:{controller_did.did}"

            # If not found, create new record.
            record = ControllerDetailsRecord(
                organisation_did=organisation_did,
                organisation_name=organisation_name,
                cover_image_url=cover_image_url,
                logo_image_url=logo_image_url,
                location=location,
                organisation_type=organisation_type,
                description=description,
                policy_url=policy_url,
                eula_url=eula_url
            )

            await record.save(self.context)
        else:
            # If found update record.
            record: ControllerDetailsRecord = records[0]
            record.organisation_name = organisation_name
            record.cover_image_url = cover_image_url
            record.logo_image_url = logo_image_url
            record.location = location
            record.organisation_type = organisation_type
            record.description = description
            record.policy_url = policy_url
            record.eula_url = eula_url

            await record.save(self.context)

        return record

    async def process_existing_connections_message(
        self,
        message: ExistingConnectionsMessage,
        message_receipt: MessageReceipt
    ):
        """Process existing connections message.

        Args:
            message (ExistingConnectionsMessage): Existing connections message.
            message_receipt (MessageReceipt): Message receipt.
        """

        # Invitation key.
        invitation_key = message_receipt.recipient_verkey

        # Fetch current connection record using invitation key
        connection_record = await ConnectionRecord.retrieve_by_invitation_key(
            self.context,
            invitation_key
        )

        # Fetch existing connections record for the current connection.
        tag_filter = {
            "connection_id": connection_record.connection_id
        }
        existing_connection_records = await ExistingConnectionRecord.query(
            self.context,
            tag_filter
        )

        if existing_connection_records:
            # Existing connection record.
            existing_connection_record: ExistingConnectionRecord = existing_connection_records[0]

            # Delete the record.
            await existing_connection_record.delete_record(self.context)

        # Fetch associated connection record.
        old_connection_record = await ConnectionRecord.retrieve_by_did(
            self.context,
            their_did=None,
            my_did=message.body.theirdid
        )

        # Create a new existing connection record.
        existing_connection_record = ExistingConnectionRecord(
            existing_connection_id=old_connection_record.connection_id,
            my_did=old_connection_record.my_did,
            connection_status="available",
            connection_id=connection_record.connection_id
        )

        await existing_connection_record.save(self.context)

        # updating the current connection invitation status to inactive
        connection_record.state = ConnectionRecord.STATE_INACTIVE
        await connection_record.save(context=self.context)

    async def get_existing_connection_record_for_new_connection_id(
        self,
        connection_id: str
    ) -> ExistingConnectionRecord:
        """Get existing connection record for new connection id.

        Args:
            connection_id (str): Connection id.

        Returns:
            ExistingConnectionRecord: Existing connection record.
        """

        # Tag filter.
        tag_filter = {
            "connection_id": connection_id
        }

        # Fetch existing connection records.
        existing_connection_records = await ExistingConnectionRecord.query(
            self.context,
            tag_filter
        )

        res = None
        if existing_connection_records:
            res = existing_connection_records[0]

        return res

    async def send_message_with_connection_invitation_and_return_route_all(
        self,
        message: AgentMessage,
        connection_id: str,
    ) -> typing.Tuple[str, str, dict]:
        """Send message with connection invitation and return route all.

        Args:
            message (AgentMessage): Agent message.
            connection_id (str): Connection id.

        Returns:
            typing.Tuple[str, str, dict]: sender_verkey, recipient_verkey, message_dict
        """
        # Fetch connection record.
        connection_record: ConnectionRecord = \
            await ConnectionRecord.retrieve_by_id(self.context, connection_id)

        # Get invitation key.
        invitation_key = connection_record.invitation_key
        # Service enpoint
        invitation = await connection_record.retrieve_invitation(self.context)
        service_endpoint = invitation.endpoint

        # Fetch wallet from context
        wallet: IndyWallet = await self.context.inject(BaseWallet)

        # Set transport return route all
        message._decorators["transport"] = TransportDecorator(
            return_route="all"
        )

        # Create a local did
        did: DIDInfo = await wallet.create_local_did()

        sender_key = did.verkey
        packed_message = await wallet.pack_message(
            message.to_json(),
            [invitation_key],
            sender_key
        )

        headers = {
            "Content-Type": "application/ssi-agent-wire"
        }

        async with aiohttp.ClientSession(headers=headers) as session:
            async with session.post(service_endpoint, data=packed_message) as response:
                if response.status == 200:
                    message_body = await response.read()

                    # Unpack message
                    unpacked = await wallet.unpack_message(message_body)
                    (message_json, sender_verkey, recipient_verkey) = unpacked

                    # Convert message to dict.
                    message_dict = json.loads(message_json)

                    return (sender_verkey, recipient_verkey, message_dict)

    async def send_message_with_connection_invitation(
        self,
        message: AgentMessage,
        connection_id: str,
    ) -> None:
        """Send message with connection invitation.

        Args:
            message (AgentMessage): Agent message.
            connection_id (str): Connection id.
        """
        # Fetch connection record.
        connection_record: ConnectionRecord = \
            await ConnectionRecord.retrieve_by_id(self.context, connection_id)

        # Get invitation key.
        invitation_key = connection_record.invitation_key
        # Service enpoint
        invitation = await connection_record.retrieve_invitation(self.context)
        service_endpoint = invitation.endpoint

        # Fetch wallet from context
        wallet: IndyWallet = await self.context.inject(BaseWallet)

        # Create a local did
        did: DIDInfo = await wallet.create_local_did()

        sender_key = did.verkey
        packed_message = await wallet.pack_message(
            message.to_json(),
            [invitation_key],
            sender_key
        )

        headers = {
            "Content-Type": "application/ssi-agent-wire"
        }

        async with aiohttp.ClientSession(headers=headers) as session:
            async with session.post(service_endpoint, data=packed_message) as response:
                if response.status == 200:
                    self._logger.info("Posted existing connection message...")

    async def send_existing_connections_message(
        self,
        theirdid: str,
        connection_id: str
    ):
        """Send existing connections notification message.

        Args:
            theirdid (str): Their DID of remote agent in old connection.
            connection_id (str): Connection identifier.
        """

        # Construct existing connection message.
        message = ExistingConnectionsMessage(
            body=ExistingConnectionsBody(
                theirdid=theirdid
            )
        )

        # Send the message to remote agent.
        await self.send_message_with_connection_invitation(
            message,
            connection_id
        )

    async def query_connections_and_categorise_results(
        self,
        tag_filter: dict = None,
        post_filter_positive: dict = None,
        page: int = 1,
        page_size: int = 10,
        org_flag: bool = False,
        marketplace_flag: bool = False,
    ) -> PaginationResult:

        # Query the connection records.
        records = await ConnectionRecord.query(
            self.context,
            tag_filter,
            post_filter_positive
        )

        # Sort the connection records.
        records = sorted(
            records,
            key=lambda k: k.created_at,
            reverse=True
        )

        res = []
        for record in records:
            tag_filter = {"connection_id": record.connection_id}

            # Fetch controller details attached to the connection.
            controller_details: typing.List[ConnectionControllerDetailsRecord] = \
                await ConnectionControllerDetailsRecord.query(
                self.context,
                tag_filter
            )

            # Fetch marketplace connection record.
            marketplace_connections: typing.List[MarketplaceConnectionRecord] = \
                await MarketplaceConnectionRecord.query(
                self.context,
                tag_filter
            )

            connection = record.serialize()

            # Update controller details to the connection dict.
            if controller_details:
                connection.update({
                    "org_flag": True,
                    "controller_details": controller_details[0].controller_details
                })
            else:
                connection.update({
                    "controller_details": {},
                    "org_flag": False
                })

            if marketplace_connections:
                connection.update({
                    "marketplace_flag": True
                })
            else:
                connection.update({"marketplace_flag": False})

            # Apply category filter on connections.
            categorise_filter = {
                "org_flag": org_flag,
                "marketplace_flag": marketplace_flag
            }

            categorise_filter = drop_none_dict(categorise_filter)

            if match_post_filter(connection, categorise_filter, True):
                res.append(connection)

        pagination_result = paginate(res, page if page else 1, page_size if page_size else 10)

        return pagination_result

    async def add_task(self,
                       context: InjectionContext,
                       coro: typing.Coroutine,
                       task_complete: typing.Callable = None,
                       ident: str = None) -> PendingTask:
        """
        Add a new task to the queue, delaying execution if busy.

        Args:
            context: Injection context to be used.
            coro: The coroutine to run
            task_complete: A callback to run on completion
            ident: A string identifier for the task

        Returns: a future resolving to the asyncio task instance once queued
        """
        loop = asyncio.get_event_loop()
        pack_format: PackWireFormat = await context.inject(BaseWireFormat, required=False)
        return pack_format.task_queue.put(coro, lambda x: loop.create_task(task_complete(x)), ident)

    async def process_da_negotiation_receipt_message(
        self,
        message: DataAgreementNegotiationReceiptMessage,
        message_receipt: MessageReceipt
    ):
        """Process DA negotiation receipt message.

        Args:
            message (DataAgreementNegotiationReceiptMessage): DA negotiation receipt message.
            message_receipt (MessageReceipt): Message receipt.
        """

        instance_id = message.body.instance_id
        blockchain_receipt = message.body.blockchain_receipt
        blink = message.body.blink
        mydata_did = message.body.mydata_did

        # Fetch the DDA instance record.
        tag_filter = {
            "instance_id": instance_id
        }
        instance_record: DataAgreementInstanceRecord = \
            await DataAgreementInstanceRecord.retrieve_by_tag_filter(
                self.context,
                tag_filter
            )

        # Update instance record.
        instance_record.blockchain_receipt = blockchain_receipt
        instance_record.blink = blink
        instance_record.mydata_did = mydata_did

        await instance_record.save(self.context)

Instance variables

var context : aries_cloudagent.config.injection_context.InjectionContext

Accessor for injection context

Returns

InjectionContext
Injection context
Expand source code
@ property
def context(self) -> InjectionContext:
    """Accessor for injection context

    Returns:
        InjectionContext: Injection context
    """
    return self._context

Methods

async def add_task(self, context: aries_cloudagent.config.injection_context.InjectionContext, coro: Coroutine[+T_co, -T_contra, +V_co], task_complete: Callable = None, ident: str = None) ‑> aries_cloudagent.utils.task_queue.PendingTask

Add a new task to the queue, delaying execution if busy.

Args

context
Injection context to be used.
coro
The coroutine to run
task_complete
A callback to run on completion
ident
A string identifier for the task

Returns: a future resolving to the asyncio task instance once queued

Expand source code
async def add_task(self,
                   context: InjectionContext,
                   coro: typing.Coroutine,
                   task_complete: typing.Callable = None,
                   ident: str = None) -> PendingTask:
    """
    Add a new task to the queue, delaying execution if busy.

    Args:
        context: Injection context to be used.
        coro: The coroutine to run
        task_complete: A callback to run on completion
        ident: A string identifier for the task

    Returns: a future resolving to the asyncio task instance once queued
    """
    loop = asyncio.get_event_loop()
    pack_format: PackWireFormat = await context.inject(BaseWireFormat, required=False)
    return pack_format.task_queue.put(coro, lambda x: loop.create_task(task_complete(x)), ident)
async def anchor_da_instance_to_blockchain(self, instance_id: str) ‑> None

Anchor da instance to blockchain.

Args

instance_id : str
Instance id
Expand source code
async def anchor_da_instance_to_blockchain(
    self,
    instance_id: str
) -> None:
    """Anchor da instance to blockchain.

    Args:
        instance_id (str): Instance id
    """

    eth_client: EthereumClient = await self.context.inject(EthereumClient)

    tag_filter = {
        "instance_id": instance_id
    }

    # Fetch data agreement instance record.
    da_instance_records = await DataAgreementInstanceRecord.query(
        self.context,
        tag_filter,
    )

    assert da_instance_records, "Data agreement instance not found."

    da_instance_record: DataAgreementInstanceRecord = da_instance_records[0]
    da_model: DataAgreementInstanceModel = \
        DataAgreementInstanceModel.deserialize(da_instance_record.data_agreement)

    did_mydata_builder = DIDMyDataBuilder(
        artefact=da_model
    )

    (tx_hash, tx_receipt) = await eth_client.emit_da_did(did_mydata_builder.mydata_did)

    return (da_instance_record.instance_id, did_mydata_builder.mydata_did, tx_hash, tx_receipt)
async def anchor_da_instance_to_blockchain_async_task(self, instance_id: str)

Async task to anchor da instance to blockchain.

Args

instance_id : str
Instance id
Expand source code
async def anchor_da_instance_to_blockchain_async_task(
    self,
    instance_id: str
):
    """Async task to anchor da instance to blockchain.

    Args:
        instance_id (str): Instance id
    """
    pending_task = await self.add_task(
        self.context,
        self.anchor_da_instance_to_blockchain(instance_id),
        self.anchor_da_instance_to_blockchain_async_task_callback
    )
    self._logger.info(pending_task)
async def anchor_da_instance_to_blockchain_async_task_callback(self, *args, **kwargs)

Anchor DA instance to blockchain async task callback function

Expand source code
async def anchor_da_instance_to_blockchain_async_task_callback(
    self, *args, **kwargs
):
    """Anchor DA instance to blockchain async task callback function
    """

    # Obtain the completed task.
    completed_task: CompletedTask = args[0]

    # Obtain the results from the task.
    (instance_id, mydata_did, tx_hash, tx_receipt) = completed_task.task.result()

    tag_filter = {
        "instance_id": instance_id
    }

    # Fetch data agreement instance record.
    da_instance_records = await DataAgreementInstanceRecord.query(
        self.context,
        tag_filter,
    )

    assert da_instance_records, "Data agreement instance not found."

    da_instance_record: DataAgreementInstanceRecord = da_instance_records[0]

    transaction_receipt = json.loads(to_json(tx_receipt))
    transaction_hash = transaction_receipt.get("transactionHash")

    # Update the data agreement with blockchain metadata.
    da_instance_record.blink = f"blink:ethereum:rinkeby:{transaction_hash}"
    da_instance_record.mydata_did = mydata_did
    da_instance_record.blockchain_receipt = transaction_receipt

    await da_instance_record.save(self.context)

    # Send receipt.
    message = DataAgreementNegotiationReceiptMessage(
        body=DataAgreementNegotiationReceiptBody(
            instance_id=da_instance_record.instance_id,
            blockchain_receipt=transaction_receipt,
            blink=f"blink:ethereum:rinkeby:{transaction_hash}",
            mydata_did=mydata_did
        )
    )

    # Find the connection record.
    data_subject_did = da_instance_record.data_subject_did.replace("did:sov:", "")
    connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_did(
        self.context,
        their_did=data_subject_did,
    )

    self.send_reply_message(
        message,
        connection_record.connection_id
    )
async def build_data_agreement_accept_for_data_ex_record(self, connection_record: aries_cloudagent.connections.models.connection_record.ConnectionRecord, data_ex_record: Union[aries_cloudagent.protocols.issue_credential.v1_0.models.credential_exchange.V10CredentialExchange, aries_cloudagent.protocols.present_proof.v1_0.models.presentation_exchange.V10PresentationExchange]) ‑> mydata_did.v1_0.messages.data_agreement_accept.DataAgreementNegotiationAcceptMessage

Build data agreement accept message for credential exchange.

Args

context : InjectionContext
Injection context to be used.
template_id : str
Data agreement template identifier.
connection_record : ConnectionRecord
Connection record.

data_ex_record (typing.Union[V10CredentialExchange, V10PresentationExchange]): Data exchange record.

Returns

DataAgreementNegotiationAcceptMessage
Accept message.
Expand source code
async def build_data_agreement_accept_for_data_ex_record(
    self,
    connection_record: ConnectionRecord,
    data_ex_record: typing.Union[V10CredentialExchange, V10PresentationExchange],
) -> DataAgreementNegotiationAcceptMessage:
    """Build data agreement accept message for credential exchange.

    Args:
        context (InjectionContext): Injection context to be used.
        template_id (str): Data agreement template identifier.
        connection_record (ConnectionRecord): Connection record.
        data_ex_record (typing.Union[V10CredentialExchange, V10PresentationExchange]):
            Data exchange record.

    Returns:
        DataAgreementNegotiationAcceptMessage: Accept message.
    """
    if data_ex_record.__class__.__name__ == V10CredentialExchange.__name__:
        # Fetch data agreement instance matching credential exchange record.
        instance_record = await DataAgreementInstanceRecord.fetch_by_data_ex_id(
            self.context,
            data_ex_record.credential_exchange_id
        )
    else:
        # Fetch data agreement instance matching credential exchange record.
        instance_record = await DataAgreementInstanceRecord.fetch_by_data_ex_id(
            self.context,
            data_ex_record.presentation_exchange_id
        )

    # Build instance record
    (da_instance_record, da_instance_model) = \
        await DataAgreementInstanceRecord.counter_sign_instance(
        self.context,
        instance_record.instance_id,
        connection_record
    )

    # Build negotiation accept agent message
    agent_message = DataAgreementNegotiationAcceptMessage(body=da_instance_model)

    return agent_message
async def build_data_agreement_negotiation_accept_by_instance_id(self, instance_id: str, connection_record: aries_cloudagent.connections.models.connection_record.ConnectionRecord) ‑> mydata_did.v1_0.messages.data_agreement_accept.DataAgreementNegotiationAcceptMessage
Expand source code
async def build_data_agreement_negotiation_accept_by_instance_id(
    self,
    instance_id: str,
    connection_record: ConnectionRecord
) -> DataAgreementNegotiationAcceptMessage:
    # Counter sign da
    (da_instance_record, da_instance_model) = \
        await DataAgreementInstanceRecord.counter_sign_instance(
            self.context,
            instance_id,
            connection_record,
    )

    # Build negotiation accept agent message
    agent_message = DataAgreementNegotiationAcceptMessage(body=da_instance_model)

    return agent_message
async def build_data_agreement_offer_for_credential_exchange(self, template_id: str, connection_record: aries_cloudagent.connections.models.connection_record.ConnectionRecord, cred_ex_record: aries_cloudagent.protocols.issue_credential.v1_0.models.credential_exchange.V10CredentialExchange) ‑> mydata_did.v1_0.messages.data_agreement_offer.DataAgreementNegotiationOfferMessage

Build data agreement offer for credential exchange.

Args

context : InjectionContext
Injection context to be used.
template_id : str
Data agreement template identifier.
connection_record : ConnectionRecord
Connection record.
cred_ex_record : V10CredentialExchange
Credential exchange record.

Returns

DataAgreementNegotiationOfferMessage
Offer message.
Expand source code
async def build_data_agreement_offer_for_credential_exchange(
    self,
    template_id: str,
    connection_record: ConnectionRecord,
    cred_ex_record: V10CredentialExchange,
) -> DataAgreementNegotiationOfferMessage:
    """Build data agreement offer for credential exchange.

    Args:
        context (InjectionContext): Injection context to be used.
        template_id (str): Data agreement template identifier.
        connection_record (ConnectionRecord): Connection record.
        cred_ex_record (V10CredentialExchange): Credential exchange record.

    Returns:
        DataAgreementNegotiationOfferMessage: Offer message.
    """

    # Build instance record
    (da_instance_record, da_instance_model) = \
        await DataAgreementInstanceRecord.build_instance_from_template(
        self.context,
        template_id,
        connection_record,
        cred_ex_record.credential_exchange_id
    )

    # Build negotiation offer agent message
    agent_message = DataAgreementNegotiationOfferMessage(body=da_instance_model)

    return agent_message
async def build_data_agreement_offer_for_presentation_exchange(self, template_id: str, connection_record: aries_cloudagent.connections.models.connection_record.ConnectionRecord, pres_ex_record: aries_cloudagent.protocols.present_proof.v1_0.models.presentation_exchange.V10PresentationExchange) ‑> mydata_did.v1_0.messages.data_agreement_offer.DataAgreementNegotiationOfferMessage

Build data agreement offer for presentaton exchange.

Args

context : InjectionContext
Injection context to be used.
template_id : str
Data agreement template identifier.
connection_record : ConnectionRecord
Connection record.
pres_ex_record : V10PresentationExchange
Presentation exchange record.

Returns

DataAgreementNegotiationOfferMessage
Offer message.
Expand source code
async def build_data_agreement_offer_for_presentation_exchange(
    self,
    template_id: str,
    connection_record: ConnectionRecord,
    pres_ex_record: V10PresentationExchange,
) -> DataAgreementNegotiationOfferMessage:
    """Build data agreement offer for presentaton exchange.

    Args:
        context (InjectionContext): Injection context to be used.
        template_id (str): Data agreement template identifier.
        connection_record (ConnectionRecord): Connection record.
        pres_ex_record (V10PresentationExchange): Presentation exchange record.

    Returns:
        DataAgreementNegotiationOfferMessage: Offer message.
    """

    # Build instance record
    (da_instance_record, da_instance_model) = \
        await DataAgreementInstanceRecord.build_instance_from_template(
        self.context,
        template_id,
        connection_record,
        pres_ex_record.presentation_exchange_id
    )

    # Build negotiation offer agent message
    agent_message = DataAgreementNegotiationOfferMessage(body=da_instance_model)

    return agent_message
def construct_presentation_request(self, *, usage_purpose: str, usage_purpose_description: str, da_template_version: str, personal_data: List[PersonalDataRecord]) ‑> dict

Construct presentation request

Args

usage_purpose
Usage purpose.
usage_purpose_description
Usage purpose description.
da_template_version
Data agreement template version.
personal_data
List of personal data.

Returns

:rtype: dict: Proof request

Expand source code
def construct_presentation_request(
        self,
        *,
        usage_purpose: str,
        usage_purpose_description: str,
        da_template_version: str,
        personal_data: typing.List[PersonalDataRecord]
) -> dict:
    """
    Construct presentation request

    Args:
        usage_purpose: Usage purpose.
        usage_purpose_description: Usage purpose description.
        da_template_version: Data agreement template version.
        personal_data: List of personal data.

    Returns:
        :rtype: dict: Proof request

    """

    presentation_request_dict: dict = {
        "name": usage_purpose,
        "comment": usage_purpose_description,
        "version": da_template_version,
        "requested_attributes": {},
        "requested_predicates": {}
    }

    index = 1
    requested_attributes = {}

    for pd in personal_data:

        requested_attributes["additionalProp" + str(index)] = {
            "name": pd.attribute_name,
            "restrictions": pd.restrictions if pd.restrictions else []
        }
        if pd.restrictions:
            restrictions = [
                {
                    "schema_id": restriction.get("schemaId"),
                    "cred_def_id": restriction.get("credDefId")
                }
                for restriction in pd.restrictions
            ]
            requested_attributes["additionalProp" +
                                 str(index)].update({"restrictions": restrictions})
        else:
            requested_attributes["additionalProp" + str(index)].update({})
        index += 1

    presentation_request_dict["requested_attributes"] = requested_attributes

    return presentation_request_dict
async def create_and_store_da_template_in_wallet(self, data_agreement: dict, *, publish_flag: bool = True, schema_id: str = None) ‑> DataAgreementTemplateRecord

Create and store data agreement template in wallet

Args

data_agreement : dict
Data agreement
publish_flag : bool
Publish flag
schema_id : str
Schema identifier
Expand source code
async def create_and_store_da_template_in_wallet(
        self,
        data_agreement: dict,
        *,
        publish_flag: bool = True,
        schema_id: str = None
) -> DataAgreementTemplateRecord:
    """Create and store data agreement template in wallet

    Args:
        data_agreement (dict): Data agreement
        publish_flag (bool): Publish flag
        schema_id (str): Schema identifier
    """

    # Temp hack
    template_version = "1.0.0"
    template_id = str(uuid.uuid4())
    data_agreement.update({"@context": DA_DEFAULT_CONTEXT})
    data_agreement.update({"@id": template_id})
    data_agreement.update({"@type": DA_TYPE})
    data_agreement.update({"version": template_version})

    try:
        # Validate the data agreement.
        data_agreement: DataAgreementModel = DataAgreementModel.deserialize(data_agreement)
    except ValidationError as err:
        raise V2ADAManagerError(
            f"Failed to create data agreement; Reason: {err}"
        )

    # Create personal data records
    pds = data_agreement.personal_data
    pd_records = []
    pd_models_with_id = []
    for pd in pds:
        pd_record: PersonalDataRecord = \
            await PersonalDataRecord.build_and_save_record_from_pd_model(
                self.context,
                template_id,
                template_version,
                pd
            )
        pd_records.append(pd_record)
        pd_models_with_id.append(pd_record.convert_record_to_pd_model())

    # Update the personal data with attribute identifiers to the agreement
    data_agreement.personal_data = pd_models_with_id

    # Create template record
    record = DataAgreementTemplateRecord(
        template_id=template_id,
        template_version=template_version,
        state=DataAgreementTemplateRecord.STATE_DEFINITION,
        method_of_use=data_agreement.method_of_use,
        data_agreement=data_agreement.serialize(),
        publish_flag=bool_to_str(publish_flag),
        schema_id=schema_id,
        existing_schema_flag=bool_to_str(True) if schema_id else bool_to_str(False),
        third_party_data_sharing=bool_to_str(
            data_agreement.data_policy.third_party_data_sharing)
    )

    await record.save(self.context)

    if publish_flag:
        # Create ledger payloads
        record = await self.create_and_store_ledger_payloads_for_da_template(
            template_record=record,
            pd_records=pd_records,
            schema_id=schema_id
        )

    return record
async def create_and_store_ledger_payloads_for_da_template(self, *, template_record: DataAgreementTemplateRecord, pd_records: List[PersonalDataRecord] = None, schema_id: str = None) ‑> DataAgreementTemplateRecord

Create and store ledger payloads for a da template

Args

template_record : DataAgreementTemplateRecord
Data agreement template record
pd_records : typing.List[PersonalDataRecord]
Personal data records
schema_id : str
Schema identifier if available

Returns

DataAgreementTemplateRecord
Record with ledger payloads
Expand source code
async def create_and_store_ledger_payloads_for_da_template(
        self,
        *,
        template_record: DataAgreementTemplateRecord,
        pd_records: typing.List[PersonalDataRecord] = None,
        schema_id: str = None
) -> DataAgreementTemplateRecord:
    """Create and store ledger payloads for a da template

    Args:
        template_record (DataAgreementTemplateRecord): Data agreement template record
        pd_records (typing.List[PersonalDataRecord]): Personal data records
        schema_id (str): Schema identifier if available

    Returns:
        DataAgreementTemplateRecord: Record with ledger payloads
    """
    if template_record.method_of_use == DataAgreementTemplateRecord.METHOD_OF_USE_DATA_SOURCE:

        # Create schema if not existing
        if not schema_id:
            data_agreement = template_record.data_agreement
            # Schema name
            schema_name = data_agreement.get("purpose")
            # Schema version
            schema_version = data_agreement.get("version")
            # Schema attributes
            attributes = [
                personal_data.attribute_name
                for personal_data in pd_records
            ]
            # Creata schema and anchor to ledger
            (schema_id, schema_def) = await create_schema_def_and_anchor_to_ledger(
                context=self.context,
                schema_name=schema_name,
                schema_version=schema_version,
                attributes=attributes
            )

        # Create credential definition and anchor to ledger

        (cred_def_id, cred_def, novel) = await create_cred_def_and_anchor_to_ledger(
            context=self.context,
            schema_id=schema_id
        )

        template_record.cred_def_id = cred_def_id
        template_record.schema_id = schema_id
        await template_record.save(self.context)

    else:
        data_agreement = template_record.data_agreement

        # Usage purpose
        usage_purpose = data_agreement.get("purpose")

        # Usage purpose description
        usage_purpose_description = data_agreement.get("purposeDescription")

        # Data agreement template version
        da_template_version = data_agreement.get("version")

        # Create presentation request
        presentation_request = self.construct_presentation_request(
            usage_purpose=usage_purpose,
            usage_purpose_description=usage_purpose_description,
            da_template_version=da_template_version,
            personal_data=pd_records
        )

        template_record.presentation_request = presentation_request
        await template_record.save(self.context)

    return template_record
async def create_connection_qr_code(self, connection_id: str) ‑> dict

Create connection QR code.

Args

connection_id : str
Connection identifier.

Returns

dict
Dict with dynamic link.
Expand source code
async def create_connection_qr_code(
    self,
    connection_id: str
) -> dict:
    """Create connection QR code.

    Args:
        connection_id (str): Connection identifier.

    Returns:
        dict: Dict with dynamic link.
    """

    # Connection record.
    connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_id(
        self.context,
        connection_id
    )

    # Connection invitation
    connection_invitation: ConnectionInvitation = await connection_record.retrieve_invitation(
        self.context
    )

    # Generate firebase dynamic link.
    payload = connection_invitation.to_url()
    firebase_dynamic_link = await generate_firebase_dynamic_link(self.context, payload)

    res = {"dynamic_link": firebase_dynamic_link}

    return res
async def create_data_agreement_qr_code(self, template_id: str, multi_use_flag: bool) ‑> dict

Create data agreement qr code

Args

template_id : str
Template identifier
multi_use_flag : bool
Multi use flag

Returns

dict
Qr code.
Expand source code
async def create_data_agreement_qr_code(
    self,
    template_id: str,
    multi_use_flag: bool
) -> dict:
    """Create data agreement qr code

    Args:
        template_id (str): Template identifier
        multi_use_flag (bool): Multi use flag

    Returns:
        dict: Qr code.
    """

    qr_record = DataAgreementQRCodeRecord(
        template_id=template_id,
        multi_use_flag=bool_to_str(multi_use_flag)
    )
    await qr_record.save(self.context)

    (connection, invitation) = await self.create_invitation(
        auto_accept=True,
        public=False,
        multi_use=multi_use_flag,
        alias=f"DA_{template_id}_QR_{qr_record._id}"
    )

    qr_record.connection_id = connection.connection_id
    await qr_record.save(self.context)

    res = {
        "qr_id": qr_record._id,
        "invitation": invitation.serialize()
    }

    res_base64 = base64.b64encode(json.dumps(res).encode()).decode()
    payload = self.context.settings.get("default_endpoint") + "?qt=2&qp=" + res_base64

    firebase_dynamic_link = await generate_firebase_dynamic_link(self.context, payload)
    qr_record.dynamic_link = firebase_dynamic_link
    await qr_record.save(self.context)

    res.update({"dynamic_link": firebase_dynamic_link})

    return res
async def create_invitation(self, my_label: str = None, my_endpoint: str = None, their_role: str = None, auto_accept: bool = None, public: bool = False, multi_use: bool = False, alias: str = None) ‑> Tuple[aries_cloudagent.connections.models.connection_record.ConnectionRecord, aries_cloudagent.protocols.connections.v1_0.messages.connection_invitation.ConnectionInvitation]

Generate new connection invitation.

Expand source code
async def create_invitation(
    self,
    my_label: str = None,
    my_endpoint: str = None,
    their_role: str = None,
    auto_accept: bool = None,
    public: bool = False,
    multi_use: bool = False,
    alias: str = None,
) -> typing.Tuple[ConnectionRecord, ConnectionInvitation]:
    """Generate new connection invitation."""

    if not my_label:
        my_label = self.context.settings.get("default_label")

    image_url = None

    # Fetch organisation details from intermediary.
    org_details = await fetch_org_details_from_intermediary(
        self.context
    )

    my_label = org_details["Name"]
    image_url = org_details["LogoImageURL"] + "/web"

    wallet: BaseWallet = await self.context.inject(BaseWallet)

    if public:
        if not self.context.settings.get("public_invites"):
            raise ConnectionManagerError(
                "Public invitations are not enabled")

        public_did = await wallet.get_public_did()
        if not public_did:
            raise ConnectionManagerError(
                "Cannot create public invitation with no public DID"
            )

        if multi_use:
            raise ConnectionManagerError(
                "Cannot use public and multi_use at the same time"
            )

        # FIXME - allow ledger instance to format public DID with prefix?
        invitation = ConnectionInvitation(
            label=my_label, did=f"did:sov:{public_did.did}", image_url=image_url
        )
        return None, invitation

    invitation_mode = ConnectionRecord.INVITATION_MODE_ONCE
    if multi_use:
        invitation_mode = ConnectionRecord.INVITATION_MODE_MULTI

    if not my_endpoint:
        my_endpoint = self.context.settings.get("default_endpoint")
    accept = (
        ConnectionRecord.ACCEPT_AUTO
        if (
            auto_accept
            or (
                auto_accept is None
                and self.context.settings.get("debug.auto_accept_requests")
            )
        )
        else ConnectionRecord.ACCEPT_MANUAL
    )

    # Create and store new invitation key
    connection_key = await wallet.create_signing_key()

    # Create connection record
    connection = ConnectionRecord(
        initiator=ConnectionRecord.INITIATOR_SELF,
        invitation_key=connection_key.verkey,
        their_role=their_role,
        state=ConnectionRecord.STATE_INVITATION,
        accept=accept,
        invitation_mode=invitation_mode,
        alias=alias,
    )

    await connection.save(self.context, reason="Created new invitation")

    # Create connection invitation message
    # Note: Need to split this into two stages to support inbound routing of invites
    # Would want to reuse create_did_document and convert the result
    invitation = ConnectionInvitation(
        label=my_label, recipient_keys=[
            connection_key.verkey], endpoint=my_endpoint, image_url=image_url
    )
    await connection.attach_invitation(self.context, invitation)

    return connection, invitation
async def delete_da_instance_by_data_ex_id(self, cred_ex_id: str) ‑> None

Delete da instance by cred ex id.

Args

cred_ex_id : str
Credential exchange identifier.
Expand source code
async def delete_da_instance_by_data_ex_id(
    self,
    cred_ex_id: str
) -> None:
    """Delete da instance by cred ex id.

    Args:
        cred_ex_id (str): Credential exchange identifier.
    """

    # Data agreement instance
    instance = await DataAgreementInstanceRecord.fetch_by_data_ex_id(
        self.context,
        cred_ex_id
    )

    await instance.delete_record(self.context)
async def delete_da_template_in_wallet(self, template_id: str) ‑> str

Deactivate DA template in wallet.

This is not a normal delete operation of a specific version of template. Instead it marks the template with latest version flag as deleted i.e. Any version under this template is no longer active.

Args

template_id : str
Template identifier
template_version : str
Template version

Returns

record_id
Record identifier for the deleted template.
Expand source code
async def delete_da_template_in_wallet(self, template_id: str) -> str:
    """Deactivate DA template in wallet.

    This is not a normal delete operation of a specific version of template. Instead it
    marks the template with latest version flag as deleted i.e. Any version under this
    template is no longer active.

    Args:
        template_id (str): Template identifier
        template_version (str): Template version

    Returns:
        record_id: Record identifier for the deleted template.
    """
    # Query for the data agreement by id
    data_agreement_records: DataAgreementTemplateRecord = \
        await DataAgreementTemplateRecord.non_deleted_template_by_id(
            self.context,
            template_id
        )

    assert data_agreement_records, "Data agreement template not found."
    data_agreement_record = data_agreement_records[0]

    # Mark the data agreement as deleted and save.
    return await data_agreement_record.delete_template(self.context)
async def delete_data_agreement_qr_code(self, template_id: str, qr_id: str) ‑> None

Delete data agreement qr code.

Expand source code
async def delete_data_agreement_qr_code(
    self,
    template_id: str,
    qr_id: str
) -> None:
    """Delete data agreement qr code."""
    record = await DataAgreementQRCodeRecord.retrieve_by_id(self.context, qr_id)
    assert record.template_id == template_id, "Data agreement not found."
    await record.delete_record(self.context)
async def delete_personal_data(self, attribute_id: str) ‑> None

Delete personal data record.

On deleting personal data record, the associated data agreement template is updated. If the personal data record deleted, is the last one in the template, proceed to delete the template record.

Args

attribute_id : str
description
Expand source code
async def delete_personal_data(self, attribute_id: str) -> None:
    """Delete personal data record.

    On deleting personal data record, the associated data agreement template is
    updated. If the personal data record deleted, is the last one in the template,
    proceed to delete the template record.

    Args:
        attribute_id (str): _description_
    """

    # Fetch personal data record by id
    pd_record: PersonalDataRecord = await PersonalDataRecord.retrieve_by_id(
        self.context,
        attribute_id
    )

    # Fetch the associated data agreement record
    da_template_record: DataAgreementTemplateRecord = \
        await DataAgreementTemplateRecord.latest_template_by_id(
            self.context,
            pd_record.data_agreement_template_id
        )

    assert da_template_record, "Matching data agreement template not found."
    assert da_template_record.template_version == \
        pd_record.data_agreement_template_version, \
        "Matching data agreement template with same version not found."

    da: DataAgreementModel = DataAgreementModel.deserialize(da_template_record.data_agreement)

    # Iterate through the existing personal data in data agreements
    # And remove the deleted personal data.
    da_pds = []
    for da_pd in da.personal_data:
        if da_pd.attribute_id != pd_record.attribute_id:
            da_pd.attribute_id = None
            da_pds.append(da_pd)

    da.personal_data = da_pds

    if len(da_pds) == 0:
        await da_template_record.delete_template(self.context)
    else:
        # Update template record with new agreement.
        await self.update_and_store_da_template_in_wallet(
            pd_record.data_agreement_template_id,
            da.serialize(),
            publish_flag=str_to_bool(da_template_record.publish_flag)
        )
async def get_existing_connection_record_for_new_connection_id(self, connection_id: str) ‑> ExistingConnectionRecord

Get existing connection record for new connection id.

Args

connection_id : str
Connection id.

Returns

ExistingConnectionRecord
Existing connection record.
Expand source code
async def get_existing_connection_record_for_new_connection_id(
    self,
    connection_id: str
) -> ExistingConnectionRecord:
    """Get existing connection record for new connection id.

    Args:
        connection_id (str): Connection id.

    Returns:
        ExistingConnectionRecord: Existing connection record.
    """

    # Tag filter.
    tag_filter = {
        "connection_id": connection_id
    }

    # Fetch existing connection records.
    existing_connection_records = await ExistingConnectionRecord.query(
        self.context,
        tag_filter
    )

    res = None
    if existing_connection_records:
        res = existing_connection_records[0]

    return res
async def process_da_negotiation_receipt_message(self, message: mydata_did.v1_0.messages.da_negotiation_receipt.DataAgreementNegotiationReceiptMessage, message_receipt: aries_cloudagent.transport.inbound.receipt.MessageReceipt)

Process DA negotiation receipt message.

Args

message : DataAgreementNegotiationReceiptMessage
DA negotiation receipt message.
message_receipt : MessageReceipt
Message receipt.
Expand source code
async def process_da_negotiation_receipt_message(
    self,
    message: DataAgreementNegotiationReceiptMessage,
    message_receipt: MessageReceipt
):
    """Process DA negotiation receipt message.

    Args:
        message (DataAgreementNegotiationReceiptMessage): DA negotiation receipt message.
        message_receipt (MessageReceipt): Message receipt.
    """

    instance_id = message.body.instance_id
    blockchain_receipt = message.body.blockchain_receipt
    blink = message.body.blink
    mydata_did = message.body.mydata_did

    # Fetch the DDA instance record.
    tag_filter = {
        "instance_id": instance_id
    }
    instance_record: DataAgreementInstanceRecord = \
        await DataAgreementInstanceRecord.retrieve_by_tag_filter(
            self.context,
            tag_filter
        )

    # Update instance record.
    instance_record.blockchain_receipt = blockchain_receipt
    instance_record.blink = blink
    instance_record.mydata_did = mydata_did

    await instance_record.save(self.context)
async def process_data_agreement_qr_code_initiate_message(self, message: mydata_did.v1_0.messages.data_agreement_qr_code_initiate.DataAgreementQrCodeInitiateMessage, receipt: aries_cloudagent.transport.inbound.receipt.MessageReceipt)

Process data QR code initiate message.

Args

message : DataAgreementQrCodeInitiateMessage
Data agreement QR code initiate message.
receipt : MessageReceipt
Message receipt.
Expand source code
async def process_data_agreement_qr_code_initiate_message(
    self,
    message: DataAgreementQrCodeInitiateMessage,
    receipt: MessageReceipt
):
    """Process data QR code initiate message.

    Args:
        message (DataAgreementQrCodeInitiateMessage): Data agreement QR code initiate message.
        receipt (MessageReceipt): Message receipt.
    """
    qr_id = message.body.qr_id
    connection_id = self.context.connection_record.connection_id

    connection_record = await ConnectionRecord.retrieve_by_id(self.context, connection_id)

    # Fetch the qr code record.
    record: DataAgreementQRCodeRecord = \
        await DataAgreementQRCodeRecord.retrieve_by_id(
            self.context,
            qr_id
        )

    if record._multi_use_flag:
        record._scanned_flag = True
        await record.save(self.context)
    else:
        if record._scanned_flag:
            explain = "Qr code cannot be scanned twice"
            await self.send_problem_report_message(explain, connection_id)
            raise Exception(explain)

    # Fetch data agreement template record.
    template_record: DataAgreementTemplateRecord = \
        await DataAgreementTemplateRecord.latest_template_by_id(
            self.context,
            record.template_id
        )

    # Construct presentation request
    preset_presentation_request = template_record.presentation_request
    comment = preset_presentation_request.pop("comment")
    if not preset_presentation_request.get("nonce"):
        preset_presentation_request["nonce"] = await generate_pr_nonce()

    presentation_request = PresentationRequest(
        comment=comment,
        request_presentations_attach=[
            AttachDecorator.from_indy_dict(
                indy_dict=preset_presentation_request,
                ident=ATTACH_DECO_IDS[PRESENTATION_REQUEST],
            )
        ],
    )

    # Construct presentation exchange record
    presentation_manager = PresentationManager(self.context)
    (pres_ex_record) = await presentation_manager.create_exchange_for_request(
        connection_id=self.context.connection_record.connection_id,
        presentation_request_message=presentation_request,
    )

    # Update qr code with record id.
    record.data_ex_id = pres_ex_record.presentation_exchange_id
    await record.save(self.context)

    offer_message = await self.build_data_agreement_offer_for_presentation_exchange(
        template_record.template_id,
        connection_record,
        pres_ex_record
    )

    # Add data agreement context decorator
    presentation_request._decorators["data-agreement-context"] = DataAgreementContextDecorator(
        message_type="protocol",
        message=offer_message.serialize()
    )

    pres_ex_record.presentation_request_dict = presentation_request.serialize()
    pres_ex_record.template_id = template_record.template_id
    await pres_ex_record.save(self.context)

    await self.send_reply_message(presentation_request, connection_id)
async def process_data_controller_details_message(self, message: mydata_did.v1_0.messages.data_controller_details.DataControllerDetailsMessage, receipt: aries_cloudagent.transport.inbound.receipt.MessageReceipt)

Process data controller details message.

Args

message : DataControllerDetailsMessage
Data controller details message.
receipt : MessageReceipt
Message receipt.
Expand source code
async def process_data_controller_details_message(
    self,
    message: DataControllerDetailsMessage,
    receipt: MessageReceipt
):
    """Process data controller details message.

    Args:
        message (DataControllerDetailsMessage): Data controller details message.
        receipt (MessageReceipt): Message receipt.
    """

    # Query controller records.
    records = await ControllerDetailsRecord.query(self.context, {})

    connection_id = self.context.connection_record.connection_id

    if not records:
        wallet: BaseWallet = await self.context.inject(BaseWallet)
        controller_did = await wallet.get_public_did()

        cache: BaseCache = await self.context.inject(BaseCache, required=False)
        cache_key = f"did:sov:{controller_did.did}"

        assert cache, "Cache not available."

        controller_details = None
        async with cache.acquire(cache_key) as entry:
            if entry.result:
                cached = entry.result
                controller_details = DataController.deserialize(cached)
            else:
                org_details = await fetch_org_details_from_intermediary(self.context)

                # Organisation did
                organisation_did = f"did:sov:{controller_did.did}"

                controller_details = DataController(
                    organisation_did=organisation_did,
                    organisation_name=org_details["Name"],
                    cover_image_url=org_details["CoverImageURL"] + "/web",
                    logo_image_url=org_details["LogoImageURL"] + "/web",
                    location=org_details["Location"],
                    organisation_type=org_details["Type"]["Type"],
                    description=org_details["Description"],
                    policy_url=org_details["PolicyURL"],
                    eula_url=org_details["EulaURL"]
                )
                cache_val = controller_details.serialize()
                await entry.set_result(cache_val, 3600)

            response_message = DataControllerDetailsResponseMessage(
                body=controller_details
            )

            await self.send_reply_message(response_message, connection_id)
    else:
        # If found update record.
        record: ControllerDetailsRecord = records[0]

        controller_details = DataController(
            organisation_did=record.organisation_did,
            organisation_name=record.organisation_name,
            cover_image_url=record.cover_image_url,
            logo_image_url=record.logo_image_url,
            location=record.location,
            organisation_type=record.organisation_type,
            description=record.description,
            policy_url=record.policy_url,
            eula_url=record.eula_url
        )

        response_message = DataControllerDetailsResponseMessage(
            body=controller_details
        )

        await self.send_reply_message(response_message, connection_id)
async def process_decorator_with_da_accept_message(self, decorator_set: aries_cloudagent.messaging.decorators.default.DecoratorSet, data_ex_record: Union[aries_cloudagent.protocols.issue_credential.v1_0.models.credential_exchange.V10CredentialExchange, aries_cloudagent.protocols.present_proof.v1_0.models.presentation_exchange.V10PresentationExchange], connection_record: aries_cloudagent.connections.models.connection_record.ConnectionRecord) ‑> DataAgreementInstanceRecord

Process data agreement context decorator with DA accept message

Args

decorator_set : DecoratorSet
Decorator set
data_ex_record (typing.Union[V10CredentialExchange, V10PresentationExchange]):
Data exchange record.
connection_record : ConnectionRecord
Connection record

Returns

DataAgreementInstanceRecord
Data agreement instance record.
Expand source code
async def process_decorator_with_da_accept_message(
    self,
    decorator_set: DecoratorSet,
    data_ex_record: typing.Union[V10CredentialExchange, V10PresentationExchange],
    connection_record: ConnectionRecord
) -> DataAgreementInstanceRecord:
    """Process data agreement context decorator with DA accept message

    Args:
        decorator_set (DecoratorSet): Decorator set
        data_ex_record (typing.Union[V10CredentialExchange, V10PresentationExchange]):
            Data exchange record.
        connection_record (ConnectionRecord): Connection record

    Returns:
        DataAgreementInstanceRecord: Data agreement instance record.
    """

    # Check if data agreement context decorator is present
    if "data-agreement-context" not in decorator_set.keys():
        self._logger.info(
            "Data agreement context decorator is not present in the incoming message.")
        return None

    # Deserialize data agreement context decorator
    da_decorator_dict = decorator_set["data-agreement-context"]
    da_decorator_model: DataAgreementContextDecorator = \
        DataAgreementContextDecorator.deserialize(da_decorator_dict)

    assert da_decorator_model.message_type == "protocol", \
        "DA context message type must be 'protocol'."

    message_type = da_decorator_model.message.get("@type")
    assert DATA_AGREEMENT_NEGOTIATION_ACCEPT in message_type, \
        f"DA context protocol message type must be '{DATA_AGREEMENT_NEGOTIATION_ACCEPT}'"

    da_accept_message: DataAgreementNegotiationAcceptMessage = \
        DataAgreementNegotiationAcceptMessage.deserialize(da_decorator_model.message)

    # Build and save data agreement instance record.
    if data_ex_record.__class__.__name__ == V10CredentialExchange.__name__:
        # Build and save data agreement instance record.
        instance_record = await DataAgreementInstanceRecord.update_instance_from_da_accept(
            self.context,
            da_accept_message,
            data_ex_record.credential_exchange_id
        )
    else:
        # Build and save data agreement instance record.
        instance_record = await DataAgreementInstanceRecord.update_instance_from_da_accept(
            self.context,
            da_accept_message,
            data_ex_record.presentation_exchange_id
        )

    # Anchor da to blockchain.
    await self.anchor_da_instance_to_blockchain_async_task(instance_record.instance_id)

    return instance_record
async def process_decorator_with_da_offer_message(self, decorator_set: aries_cloudagent.messaging.decorators.default.DecoratorSet, data_ex_record: Union[aries_cloudagent.protocols.issue_credential.v1_0.models.credential_exchange.V10CredentialExchange, aries_cloudagent.protocols.present_proof.v1_0.models.presentation_exchange.V10PresentationExchange], connection_record: aries_cloudagent.connections.models.connection_record.ConnectionRecord) ‑> DataAgreementInstanceRecord

Process data agreement context decorator with DA offer message

Args

decorator_set : DecoratorSet
Decorator set
cred_ex_record : V10CredentialExchange
Credential exchange record
connection_record : ConnectionRecord
Connection record

Returns

DataAgreementInstanceRecord
Data agreement instance record.
Expand source code
async def process_decorator_with_da_offer_message(
    self,
    decorator_set: DecoratorSet,
    data_ex_record: typing.Union[V10CredentialExchange, V10PresentationExchange],
    connection_record: ConnectionRecord
) -> DataAgreementInstanceRecord:
    """Process data agreement context decorator with DA offer message

    Args:
        decorator_set (DecoratorSet): Decorator set
        cred_ex_record (V10CredentialExchange): Credential exchange record
        connection_record (ConnectionRecord): Connection record

    Returns:
        DataAgreementInstanceRecord: Data agreement instance record.
    """

    # Check if data agreement context decorator is present
    if "data-agreement-context" not in decorator_set.keys():
        self._logger.info(
            "Data agreement context decorator is not present in the incoming message.")
        return None

    # Deserialize data agreement context decorator
    da_decorator_dict = decorator_set["data-agreement-context"]
    da_decorator_model: DataAgreementContextDecorator = \
        DataAgreementContextDecorator.deserialize(da_decorator_dict)

    assert da_decorator_model.message_type == "protocol", \
        "DA context message type must be 'protocol'."

    message_type = da_decorator_model.message.get("@type")
    assert DATA_AGREEMENT_NEGOTIATION_OFFER in message_type, \
        f"DA context protocol message type must be '{DATA_AGREEMENT_NEGOTIATION_OFFER}'"

    da_offer_message: DataAgreementNegotiationOfferMessage = \
        DataAgreementNegotiationOfferMessage.deserialize(da_decorator_model.message)

    # Build and save data agreement instance record.
    if data_ex_record.__class__.__name__ == V10CredentialExchange.__name__:
        return await DataAgreementInstanceRecord.build_instance_from_da_offer(
            self.context,
            da_offer_message,
            connection_record,
            data_ex_record.credential_exchange_id
        )
    else:
        return await DataAgreementInstanceRecord.build_instance_from_da_offer(
            self.context,
            da_offer_message,
            connection_record,
            data_ex_record.presentation_exchange_id
        )
async def process_existing_connections_message(self, message: mydata_did.v1_0.messages.existing_connections.ExistingConnectionsMessage, message_receipt: aries_cloudagent.transport.inbound.receipt.MessageReceipt)

Process existing connections message.

Args

message : ExistingConnectionsMessage
Existing connections message.
message_receipt : MessageReceipt
Message receipt.
Expand source code
async def process_existing_connections_message(
    self,
    message: ExistingConnectionsMessage,
    message_receipt: MessageReceipt
):
    """Process existing connections message.

    Args:
        message (ExistingConnectionsMessage): Existing connections message.
        message_receipt (MessageReceipt): Message receipt.
    """

    # Invitation key.
    invitation_key = message_receipt.recipient_verkey

    # Fetch current connection record using invitation key
    connection_record = await ConnectionRecord.retrieve_by_invitation_key(
        self.context,
        invitation_key
    )

    # Fetch existing connections record for the current connection.
    tag_filter = {
        "connection_id": connection_record.connection_id
    }
    existing_connection_records = await ExistingConnectionRecord.query(
        self.context,
        tag_filter
    )

    if existing_connection_records:
        # Existing connection record.
        existing_connection_record: ExistingConnectionRecord = existing_connection_records[0]

        # Delete the record.
        await existing_connection_record.delete_record(self.context)

    # Fetch associated connection record.
    old_connection_record = await ConnectionRecord.retrieve_by_did(
        self.context,
        their_did=None,
        my_did=message.body.theirdid
    )

    # Create a new existing connection record.
    existing_connection_record = ExistingConnectionRecord(
        existing_connection_id=old_connection_record.connection_id,
        my_did=old_connection_record.my_did,
        connection_status="available",
        connection_id=connection_record.connection_id
    )

    await existing_connection_record.save(self.context)

    # updating the current connection invitation status to inactive
    connection_record.state = ConnectionRecord.STATE_INACTIVE
    await connection_record.save(context=self.context)
async def publish_da_template_in_wallet(self, template_id: str) ‑> DataAgreementTemplateRecord

Publish data agreement template.

Args

template_id : str
Template identifier

Returns

DataAgreementTemplateRecord
Template record.
Expand source code
async def publish_da_template_in_wallet(self,
                                        template_id: str) -> DataAgreementTemplateRecord:
    """Publish data agreement template.

    Args:
        template_id (str): Template identifier

    Returns:
        DataAgreementTemplateRecord: Template record.
    """

    tag_filter = {
        "delete_flag": bool_to_str(False),
        "publish_flag": bool_to_str(False),
        "latest_version_flag": bool_to_str(True),
        "template_id": template_id
    }

    records = await DataAgreementTemplateRecord.query(
        context=self.context,
        tag_filter=tag_filter
    )

    assert records, "Data agreement template not found."

    record: DataAgreementTemplateRecord = records[0]

    await record.publish_template(self.context)

    pd_records = await record.fetch_personal_data_records(self.context)

    # Create ledger payloads
    record = await self.create_and_store_ledger_payloads_for_da_template(
        template_record=record,
        pd_records=pd_records,
        schema_id=record.schema_id
    )

    return record
async def query_connections_and_categorise_results(self, tag_filter: dict = None, post_filter_positive: dict = None, page: int = 1, page_size: int = 10, org_flag: bool = False, marketplace_flag: bool = False) ‑> PaginationResult
Expand source code
async def query_connections_and_categorise_results(
    self,
    tag_filter: dict = None,
    post_filter_positive: dict = None,
    page: int = 1,
    page_size: int = 10,
    org_flag: bool = False,
    marketplace_flag: bool = False,
) -> PaginationResult:

    # Query the connection records.
    records = await ConnectionRecord.query(
        self.context,
        tag_filter,
        post_filter_positive
    )

    # Sort the connection records.
    records = sorted(
        records,
        key=lambda k: k.created_at,
        reverse=True
    )

    res = []
    for record in records:
        tag_filter = {"connection_id": record.connection_id}

        # Fetch controller details attached to the connection.
        controller_details: typing.List[ConnectionControllerDetailsRecord] = \
            await ConnectionControllerDetailsRecord.query(
            self.context,
            tag_filter
        )

        # Fetch marketplace connection record.
        marketplace_connections: typing.List[MarketplaceConnectionRecord] = \
            await MarketplaceConnectionRecord.query(
            self.context,
            tag_filter
        )

        connection = record.serialize()

        # Update controller details to the connection dict.
        if controller_details:
            connection.update({
                "org_flag": True,
                "controller_details": controller_details[0].controller_details
            })
        else:
            connection.update({
                "controller_details": {},
                "org_flag": False
            })

        if marketplace_connections:
            connection.update({
                "marketplace_flag": True
            })
        else:
            connection.update({"marketplace_flag": False})

        # Apply category filter on connections.
        categorise_filter = {
            "org_flag": org_flag,
            "marketplace_flag": marketplace_flag
        }

        categorise_filter = drop_none_dict(categorise_filter)

        if match_post_filter(connection, categorise_filter, True):
            res.append(connection)

    pagination_result = paginate(res, page if page else 1, page_size if page_size else 10)

    return pagination_result
async def query_da_templates_in_wallet(self, *, template_id: str = None, delete_flag: str = 'false', method_of_use: str = None, publish_flag: str = 'true', template_version: str = None, latest_version_flag: str = 'true', third_party_data_sharing: str = 'false', page: int = 1, page_size: int = 10) ‑> PaginationResult

Query DA templates in wallet

Args

template_id : str, optional
Template identifier. Defaults to None.
delete_flag : str, optional
Delete flag. Defaults to false.
method_of_use : str, optional
Method of use. Defaults to None.
publish_flag : str, optional
Publish flag. Defaults to true.
latest_version_flag : str, optional
Latest version flag. Defaults to true.
template_version : str, optional
Template version. Defaults to None.
third_party_data_sharing : str, optional
Third party data sharing. Defaults to false.
page : int, optional
Page. Defaults to 1.

Returns

PaginationResult
Pagination results.
Expand source code
async def query_da_templates_in_wallet(
        self,
        *,
        template_id: str = None,
        delete_flag: str = "false",
        method_of_use: str = None,
        publish_flag: str = "true",
        template_version: str = None,
        latest_version_flag: str = "true",
        third_party_data_sharing: str = "false",
        page: int = 1,
        page_size: int = 10,
) -> PaginationResult:
    """Query DA templates in wallet

    Args:
        template_id (str, optional): Template identifier. Defaults to None.
        delete_flag (str, optional): Delete flag. Defaults to false.
        method_of_use (str, optional): Method of use. Defaults to None.
        publish_flag (str, optional): Publish flag. Defaults to true.
        latest_version_flag (str, optional): Latest version flag. Defaults to true.
        template_version (str, optional): Template version. Defaults to None.
        third_party_data_sharing (str, optional): Third party data sharing.
            Defaults to false.
        page (int, optional): Page. Defaults to 1.

    Returns:
        PaginationResult: Pagination results.
    """

    # Query by version is only possible if the template id is provided
    if template_version:
        assert template_id, "Template identifier is required to query by version"

    # Tag filter
    tag_filter = {
        "delete_flag": delete_flag,
        "publish_flag": publish_flag,
        "method_of_use": method_of_use,
        "template_id": template_id,
        "template_version": template_version,
        "latest_version_flag": latest_version_flag,
        "third_party_data_sharing": third_party_data_sharing
    }

    tag_filter = drop_none_dict(tag_filter)

    records = await DataAgreementTemplateRecord.query(
        context=self.context,
        tag_filter=tag_filter
    )

    records = sorted(records, key=lambda k: k.created_at, reverse=True)

    paginate_result = paginate_records(records, page, page_size)

    return paginate_result
async def query_data_agreement_instances(self, instance_id: str, template_id: str, template_version: str, method_of_use: str, third_party_data_sharing: str, data_ex_id: str, data_subject_did: str, page: int = 1, page_size: int = 10) ‑> PaginationResult

Query data agreement instances

Args

instance_id : str
Instance identifier
template_id : str
Template identifier
template_version : str
Template version
method_of_use : str
Method of use
third_party_data_sharing : str
Third party data sharing
data_ex_id : str
Data exchange id
data_subject_did : str
Data subject did
page : int, optional
Page. Defaults to 1.
page_size : int, optional
Page size. Defaults to 10.

Returns

PaginationResult
Pagination result
Expand source code
async def query_data_agreement_instances(
    self,
    instance_id: str,
    template_id: str,
    template_version: str,
    method_of_use: str,
    third_party_data_sharing: str,
    data_ex_id: str,
    data_subject_did: str,
    page: int = 1,
    page_size: int = 10
) -> PaginationResult:
    """Query data agreement instances

    Args:
        instance_id (str): Instance identifier
        template_id (str): Template identifier
        template_version (str): Template version
        method_of_use (str): Method of use
        third_party_data_sharing (str): Third party data sharing
        data_ex_id (str): Data exchange id
        data_subject_did (str): Data subject did
        page (int, optional): Page. Defaults to 1.
        page_size (int, optional): Page size. Defaults to 10.

    Returns:
        PaginationResult: Pagination result
    """
    # Query by version is only possible if the template id is provided
    if template_version:
        assert template_id, "Template identifier is required to query by version"

    # Tag filter
    tag_filter = {
        "instance_id": instance_id,
        "template_id": template_id,
        "template_version": template_version,
        "method_of_use": method_of_use,
        "third_party_data_sharing": third_party_data_sharing,
        "data_ex_id": data_ex_id,
        "data_subject_did": data_subject_did
    }

    tag_filter = drop_none_dict(tag_filter)

    records = await DataAgreementInstanceRecord.query(
        context=self.context,
        tag_filter=tag_filter
    )

    records = sorted(records, key=lambda k: k.created_at, reverse=True)

    paginate_result = paginate_records(records, page, page_size)

    return paginate_result
async def query_data_agreement_qr_codes(self, template_id: str) ‑> PaginationResult

Query data agreement qr codes

Returns

PaginationResult
List of qr code records.
Expand source code
async def query_data_agreement_qr_codes(
    self,
    template_id: str,
) -> PaginationResult:
    """Query data agreement qr codes

    Returns:
        PaginationResult: List of qr code records.
    """

    records = await DataAgreementQRCodeRecord.query(self.context, {"template_id": template_id})
    pagination_result = paginate_records(records, page=1, page_size=1000000)
    return pagination_result
async def query_pd_of_da_template_from_wallet(self, template_id: str = None, method_of_use: str = None, third_party_data_sharing: str = None, page: int = 1, page_size: int = 10) ‑> PaginationResult

Query personal data for DA template.

Args

template_id : str
Template identifier
page : int, optional
Page number. Defaults to 1.
page_size : int, optional
Page size. Defaults to 10.

Returns

PaginationResult
Pagination results
Expand source code
async def query_pd_of_da_template_from_wallet(self,
                                              template_id: str = None,
                                              method_of_use: str = None,
                                              third_party_data_sharing: str = None,
                                              page: int = 1,
                                              page_size: int = 10,
                                              ) -> PaginationResult:
    """Query personal data for DA template.

    Args:
        template_id (str): Template identifier
        page (int, optional): Page number. Defaults to 1.
        page_size (int, optional): Page size. Defaults to 10.

    Returns:
        PaginationResult: Pagination results
    """

    # Tag filter
    tag_filter = {
        "delete_flag": bool_to_str(False),
        "method_of_use": method_of_use,
        "template_id": template_id,
        "latest_version_flag": bool_to_str(True),
        "third_party_data_sharing": third_party_data_sharing
    }

    tag_filter = drop_none_dict(tag_filter)

    records = await DataAgreementTemplateRecord.query(
        context=self.context,
        tag_filter=tag_filter
    )

    records = sorted(records, key=lambda k: k.created_at, reverse=True)

    # Fetch personal data records
    pd_records = []
    for record in records:
        pd_records.extend(await record.fetch_personal_data_records(self.context))

    paginate_result = paginate_records(pd_records, page, page_size)

    return paginate_result
async def send_data_controller_details_message(self, connection_id: str)

Send data controller details message

Args

connection_id : str
Connection ID
Expand source code
async def send_data_controller_details_message(
    self,
    connection_id: str
):
    """Send data controller details message

    Args:
        connection_id (str): Connection ID
    """

    message = DataControllerDetailsMessage()
    await self.send_reply_message(message, connection_id)
async def send_existing_connections_message(self, theirdid: str, connection_id: str)

Send existing connections notification message.

Args

theirdid : str
Their DID of remote agent in old connection.
connection_id : str
Connection identifier.
Expand source code
async def send_existing_connections_message(
    self,
    theirdid: str,
    connection_id: str
):
    """Send existing connections notification message.

    Args:
        theirdid (str): Their DID of remote agent in old connection.
        connection_id (str): Connection identifier.
    """

    # Construct existing connection message.
    message = ExistingConnectionsMessage(
        body=ExistingConnectionsBody(
            theirdid=theirdid
        )
    )

    # Send the message to remote agent.
    await self.send_message_with_connection_invitation(
        message,
        connection_id
    )
async def send_message_with_connection_invitation(self, message: aries_cloudagent.messaging.agent_message.AgentMessage, connection_id: str) ‑> None

Send message with connection invitation.

Args

message : AgentMessage
Agent message.
connection_id : str
Connection id.
Expand source code
async def send_message_with_connection_invitation(
    self,
    message: AgentMessage,
    connection_id: str,
) -> None:
    """Send message with connection invitation.

    Args:
        message (AgentMessage): Agent message.
        connection_id (str): Connection id.
    """
    # Fetch connection record.
    connection_record: ConnectionRecord = \
        await ConnectionRecord.retrieve_by_id(self.context, connection_id)

    # Get invitation key.
    invitation_key = connection_record.invitation_key
    # Service enpoint
    invitation = await connection_record.retrieve_invitation(self.context)
    service_endpoint = invitation.endpoint

    # Fetch wallet from context
    wallet: IndyWallet = await self.context.inject(BaseWallet)

    # Create a local did
    did: DIDInfo = await wallet.create_local_did()

    sender_key = did.verkey
    packed_message = await wallet.pack_message(
        message.to_json(),
        [invitation_key],
        sender_key
    )

    headers = {
        "Content-Type": "application/ssi-agent-wire"
    }

    async with aiohttp.ClientSession(headers=headers) as session:
        async with session.post(service_endpoint, data=packed_message) as response:
            if response.status == 200:
                self._logger.info("Posted existing connection message...")
async def send_message_with_connection_invitation_and_return_route_all(self, message: aries_cloudagent.messaging.agent_message.AgentMessage, connection_id: str) ‑> Tuple[str, str, dict]

Send message with connection invitation and return route all.

Args

message : AgentMessage
Agent message.
connection_id : str
Connection id.

Returns

typing.Tuple[str, str, dict]
sender_verkey, recipient_verkey, message_dict
Expand source code
async def send_message_with_connection_invitation_and_return_route_all(
    self,
    message: AgentMessage,
    connection_id: str,
) -> typing.Tuple[str, str, dict]:
    """Send message with connection invitation and return route all.

    Args:
        message (AgentMessage): Agent message.
        connection_id (str): Connection id.

    Returns:
        typing.Tuple[str, str, dict]: sender_verkey, recipient_verkey, message_dict
    """
    # Fetch connection record.
    connection_record: ConnectionRecord = \
        await ConnectionRecord.retrieve_by_id(self.context, connection_id)

    # Get invitation key.
    invitation_key = connection_record.invitation_key
    # Service enpoint
    invitation = await connection_record.retrieve_invitation(self.context)
    service_endpoint = invitation.endpoint

    # Fetch wallet from context
    wallet: IndyWallet = await self.context.inject(BaseWallet)

    # Set transport return route all
    message._decorators["transport"] = TransportDecorator(
        return_route="all"
    )

    # Create a local did
    did: DIDInfo = await wallet.create_local_did()

    sender_key = did.verkey
    packed_message = await wallet.pack_message(
        message.to_json(),
        [invitation_key],
        sender_key
    )

    headers = {
        "Content-Type": "application/ssi-agent-wire"
    }

    async with aiohttp.ClientSession(headers=headers) as session:
        async with session.post(service_endpoint, data=packed_message) as response:
            if response.status == 200:
                message_body = await response.read()

                # Unpack message
                unpacked = await wallet.unpack_message(message_body)
                (message_json, sender_verkey, recipient_verkey) = unpacked

                # Convert message to dict.
                message_dict = json.loads(message_json)

                return (sender_verkey, recipient_verkey, message_dict)
async def send_problem_report_message(self, explain: str, connection_id: str) ‑> None

Send problem report message as reply.

Args

explain : str
Explaination.
connection_id : str
Connection id.
Expand source code
async def send_problem_report_message(self, explain: str, connection_id: str) -> None:
    """Send problem report message as reply.

    Args:
        explain (str): Explaination.
        connection_id (str): Connection id.
    """

    # Responder instance
    responder: DispatcherResponder = await self.context.inject(BaseResponder, required=False)

    problem_report = ProblemReport(explain_ltxt=explain)

    if responder:
        await responder.send_reply(problem_report, connection_id=connection_id)
async def send_qr_code_initiate_message(self, qr_id, connection_id)

Send data agreement qr code initiate message.

Args

qr_id : _type_
QR id
connection_id : _type_
connection id
Expand source code
async def send_qr_code_initiate_message(
    self,
    qr_id,
    connection_id
):
    """Send data agreement qr code initiate message.

    Args:
        qr_id (_type_): QR id
        connection_id (_type_): connection id
    """

    message = DataAgreementQrCodeInitiateMessage(
        body=DataAgreementQrCodeInitiateBody(
            qr_id=qr_id
        )
    )

    await self.send_reply_message(message, connection_id)
async def send_reply_message(self, message: aries_cloudagent.messaging.agent_message.AgentMessage, connection_id: str = None) ‑> None

Send reply message to remote agent.

Args

message : AgentMessage
Agent message.
connection_id : str
Connection identifier
Expand source code
async def send_reply_message(self, message: AgentMessage, connection_id: str = None) -> None:
    """Send reply message to remote agent.

    Args:
        message (AgentMessage): Agent message.
        connection_id (str): Connection identifier
    """
    # Responder instance
    responder: DispatcherResponder = await self.context.inject(BaseResponder, required=False)

    if responder:
        await responder.send_reply(message, connection_id=connection_id)
async def update_and_store_da_template_in_wallet(self, template_id: str, data_agreement: dict, *, publish_flag: bool = True, schema_id: str = None) ‑> DataAgreementTemplateRecord

Update and store data agreement template in wallet.

Args

template_id : str
Template identifier
data_agreement : dict
Data agreement
publish_flag : bool
Publish flag
schema_id : str
Schema identifier

Returns

DataAgreementTemplateRecord
Updated record.
Expand source code
async def update_and_store_da_template_in_wallet(
        self,
        template_id: str,
        data_agreement: dict,
        *,
        publish_flag: bool = True,
        schema_id: str = None
) -> DataAgreementTemplateRecord:
    """Update and store data agreement template in wallet.

    Args:
        template_id (str): Template identifier
        data_agreement (dict): Data agreement
        publish_flag (bool): Publish flag
        schema_id (str): Schema identifier

    Returns:
        DataAgreementTemplateRecord: Updated record.
    """

    # Tag filter
    tag_filter = {
        "delete_flag": bool_to_str(False),
        "template_id": template_id,
        "latest_version_flag": bool_to_str(True)
    }

    # Fetch data agreement record
    record: DataAgreementTemplateRecord = \
        await DataAgreementTemplateRecord.retrieve_by_tag_filter(self.context, tag_filter)

    # Validate the data agreement.
    previous_da: DataAgreementModel = DataAgreementModel.deserialize(record.data_agreement)

    assert previous_da.method_of_use == data_agreement.get(
        "methodOfUse"), "Method of use cannot be updated."

    assert previous_da.data_policy.third_party_data_sharing \
        == data_agreement.get("dataPolicy").get("thirdPartyDataSharing"), \
        "Third party data sharing cannot be updated."

    # Copy the id, version from previous da to new da
    template_version = bump_major_for_semver_string(previous_da.version)
    template_id = previous_da.id
    data_agreement.update({"@context": DA_DEFAULT_CONTEXT})
    data_agreement.update({"@type": DA_TYPE})
    data_agreement.update({"@id": template_id})
    data_agreement.update({"version": template_version})

    updated_da: DataAgreementModel = DataAgreementModel.deserialize(data_agreement)

    # Create personal data records
    pds = updated_da.personal_data
    pd_records = []
    pd_models_with_id = []
    for pd in pds:
        pd_record: PersonalDataRecord = \
            await PersonalDataRecord.build_and_save_record_from_pd_model(
                self.context,
                template_id,
                template_version,
                pd
            )
        pd_records.append(pd_record)
        pd_models_with_id.append(pd_record.convert_record_to_pd_model())

    # Update the personal data with attribute identifiers to the agreement
    updated_da.personal_data = pd_models_with_id

    record.data_agreement = updated_da.serialize()
    record.publish_flag = bool_to_str(publish_flag)
    record.schema_id = schema_id
    record.existing_schema_flag = bool_to_str(True) if schema_id else bool_to_str(False)
    record.template_version = template_version

    await record.upgrade(self.context)

    if publish_flag:
        # Create ledger payloads
        record = await self.create_and_store_ledger_payloads_for_da_template(
            template_record=record,
            pd_records=pd_records,
            schema_id=schema_id
        )

    return record
async def update_controller_details(self, organisation_name: str = None, cover_image_url: str = None, logo_image_url: str = None, location: str = None, organisation_type: str = None, description: str = None, policy_url: str = None, eula_url: str = None) ‑> ControllerDetailsRecord

Update controller details

Args

organisation_name : str, optional
Organisation name. Defaults to None.
cover_image_url : str, optional
Cover image URL. Defaults to None.
logo_image_url : str, optional
Logo image URL. Defaults to None.
location : str, optional
Location. Defaults to None.
organisation_type : str, optional
Organisation type. Defaults to None.
description : str, optional
Description. Defaults to None.
policy_url : str, optional
Policy URL. Defaults to None.
eula_url : str, optional
EULA URL. Defaults to None.

Returns

ControllerDetailsRecord
Controller details record.
Expand source code
async def update_controller_details(
    self,
    organisation_name: str = None,
    cover_image_url: str = None,
    logo_image_url: str = None,
    location: str = None,
    organisation_type: str = None,
    description: str = None,
    policy_url: str = None,
    eula_url: str = None
) -> ControllerDetailsRecord:
    """Update controller details

    Args:
        organisation_name (str, optional): Organisation name. Defaults to None.
        cover_image_url (str, optional): Cover image URL. Defaults to None.
        logo_image_url (str, optional): Logo image URL. Defaults to None.
        location (str, optional): Location. Defaults to None.
        organisation_type (str, optional): Organisation type. Defaults to None.
        description (str, optional): Description. Defaults to None.
        policy_url (str, optional): Policy URL. Defaults to None.
        eula_url (str, optional): EULA URL. Defaults to None.

    Returns:
        ControllerDetailsRecord: Controller details record.
    """

    # Query controller records.
    records = await ControllerDetailsRecord.query(self.context, {})
    if not records:

        wallet: BaseWallet = await self.context.inject(BaseWallet)

        controller_did = await wallet.get_public_did()

        organisation_did = f"did:sov:{controller_did.did}"

        # If not found, create new record.
        record = ControllerDetailsRecord(
            organisation_did=organisation_did,
            organisation_name=organisation_name,
            cover_image_url=cover_image_url,
            logo_image_url=logo_image_url,
            location=location,
            organisation_type=organisation_type,
            description=description,
            policy_url=policy_url,
            eula_url=eula_url
        )

        await record.save(self.context)
    else:
        # If found update record.
        record: ControllerDetailsRecord = records[0]
        record.organisation_name = organisation_name
        record.cover_image_url = cover_image_url
        record.logo_image_url = logo_image_url
        record.location = location
        record.organisation_type = organisation_type
        record.description = description
        record.policy_url = policy_url
        record.eula_url = eula_url

        await record.save(self.context)

    return record
async def update_personal_data_description(self, attribute_id: str, desc: str) ‑> PersonalDataRecord

Update personal data description

Args

attribute_id : str
Attribute id
desc : str
Description

Returns

PersonalDataRecord
Personal data record
Expand source code
async def update_personal_data_description(self,
                                           attribute_id: str,
                                           desc: str) -> PersonalDataRecord:
    """Update personal data description

    Args:
        attribute_id (str): Attribute id
        desc (str): Description

    Returns:
        PersonalDataRecord: Personal data record
    """

    # Fetch personal data record by id
    pd_record: PersonalDataRecord = await PersonalDataRecord.retrieve_by_id(
        self.context,
        attribute_id
    )

    # Fetch the associated data agreement record
    da_template_record: DataAgreementTemplateRecord = \
        await DataAgreementTemplateRecord.latest_template_by_id(
            self.context,
            pd_record.data_agreement_template_id
        )

    assert da_template_record, "Matching data agreement template not found."
    assert da_template_record.template_version == \
        pd_record.data_agreement_template_version, \
        "Matching data agreement template with same version not found."

    # Update the personal data record.
    pd_record.attribute_description = desc
    await pd_record.save(self.context)

    pd_model: DataAgreementPersonalDataModel = pd_record.convert_record_to_pd_model()

    # Update the data agreement record with new personal data.
    da: DataAgreementModel = DataAgreementModel.deserialize(da_template_record.data_agreement)
    # Iterate through the existing personal data in data agreements
    # And update the personal data matching the attribute id
    da_pds = []
    for da_pd in da.personal_data:
        if da_pd.attribute_id != pd_model.attribute_id:
            da_pds.append(da_pd)
    da_pds.append(pd_model)
    da.personal_data = da_pds

    da_template_record.data_agreement = da.serialize()
    await da_template_record.save(self.context)

    return pd_record
class V2ADAManagerError (*args, error_code: str = None, **kwargs)

ADA manager error

Initialize a BaseError instance.

Expand source code
class V2ADAManagerError(BaseError):
    """ADA manager error"""

Ancestors

  • aries_cloudagent.core.error.BaseError
  • builtins.Exception
  • builtins.BaseException