Module dexa_sdk.managers.dexa_manager

Expand source code
import uuid
import json
import aiohttp
import typing
import asyncio
from loguru import logger
from web3._utils.encoding import to_json
from aries_cloudagent.core.dispatcher import Dispatcher
from aries_cloudagent.config.injection_context import InjectionContext
from aries_cloudagent.wallet.base import BaseWallet
from aries_cloudagent.wallet.indy import IndyWallet
from aries_cloudagent.utils.task_queue import CompletedTask, PendingTask
from aries_cloudagent.connections.models.connection_target import ConnectionTarget
from aries_cloudagent.connections.models.connection_record import ConnectionRecord
from aries_cloudagent.messaging.agent_message import AgentMessage
from aries_cloudagent.messaging.decorators.transport_decorator import TransportDecorator
from aries_cloudagent.transport.pack_format import BaseWireFormat, PackWireFormat
from aries_cloudagent.protocols.connections.v1_0.manager import (
    ConnectionManager
)
from aries_cloudagent.transport.inbound.receipt import MessageReceipt
from mydata_did.v1_0.utils.util import bool_to_str
from mydata_did.v1_0.messages.data_controller_details import DataControllerDetailsMessage
from mydata_did.v1_0.messages.data_controller_details_response import (
    DataControllerDetailsResponseMessage
)
from dexa_protocol.v1_0.messages.negotiation.request_dda import RequestDDAMessage
from dexa_protocol.v1_0.messages.negotiation.offer_dda import OfferDDAMessage
from dexa_protocol.v1_0.messages.negotiation.accept_dda import AcceptDDAMessage
from dexa_protocol.v1_0.models.offer_dda_model import (
    OfferDDAMessageBodyModel,
    CustomerIdentificationModel
)
from dexa_protocol.v1_0.models.accept_dda_model import (
    AcceptDDAMessageBodyModel
)
from dexa_protocol.v1_0.models.request_dda_model import RequestDDAModel
from dexa_protocol.v1_0.messages.marketplace.publish_dda import PublishDDAMessage
from dexa_protocol.v1_0.models.publish_dda_model import PublishDDAModel
from dexa_protocol.v1_0.messages.marketplace.delete_dda import DeleteDDAMessage
from dexa_protocol.v1_0.models.delete_dda_model import DeleteDDAModel
from dexa_protocol.v1_0.messages.marketplace.list_marketplace_dda import ListMarketplaceDDAMessage
from dexa_protocol.v1_0.messages.marketplace.list_marketplace_dda_response import (
    ListMarketplaceDDAResponseMessage
)
from dexa_protocol.v1_0.models.list_marketplace_dda_response_model import (
    ListMarketplaceDDAResponseBody,
    ListMarketplaceDDAResponseModel
)
from dexa_protocol.v1_0.messages.negotiation.dda_negotiation_receipt import (
    DDANegotiationReceiptMessage
)
from dexa_protocol.v1_0.models.dda_negotiation_receipt_model import (
    DDANegotiationReceiptBodyModel
)
from .ada_manager import V2ADAManager
from ..utils import (
    PaginationResult,
    paginate_records,
    drop_none_dict
)
from ..agreements.dda.v1_0.models.dda_instance_models import (
    DataDisclosureAgreementInstanceModel
)
from ..agreements.da.v1_0.records.da_template_record import (
    DataAgreementTemplateRecord
)
from ..agreements.dda.v1_0.records.dda_template_record import (
    DataDisclosureAgreementTemplateRecord
)
from ..agreements.dda.v1_0.records.dda_instance_record import (
    DataDisclosureAgreementInstanceRecord
)
from ..agreements.dda.v1_0.models.dda_models import (
    DDA_DEFAULT_CONTEXT,
    DDA_TYPE,
    DataDisclosureAgreementModel,
)
from ..marketplace.records.marketplace_connection_record import (
    MarketplaceConnectionRecord
)
from ..marketplace.records.publish_dda_record import (
    PublishDDARecord
)
from ..marketplace.records.published_dda_template_record import (
    PublishedDDATemplateRecord
)
from ..data_controller.records.connection_controller_details_record import (
    ConnectionControllerDetailsRecord
)
from ..agreements.da.v1_0.records.customer_identification_record import (
    CustomerIdentificationRecord
)
from ..ledgers.ethereum.core import EthereumClient
from ..did_mydata.core import DIDMyDataBuilder


class DexaManager:
    """Manages Dexa related functions"""

    def __init__(self, context: InjectionContext) -> None:
        """Initialise Dexa manager

        Args:
            context (InjectionContext): Injection context to be used.
        """

        # Injection context
        self._context = context

        # Logger
        self._logger = logger

    @property
    def context(self) -> InjectionContext:
        """Accessor for injection context

        Returns:
            InjectionContext: Injection context
        """
        return self._context

    @property
    def logger(self):
        """Accessor for logger."""
        return self._logger

    async def create_and_store_dda_template_in_wallet(
            self,
            dda: dict,
            *,
            publish_flag: bool = True
    ) -> DataDisclosureAgreementTemplateRecord:
        """Create and store dda template in wallet

        Args:
            dda (dict): DDA template.
            publish_flag (bool): Publish flag
            schema_id (str): Schema identifier
        """

        # Temp hack
        template_version = "1.0.0"
        template_id = str(uuid.uuid4())
        dda.update({"@context": DDA_DEFAULT_CONTEXT})
        dda.update({"@id": template_id})
        dda.update({"@type": DDA_TYPE})
        dda.update({"version": template_version})

        # Fetch wallet from context
        wallet: IndyWallet = await self.context.inject(BaseWallet)
        controller_did = await wallet.get_public_did()

        dda["dataController"].update({"did": f"did:sov:{controller_did.did}"})

        # Validate the data agreement.
        dda: DataDisclosureAgreementModel = \
            DataDisclosureAgreementModel.deserialize(
                dda)

        # Hack: Iterate through personal data records and add a unique identifier
        # Todo: Correlating personal data across agreements needs to be done.
        pds = dda.personal_data
        for pd in pds:
            pd.attribute_id = str(uuid.uuid4())

        # Update the personal data with attribute identifiers to the agreement
        dda.personal_data = pds

        # Create template record
        record = DataDisclosureAgreementTemplateRecord(
            template_id=template_id,
            template_version=template_version,
            state=DataDisclosureAgreementTemplateRecord.STATE_DEFINITION,
            data_disclosure_agreement=dda.serialize(),
            industry_sector=dda.data_sharing_restrictions.industry_sector.lower(),
            publish_flag=bool_to_str(publish_flag),
            latest_version_flag=bool_to_str(True)
        )

        await record.save(self.context)

        return record

    async def query_dda_templates_in_wallet(
        self,
        template_id: str = None,
        template_version: str = None,
        industry_sector: str = None,
        publish_flag: str = "false",
        delete_flag: str = "false",
        latest_version_flag: str = "false",
        page: int = 1,
        page_size: int = 10,
    ) -> PaginationResult:
        """Query DA templates in wallet.

        Args:
            template_id (str, optional): Template id. Defaults to None.
            template_version (str, optional): Template version. Defaults to None.
            industry_sector (str, optional): Industry sector. Defaults to None.
            publish_flag (str, optional): Publish flag. Defaults to "false".
            delete_flag (str, optional): Delete flag. Defaults to "false".
            latest_version_flag (str, optional): Latest version flag. Defaults to "false".
            page (int): Page number. Defaults to 1.
            page_size (int): Page size. Defaults to 10.

        Returns:
            PaginationResult: Pagination result
        """

        # Query by version is only possible if the template id is provided
        if template_version:
            assert template_id, "Template identifier is required to query by version"

        # Tag filter
        tag_filter = {
            "template_id": template_id,
            "template_version": template_version,
            "industry_sector": industry_sector.lower() if industry_sector else industry_sector,
            "publish_flag": publish_flag,
            "delete_flag": delete_flag,
            "latest_version_flag": latest_version_flag
        }

        tag_filter = drop_none_dict(tag_filter)

        records = await DataDisclosureAgreementTemplateRecord.query(
            context=self.context,
            tag_filter=tag_filter
        )

        records = sorted(records, key=lambda k: k.created_at, reverse=True)

        paginate_result = paginate_records(records, page, page_size)

        return paginate_result

    async def update_dda_template_in_wallet(
        self,
        template_id: str,
        *,
        dda: dict,
        publish_flag: bool = True,
    ) -> DataDisclosureAgreementTemplateRecord:
        """Update DDA template in wallet.

        Args:
            template_id (str): Template identifier
            publish_flag (bool, optional): Publish flag. Defaults to True.

        Returns:
            DataDisclosureAgreementTemplateRecord: Upgraded template record.
        """

        # Fetch the latest template.
        existing_template = \
            await DataDisclosureAgreementTemplateRecord.latest_template_by_id(
                self.context,
                template_id
            )

        # Upgrade the existing template to next version.
        upgraded = await existing_template.upgrade(
            self.context,
            dda,
            bool_to_str(publish_flag)
        )

        # Post update actions
        if publish_flag:
            await self.post_update_dda_template(
                upgraded
            )

        return upgraded

    async def delete_dda_template_in_wallet(
        self,
        template_id: str
    ):
        """Delete DDA template in wallet.

        Args:
            template_id (str): Template identifier.
        """

        # Fetch the latest template.
        existing_template = \
            await DataDisclosureAgreementTemplateRecord.latest_template_by_id(
                self.context,
                template_id
            )

        assert existing_template, "DDA template not found."

        # Delete template.
        await existing_template.delete_template(self.context)

        # Post delete actions.
        await self.post_delete_dda_template(
            template_id
        )

    async def publish_dda_template_wallet(
        self,
        template_id: str
    ):
        """Publish DDA template in wallet.

        Args:
            template_id (str): Template identifier
        """

        # Fetch the latest template.
        existing_template = \
            await DataDisclosureAgreementTemplateRecord.latest_template_by_id(
                self.context,
                template_id
            )

        await existing_template.publish_template(self.context)

        # Post publish actions.
        await self.post_update_dda_template(existing_template)

    async def send_message_with_return_route_all(
        self,
        message: AgentMessage,
        connection_record: ConnectionRecord
    ) -> typing.Tuple[str, str, dict]:
        """Send message with return route all in transport decorator.

        Args:
            message (AgentMessage): Agent message.
            connection_record (ConnectionRecord): Connection record.

        Returns:
            typing.Tuple[str, str, dict]: sender_verkey, recipient_verkey, message_dict
        """

        # Fetch wallet from context
        wallet: IndyWallet = await self.context.inject(BaseWallet)

        # Get pack format from context
        pack_format: PackWireFormat = await self.context.inject(BaseWireFormat)

        # Add transport decorator
        message._decorators["transport"] = TransportDecorator(
            return_route="all"
        )

        # Initialise connection manager
        connection_manager = ConnectionManager(self.context)

        # Fetch connection targets
        connection_targets = await connection_manager.fetch_connection_targets(connection_record)

        assert len(connection_targets) > 0, "Zero connection targets found."

        connection_target: ConnectionTarget = connection_targets[0]

        # Pack message
        packed_message = await pack_format.pack(
            context=self.context,
            message_json=message.serialize(as_string=True),
            recipient_keys=connection_target.recipient_keys,
            routing_keys=None,
            sender_key=connection_target.sender_key,
        )

        # Headers
        headers = {
            "Content-Type": "application/ssi-agent-wire"
        }

        # Send request and receive response.
        async with aiohttp.ClientSession(headers=headers) as session:
            async with session.post(connection_target.endpoint, data=packed_message) as response:
                # Assert status code is 200
                assert response.status == 200, \
                    f"HTTP request failed with status code {response.status}"

                message_body = await response.read()

                # Unpack message
                unpacked = await wallet.unpack_message(message_body)
                (message_json, sender_verkey, recipient_verkey) = unpacked

                # Convert message to dict.
                message_dict = json.loads(message_json)

                return (sender_verkey, recipient_verkey, message_dict)

    async def add_marketplace_connection(
        self,
        connection_id: str
    ) -> MarketplaceConnectionRecord:
        """Set connection as marketplace.

        Args:
            connection_id (str): Connection identifier.

        Returns:
            MarketplaceConnectionRecord: Marketplace connection record.
        """

        # Connection record.
        connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_id(
            self.context,
            connection_id
        )

        record = await MarketplaceConnectionRecord.set_connection_as_marketplace(
            self.context,
            connection_record.connection_id
        )

        return record

    async def query_marketplace_connections(
        self,
        connection_id: str,
        page: int = 1,
        page_size: int = 10
    ) -> PaginationResult:
        """Query marketplace connections

        Args:
            connection_id (str): Connection identifier
            page (int, optional): Page. Defaults to 1.
            page_size (int, optional): Page size. Defaults to 10.

        Returns:
            PaginationResult: Pagination result
        """

        tag_filter = {"connection_id": connection_id}
        tag_filter = drop_none_dict(tag_filter)

        records = await MarketplaceConnectionRecord.query(self.context, tag_filter)

        pagination_result = paginate_records(records, page, page_size)

        return pagination_result

    async def post_update_dda_template(
        self,
        template_record: DataDisclosureAgreementTemplateRecord
    ):
        """Post update DDA template actions.

        Args:
            template_record (DataDisclosureAgreementTemplateRecord): DDA template record.
        """

        # Find all the marketplace connections.
        # Query to find all marketplaces the template is published to.
        tag_filter = {
            "template_id": template_record.template_id
        }
        records: typing.List[PublishDDARecord] = await PublishDDARecord.query(
            self.context,
            tag_filter
        )

        # Notify all the marketplaces about the update.
        for record in records:
            await self.send_publish_dda_message(
                record,
                template_record
            )

    async def send_publish_dda_message(
        self,
        publish_dda_record: PublishDDARecord,
        template_record: DataDisclosureAgreementTemplateRecord
    ):
        """Send publish DDA message.

        Args:
            publish_dda_record (PublishDDARecord): Publish dda record.
            template_record (DataDisclosureAgreementTemplateRecord): DDA template record.
            connection_id (str): Connection identifier.
        """
        # Create connection invitation
        mgr = V2ADAManager(self.context)
        (connection_record_for_marketplace, connection_invitation_for_marketplace) = \
            await mgr.create_invitation(
            auto_accept=True,
            public=False,
            multi_use=True,
            alias=f"DDA_{template_record.template_id}_QR_{publish_dda_record._id}"
        )

        # Publish dda message
        publish_dda_message = PublishDDAMessage(
            body=PublishDDAModel(
                dda=template_record.dda_model,
                connection_url=connection_invitation_for_marketplace.to_url()
            )
        )

        # Send publish dda message to marketplace connection
        await mgr.send_reply_message(
            publish_dda_message,
            publish_dda_record.connection_id
        )

    async def publish_dda_template_to_marketplace(
        self,
        connection_id: str,
        template_id: str
    ) -> PublishDDARecord:
        """Publish DDA template to marketplace

        Args:
            connection_id (str): Connection ID
            template_id (str): Template ID

        Returns:
            PublishDDARecord: Publish DDA record.
        """

        # Fetch template
        template_record = await DataDisclosureAgreementTemplateRecord.latest_template_by_id(
            self.context,
            template_id
        )

        assert template_record._publish_flag, \
            "DDA must be published locally before published to marketplace."

        # Connection record
        connection_record: ConnectionRecord = \
            await MarketplaceConnectionRecord.retrieve_connection_record(
                self.context,
                connection_id
            )

        # Create Publish DDA record.
        # Publish DDA record is mapping of which template is published in which marketplace.
        publish_dda_record = await PublishDDARecord.store_publish_dda_record(
            self.context,
            connection_record.connection_id,
            template_record.template_id,
            template_record.data_disclosure_agreement
        )

        # Send publish dda message to marketplace.
        await self.send_publish_dda_message(
            publish_dda_record,
            template_record
        )

        return publish_dda_record

    async def fetch_and_save_controller_details_for_connection(
        self,
        connection_record: ConnectionRecord
    ):
        """Fetch and save controller details for connection

        Args:
            connection_record (ConnectionRecord): Connection record
        """
        controller_details_message = DataControllerDetailsMessage()
        (sender_verkey, recipient_verkey, message_dict) = \
            await self.send_message_with_return_route_all(
            controller_details_message,
            connection_record
        )
        # Data controller detail response.
        data_controller_details_response: DataControllerDetailsResponseMessage = \
            DataControllerDetailsResponseMessage.deserialize(
                message_dict
            )

        # Save controller details for a connection.
        await ConnectionControllerDetailsRecord.set_controller_details_for_connection(
            self.context,
            connection_record,
            data_controller_details_response.body
        )

    async def post_connection_delete_actions(
        self,
        connection_id: str
    ):
        """Post connection record delete actions.

        Args:
            connection_id (str): Connection identifier.
        """

        self._logger.info("Performing post delete actions for connection records...")

        # Delete marketplace connection records.
        tag_filter = {"connection_id": connection_id}
        marketplace_records = await MarketplaceConnectionRecord.query(
            self.context,
            tag_filter
        )
        if marketplace_records:
            marketplace_record = marketplace_records[0]
            await marketplace_record.delete_record(self.context)

        # Delete controller connection records.
        controller_records = await ConnectionControllerDetailsRecord.query(
            self.context,
            tag_filter
        )
        if controller_records:
            controller_record = controller_records[0]
            await controller_record.delete_record(self.context)

    async def handle_connections_webhook(
        self,
        body: dict
    ):
        """Handle connections webhook.

        Args:
            body (dict): Connection record.
        """

        # Fetch connection record.
        connection_record: ConnectionRecord = ConnectionRecord.deserialize(body)

        if connection_record.state == ConnectionRecord.STATE_ACTIVE:
            # Save controller details for connection.
            await self.fetch_and_save_controller_details_for_connection(
                connection_record
            )
        if connection_record.state == "delete":
            # Perform cleanup.
            await self.post_connection_delete_actions(connection_record.connection_id)

    async def process_publish_dda_request_message(
        self,
        message: PublishDDAMessage,
        message_receipt: MessageReceipt
    ):
        """Process publish dda request message.

        Args:
            message (PublishDDAMessage): Publish dda request message
            message_receipt (MessageReceipt): Message receipt
        """

        # Connection record.
        connection_record: ConnectionRecord = self.context.connection_record

        # Save a publish DDA record if not existing.
        await PublishedDDATemplateRecord.store_publish_dda_record(
            self.context,
            connection_record.connection_id,
            message.body.dda,
            message.body.connection_url
        )

    async def query_publish_dda_template_records(
        self,
        page: int = 1,
        page_size: int = 10
    ) -> PaginationResult:
        """Query publish DDA template record.

        Returns:
            PaginationResult: Pagination result.
        """

        # Fetch all the published DDA records.
        records = await PublishedDDATemplateRecord.query(self.context, {})

        # Paginate the records.
        pagination_result = paginate_records(records, page=page, page_size=page_size)

        # Return the result.
        return pagination_result

    async def process_delete_dda_message(
        self,
        message: DeleteDDAMessage,
        message_receipt: MessageReceipt
    ):
        """Process delete DDA message.

        Args:
            message (DeleteDDAMessage): Delete DDA message
            message_receipt (MessageReceipt): Message receipt
        """

        # Connection record.
        connection_record: ConnectionRecord = self.context.connection_record

        # Template id.
        template_id = message.body.template_id

        # Delete published DDA template record.
        await PublishedDDATemplateRecord.delete_publish_dda_record(
            self.context,
            connection_record.connection_id,
            template_id
        )

    async def post_delete_dda_template(
        self,
        template_id: str
    ):
        """Post delete dda template record actions.

        Inform the data marketplaces the template is deleted.

        Args:
            template_id (str): Template identifier.
        """

        # Construct delete DDA message.
        message = DeleteDDAMessage(
            body=DeleteDDAModel(
                template_id=template_id
            )
        )

        # Query to find all marketplaces the template is published to.
        tag_filter = {
            "template_id": template_id
        }
        records: typing.List[PublishDDARecord] = await PublishDDARecord.query(
            self.context,
            tag_filter
        )

        mgr = V2ADAManager(self.context)

        # Notify all the marketplaces the template is deleted.
        for record in records:
            await mgr.send_reply_message(
                message,
                record.connection_id
            )

            # Delete publish DDA records.
            await record.delete_record(self.context)

    async def list_dda_published_in_marketplace(
        self,
        page: int = 1,
        page_size: int = 10
    ) -> PaginationResult:
        """List DDAs published in a marketplace.

        Returns:
            PaginationResult: Pagination result.
        """

        # Fetch all publish dda records.
        tag_filter = {}
        records: typing.List[PublishDDARecord] = await PublishDDARecord.query(
            self.context,
            tag_filter
        )

        pagination_result = paginate_records(records, page=page, page_size=page_size)

        return pagination_result

    async def send_list_marketplace_dda_message(
        self,
        connection_id: str
    ) -> PaginationResult:
        """Send list marketplace DDA message.

        Args:
            connection_id (str): Marketplace connection identifier.
        """

        # Retrieve connection record for marketplace connection.
        connection_record = \
            await MarketplaceConnectionRecord.retrieve_connection_record(
                self.context,
                connection_id
            )

        # Construct the list dda message.
        message = ListMarketplaceDDAMessage()

        (sender_verkey, recipient_verkey, message_dict) = \
            await self.send_message_with_return_route_all(
            message,
            connection_record
        )

        # Deserialise the message dict into response message.
        response: ListMarketplaceDDAResponseMessage = \
            ListMarketplaceDDAResponseMessage.deserialize(
                message_dict
            )

        results = response.body.results

        # Pagination result.
        pagination_result = paginate_records(
            results,
            1,
            100000
        )

        return pagination_result

    async def process_list_marketplace_dda_message(
        self,
        message: ListMarketplaceDDAMessage,
        receipt: MessageReceipt
    ):
        """Process list marketplace DDA message.

        Args:
            message (ListMarketplaceDDAMessage): List marketplace DDA message.
            receipt (MessageReceipt): Message receipt.
        """

        # Query published DDAs
        tag_filter = {}
        records: typing.List[PublishedDDATemplateRecord] = \
            await PublishedDDATemplateRecord.query(
            self.context,
            tag_filter
        )

        # Iterate through the records and create DDA results.
        results = []
        for record in records:
            results.append(
                ListMarketplaceDDAResponseModel(
                    dda=record.dda,
                    template_id=record.template_id,
                    industry_sector=record.industry_sector,
                    connection_url=record.connection_url,
                    created_at=record.created_at,
                    updated_at=record.updated_at
                )
            )

        # Construct response message.
        response_message = ListMarketplaceDDAResponseMessage(
            body=ListMarketplaceDDAResponseBody(
                results=results
            )
        )

        # Initialise ADA manager
        mgr = V2ADAManager(self.context)

        # Send response message.
        await mgr.send_reply_message(
            response_message
        )

    async def get_message_class_from_dict(
            self,
            message_dict: dict
    ) -> AgentMessage:
        """Get message class from message dict.

        Args:
            message_dict (dict): Message dict.

        Returns:
            AgentMessage: Agent message.
        """

        # Initialise dispatcher
        dispatcher = Dispatcher(self.context)

        # Get message class.
        msg_class = await dispatcher.make_message(message_dict)

        return msg_class

    async def process_request_dda_message(
        self,
        message: RequestDDAMessage,
        receipt: MessageReceipt
    ):
        """Process request DDA message.

        Args:
            message (RequestDDAMessage): Request DDA message.
            receipt (MessageReceipt): Message receipt.
        """

        # Connection record.
        connection_record: ConnectionRecord = self.context.connection_record

        # Fetch the template record.
        template_id = message.body.template_id

        # Build instance record.
        (dda_instance_record, dda_instance_model) = \
            await DataDisclosureAgreementInstanceRecord.build_instance_from_template(
                self.context,
                template_id,
                connection_record)

        # Fetch customer identification data agreement if available.
        customer_identification_records = await CustomerIdentificationRecord.query(self.context, {})

        if customer_identification_records:
            customer_identification_record: CustomerIdentificationRecord = \
                customer_identification_records[0]

            # Fetch DA template.
            da_template_record: DataAgreementTemplateRecord = \
                await customer_identification_record.data_agreement_template_record(
                    self.context
                )

            # Build dda offer message
            offer_dda_message = OfferDDAMessage(
                body=OfferDDAMessageBodyModel(
                    dda=dda_instance_model,
                    customer_identification=CustomerIdentificationModel(
                        schema_id=da_template_record.schema_id,
                        cred_def_id=da_template_record.cred_def_id
                    )
                )
            )
        else:
            # Build dda offer message
            offer_dda_message = OfferDDAMessage(
                body=OfferDDAMessageBodyModel(
                    dda=dda_instance_model
                )
            )

        mgr = V2ADAManager(self.context)

        await mgr.send_reply_message(
            offer_dda_message,
            connection_record.connection_id
        )

    async def request_dda_offer_from_ds(
        self,
        connection_id: str,
        template_id: str
    ):
        """DUS requests DDA offer from DS.

        Args:
            connection_id (str): Connection ID.
            template_id (str): Template ID.
        """

        # Retreive connection record.
        connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_id(
            self.context,
            connection_id
        )

        # Send DDA request message to DS.
        message = RequestDDAMessage(
            body=RequestDDAModel(
                template_id=template_id
            )
        )

        # Initialise ADA manager.
        mgr = V2ADAManager(self.context)

        await mgr.send_reply_message(
            message,
            connection_record.connection_id
        )

    async def fetch_customer_identification_data_agreement(
        self
    ) -> typing.Union[None, CustomerIdentificationRecord]:
        """Fetch customer identification data agreement.

        Returns:
            typing.Union[None, CustomerIdentificationRecord]: Customer identification record.
        """
        records = await CustomerIdentificationRecord.query(self.context, {})

        return {} if not records else records[0]

    async def configure_customer_identification_data_agreement(
        self,
        da_template_id: str
    ) -> CustomerIdentificationRecord:
        """Configure customer identification data agreement.

        Args:
            da_template_id (str): DA template ID.

        Returns:
            CustomerIdentificationRecord: _description_
        """

        return await CustomerIdentificationRecord.create_or_update_record(
            self.context,
            da_template_id
        )

    async def process_offer_dda_message(
        self,
        message: OfferDDAMessage,
        message_receipt: MessageReceipt
    ):
        """Process offer dda message.

        Args:
            message (OfferDDAMessage): Offer DDA message.
            message_receipt (MessageReceipt): Message receipt.
        """

        (record, instance_model) = \
            await DataDisclosureAgreementInstanceRecord.build_instance_from_dda_offer(
            self.context,
            message,
            self.context.connection_record
        )

        # Construct accept DDA message.
        accept_dda_message = AcceptDDAMessage(
            body=AcceptDDAMessageBodyModel(
                dda=instance_model
            )
        )

        # Initialise the ADA manager
        mgr = V2ADAManager(self.context)

        # Send the message.
        await mgr.send_reply_message(accept_dda_message)

    async def process_accept_dda_message(
        self,
        message: AcceptDDAMessage,
        message_receipt: MessageReceipt
    ):
        """Process accept dda message.

        Args:
            message (AcceptDDAMessage): Accept DDA message.
            message_receipt (MessageReceipt): Message receipt.
        """

        instance_record = \
            await DataDisclosureAgreementInstanceRecord.update_instance_from_dda_accept(
                self.context,
                message
            )

        # Anchor DDA instance to blochain.
        await self.anchor_dda_instance_to_blockchain_async_task(instance_record.instance_id)

    async def add_task(self,
                       context: InjectionContext,
                       coro: typing.Coroutine,
                       task_complete: typing.Callable = None,
                       ident: str = None) -> PendingTask:
        """
        Add a new task to the queue, delaying execution if busy.

        Args:
            context: Injection context to be used.
            coro: The coroutine to run
            task_complete: A callback to run on completion
            ident: A string identifier for the task

        Returns: a future resolving to the asyncio task instance once queued
        """
        loop = asyncio.get_event_loop()
        pack_format: PackWireFormat = await context.inject(BaseWireFormat, required=False)
        return pack_format.task_queue.put(coro, lambda x: loop.create_task(task_complete(x)), ident)

    async def anchor_dda_instance_to_blockchain_async_task(
        self,
        instance_id: str
    ):
        """Async task to anchor DDA instance to blockchain.

        Args:
            instance_id (str): Instance id
        """
        pending_task = await self.add_task(
            self.context,
            self.anchor_dda_instance_to_blockchain(instance_id),
            self.anchor_dda_instance_to_blockchain_async_task_callback
        )
        self._logger.info(pending_task)

    async def anchor_dda_instance_to_blockchain(
        self,
        instance_id: str
    ) -> None:
        """Anchor DDA instance to blockchain.

        Args:
            instance_id (str): Instance id
        """

        eth_client: EthereumClient = await self.context.inject(EthereumClient)

        tag_filter = {
            "instance_id": instance_id
        }

        # Fetch DDA instance record.
        dda_instance_records = await DataDisclosureAgreementInstanceRecord.query(
            self.context,
            tag_filter,
        )

        assert dda_instance_records, "Data agreement instance not found."

        dda_instance_record: DataDisclosureAgreementInstanceRecord = dda_instance_records[0]
        dda_model: DataDisclosureAgreementInstanceModel = \
            DataDisclosureAgreementInstanceModel.deserialize(
                dda_instance_record.data_disclosure_agreement)

        did_mydata_builder = DIDMyDataBuilder(
            artefact=dda_model
        )

        (tx_hash, tx_receipt) = await eth_client.emit_dda_did(
            did_mydata_builder.generate_did(
                "DataDisclosureAgreement"
            )
        )

        return (dda_instance_record.instance_id, did_mydata_builder.mydata_did, tx_hash, tx_receipt)

    async def anchor_dda_instance_to_blockchain_async_task_callback(
        self, *args, **kwargs
    ):
        """Anchor DDA instance to blockchain async task callback function
        """

        # Obtain the completed task.
        completed_task: CompletedTask = args[0]

        # Obtain the results from the task.
        (instance_id, mydata_did, tx_hash, tx_receipt) = completed_task.task.result()

        tag_filter = {
            "instance_id": instance_id
        }

        # Fetch data agreement instance record.
        dda_instance_records = await DataDisclosureAgreementInstanceRecord.query(
            self.context,
            tag_filter,
        )

        assert dda_instance_records, "Data agreement instance not found."

        dda_instance_record: DataDisclosureAgreementInstanceRecord = dda_instance_records[0]

        transaction_receipt = json.loads(to_json(tx_receipt))
        transaction_hash = transaction_receipt.get("transactionHash")

        # Update the data agreement with blockchain metadata.
        dda_instance_record.blink = f"blink:ethereum:rinkeby:{transaction_hash}"
        dda_instance_record.mydata_did = mydata_did
        dda_instance_record.blockchain_receipt = transaction_receipt

        await dda_instance_record.save(self.context)

        # Send negotiation receipt to DUS.
        # Construct negotiation receipt message.
        message = DDANegotiationReceiptMessage(
            body=DDANegotiationReceiptBodyModel(
                instance_id=dda_instance_record.instance_id,
                blockchain_receipt=transaction_receipt,
                blink=f"blink:ethereum:rinkeby:{transaction_hash}",
                mydata_did=mydata_did
            )
        )

        # Fetch connection record.
        connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_id(
            self.context,
            dda_instance_record.connection_id
        )

        # Initialise ADA manager
        mgr = V2ADAManager(self.context)

        # Send message
        await mgr.send_reply_message(
            message,
            connection_record.connection_id
        )

    async def query_dda_instances(
        self,
        instance_id: str,
        template_id: str,
        template_version: str,
        connection_id: str,
        page: int = 1,
        page_size: int = 10
    ) -> PaginationResult:
        """Query DDA instances

        Args:
            instance_id (str): Instance identifier
            template_id (str): Template identifier
            template_version (str): Template version
            connection_id (str): Connection id
            page (int, optional): Page. Defaults to 1.
            page_size (int, optional): Page size. Defaults to 10.

        Returns:
            PaginationResult: Pagination result
        """
        # Query by version is only possible if the template id is provided
        if template_version:
            assert template_id, "Template identifier is required to query by version"

        # Tag filter
        tag_filter = {
            "instance_id": instance_id,
            "template_id": template_id,
            "template_version": template_version,
            "connection_id": connection_id
        }

        tag_filter = drop_none_dict(tag_filter)

        records = await DataDisclosureAgreementInstanceRecord.query(
            context=self.context,
            tag_filter=tag_filter
        )

        records = sorted(records, key=lambda k: k.created_at, reverse=True)

        paginate_result = paginate_records(records, page, page_size)

        return paginate_result

    async def process_dda_negotiation_receipt_message(
        self,
        message: DDANegotiationReceiptMessage,
        message_receipt: MessageReceipt
    ):
        """Process DDA negotiation receipt message.

        Args:
            message (DDANegotiationReceiptMessage): DDA negotiation receipt message.
            message_receipt (MessageReceipt): Message receipt.
        """

        instance_id = message.body.instance_id
        blockchain_receipt = message.body.blockchain_receipt
        blink = message.body.blink
        mydata_did = message.body.mydata_did

        # Fetch the DDA instance record.
        tag_filter = {
            "instance_id": instance_id
        }
        instance_record: DataDisclosureAgreementInstanceRecord = \
            await DataDisclosureAgreementInstanceRecord.retrieve_by_tag_filter(
                self.context,
                tag_filter
            )

        # Update instance record.
        instance_record.blockchain_receipt = blockchain_receipt
        instance_record.blink = blink
        instance_record.mydata_did = mydata_did

        await instance_record.save(self.context)

Classes

class DexaManager (context: aries_cloudagent.config.injection_context.InjectionContext)

Manages Dexa related functions

Initialise Dexa manager

Args

context : InjectionContext
Injection context to be used.
Expand source code
class DexaManager:
    """Manages Dexa related functions"""

    def __init__(self, context: InjectionContext) -> None:
        """Initialise Dexa manager

        Args:
            context (InjectionContext): Injection context to be used.
        """

        # Injection context
        self._context = context

        # Logger
        self._logger = logger

    @property
    def context(self) -> InjectionContext:
        """Accessor for injection context

        Returns:
            InjectionContext: Injection context
        """
        return self._context

    @property
    def logger(self):
        """Accessor for logger."""
        return self._logger

    async def create_and_store_dda_template_in_wallet(
            self,
            dda: dict,
            *,
            publish_flag: bool = True
    ) -> DataDisclosureAgreementTemplateRecord:
        """Create and store dda template in wallet

        Args:
            dda (dict): DDA template.
            publish_flag (bool): Publish flag
            schema_id (str): Schema identifier
        """

        # Temp hack
        template_version = "1.0.0"
        template_id = str(uuid.uuid4())
        dda.update({"@context": DDA_DEFAULT_CONTEXT})
        dda.update({"@id": template_id})
        dda.update({"@type": DDA_TYPE})
        dda.update({"version": template_version})

        # Fetch wallet from context
        wallet: IndyWallet = await self.context.inject(BaseWallet)
        controller_did = await wallet.get_public_did()

        dda["dataController"].update({"did": f"did:sov:{controller_did.did}"})

        # Validate the data agreement.
        dda: DataDisclosureAgreementModel = \
            DataDisclosureAgreementModel.deserialize(
                dda)

        # Hack: Iterate through personal data records and add a unique identifier
        # Todo: Correlating personal data across agreements needs to be done.
        pds = dda.personal_data
        for pd in pds:
            pd.attribute_id = str(uuid.uuid4())

        # Update the personal data with attribute identifiers to the agreement
        dda.personal_data = pds

        # Create template record
        record = DataDisclosureAgreementTemplateRecord(
            template_id=template_id,
            template_version=template_version,
            state=DataDisclosureAgreementTemplateRecord.STATE_DEFINITION,
            data_disclosure_agreement=dda.serialize(),
            industry_sector=dda.data_sharing_restrictions.industry_sector.lower(),
            publish_flag=bool_to_str(publish_flag),
            latest_version_flag=bool_to_str(True)
        )

        await record.save(self.context)

        return record

    async def query_dda_templates_in_wallet(
        self,
        template_id: str = None,
        template_version: str = None,
        industry_sector: str = None,
        publish_flag: str = "false",
        delete_flag: str = "false",
        latest_version_flag: str = "false",
        page: int = 1,
        page_size: int = 10,
    ) -> PaginationResult:
        """Query DA templates in wallet.

        Args:
            template_id (str, optional): Template id. Defaults to None.
            template_version (str, optional): Template version. Defaults to None.
            industry_sector (str, optional): Industry sector. Defaults to None.
            publish_flag (str, optional): Publish flag. Defaults to "false".
            delete_flag (str, optional): Delete flag. Defaults to "false".
            latest_version_flag (str, optional): Latest version flag. Defaults to "false".
            page (int): Page number. Defaults to 1.
            page_size (int): Page size. Defaults to 10.

        Returns:
            PaginationResult: Pagination result
        """

        # Query by version is only possible if the template id is provided
        if template_version:
            assert template_id, "Template identifier is required to query by version"

        # Tag filter
        tag_filter = {
            "template_id": template_id,
            "template_version": template_version,
            "industry_sector": industry_sector.lower() if industry_sector else industry_sector,
            "publish_flag": publish_flag,
            "delete_flag": delete_flag,
            "latest_version_flag": latest_version_flag
        }

        tag_filter = drop_none_dict(tag_filter)

        records = await DataDisclosureAgreementTemplateRecord.query(
            context=self.context,
            tag_filter=tag_filter
        )

        records = sorted(records, key=lambda k: k.created_at, reverse=True)

        paginate_result = paginate_records(records, page, page_size)

        return paginate_result

    async def update_dda_template_in_wallet(
        self,
        template_id: str,
        *,
        dda: dict,
        publish_flag: bool = True,
    ) -> DataDisclosureAgreementTemplateRecord:
        """Update DDA template in wallet.

        Args:
            template_id (str): Template identifier
            publish_flag (bool, optional): Publish flag. Defaults to True.

        Returns:
            DataDisclosureAgreementTemplateRecord: Upgraded template record.
        """

        # Fetch the latest template.
        existing_template = \
            await DataDisclosureAgreementTemplateRecord.latest_template_by_id(
                self.context,
                template_id
            )

        # Upgrade the existing template to next version.
        upgraded = await existing_template.upgrade(
            self.context,
            dda,
            bool_to_str(publish_flag)
        )

        # Post update actions
        if publish_flag:
            await self.post_update_dda_template(
                upgraded
            )

        return upgraded

    async def delete_dda_template_in_wallet(
        self,
        template_id: str
    ):
        """Delete DDA template in wallet.

        Args:
            template_id (str): Template identifier.
        """

        # Fetch the latest template.
        existing_template = \
            await DataDisclosureAgreementTemplateRecord.latest_template_by_id(
                self.context,
                template_id
            )

        assert existing_template, "DDA template not found."

        # Delete template.
        await existing_template.delete_template(self.context)

        # Post delete actions.
        await self.post_delete_dda_template(
            template_id
        )

    async def publish_dda_template_wallet(
        self,
        template_id: str
    ):
        """Publish DDA template in wallet.

        Args:
            template_id (str): Template identifier
        """

        # Fetch the latest template.
        existing_template = \
            await DataDisclosureAgreementTemplateRecord.latest_template_by_id(
                self.context,
                template_id
            )

        await existing_template.publish_template(self.context)

        # Post publish actions.
        await self.post_update_dda_template(existing_template)

    async def send_message_with_return_route_all(
        self,
        message: AgentMessage,
        connection_record: ConnectionRecord
    ) -> typing.Tuple[str, str, dict]:
        """Send message with return route all in transport decorator.

        Args:
            message (AgentMessage): Agent message.
            connection_record (ConnectionRecord): Connection record.

        Returns:
            typing.Tuple[str, str, dict]: sender_verkey, recipient_verkey, message_dict
        """

        # Fetch wallet from context
        wallet: IndyWallet = await self.context.inject(BaseWallet)

        # Get pack format from context
        pack_format: PackWireFormat = await self.context.inject(BaseWireFormat)

        # Add transport decorator
        message._decorators["transport"] = TransportDecorator(
            return_route="all"
        )

        # Initialise connection manager
        connection_manager = ConnectionManager(self.context)

        # Fetch connection targets
        connection_targets = await connection_manager.fetch_connection_targets(connection_record)

        assert len(connection_targets) > 0, "Zero connection targets found."

        connection_target: ConnectionTarget = connection_targets[0]

        # Pack message
        packed_message = await pack_format.pack(
            context=self.context,
            message_json=message.serialize(as_string=True),
            recipient_keys=connection_target.recipient_keys,
            routing_keys=None,
            sender_key=connection_target.sender_key,
        )

        # Headers
        headers = {
            "Content-Type": "application/ssi-agent-wire"
        }

        # Send request and receive response.
        async with aiohttp.ClientSession(headers=headers) as session:
            async with session.post(connection_target.endpoint, data=packed_message) as response:
                # Assert status code is 200
                assert response.status == 200, \
                    f"HTTP request failed with status code {response.status}"

                message_body = await response.read()

                # Unpack message
                unpacked = await wallet.unpack_message(message_body)
                (message_json, sender_verkey, recipient_verkey) = unpacked

                # Convert message to dict.
                message_dict = json.loads(message_json)

                return (sender_verkey, recipient_verkey, message_dict)

    async def add_marketplace_connection(
        self,
        connection_id: str
    ) -> MarketplaceConnectionRecord:
        """Set connection as marketplace.

        Args:
            connection_id (str): Connection identifier.

        Returns:
            MarketplaceConnectionRecord: Marketplace connection record.
        """

        # Connection record.
        connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_id(
            self.context,
            connection_id
        )

        record = await MarketplaceConnectionRecord.set_connection_as_marketplace(
            self.context,
            connection_record.connection_id
        )

        return record

    async def query_marketplace_connections(
        self,
        connection_id: str,
        page: int = 1,
        page_size: int = 10
    ) -> PaginationResult:
        """Query marketplace connections

        Args:
            connection_id (str): Connection identifier
            page (int, optional): Page. Defaults to 1.
            page_size (int, optional): Page size. Defaults to 10.

        Returns:
            PaginationResult: Pagination result
        """

        tag_filter = {"connection_id": connection_id}
        tag_filter = drop_none_dict(tag_filter)

        records = await MarketplaceConnectionRecord.query(self.context, tag_filter)

        pagination_result = paginate_records(records, page, page_size)

        return pagination_result

    async def post_update_dda_template(
        self,
        template_record: DataDisclosureAgreementTemplateRecord
    ):
        """Post update DDA template actions.

        Args:
            template_record (DataDisclosureAgreementTemplateRecord): DDA template record.
        """

        # Find all the marketplace connections.
        # Query to find all marketplaces the template is published to.
        tag_filter = {
            "template_id": template_record.template_id
        }
        records: typing.List[PublishDDARecord] = await PublishDDARecord.query(
            self.context,
            tag_filter
        )

        # Notify all the marketplaces about the update.
        for record in records:
            await self.send_publish_dda_message(
                record,
                template_record
            )

    async def send_publish_dda_message(
        self,
        publish_dda_record: PublishDDARecord,
        template_record: DataDisclosureAgreementTemplateRecord
    ):
        """Send publish DDA message.

        Args:
            publish_dda_record (PublishDDARecord): Publish dda record.
            template_record (DataDisclosureAgreementTemplateRecord): DDA template record.
            connection_id (str): Connection identifier.
        """
        # Create connection invitation
        mgr = V2ADAManager(self.context)
        (connection_record_for_marketplace, connection_invitation_for_marketplace) = \
            await mgr.create_invitation(
            auto_accept=True,
            public=False,
            multi_use=True,
            alias=f"DDA_{template_record.template_id}_QR_{publish_dda_record._id}"
        )

        # Publish dda message
        publish_dda_message = PublishDDAMessage(
            body=PublishDDAModel(
                dda=template_record.dda_model,
                connection_url=connection_invitation_for_marketplace.to_url()
            )
        )

        # Send publish dda message to marketplace connection
        await mgr.send_reply_message(
            publish_dda_message,
            publish_dda_record.connection_id
        )

    async def publish_dda_template_to_marketplace(
        self,
        connection_id: str,
        template_id: str
    ) -> PublishDDARecord:
        """Publish DDA template to marketplace

        Args:
            connection_id (str): Connection ID
            template_id (str): Template ID

        Returns:
            PublishDDARecord: Publish DDA record.
        """

        # Fetch template
        template_record = await DataDisclosureAgreementTemplateRecord.latest_template_by_id(
            self.context,
            template_id
        )

        assert template_record._publish_flag, \
            "DDA must be published locally before published to marketplace."

        # Connection record
        connection_record: ConnectionRecord = \
            await MarketplaceConnectionRecord.retrieve_connection_record(
                self.context,
                connection_id
            )

        # Create Publish DDA record.
        # Publish DDA record is mapping of which template is published in which marketplace.
        publish_dda_record = await PublishDDARecord.store_publish_dda_record(
            self.context,
            connection_record.connection_id,
            template_record.template_id,
            template_record.data_disclosure_agreement
        )

        # Send publish dda message to marketplace.
        await self.send_publish_dda_message(
            publish_dda_record,
            template_record
        )

        return publish_dda_record

    async def fetch_and_save_controller_details_for_connection(
        self,
        connection_record: ConnectionRecord
    ):
        """Fetch and save controller details for connection

        Args:
            connection_record (ConnectionRecord): Connection record
        """
        controller_details_message = DataControllerDetailsMessage()
        (sender_verkey, recipient_verkey, message_dict) = \
            await self.send_message_with_return_route_all(
            controller_details_message,
            connection_record
        )
        # Data controller detail response.
        data_controller_details_response: DataControllerDetailsResponseMessage = \
            DataControllerDetailsResponseMessage.deserialize(
                message_dict
            )

        # Save controller details for a connection.
        await ConnectionControllerDetailsRecord.set_controller_details_for_connection(
            self.context,
            connection_record,
            data_controller_details_response.body
        )

    async def post_connection_delete_actions(
        self,
        connection_id: str
    ):
        """Post connection record delete actions.

        Args:
            connection_id (str): Connection identifier.
        """

        self._logger.info("Performing post delete actions for connection records...")

        # Delete marketplace connection records.
        tag_filter = {"connection_id": connection_id}
        marketplace_records = await MarketplaceConnectionRecord.query(
            self.context,
            tag_filter
        )
        if marketplace_records:
            marketplace_record = marketplace_records[0]
            await marketplace_record.delete_record(self.context)

        # Delete controller connection records.
        controller_records = await ConnectionControllerDetailsRecord.query(
            self.context,
            tag_filter
        )
        if controller_records:
            controller_record = controller_records[0]
            await controller_record.delete_record(self.context)

    async def handle_connections_webhook(
        self,
        body: dict
    ):
        """Handle connections webhook.

        Args:
            body (dict): Connection record.
        """

        # Fetch connection record.
        connection_record: ConnectionRecord = ConnectionRecord.deserialize(body)

        if connection_record.state == ConnectionRecord.STATE_ACTIVE:
            # Save controller details for connection.
            await self.fetch_and_save_controller_details_for_connection(
                connection_record
            )
        if connection_record.state == "delete":
            # Perform cleanup.
            await self.post_connection_delete_actions(connection_record.connection_id)

    async def process_publish_dda_request_message(
        self,
        message: PublishDDAMessage,
        message_receipt: MessageReceipt
    ):
        """Process publish dda request message.

        Args:
            message (PublishDDAMessage): Publish dda request message
            message_receipt (MessageReceipt): Message receipt
        """

        # Connection record.
        connection_record: ConnectionRecord = self.context.connection_record

        # Save a publish DDA record if not existing.
        await PublishedDDATemplateRecord.store_publish_dda_record(
            self.context,
            connection_record.connection_id,
            message.body.dda,
            message.body.connection_url
        )

    async def query_publish_dda_template_records(
        self,
        page: int = 1,
        page_size: int = 10
    ) -> PaginationResult:
        """Query publish DDA template record.

        Returns:
            PaginationResult: Pagination result.
        """

        # Fetch all the published DDA records.
        records = await PublishedDDATemplateRecord.query(self.context, {})

        # Paginate the records.
        pagination_result = paginate_records(records, page=page, page_size=page_size)

        # Return the result.
        return pagination_result

    async def process_delete_dda_message(
        self,
        message: DeleteDDAMessage,
        message_receipt: MessageReceipt
    ):
        """Process delete DDA message.

        Args:
            message (DeleteDDAMessage): Delete DDA message
            message_receipt (MessageReceipt): Message receipt
        """

        # Connection record.
        connection_record: ConnectionRecord = self.context.connection_record

        # Template id.
        template_id = message.body.template_id

        # Delete published DDA template record.
        await PublishedDDATemplateRecord.delete_publish_dda_record(
            self.context,
            connection_record.connection_id,
            template_id
        )

    async def post_delete_dda_template(
        self,
        template_id: str
    ):
        """Post delete dda template record actions.

        Inform the data marketplaces the template is deleted.

        Args:
            template_id (str): Template identifier.
        """

        # Construct delete DDA message.
        message = DeleteDDAMessage(
            body=DeleteDDAModel(
                template_id=template_id
            )
        )

        # Query to find all marketplaces the template is published to.
        tag_filter = {
            "template_id": template_id
        }
        records: typing.List[PublishDDARecord] = await PublishDDARecord.query(
            self.context,
            tag_filter
        )

        mgr = V2ADAManager(self.context)

        # Notify all the marketplaces the template is deleted.
        for record in records:
            await mgr.send_reply_message(
                message,
                record.connection_id
            )

            # Delete publish DDA records.
            await record.delete_record(self.context)

    async def list_dda_published_in_marketplace(
        self,
        page: int = 1,
        page_size: int = 10
    ) -> PaginationResult:
        """List DDAs published in a marketplace.

        Returns:
            PaginationResult: Pagination result.
        """

        # Fetch all publish dda records.
        tag_filter = {}
        records: typing.List[PublishDDARecord] = await PublishDDARecord.query(
            self.context,
            tag_filter
        )

        pagination_result = paginate_records(records, page=page, page_size=page_size)

        return pagination_result

    async def send_list_marketplace_dda_message(
        self,
        connection_id: str
    ) -> PaginationResult:
        """Send list marketplace DDA message.

        Args:
            connection_id (str): Marketplace connection identifier.
        """

        # Retrieve connection record for marketplace connection.
        connection_record = \
            await MarketplaceConnectionRecord.retrieve_connection_record(
                self.context,
                connection_id
            )

        # Construct the list dda message.
        message = ListMarketplaceDDAMessage()

        (sender_verkey, recipient_verkey, message_dict) = \
            await self.send_message_with_return_route_all(
            message,
            connection_record
        )

        # Deserialise the message dict into response message.
        response: ListMarketplaceDDAResponseMessage = \
            ListMarketplaceDDAResponseMessage.deserialize(
                message_dict
            )

        results = response.body.results

        # Pagination result.
        pagination_result = paginate_records(
            results,
            1,
            100000
        )

        return pagination_result

    async def process_list_marketplace_dda_message(
        self,
        message: ListMarketplaceDDAMessage,
        receipt: MessageReceipt
    ):
        """Process list marketplace DDA message.

        Args:
            message (ListMarketplaceDDAMessage): List marketplace DDA message.
            receipt (MessageReceipt): Message receipt.
        """

        # Query published DDAs
        tag_filter = {}
        records: typing.List[PublishedDDATemplateRecord] = \
            await PublishedDDATemplateRecord.query(
            self.context,
            tag_filter
        )

        # Iterate through the records and create DDA results.
        results = []
        for record in records:
            results.append(
                ListMarketplaceDDAResponseModel(
                    dda=record.dda,
                    template_id=record.template_id,
                    industry_sector=record.industry_sector,
                    connection_url=record.connection_url,
                    created_at=record.created_at,
                    updated_at=record.updated_at
                )
            )

        # Construct response message.
        response_message = ListMarketplaceDDAResponseMessage(
            body=ListMarketplaceDDAResponseBody(
                results=results
            )
        )

        # Initialise ADA manager
        mgr = V2ADAManager(self.context)

        # Send response message.
        await mgr.send_reply_message(
            response_message
        )

    async def get_message_class_from_dict(
            self,
            message_dict: dict
    ) -> AgentMessage:
        """Get message class from message dict.

        Args:
            message_dict (dict): Message dict.

        Returns:
            AgentMessage: Agent message.
        """

        # Initialise dispatcher
        dispatcher = Dispatcher(self.context)

        # Get message class.
        msg_class = await dispatcher.make_message(message_dict)

        return msg_class

    async def process_request_dda_message(
        self,
        message: RequestDDAMessage,
        receipt: MessageReceipt
    ):
        """Process request DDA message.

        Args:
            message (RequestDDAMessage): Request DDA message.
            receipt (MessageReceipt): Message receipt.
        """

        # Connection record.
        connection_record: ConnectionRecord = self.context.connection_record

        # Fetch the template record.
        template_id = message.body.template_id

        # Build instance record.
        (dda_instance_record, dda_instance_model) = \
            await DataDisclosureAgreementInstanceRecord.build_instance_from_template(
                self.context,
                template_id,
                connection_record)

        # Fetch customer identification data agreement if available.
        customer_identification_records = await CustomerIdentificationRecord.query(self.context, {})

        if customer_identification_records:
            customer_identification_record: CustomerIdentificationRecord = \
                customer_identification_records[0]

            # Fetch DA template.
            da_template_record: DataAgreementTemplateRecord = \
                await customer_identification_record.data_agreement_template_record(
                    self.context
                )

            # Build dda offer message
            offer_dda_message = OfferDDAMessage(
                body=OfferDDAMessageBodyModel(
                    dda=dda_instance_model,
                    customer_identification=CustomerIdentificationModel(
                        schema_id=da_template_record.schema_id,
                        cred_def_id=da_template_record.cred_def_id
                    )
                )
            )
        else:
            # Build dda offer message
            offer_dda_message = OfferDDAMessage(
                body=OfferDDAMessageBodyModel(
                    dda=dda_instance_model
                )
            )

        mgr = V2ADAManager(self.context)

        await mgr.send_reply_message(
            offer_dda_message,
            connection_record.connection_id
        )

    async def request_dda_offer_from_ds(
        self,
        connection_id: str,
        template_id: str
    ):
        """DUS requests DDA offer from DS.

        Args:
            connection_id (str): Connection ID.
            template_id (str): Template ID.
        """

        # Retreive connection record.
        connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_id(
            self.context,
            connection_id
        )

        # Send DDA request message to DS.
        message = RequestDDAMessage(
            body=RequestDDAModel(
                template_id=template_id
            )
        )

        # Initialise ADA manager.
        mgr = V2ADAManager(self.context)

        await mgr.send_reply_message(
            message,
            connection_record.connection_id
        )

    async def fetch_customer_identification_data_agreement(
        self
    ) -> typing.Union[None, CustomerIdentificationRecord]:
        """Fetch customer identification data agreement.

        Returns:
            typing.Union[None, CustomerIdentificationRecord]: Customer identification record.
        """
        records = await CustomerIdentificationRecord.query(self.context, {})

        return {} if not records else records[0]

    async def configure_customer_identification_data_agreement(
        self,
        da_template_id: str
    ) -> CustomerIdentificationRecord:
        """Configure customer identification data agreement.

        Args:
            da_template_id (str): DA template ID.

        Returns:
            CustomerIdentificationRecord: _description_
        """

        return await CustomerIdentificationRecord.create_or_update_record(
            self.context,
            da_template_id
        )

    async def process_offer_dda_message(
        self,
        message: OfferDDAMessage,
        message_receipt: MessageReceipt
    ):
        """Process offer dda message.

        Args:
            message (OfferDDAMessage): Offer DDA message.
            message_receipt (MessageReceipt): Message receipt.
        """

        (record, instance_model) = \
            await DataDisclosureAgreementInstanceRecord.build_instance_from_dda_offer(
            self.context,
            message,
            self.context.connection_record
        )

        # Construct accept DDA message.
        accept_dda_message = AcceptDDAMessage(
            body=AcceptDDAMessageBodyModel(
                dda=instance_model
            )
        )

        # Initialise the ADA manager
        mgr = V2ADAManager(self.context)

        # Send the message.
        await mgr.send_reply_message(accept_dda_message)

    async def process_accept_dda_message(
        self,
        message: AcceptDDAMessage,
        message_receipt: MessageReceipt
    ):
        """Process accept dda message.

        Args:
            message (AcceptDDAMessage): Accept DDA message.
            message_receipt (MessageReceipt): Message receipt.
        """

        instance_record = \
            await DataDisclosureAgreementInstanceRecord.update_instance_from_dda_accept(
                self.context,
                message
            )

        # Anchor DDA instance to blochain.
        await self.anchor_dda_instance_to_blockchain_async_task(instance_record.instance_id)

    async def add_task(self,
                       context: InjectionContext,
                       coro: typing.Coroutine,
                       task_complete: typing.Callable = None,
                       ident: str = None) -> PendingTask:
        """
        Add a new task to the queue, delaying execution if busy.

        Args:
            context: Injection context to be used.
            coro: The coroutine to run
            task_complete: A callback to run on completion
            ident: A string identifier for the task

        Returns: a future resolving to the asyncio task instance once queued
        """
        loop = asyncio.get_event_loop()
        pack_format: PackWireFormat = await context.inject(BaseWireFormat, required=False)
        return pack_format.task_queue.put(coro, lambda x: loop.create_task(task_complete(x)), ident)

    async def anchor_dda_instance_to_blockchain_async_task(
        self,
        instance_id: str
    ):
        """Async task to anchor DDA instance to blockchain.

        Args:
            instance_id (str): Instance id
        """
        pending_task = await self.add_task(
            self.context,
            self.anchor_dda_instance_to_blockchain(instance_id),
            self.anchor_dda_instance_to_blockchain_async_task_callback
        )
        self._logger.info(pending_task)

    async def anchor_dda_instance_to_blockchain(
        self,
        instance_id: str
    ) -> None:
        """Anchor DDA instance to blockchain.

        Args:
            instance_id (str): Instance id
        """

        eth_client: EthereumClient = await self.context.inject(EthereumClient)

        tag_filter = {
            "instance_id": instance_id
        }

        # Fetch DDA instance record.
        dda_instance_records = await DataDisclosureAgreementInstanceRecord.query(
            self.context,
            tag_filter,
        )

        assert dda_instance_records, "Data agreement instance not found."

        dda_instance_record: DataDisclosureAgreementInstanceRecord = dda_instance_records[0]
        dda_model: DataDisclosureAgreementInstanceModel = \
            DataDisclosureAgreementInstanceModel.deserialize(
                dda_instance_record.data_disclosure_agreement)

        did_mydata_builder = DIDMyDataBuilder(
            artefact=dda_model
        )

        (tx_hash, tx_receipt) = await eth_client.emit_dda_did(
            did_mydata_builder.generate_did(
                "DataDisclosureAgreement"
            )
        )

        return (dda_instance_record.instance_id, did_mydata_builder.mydata_did, tx_hash, tx_receipt)

    async def anchor_dda_instance_to_blockchain_async_task_callback(
        self, *args, **kwargs
    ):
        """Anchor DDA instance to blockchain async task callback function
        """

        # Obtain the completed task.
        completed_task: CompletedTask = args[0]

        # Obtain the results from the task.
        (instance_id, mydata_did, tx_hash, tx_receipt) = completed_task.task.result()

        tag_filter = {
            "instance_id": instance_id
        }

        # Fetch data agreement instance record.
        dda_instance_records = await DataDisclosureAgreementInstanceRecord.query(
            self.context,
            tag_filter,
        )

        assert dda_instance_records, "Data agreement instance not found."

        dda_instance_record: DataDisclosureAgreementInstanceRecord = dda_instance_records[0]

        transaction_receipt = json.loads(to_json(tx_receipt))
        transaction_hash = transaction_receipt.get("transactionHash")

        # Update the data agreement with blockchain metadata.
        dda_instance_record.blink = f"blink:ethereum:rinkeby:{transaction_hash}"
        dda_instance_record.mydata_did = mydata_did
        dda_instance_record.blockchain_receipt = transaction_receipt

        await dda_instance_record.save(self.context)

        # Send negotiation receipt to DUS.
        # Construct negotiation receipt message.
        message = DDANegotiationReceiptMessage(
            body=DDANegotiationReceiptBodyModel(
                instance_id=dda_instance_record.instance_id,
                blockchain_receipt=transaction_receipt,
                blink=f"blink:ethereum:rinkeby:{transaction_hash}",
                mydata_did=mydata_did
            )
        )

        # Fetch connection record.
        connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_id(
            self.context,
            dda_instance_record.connection_id
        )

        # Initialise ADA manager
        mgr = V2ADAManager(self.context)

        # Send message
        await mgr.send_reply_message(
            message,
            connection_record.connection_id
        )

    async def query_dda_instances(
        self,
        instance_id: str,
        template_id: str,
        template_version: str,
        connection_id: str,
        page: int = 1,
        page_size: int = 10
    ) -> PaginationResult:
        """Query DDA instances

        Args:
            instance_id (str): Instance identifier
            template_id (str): Template identifier
            template_version (str): Template version
            connection_id (str): Connection id
            page (int, optional): Page. Defaults to 1.
            page_size (int, optional): Page size. Defaults to 10.

        Returns:
            PaginationResult: Pagination result
        """
        # Query by version is only possible if the template id is provided
        if template_version:
            assert template_id, "Template identifier is required to query by version"

        # Tag filter
        tag_filter = {
            "instance_id": instance_id,
            "template_id": template_id,
            "template_version": template_version,
            "connection_id": connection_id
        }

        tag_filter = drop_none_dict(tag_filter)

        records = await DataDisclosureAgreementInstanceRecord.query(
            context=self.context,
            tag_filter=tag_filter
        )

        records = sorted(records, key=lambda k: k.created_at, reverse=True)

        paginate_result = paginate_records(records, page, page_size)

        return paginate_result

    async def process_dda_negotiation_receipt_message(
        self,
        message: DDANegotiationReceiptMessage,
        message_receipt: MessageReceipt
    ):
        """Process DDA negotiation receipt message.

        Args:
            message (DDANegotiationReceiptMessage): DDA negotiation receipt message.
            message_receipt (MessageReceipt): Message receipt.
        """

        instance_id = message.body.instance_id
        blockchain_receipt = message.body.blockchain_receipt
        blink = message.body.blink
        mydata_did = message.body.mydata_did

        # Fetch the DDA instance record.
        tag_filter = {
            "instance_id": instance_id
        }
        instance_record: DataDisclosureAgreementInstanceRecord = \
            await DataDisclosureAgreementInstanceRecord.retrieve_by_tag_filter(
                self.context,
                tag_filter
            )

        # Update instance record.
        instance_record.blockchain_receipt = blockchain_receipt
        instance_record.blink = blink
        instance_record.mydata_did = mydata_did

        await instance_record.save(self.context)

Instance variables

var context : aries_cloudagent.config.injection_context.InjectionContext

Accessor for injection context

Returns

InjectionContext
Injection context
Expand source code
@property
def context(self) -> InjectionContext:
    """Accessor for injection context

    Returns:
        InjectionContext: Injection context
    """
    return self._context
var logger

Accessor for logger.

Expand source code
@property
def logger(self):
    """Accessor for logger."""
    return self._logger

Methods

async def add_marketplace_connection(self, connection_id: str) ‑> MarketplaceConnectionRecord

Set connection as marketplace.

Args

connection_id : str
Connection identifier.

Returns

MarketplaceConnectionRecord
Marketplace connection record.
Expand source code
async def add_marketplace_connection(
    self,
    connection_id: str
) -> MarketplaceConnectionRecord:
    """Set connection as marketplace.

    Args:
        connection_id (str): Connection identifier.

    Returns:
        MarketplaceConnectionRecord: Marketplace connection record.
    """

    # Connection record.
    connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_id(
        self.context,
        connection_id
    )

    record = await MarketplaceConnectionRecord.set_connection_as_marketplace(
        self.context,
        connection_record.connection_id
    )

    return record
async def add_task(self, context: aries_cloudagent.config.injection_context.InjectionContext, coro: Coroutine[+T_co, -T_contra, +V_co], task_complete: Callable = None, ident: str = None) ‑> aries_cloudagent.utils.task_queue.PendingTask

Add a new task to the queue, delaying execution if busy.

Args

context
Injection context to be used.
coro
The coroutine to run
task_complete
A callback to run on completion
ident
A string identifier for the task

Returns: a future resolving to the asyncio task instance once queued

Expand source code
async def add_task(self,
                   context: InjectionContext,
                   coro: typing.Coroutine,
                   task_complete: typing.Callable = None,
                   ident: str = None) -> PendingTask:
    """
    Add a new task to the queue, delaying execution if busy.

    Args:
        context: Injection context to be used.
        coro: The coroutine to run
        task_complete: A callback to run on completion
        ident: A string identifier for the task

    Returns: a future resolving to the asyncio task instance once queued
    """
    loop = asyncio.get_event_loop()
    pack_format: PackWireFormat = await context.inject(BaseWireFormat, required=False)
    return pack_format.task_queue.put(coro, lambda x: loop.create_task(task_complete(x)), ident)
async def anchor_dda_instance_to_blockchain(self, instance_id: str) ‑> None

Anchor DDA instance to blockchain.

Args

instance_id : str
Instance id
Expand source code
async def anchor_dda_instance_to_blockchain(
    self,
    instance_id: str
) -> None:
    """Anchor DDA instance to blockchain.

    Args:
        instance_id (str): Instance id
    """

    eth_client: EthereumClient = await self.context.inject(EthereumClient)

    tag_filter = {
        "instance_id": instance_id
    }

    # Fetch DDA instance record.
    dda_instance_records = await DataDisclosureAgreementInstanceRecord.query(
        self.context,
        tag_filter,
    )

    assert dda_instance_records, "Data agreement instance not found."

    dda_instance_record: DataDisclosureAgreementInstanceRecord = dda_instance_records[0]
    dda_model: DataDisclosureAgreementInstanceModel = \
        DataDisclosureAgreementInstanceModel.deserialize(
            dda_instance_record.data_disclosure_agreement)

    did_mydata_builder = DIDMyDataBuilder(
        artefact=dda_model
    )

    (tx_hash, tx_receipt) = await eth_client.emit_dda_did(
        did_mydata_builder.generate_did(
            "DataDisclosureAgreement"
        )
    )

    return (dda_instance_record.instance_id, did_mydata_builder.mydata_did, tx_hash, tx_receipt)
async def anchor_dda_instance_to_blockchain_async_task(self, instance_id: str)

Async task to anchor DDA instance to blockchain.

Args

instance_id : str
Instance id
Expand source code
async def anchor_dda_instance_to_blockchain_async_task(
    self,
    instance_id: str
):
    """Async task to anchor DDA instance to blockchain.

    Args:
        instance_id (str): Instance id
    """
    pending_task = await self.add_task(
        self.context,
        self.anchor_dda_instance_to_blockchain(instance_id),
        self.anchor_dda_instance_to_blockchain_async_task_callback
    )
    self._logger.info(pending_task)
async def anchor_dda_instance_to_blockchain_async_task_callback(self, *args, **kwargs)

Anchor DDA instance to blockchain async task callback function

Expand source code
async def anchor_dda_instance_to_blockchain_async_task_callback(
    self, *args, **kwargs
):
    """Anchor DDA instance to blockchain async task callback function
    """

    # Obtain the completed task.
    completed_task: CompletedTask = args[0]

    # Obtain the results from the task.
    (instance_id, mydata_did, tx_hash, tx_receipt) = completed_task.task.result()

    tag_filter = {
        "instance_id": instance_id
    }

    # Fetch data agreement instance record.
    dda_instance_records = await DataDisclosureAgreementInstanceRecord.query(
        self.context,
        tag_filter,
    )

    assert dda_instance_records, "Data agreement instance not found."

    dda_instance_record: DataDisclosureAgreementInstanceRecord = dda_instance_records[0]

    transaction_receipt = json.loads(to_json(tx_receipt))
    transaction_hash = transaction_receipt.get("transactionHash")

    # Update the data agreement with blockchain metadata.
    dda_instance_record.blink = f"blink:ethereum:rinkeby:{transaction_hash}"
    dda_instance_record.mydata_did = mydata_did
    dda_instance_record.blockchain_receipt = transaction_receipt

    await dda_instance_record.save(self.context)

    # Send negotiation receipt to DUS.
    # Construct negotiation receipt message.
    message = DDANegotiationReceiptMessage(
        body=DDANegotiationReceiptBodyModel(
            instance_id=dda_instance_record.instance_id,
            blockchain_receipt=transaction_receipt,
            blink=f"blink:ethereum:rinkeby:{transaction_hash}",
            mydata_did=mydata_did
        )
    )

    # Fetch connection record.
    connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_id(
        self.context,
        dda_instance_record.connection_id
    )

    # Initialise ADA manager
    mgr = V2ADAManager(self.context)

    # Send message
    await mgr.send_reply_message(
        message,
        connection_record.connection_id
    )
async def configure_customer_identification_data_agreement(self, da_template_id: str) ‑> CustomerIdentificationRecord

Configure customer identification data agreement.

Args

da_template_id : str
DA template ID.

Returns

CustomerIdentificationRecord
description
Expand source code
async def configure_customer_identification_data_agreement(
    self,
    da_template_id: str
) -> CustomerIdentificationRecord:
    """Configure customer identification data agreement.

    Args:
        da_template_id (str): DA template ID.

    Returns:
        CustomerIdentificationRecord: _description_
    """

    return await CustomerIdentificationRecord.create_or_update_record(
        self.context,
        da_template_id
    )
async def create_and_store_dda_template_in_wallet(self, dda: dict, *, publish_flag: bool = True) ‑> DataDisclosureAgreementTemplateRecord

Create and store dda template in wallet

Args

dda : dict
DDA template.
publish_flag : bool
Publish flag
schema_id : str
Schema identifier
Expand source code
async def create_and_store_dda_template_in_wallet(
        self,
        dda: dict,
        *,
        publish_flag: bool = True
) -> DataDisclosureAgreementTemplateRecord:
    """Create and store dda template in wallet

    Args:
        dda (dict): DDA template.
        publish_flag (bool): Publish flag
        schema_id (str): Schema identifier
    """

    # Temp hack
    template_version = "1.0.0"
    template_id = str(uuid.uuid4())
    dda.update({"@context": DDA_DEFAULT_CONTEXT})
    dda.update({"@id": template_id})
    dda.update({"@type": DDA_TYPE})
    dda.update({"version": template_version})

    # Fetch wallet from context
    wallet: IndyWallet = await self.context.inject(BaseWallet)
    controller_did = await wallet.get_public_did()

    dda["dataController"].update({"did": f"did:sov:{controller_did.did}"})

    # Validate the data agreement.
    dda: DataDisclosureAgreementModel = \
        DataDisclosureAgreementModel.deserialize(
            dda)

    # Hack: Iterate through personal data records and add a unique identifier
    # Todo: Correlating personal data across agreements needs to be done.
    pds = dda.personal_data
    for pd in pds:
        pd.attribute_id = str(uuid.uuid4())

    # Update the personal data with attribute identifiers to the agreement
    dda.personal_data = pds

    # Create template record
    record = DataDisclosureAgreementTemplateRecord(
        template_id=template_id,
        template_version=template_version,
        state=DataDisclosureAgreementTemplateRecord.STATE_DEFINITION,
        data_disclosure_agreement=dda.serialize(),
        industry_sector=dda.data_sharing_restrictions.industry_sector.lower(),
        publish_flag=bool_to_str(publish_flag),
        latest_version_flag=bool_to_str(True)
    )

    await record.save(self.context)

    return record
async def delete_dda_template_in_wallet(self, template_id: str)

Delete DDA template in wallet.

Args

template_id : str
Template identifier.
Expand source code
async def delete_dda_template_in_wallet(
    self,
    template_id: str
):
    """Delete DDA template in wallet.

    Args:
        template_id (str): Template identifier.
    """

    # Fetch the latest template.
    existing_template = \
        await DataDisclosureAgreementTemplateRecord.latest_template_by_id(
            self.context,
            template_id
        )

    assert existing_template, "DDA template not found."

    # Delete template.
    await existing_template.delete_template(self.context)

    # Post delete actions.
    await self.post_delete_dda_template(
        template_id
    )
async def fetch_and_save_controller_details_for_connection(self, connection_record: aries_cloudagent.connections.models.connection_record.ConnectionRecord)

Fetch and save controller details for connection

Args

connection_record : ConnectionRecord
Connection record
Expand source code
async def fetch_and_save_controller_details_for_connection(
    self,
    connection_record: ConnectionRecord
):
    """Fetch and save controller details for connection

    Args:
        connection_record (ConnectionRecord): Connection record
    """
    controller_details_message = DataControllerDetailsMessage()
    (sender_verkey, recipient_verkey, message_dict) = \
        await self.send_message_with_return_route_all(
        controller_details_message,
        connection_record
    )
    # Data controller detail response.
    data_controller_details_response: DataControllerDetailsResponseMessage = \
        DataControllerDetailsResponseMessage.deserialize(
            message_dict
        )

    # Save controller details for a connection.
    await ConnectionControllerDetailsRecord.set_controller_details_for_connection(
        self.context,
        connection_record,
        data_controller_details_response.body
    )
async def fetch_customer_identification_data_agreement(self) ‑> Optional[None]

Fetch customer identification data agreement.

Returns

typing.Union[None, CustomerIdentificationRecord]
Customer identification record.
Expand source code
async def fetch_customer_identification_data_agreement(
    self
) -> typing.Union[None, CustomerIdentificationRecord]:
    """Fetch customer identification data agreement.

    Returns:
        typing.Union[None, CustomerIdentificationRecord]: Customer identification record.
    """
    records = await CustomerIdentificationRecord.query(self.context, {})

    return {} if not records else records[0]
async def get_message_class_from_dict(self, message_dict: dict) ‑> aries_cloudagent.messaging.agent_message.AgentMessage

Get message class from message dict.

Args

message_dict : dict
Message dict.

Returns

AgentMessage
Agent message.
Expand source code
async def get_message_class_from_dict(
        self,
        message_dict: dict
) -> AgentMessage:
    """Get message class from message dict.

    Args:
        message_dict (dict): Message dict.

    Returns:
        AgentMessage: Agent message.
    """

    # Initialise dispatcher
    dispatcher = Dispatcher(self.context)

    # Get message class.
    msg_class = await dispatcher.make_message(message_dict)

    return msg_class
async def handle_connections_webhook(self, body: dict)

Handle connections webhook.

Args

body : dict
Connection record.
Expand source code
async def handle_connections_webhook(
    self,
    body: dict
):
    """Handle connections webhook.

    Args:
        body (dict): Connection record.
    """

    # Fetch connection record.
    connection_record: ConnectionRecord = ConnectionRecord.deserialize(body)

    if connection_record.state == ConnectionRecord.STATE_ACTIVE:
        # Save controller details for connection.
        await self.fetch_and_save_controller_details_for_connection(
            connection_record
        )
    if connection_record.state == "delete":
        # Perform cleanup.
        await self.post_connection_delete_actions(connection_record.connection_id)
async def list_dda_published_in_marketplace(self, page: int = 1, page_size: int = 10) ‑> PaginationResult

List DDAs published in a marketplace.

Returns

PaginationResult
Pagination result.
Expand source code
async def list_dda_published_in_marketplace(
    self,
    page: int = 1,
    page_size: int = 10
) -> PaginationResult:
    """List DDAs published in a marketplace.

    Returns:
        PaginationResult: Pagination result.
    """

    # Fetch all publish dda records.
    tag_filter = {}
    records: typing.List[PublishDDARecord] = await PublishDDARecord.query(
        self.context,
        tag_filter
    )

    pagination_result = paginate_records(records, page=page, page_size=page_size)

    return pagination_result
async def post_connection_delete_actions(self, connection_id: str)

Post connection record delete actions.

Args

connection_id : str
Connection identifier.
Expand source code
async def post_connection_delete_actions(
    self,
    connection_id: str
):
    """Post connection record delete actions.

    Args:
        connection_id (str): Connection identifier.
    """

    self._logger.info("Performing post delete actions for connection records...")

    # Delete marketplace connection records.
    tag_filter = {"connection_id": connection_id}
    marketplace_records = await MarketplaceConnectionRecord.query(
        self.context,
        tag_filter
    )
    if marketplace_records:
        marketplace_record = marketplace_records[0]
        await marketplace_record.delete_record(self.context)

    # Delete controller connection records.
    controller_records = await ConnectionControllerDetailsRecord.query(
        self.context,
        tag_filter
    )
    if controller_records:
        controller_record = controller_records[0]
        await controller_record.delete_record(self.context)
async def post_delete_dda_template(self, template_id: str)

Post delete dda template record actions.

Inform the data marketplaces the template is deleted.

Args

template_id : str
Template identifier.
Expand source code
async def post_delete_dda_template(
    self,
    template_id: str
):
    """Post delete dda template record actions.

    Inform the data marketplaces the template is deleted.

    Args:
        template_id (str): Template identifier.
    """

    # Construct delete DDA message.
    message = DeleteDDAMessage(
        body=DeleteDDAModel(
            template_id=template_id
        )
    )

    # Query to find all marketplaces the template is published to.
    tag_filter = {
        "template_id": template_id
    }
    records: typing.List[PublishDDARecord] = await PublishDDARecord.query(
        self.context,
        tag_filter
    )

    mgr = V2ADAManager(self.context)

    # Notify all the marketplaces the template is deleted.
    for record in records:
        await mgr.send_reply_message(
            message,
            record.connection_id
        )

        # Delete publish DDA records.
        await record.delete_record(self.context)
async def post_update_dda_template(self, template_record: DataDisclosureAgreementTemplateRecord)

Post update DDA template actions.

Args

template_record : DataDisclosureAgreementTemplateRecord
DDA template record.
Expand source code
async def post_update_dda_template(
    self,
    template_record: DataDisclosureAgreementTemplateRecord
):
    """Post update DDA template actions.

    Args:
        template_record (DataDisclosureAgreementTemplateRecord): DDA template record.
    """

    # Find all the marketplace connections.
    # Query to find all marketplaces the template is published to.
    tag_filter = {
        "template_id": template_record.template_id
    }
    records: typing.List[PublishDDARecord] = await PublishDDARecord.query(
        self.context,
        tag_filter
    )

    # Notify all the marketplaces about the update.
    for record in records:
        await self.send_publish_dda_message(
            record,
            template_record
        )
async def process_accept_dda_message(self, message: dexa_protocol.v1_0.messages.negotiation.accept_dda.AcceptDDAMessage, message_receipt: aries_cloudagent.transport.inbound.receipt.MessageReceipt)

Process accept dda message.

Args

message : AcceptDDAMessage
Accept DDA message.
message_receipt : MessageReceipt
Message receipt.
Expand source code
async def process_accept_dda_message(
    self,
    message: AcceptDDAMessage,
    message_receipt: MessageReceipt
):
    """Process accept dda message.

    Args:
        message (AcceptDDAMessage): Accept DDA message.
        message_receipt (MessageReceipt): Message receipt.
    """

    instance_record = \
        await DataDisclosureAgreementInstanceRecord.update_instance_from_dda_accept(
            self.context,
            message
        )

    # Anchor DDA instance to blochain.
    await self.anchor_dda_instance_to_blockchain_async_task(instance_record.instance_id)
async def process_dda_negotiation_receipt_message(self, message: dexa_protocol.v1_0.messages.negotiation.dda_negotiation_receipt.DDANegotiationReceiptMessage, message_receipt: aries_cloudagent.transport.inbound.receipt.MessageReceipt)

Process DDA negotiation receipt message.

Args

message : DDANegotiationReceiptMessage
DDA negotiation receipt message.
message_receipt : MessageReceipt
Message receipt.
Expand source code
async def process_dda_negotiation_receipt_message(
    self,
    message: DDANegotiationReceiptMessage,
    message_receipt: MessageReceipt
):
    """Process DDA negotiation receipt message.

    Args:
        message (DDANegotiationReceiptMessage): DDA negotiation receipt message.
        message_receipt (MessageReceipt): Message receipt.
    """

    instance_id = message.body.instance_id
    blockchain_receipt = message.body.blockchain_receipt
    blink = message.body.blink
    mydata_did = message.body.mydata_did

    # Fetch the DDA instance record.
    tag_filter = {
        "instance_id": instance_id
    }
    instance_record: DataDisclosureAgreementInstanceRecord = \
        await DataDisclosureAgreementInstanceRecord.retrieve_by_tag_filter(
            self.context,
            tag_filter
        )

    # Update instance record.
    instance_record.blockchain_receipt = blockchain_receipt
    instance_record.blink = blink
    instance_record.mydata_did = mydata_did

    await instance_record.save(self.context)
async def process_delete_dda_message(self, message: dexa_protocol.v1_0.messages.marketplace.delete_dda.DeleteDDAMessage, message_receipt: aries_cloudagent.transport.inbound.receipt.MessageReceipt)

Process delete DDA message.

Args

message : DeleteDDAMessage
Delete DDA message
message_receipt : MessageReceipt
Message receipt
Expand source code
async def process_delete_dda_message(
    self,
    message: DeleteDDAMessage,
    message_receipt: MessageReceipt
):
    """Process delete DDA message.

    Args:
        message (DeleteDDAMessage): Delete DDA message
        message_receipt (MessageReceipt): Message receipt
    """

    # Connection record.
    connection_record: ConnectionRecord = self.context.connection_record

    # Template id.
    template_id = message.body.template_id

    # Delete published DDA template record.
    await PublishedDDATemplateRecord.delete_publish_dda_record(
        self.context,
        connection_record.connection_id,
        template_id
    )
async def process_list_marketplace_dda_message(self, message: dexa_protocol.v1_0.messages.marketplace.list_marketplace_dda.ListMarketplaceDDAMessage, receipt: aries_cloudagent.transport.inbound.receipt.MessageReceipt)

Process list marketplace DDA message.

Args

message : ListMarketplaceDDAMessage
List marketplace DDA message.
receipt : MessageReceipt
Message receipt.
Expand source code
async def process_list_marketplace_dda_message(
    self,
    message: ListMarketplaceDDAMessage,
    receipt: MessageReceipt
):
    """Process list marketplace DDA message.

    Args:
        message (ListMarketplaceDDAMessage): List marketplace DDA message.
        receipt (MessageReceipt): Message receipt.
    """

    # Query published DDAs
    tag_filter = {}
    records: typing.List[PublishedDDATemplateRecord] = \
        await PublishedDDATemplateRecord.query(
        self.context,
        tag_filter
    )

    # Iterate through the records and create DDA results.
    results = []
    for record in records:
        results.append(
            ListMarketplaceDDAResponseModel(
                dda=record.dda,
                template_id=record.template_id,
                industry_sector=record.industry_sector,
                connection_url=record.connection_url,
                created_at=record.created_at,
                updated_at=record.updated_at
            )
        )

    # Construct response message.
    response_message = ListMarketplaceDDAResponseMessage(
        body=ListMarketplaceDDAResponseBody(
            results=results
        )
    )

    # Initialise ADA manager
    mgr = V2ADAManager(self.context)

    # Send response message.
    await mgr.send_reply_message(
        response_message
    )
async def process_offer_dda_message(self, message: dexa_protocol.v1_0.messages.negotiation.offer_dda.OfferDDAMessage, message_receipt: aries_cloudagent.transport.inbound.receipt.MessageReceipt)

Process offer dda message.

Args

message : OfferDDAMessage
Offer DDA message.
message_receipt : MessageReceipt
Message receipt.
Expand source code
async def process_offer_dda_message(
    self,
    message: OfferDDAMessage,
    message_receipt: MessageReceipt
):
    """Process offer dda message.

    Args:
        message (OfferDDAMessage): Offer DDA message.
        message_receipt (MessageReceipt): Message receipt.
    """

    (record, instance_model) = \
        await DataDisclosureAgreementInstanceRecord.build_instance_from_dda_offer(
        self.context,
        message,
        self.context.connection_record
    )

    # Construct accept DDA message.
    accept_dda_message = AcceptDDAMessage(
        body=AcceptDDAMessageBodyModel(
            dda=instance_model
        )
    )

    # Initialise the ADA manager
    mgr = V2ADAManager(self.context)

    # Send the message.
    await mgr.send_reply_message(accept_dda_message)
async def process_publish_dda_request_message(self, message: dexa_protocol.v1_0.messages.marketplace.publish_dda.PublishDDAMessage, message_receipt: aries_cloudagent.transport.inbound.receipt.MessageReceipt)

Process publish dda request message.

Args

message : PublishDDAMessage
Publish dda request message
message_receipt : MessageReceipt
Message receipt
Expand source code
async def process_publish_dda_request_message(
    self,
    message: PublishDDAMessage,
    message_receipt: MessageReceipt
):
    """Process publish dda request message.

    Args:
        message (PublishDDAMessage): Publish dda request message
        message_receipt (MessageReceipt): Message receipt
    """

    # Connection record.
    connection_record: ConnectionRecord = self.context.connection_record

    # Save a publish DDA record if not existing.
    await PublishedDDATemplateRecord.store_publish_dda_record(
        self.context,
        connection_record.connection_id,
        message.body.dda,
        message.body.connection_url
    )
async def process_request_dda_message(self, message: dexa_protocol.v1_0.messages.negotiation.request_dda.RequestDDAMessage, receipt: aries_cloudagent.transport.inbound.receipt.MessageReceipt)

Process request DDA message.

Args

message : RequestDDAMessage
Request DDA message.
receipt : MessageReceipt
Message receipt.
Expand source code
async def process_request_dda_message(
    self,
    message: RequestDDAMessage,
    receipt: MessageReceipt
):
    """Process request DDA message.

    Args:
        message (RequestDDAMessage): Request DDA message.
        receipt (MessageReceipt): Message receipt.
    """

    # Connection record.
    connection_record: ConnectionRecord = self.context.connection_record

    # Fetch the template record.
    template_id = message.body.template_id

    # Build instance record.
    (dda_instance_record, dda_instance_model) = \
        await DataDisclosureAgreementInstanceRecord.build_instance_from_template(
            self.context,
            template_id,
            connection_record)

    # Fetch customer identification data agreement if available.
    customer_identification_records = await CustomerIdentificationRecord.query(self.context, {})

    if customer_identification_records:
        customer_identification_record: CustomerIdentificationRecord = \
            customer_identification_records[0]

        # Fetch DA template.
        da_template_record: DataAgreementTemplateRecord = \
            await customer_identification_record.data_agreement_template_record(
                self.context
            )

        # Build dda offer message
        offer_dda_message = OfferDDAMessage(
            body=OfferDDAMessageBodyModel(
                dda=dda_instance_model,
                customer_identification=CustomerIdentificationModel(
                    schema_id=da_template_record.schema_id,
                    cred_def_id=da_template_record.cred_def_id
                )
            )
        )
    else:
        # Build dda offer message
        offer_dda_message = OfferDDAMessage(
            body=OfferDDAMessageBodyModel(
                dda=dda_instance_model
            )
        )

    mgr = V2ADAManager(self.context)

    await mgr.send_reply_message(
        offer_dda_message,
        connection_record.connection_id
    )
async def publish_dda_template_to_marketplace(self, connection_id: str, template_id: str) ‑> PublishDDARecord

Publish DDA template to marketplace

Args

connection_id : str
Connection ID
template_id : str
Template ID

Returns

PublishDDARecord
Publish DDA record.
Expand source code
async def publish_dda_template_to_marketplace(
    self,
    connection_id: str,
    template_id: str
) -> PublishDDARecord:
    """Publish DDA template to marketplace

    Args:
        connection_id (str): Connection ID
        template_id (str): Template ID

    Returns:
        PublishDDARecord: Publish DDA record.
    """

    # Fetch template
    template_record = await DataDisclosureAgreementTemplateRecord.latest_template_by_id(
        self.context,
        template_id
    )

    assert template_record._publish_flag, \
        "DDA must be published locally before published to marketplace."

    # Connection record
    connection_record: ConnectionRecord = \
        await MarketplaceConnectionRecord.retrieve_connection_record(
            self.context,
            connection_id
        )

    # Create Publish DDA record.
    # Publish DDA record is mapping of which template is published in which marketplace.
    publish_dda_record = await PublishDDARecord.store_publish_dda_record(
        self.context,
        connection_record.connection_id,
        template_record.template_id,
        template_record.data_disclosure_agreement
    )

    # Send publish dda message to marketplace.
    await self.send_publish_dda_message(
        publish_dda_record,
        template_record
    )

    return publish_dda_record
async def publish_dda_template_wallet(self, template_id: str)

Publish DDA template in wallet.

Args

template_id : str
Template identifier
Expand source code
async def publish_dda_template_wallet(
    self,
    template_id: str
):
    """Publish DDA template in wallet.

    Args:
        template_id (str): Template identifier
    """

    # Fetch the latest template.
    existing_template = \
        await DataDisclosureAgreementTemplateRecord.latest_template_by_id(
            self.context,
            template_id
        )

    await existing_template.publish_template(self.context)

    # Post publish actions.
    await self.post_update_dda_template(existing_template)
async def query_dda_instances(self, instance_id: str, template_id: str, template_version: str, connection_id: str, page: int = 1, page_size: int = 10) ‑> PaginationResult

Query DDA instances

Args

instance_id : str
Instance identifier
template_id : str
Template identifier
template_version : str
Template version
connection_id : str
Connection id
page : int, optional
Page. Defaults to 1.
page_size : int, optional
Page size. Defaults to 10.

Returns

PaginationResult
Pagination result
Expand source code
async def query_dda_instances(
    self,
    instance_id: str,
    template_id: str,
    template_version: str,
    connection_id: str,
    page: int = 1,
    page_size: int = 10
) -> PaginationResult:
    """Query DDA instances

    Args:
        instance_id (str): Instance identifier
        template_id (str): Template identifier
        template_version (str): Template version
        connection_id (str): Connection id
        page (int, optional): Page. Defaults to 1.
        page_size (int, optional): Page size. Defaults to 10.

    Returns:
        PaginationResult: Pagination result
    """
    # Query by version is only possible if the template id is provided
    if template_version:
        assert template_id, "Template identifier is required to query by version"

    # Tag filter
    tag_filter = {
        "instance_id": instance_id,
        "template_id": template_id,
        "template_version": template_version,
        "connection_id": connection_id
    }

    tag_filter = drop_none_dict(tag_filter)

    records = await DataDisclosureAgreementInstanceRecord.query(
        context=self.context,
        tag_filter=tag_filter
    )

    records = sorted(records, key=lambda k: k.created_at, reverse=True)

    paginate_result = paginate_records(records, page, page_size)

    return paginate_result
async def query_dda_templates_in_wallet(self, template_id: str = None, template_version: str = None, industry_sector: str = None, publish_flag: str = 'false', delete_flag: str = 'false', latest_version_flag: str = 'false', page: int = 1, page_size: int = 10) ‑> PaginationResult

Query DA templates in wallet.

Args

template_id : str, optional
Template id. Defaults to None.
template_version : str, optional
Template version. Defaults to None.
industry_sector : str, optional
Industry sector. Defaults to None.
publish_flag : str, optional
Publish flag. Defaults to "false".
delete_flag : str, optional
Delete flag. Defaults to "false".
latest_version_flag : str, optional
Latest version flag. Defaults to "false".
page : int
Page number. Defaults to 1.
page_size : int
Page size. Defaults to 10.

Returns

PaginationResult
Pagination result
Expand source code
async def query_dda_templates_in_wallet(
    self,
    template_id: str = None,
    template_version: str = None,
    industry_sector: str = None,
    publish_flag: str = "false",
    delete_flag: str = "false",
    latest_version_flag: str = "false",
    page: int = 1,
    page_size: int = 10,
) -> PaginationResult:
    """Query DA templates in wallet.

    Args:
        template_id (str, optional): Template id. Defaults to None.
        template_version (str, optional): Template version. Defaults to None.
        industry_sector (str, optional): Industry sector. Defaults to None.
        publish_flag (str, optional): Publish flag. Defaults to "false".
        delete_flag (str, optional): Delete flag. Defaults to "false".
        latest_version_flag (str, optional): Latest version flag. Defaults to "false".
        page (int): Page number. Defaults to 1.
        page_size (int): Page size. Defaults to 10.

    Returns:
        PaginationResult: Pagination result
    """

    # Query by version is only possible if the template id is provided
    if template_version:
        assert template_id, "Template identifier is required to query by version"

    # Tag filter
    tag_filter = {
        "template_id": template_id,
        "template_version": template_version,
        "industry_sector": industry_sector.lower() if industry_sector else industry_sector,
        "publish_flag": publish_flag,
        "delete_flag": delete_flag,
        "latest_version_flag": latest_version_flag
    }

    tag_filter = drop_none_dict(tag_filter)

    records = await DataDisclosureAgreementTemplateRecord.query(
        context=self.context,
        tag_filter=tag_filter
    )

    records = sorted(records, key=lambda k: k.created_at, reverse=True)

    paginate_result = paginate_records(records, page, page_size)

    return paginate_result
async def query_marketplace_connections(self, connection_id: str, page: int = 1, page_size: int = 10) ‑> PaginationResult

Query marketplace connections

Args

connection_id : str
Connection identifier
page : int, optional
Page. Defaults to 1.
page_size : int, optional
Page size. Defaults to 10.

Returns

PaginationResult
Pagination result
Expand source code
async def query_marketplace_connections(
    self,
    connection_id: str,
    page: int = 1,
    page_size: int = 10
) -> PaginationResult:
    """Query marketplace connections

    Args:
        connection_id (str): Connection identifier
        page (int, optional): Page. Defaults to 1.
        page_size (int, optional): Page size. Defaults to 10.

    Returns:
        PaginationResult: Pagination result
    """

    tag_filter = {"connection_id": connection_id}
    tag_filter = drop_none_dict(tag_filter)

    records = await MarketplaceConnectionRecord.query(self.context, tag_filter)

    pagination_result = paginate_records(records, page, page_size)

    return pagination_result
async def query_publish_dda_template_records(self, page: int = 1, page_size: int = 10) ‑> PaginationResult

Query publish DDA template record.

Returns

PaginationResult
Pagination result.
Expand source code
async def query_publish_dda_template_records(
    self,
    page: int = 1,
    page_size: int = 10
) -> PaginationResult:
    """Query publish DDA template record.

    Returns:
        PaginationResult: Pagination result.
    """

    # Fetch all the published DDA records.
    records = await PublishedDDATemplateRecord.query(self.context, {})

    # Paginate the records.
    pagination_result = paginate_records(records, page=page, page_size=page_size)

    # Return the result.
    return pagination_result
async def request_dda_offer_from_ds(self, connection_id: str, template_id: str)

DUS requests DDA offer from DS.

Args

connection_id : str
Connection ID.
template_id : str
Template ID.
Expand source code
async def request_dda_offer_from_ds(
    self,
    connection_id: str,
    template_id: str
):
    """DUS requests DDA offer from DS.

    Args:
        connection_id (str): Connection ID.
        template_id (str): Template ID.
    """

    # Retreive connection record.
    connection_record: ConnectionRecord = await ConnectionRecord.retrieve_by_id(
        self.context,
        connection_id
    )

    # Send DDA request message to DS.
    message = RequestDDAMessage(
        body=RequestDDAModel(
            template_id=template_id
        )
    )

    # Initialise ADA manager.
    mgr = V2ADAManager(self.context)

    await mgr.send_reply_message(
        message,
        connection_record.connection_id
    )
async def send_list_marketplace_dda_message(self, connection_id: str) ‑> PaginationResult

Send list marketplace DDA message.

Args

connection_id : str
Marketplace connection identifier.
Expand source code
async def send_list_marketplace_dda_message(
    self,
    connection_id: str
) -> PaginationResult:
    """Send list marketplace DDA message.

    Args:
        connection_id (str): Marketplace connection identifier.
    """

    # Retrieve connection record for marketplace connection.
    connection_record = \
        await MarketplaceConnectionRecord.retrieve_connection_record(
            self.context,
            connection_id
        )

    # Construct the list dda message.
    message = ListMarketplaceDDAMessage()

    (sender_verkey, recipient_verkey, message_dict) = \
        await self.send_message_with_return_route_all(
        message,
        connection_record
    )

    # Deserialise the message dict into response message.
    response: ListMarketplaceDDAResponseMessage = \
        ListMarketplaceDDAResponseMessage.deserialize(
            message_dict
        )

    results = response.body.results

    # Pagination result.
    pagination_result = paginate_records(
        results,
        1,
        100000
    )

    return pagination_result
async def send_message_with_return_route_all(self, message: aries_cloudagent.messaging.agent_message.AgentMessage, connection_record: aries_cloudagent.connections.models.connection_record.ConnectionRecord) ‑> Tuple[str, str, dict]

Send message with return route all in transport decorator.

Args

message : AgentMessage
Agent message.
connection_record : ConnectionRecord
Connection record.

Returns

typing.Tuple[str, str, dict]
sender_verkey, recipient_verkey, message_dict
Expand source code
async def send_message_with_return_route_all(
    self,
    message: AgentMessage,
    connection_record: ConnectionRecord
) -> typing.Tuple[str, str, dict]:
    """Send message with return route all in transport decorator.

    Args:
        message (AgentMessage): Agent message.
        connection_record (ConnectionRecord): Connection record.

    Returns:
        typing.Tuple[str, str, dict]: sender_verkey, recipient_verkey, message_dict
    """

    # Fetch wallet from context
    wallet: IndyWallet = await self.context.inject(BaseWallet)

    # Get pack format from context
    pack_format: PackWireFormat = await self.context.inject(BaseWireFormat)

    # Add transport decorator
    message._decorators["transport"] = TransportDecorator(
        return_route="all"
    )

    # Initialise connection manager
    connection_manager = ConnectionManager(self.context)

    # Fetch connection targets
    connection_targets = await connection_manager.fetch_connection_targets(connection_record)

    assert len(connection_targets) > 0, "Zero connection targets found."

    connection_target: ConnectionTarget = connection_targets[0]

    # Pack message
    packed_message = await pack_format.pack(
        context=self.context,
        message_json=message.serialize(as_string=True),
        recipient_keys=connection_target.recipient_keys,
        routing_keys=None,
        sender_key=connection_target.sender_key,
    )

    # Headers
    headers = {
        "Content-Type": "application/ssi-agent-wire"
    }

    # Send request and receive response.
    async with aiohttp.ClientSession(headers=headers) as session:
        async with session.post(connection_target.endpoint, data=packed_message) as response:
            # Assert status code is 200
            assert response.status == 200, \
                f"HTTP request failed with status code {response.status}"

            message_body = await response.read()

            # Unpack message
            unpacked = await wallet.unpack_message(message_body)
            (message_json, sender_verkey, recipient_verkey) = unpacked

            # Convert message to dict.
            message_dict = json.loads(message_json)

            return (sender_verkey, recipient_verkey, message_dict)
async def send_publish_dda_message(self, publish_dda_record: PublishDDARecord, template_record: DataDisclosureAgreementTemplateRecord)

Send publish DDA message.

Args

publish_dda_record : PublishDDARecord
Publish dda record.
template_record : DataDisclosureAgreementTemplateRecord
DDA template record.
connection_id : str
Connection identifier.
Expand source code
async def send_publish_dda_message(
    self,
    publish_dda_record: PublishDDARecord,
    template_record: DataDisclosureAgreementTemplateRecord
):
    """Send publish DDA message.

    Args:
        publish_dda_record (PublishDDARecord): Publish dda record.
        template_record (DataDisclosureAgreementTemplateRecord): DDA template record.
        connection_id (str): Connection identifier.
    """
    # Create connection invitation
    mgr = V2ADAManager(self.context)
    (connection_record_for_marketplace, connection_invitation_for_marketplace) = \
        await mgr.create_invitation(
        auto_accept=True,
        public=False,
        multi_use=True,
        alias=f"DDA_{template_record.template_id}_QR_{publish_dda_record._id}"
    )

    # Publish dda message
    publish_dda_message = PublishDDAMessage(
        body=PublishDDAModel(
            dda=template_record.dda_model,
            connection_url=connection_invitation_for_marketplace.to_url()
        )
    )

    # Send publish dda message to marketplace connection
    await mgr.send_reply_message(
        publish_dda_message,
        publish_dda_record.connection_id
    )
async def update_dda_template_in_wallet(self, template_id: str, *, dda: dict, publish_flag: bool = True) ‑> DataDisclosureAgreementTemplateRecord

Update DDA template in wallet.

Args

template_id : str
Template identifier
publish_flag : bool, optional
Publish flag. Defaults to True.

Returns

DataDisclosureAgreementTemplateRecord
Upgraded template record.
Expand source code
async def update_dda_template_in_wallet(
    self,
    template_id: str,
    *,
    dda: dict,
    publish_flag: bool = True,
) -> DataDisclosureAgreementTemplateRecord:
    """Update DDA template in wallet.

    Args:
        template_id (str): Template identifier
        publish_flag (bool, optional): Publish flag. Defaults to True.

    Returns:
        DataDisclosureAgreementTemplateRecord: Upgraded template record.
    """

    # Fetch the latest template.
    existing_template = \
        await DataDisclosureAgreementTemplateRecord.latest_template_by_id(
            self.context,
            template_id
        )

    # Upgrade the existing template to next version.
    upgraded = await existing_template.upgrade(
        self.context,
        dda,
        bool_to_str(publish_flag)
    )

    # Post update actions
    if publish_flag:
        await self.post_update_dda_template(
            upgraded
        )

    return upgraded