Module dexa_sdk.agent.core.conductor

The Conductor.

The conductor is responsible for coordinating messages that are received over the network, communicating with the ledger, passing messages to handlers, instantiating concrete implementations of required modules and storing data in the wallet.

Expand source code
"""
The Conductor.

The conductor is responsible for coordinating messages that are received
over the network, communicating with the ledger, passing messages to handlers,
instantiating concrete implementations of required modules and storing data in the wallet.
"""

import hashlib
import logging
from ..config.injection_context import InjectionContext
from ..admin.server import AdminServer
from ..config.dexa import smartcontract_config
from aries_cloudagent.admin.base_server import BaseAdminServer
from aries_cloudagent.config.default_context import ContextBuilder
from aries_cloudagent.config.ledger import ledger_config
from aries_cloudagent.config.logging import LoggingConfigurator
from aries_cloudagent.config.wallet import wallet_config, BaseWallet
from aries_cloudagent.ledger.error import LedgerConfigError, LedgerTransactionError
from aries_cloudagent.messaging.responder import BaseResponder
from aries_cloudagent.protocols.connections.v1_0.manager import (
    ConnectionManager,
    ConnectionManagerError,
)
from aries_cloudagent.transport.inbound.manager import InboundTransportManager
from aries_cloudagent.transport.inbound.message import InboundMessage
from aries_cloudagent.transport.outbound.base import OutboundDeliveryError
from aries_cloudagent.transport.outbound.manager import (
    OutboundTransportManager,
    QueuedOutboundMessage,
)
from aries_cloudagent.transport.outbound.message import OutboundMessage
from aries_cloudagent.transport.wire_format import BaseWireFormat
from aries_cloudagent.utils.task_queue import CompletedTask, TaskQueue
from aries_cloudagent.utils.stats import Collector
from aries_cloudagent.core.dispatcher import Dispatcher

LOGGER = logging.getLogger(__name__)


class Conductor:
    """
    Conductor class.

    Class responsible for initializing concrete implementations
    of our require interfaces and routing inbound and outbound message data.
    """

    def __init__(self, context_builder: ContextBuilder) -> None:
        """
        Initialize an instance of Conductor.

        Args:
            inbound_transports: Configuration for inbound transports
            outbound_transports: Configuration for outbound transports
            settings: Dictionary of various settings

        """
        self.admin_server = None
        self.context: InjectionContext = None
        self.context_builder = context_builder
        self.dispatcher: Dispatcher = None
        self.inbound_transport_manager: InboundTransportManager = None
        self.outbound_transport_manager: OutboundTransportManager = None

    async def setup(self):
        """Initialize the global request context."""

        context = await self.context_builder.build()

        self.dispatcher = Dispatcher(context)
        await self.dispatcher.setup()

        wire_format = await context.inject(BaseWireFormat, required=False)
        if wire_format and hasattr(wire_format, "task_queue"):
            wire_format.task_queue = self.dispatcher.task_queue

        # Register all inbound transports
        self.inbound_transport_manager = InboundTransportManager(
            context, self.inbound_message_router, self.handle_not_returned
        )
        await self.inbound_transport_manager.setup()

        # Register all outbound transports
        self.outbound_transport_manager = OutboundTransportManager(
            context, self.handle_not_delivered
        )
        await self.outbound_transport_manager.setup()

        # Configure the wallet
        public_did = await wallet_config(context)

        # Configure the ledger
        if not await ledger_config(context, public_did):
            LOGGER.warning("No ledger configured")

        # Admin API
        if context.settings.get("admin.enabled"):
            try:
                admin_host = context.settings.get("admin.host", "0.0.0.0")
                admin_port = context.settings.get("admin.port", "80")
                self.admin_server = AdminServer(
                    admin_host,
                    admin_port,
                    context,
                    self.outbound_message_router,
                    self.webhook_router,
                    self.stop,
                    self.dispatcher.task_queue,
                    self.get_stats,
                )
                webhook_urls = context.settings.get("admin.webhook_urls")
                if webhook_urls:
                    for url in webhook_urls:
                        self.admin_server.add_webhook_target(url)
                context.injector.bind_instance(BaseAdminServer, self.admin_server)
                if "http" not in self.outbound_transport_manager.registered_schemes:
                    self.outbound_transport_manager.register("http")
            except Exception:
                LOGGER.exception("Unable to register admin server")
                raise

        # Fetch stats collector, if any
        collector = await context.inject(Collector, required=False)
        if collector:
            # add stats to our own methods
            collector.wrap(
                self,
                (
                    # "inbound_message_router",
                    "outbound_message_router",
                    # "create_inbound_session",
                ),
            )
            # at the class level (!) should not be performed multiple times
            collector.wrap(
                ConnectionManager,
                (
                    # "get_connection_targets",
                    "fetch_did_document",
                    "find_inbound_connection",
                ),
            )

        # Configure smart contract
        await smartcontract_config(context)

        self.context = context

    async def start(self) -> None:
        """Start the agent."""

        context = self.context

        # Start up transports
        try:
            await self.inbound_transport_manager.start()
        except Exception:
            LOGGER.exception("Unable to start inbound transports")
            raise
        try:
            await self.outbound_transport_manager.start()
        except Exception:
            LOGGER.exception("Unable to start outbound transports")
            raise

        # Start up Admin server
        if self.admin_server:
            try:
                await self.admin_server.start()
            except Exception:
                LOGGER.exception("Unable to start administration API")
            # Make admin responder available during message parsing
            # This allows webhooks to be called when a connection is marked active,
            # for example
            context.injector.bind_instance(BaseResponder, self.admin_server.responder)

        # Get agent label
        default_label = context.settings.get("default_label")

        # Get public did
        wallet: BaseWallet = await context.inject(BaseWallet)
        public_did = await wallet.get_public_did()

        # Show some details about the configuration to the user
        LoggingConfigurator.print_banner(
            default_label,
            self.inbound_transport_manager.registered_transports,
            self.outbound_transport_manager.registered_transports,
            public_did.did if public_did else None,
            self.admin_server,
        )

        # Create a static connection for use by the test-suite
        if context.settings.get("debug.test_suite_endpoint"):
            mgr = ConnectionManager(self.context)
            their_endpoint = context.settings["debug.test_suite_endpoint"]
            test_conn = await mgr.create_static_connection(
                my_seed=hashlib.sha256(b"aries-protocol-test-subject").digest(),
                their_seed=hashlib.sha256(b"aries-protocol-test-suite").digest(),
                their_endpoint=their_endpoint,
                their_role="tester",
                alias="test-suite",
            )
            print("Created static connection for test suite")
            print(" - My DID:", test_conn.my_did)
            print(" - Their DID:", test_conn.their_did)
            print(" - Their endpoint:", their_endpoint)
            print()

        # Print an invitation to the terminal
        if context.settings.get("debug.print_invitation"):
            try:
                mgr = ConnectionManager(self.context)
                _connection, invitation = await mgr.create_invitation(
                    their_role=context.settings.get("debug.invite_role"),
                    my_label=context.settings.get("debug.invite_label"),
                    multi_use=context.settings.get("debug.invite_multi_use", False),
                    public=context.settings.get("debug.invite_public", False),
                )
                base_url = context.settings.get("invite_base_url")
                invite_url = invitation.to_url(base_url)
                print("Invitation URL:")
                print(invite_url, flush=True)
            except Exception:
                LOGGER.exception("Error creating invitation")

    async def stop(self, timeout=1.0):
        """Stop the agent."""
        shutdown = TaskQueue()
        if self.dispatcher:
            shutdown.run(self.dispatcher.complete())
        if self.admin_server:
            shutdown.run(self.admin_server.stop())
        if self.inbound_transport_manager:
            shutdown.run(self.inbound_transport_manager.stop())
        if self.outbound_transport_manager:
            shutdown.run(self.outbound_transport_manager.stop())
        await shutdown.complete(timeout)

    def inbound_message_router(
        self, message: InboundMessage, can_respond: bool = False
    ):
        """
        Route inbound messages.

        Args:
            message: The inbound message instance
            can_respond: If the session supports return routing

        """

        if message.receipt.direct_response_requested and not can_respond:
            LOGGER.warning(
                "Direct response requested, but not supported by transport: %s",
                message.transport_type,
            )

        # Note: at this point we could send the message to a shared queue
        # if this pod is too busy to process it

        try:
            self.dispatcher.queue_message(
                message,
                self.outbound_message_router,
                self.admin_server and self.admin_server.send_webhook,
                lambda completed: self.dispatch_complete(message, completed),
            )
        except (LedgerConfigError, LedgerTransactionError) as e:
            LOGGER.error("Shutdown on ledger error %s", str(e))
            if self.admin_server:
                self.admin_server.notify_fatal_error()
            raise

    def dispatch_complete(self, message: InboundMessage, completed: CompletedTask):
        """Handle completion of message dispatch."""
        if completed.exc_info:
            LOGGER.exception(
                "Exception in message handler:", exc_info=completed.exc_info
            )
            if isinstance(completed.exc_info[1], LedgerConfigError) or isinstance(
                completed.exc_info[1], LedgerTransactionError
            ):
                LOGGER.error(
                    "%shutdown on ledger error %s",
                    "S" if self.admin_server else "No admin server to s",
                    str(completed.exc_info[1]),
                )
                if self.admin_server:
                    self.admin_server.notify_fatal_error()
            else:
                LOGGER.error(
                    "DON'T shutdown on %s %s",
                    completed.exc_info[0].__name__,
                    str(completed.exc_info[1]),
                )
        self.inbound_transport_manager.dispatch_complete(message, completed)

    async def get_stats(self) -> dict:
        """Get the current stats tracked by the conductor."""
        stats = {
            "in_sessions": len(self.inbound_transport_manager.sessions),
            "out_encode": 0,
            "out_deliver": 0,
            "task_active": self.dispatcher.task_queue.current_active,
            "task_done": self.dispatcher.task_queue.total_done,
            "task_failed": self.dispatcher.task_queue.total_failed,
            "task_pending": self.dispatcher.task_queue.current_pending,
        }
        for m in self.outbound_transport_manager.outbound_buffer:
            if m.state == QueuedOutboundMessage.STATE_ENCODE:
                stats["out_encode"] += 1
            if m.state == QueuedOutboundMessage.STATE_DELIVER:
                stats["out_deliver"] += 1
        return stats

    async def outbound_message_router(
        self,
        context: InjectionContext,
        outbound: OutboundMessage,
        inbound: InboundMessage = None,
    ) -> None:
        """
        Route an outbound message.

        Args:
            context: The request context
            message: An outbound message to be sent
            inbound: The inbound message that produced this response, if available
        """
        if not outbound.target and outbound.reply_to_verkey:
            if not outbound.reply_from_verkey and inbound:
                outbound.reply_from_verkey = inbound.receipt.recipient_verkey
            # return message to an inbound session
            if self.inbound_transport_manager.return_to_session(outbound):
                return

        if not outbound.to_session_only:
            await self.queue_outbound(context, outbound, inbound)

    def handle_not_returned(self, context: InjectionContext, outbound: OutboundMessage):
        """Handle a message that failed delivery via an inbound session."""
        try:
            self.dispatcher.run_task(self.queue_outbound(context, outbound))
        except (LedgerConfigError, LedgerTransactionError) as e:
            LOGGER.error("Shutdown on ledger error %s", str(e))
            if self.admin_server:
                self.admin_server.notify_fatal_error()
            raise

    async def queue_outbound(
        self,
        context: InjectionContext,
        outbound: OutboundMessage,
        inbound: InboundMessage = None,
    ):
        """
        Queue an outbound message.

        Args:
            context: The request context
            message: An outbound message to be sent
            inbound: The inbound message that produced this response, if available
        """
        # populate connection target(s)
        if not outbound.target and not outbound.target_list and outbound.connection_id:
            # using provided request context
            mgr = ConnectionManager(context)
            try:
                outbound.target_list = await self.dispatcher.run_task(
                    mgr.get_connection_targets(connection_id=outbound.connection_id)
                )
            except ConnectionManagerError:
                LOGGER.exception("Error preparing outbound message for transmission")
                return
            except (LedgerConfigError, LedgerTransactionError) as e:
                LOGGER.error("Shutdown on ledger error %s", str(e))
                if self.admin_server:
                    self.admin_server.notify_fatal_error()
                raise

        try:
            self.outbound_transport_manager.enqueue_message(context, outbound)
        except OutboundDeliveryError:
            LOGGER.warning("Cannot queue message for delivery, no supported transport")
            self.handle_not_delivered(context, outbound)

    def handle_not_delivered(
        self, context: InjectionContext, outbound: OutboundMessage
    ):
        """Handle a message that failed delivery via outbound transports."""
        self.inbound_transport_manager.return_undelivered(outbound)

    def webhook_router(
        self, topic: str, payload: dict, endpoint: str, max_attempts: int = None
    ):
        """
        Route a webhook through the outbound transport manager.

        Args:
            topic: The webhook topic
            payload: The webhook payload
            endpoint: The endpoint of the webhook target
            max_attempts: The maximum number of attempts
        """
        try:
            self.outbound_transport_manager.enqueue_webhook(
                topic, payload, endpoint, max_attempts
            )
        except OutboundDeliveryError:
            LOGGER.warning(
                "Cannot queue message webhook for delivery, no supported transport"
            )

Classes

class Conductor (context_builder: aries_cloudagent.config.base_context.ContextBuilder)

Conductor class.

Class responsible for initializing concrete implementations of our require interfaces and routing inbound and outbound message data.

Initialize an instance of Conductor.

Args

inbound_transports
Configuration for inbound transports
outbound_transports
Configuration for outbound transports
settings
Dictionary of various settings
Expand source code
class Conductor:
    """
    Conductor class.

    Class responsible for initializing concrete implementations
    of our require interfaces and routing inbound and outbound message data.
    """

    def __init__(self, context_builder: ContextBuilder) -> None:
        """
        Initialize an instance of Conductor.

        Args:
            inbound_transports: Configuration for inbound transports
            outbound_transports: Configuration for outbound transports
            settings: Dictionary of various settings

        """
        self.admin_server = None
        self.context: InjectionContext = None
        self.context_builder = context_builder
        self.dispatcher: Dispatcher = None
        self.inbound_transport_manager: InboundTransportManager = None
        self.outbound_transport_manager: OutboundTransportManager = None

    async def setup(self):
        """Initialize the global request context."""

        context = await self.context_builder.build()

        self.dispatcher = Dispatcher(context)
        await self.dispatcher.setup()

        wire_format = await context.inject(BaseWireFormat, required=False)
        if wire_format and hasattr(wire_format, "task_queue"):
            wire_format.task_queue = self.dispatcher.task_queue

        # Register all inbound transports
        self.inbound_transport_manager = InboundTransportManager(
            context, self.inbound_message_router, self.handle_not_returned
        )
        await self.inbound_transport_manager.setup()

        # Register all outbound transports
        self.outbound_transport_manager = OutboundTransportManager(
            context, self.handle_not_delivered
        )
        await self.outbound_transport_manager.setup()

        # Configure the wallet
        public_did = await wallet_config(context)

        # Configure the ledger
        if not await ledger_config(context, public_did):
            LOGGER.warning("No ledger configured")

        # Admin API
        if context.settings.get("admin.enabled"):
            try:
                admin_host = context.settings.get("admin.host", "0.0.0.0")
                admin_port = context.settings.get("admin.port", "80")
                self.admin_server = AdminServer(
                    admin_host,
                    admin_port,
                    context,
                    self.outbound_message_router,
                    self.webhook_router,
                    self.stop,
                    self.dispatcher.task_queue,
                    self.get_stats,
                )
                webhook_urls = context.settings.get("admin.webhook_urls")
                if webhook_urls:
                    for url in webhook_urls:
                        self.admin_server.add_webhook_target(url)
                context.injector.bind_instance(BaseAdminServer, self.admin_server)
                if "http" not in self.outbound_transport_manager.registered_schemes:
                    self.outbound_transport_manager.register("http")
            except Exception:
                LOGGER.exception("Unable to register admin server")
                raise

        # Fetch stats collector, if any
        collector = await context.inject(Collector, required=False)
        if collector:
            # add stats to our own methods
            collector.wrap(
                self,
                (
                    # "inbound_message_router",
                    "outbound_message_router",
                    # "create_inbound_session",
                ),
            )
            # at the class level (!) should not be performed multiple times
            collector.wrap(
                ConnectionManager,
                (
                    # "get_connection_targets",
                    "fetch_did_document",
                    "find_inbound_connection",
                ),
            )

        # Configure smart contract
        await smartcontract_config(context)

        self.context = context

    async def start(self) -> None:
        """Start the agent."""

        context = self.context

        # Start up transports
        try:
            await self.inbound_transport_manager.start()
        except Exception:
            LOGGER.exception("Unable to start inbound transports")
            raise
        try:
            await self.outbound_transport_manager.start()
        except Exception:
            LOGGER.exception("Unable to start outbound transports")
            raise

        # Start up Admin server
        if self.admin_server:
            try:
                await self.admin_server.start()
            except Exception:
                LOGGER.exception("Unable to start administration API")
            # Make admin responder available during message parsing
            # This allows webhooks to be called when a connection is marked active,
            # for example
            context.injector.bind_instance(BaseResponder, self.admin_server.responder)

        # Get agent label
        default_label = context.settings.get("default_label")

        # Get public did
        wallet: BaseWallet = await context.inject(BaseWallet)
        public_did = await wallet.get_public_did()

        # Show some details about the configuration to the user
        LoggingConfigurator.print_banner(
            default_label,
            self.inbound_transport_manager.registered_transports,
            self.outbound_transport_manager.registered_transports,
            public_did.did if public_did else None,
            self.admin_server,
        )

        # Create a static connection for use by the test-suite
        if context.settings.get("debug.test_suite_endpoint"):
            mgr = ConnectionManager(self.context)
            their_endpoint = context.settings["debug.test_suite_endpoint"]
            test_conn = await mgr.create_static_connection(
                my_seed=hashlib.sha256(b"aries-protocol-test-subject").digest(),
                their_seed=hashlib.sha256(b"aries-protocol-test-suite").digest(),
                their_endpoint=their_endpoint,
                their_role="tester",
                alias="test-suite",
            )
            print("Created static connection for test suite")
            print(" - My DID:", test_conn.my_did)
            print(" - Their DID:", test_conn.their_did)
            print(" - Their endpoint:", their_endpoint)
            print()

        # Print an invitation to the terminal
        if context.settings.get("debug.print_invitation"):
            try:
                mgr = ConnectionManager(self.context)
                _connection, invitation = await mgr.create_invitation(
                    their_role=context.settings.get("debug.invite_role"),
                    my_label=context.settings.get("debug.invite_label"),
                    multi_use=context.settings.get("debug.invite_multi_use", False),
                    public=context.settings.get("debug.invite_public", False),
                )
                base_url = context.settings.get("invite_base_url")
                invite_url = invitation.to_url(base_url)
                print("Invitation URL:")
                print(invite_url, flush=True)
            except Exception:
                LOGGER.exception("Error creating invitation")

    async def stop(self, timeout=1.0):
        """Stop the agent."""
        shutdown = TaskQueue()
        if self.dispatcher:
            shutdown.run(self.dispatcher.complete())
        if self.admin_server:
            shutdown.run(self.admin_server.stop())
        if self.inbound_transport_manager:
            shutdown.run(self.inbound_transport_manager.stop())
        if self.outbound_transport_manager:
            shutdown.run(self.outbound_transport_manager.stop())
        await shutdown.complete(timeout)

    def inbound_message_router(
        self, message: InboundMessage, can_respond: bool = False
    ):
        """
        Route inbound messages.

        Args:
            message: The inbound message instance
            can_respond: If the session supports return routing

        """

        if message.receipt.direct_response_requested and not can_respond:
            LOGGER.warning(
                "Direct response requested, but not supported by transport: %s",
                message.transport_type,
            )

        # Note: at this point we could send the message to a shared queue
        # if this pod is too busy to process it

        try:
            self.dispatcher.queue_message(
                message,
                self.outbound_message_router,
                self.admin_server and self.admin_server.send_webhook,
                lambda completed: self.dispatch_complete(message, completed),
            )
        except (LedgerConfigError, LedgerTransactionError) as e:
            LOGGER.error("Shutdown on ledger error %s", str(e))
            if self.admin_server:
                self.admin_server.notify_fatal_error()
            raise

    def dispatch_complete(self, message: InboundMessage, completed: CompletedTask):
        """Handle completion of message dispatch."""
        if completed.exc_info:
            LOGGER.exception(
                "Exception in message handler:", exc_info=completed.exc_info
            )
            if isinstance(completed.exc_info[1], LedgerConfigError) or isinstance(
                completed.exc_info[1], LedgerTransactionError
            ):
                LOGGER.error(
                    "%shutdown on ledger error %s",
                    "S" if self.admin_server else "No admin server to s",
                    str(completed.exc_info[1]),
                )
                if self.admin_server:
                    self.admin_server.notify_fatal_error()
            else:
                LOGGER.error(
                    "DON'T shutdown on %s %s",
                    completed.exc_info[0].__name__,
                    str(completed.exc_info[1]),
                )
        self.inbound_transport_manager.dispatch_complete(message, completed)

    async def get_stats(self) -> dict:
        """Get the current stats tracked by the conductor."""
        stats = {
            "in_sessions": len(self.inbound_transport_manager.sessions),
            "out_encode": 0,
            "out_deliver": 0,
            "task_active": self.dispatcher.task_queue.current_active,
            "task_done": self.dispatcher.task_queue.total_done,
            "task_failed": self.dispatcher.task_queue.total_failed,
            "task_pending": self.dispatcher.task_queue.current_pending,
        }
        for m in self.outbound_transport_manager.outbound_buffer:
            if m.state == QueuedOutboundMessage.STATE_ENCODE:
                stats["out_encode"] += 1
            if m.state == QueuedOutboundMessage.STATE_DELIVER:
                stats["out_deliver"] += 1
        return stats

    async def outbound_message_router(
        self,
        context: InjectionContext,
        outbound: OutboundMessage,
        inbound: InboundMessage = None,
    ) -> None:
        """
        Route an outbound message.

        Args:
            context: The request context
            message: An outbound message to be sent
            inbound: The inbound message that produced this response, if available
        """
        if not outbound.target and outbound.reply_to_verkey:
            if not outbound.reply_from_verkey and inbound:
                outbound.reply_from_verkey = inbound.receipt.recipient_verkey
            # return message to an inbound session
            if self.inbound_transport_manager.return_to_session(outbound):
                return

        if not outbound.to_session_only:
            await self.queue_outbound(context, outbound, inbound)

    def handle_not_returned(self, context: InjectionContext, outbound: OutboundMessage):
        """Handle a message that failed delivery via an inbound session."""
        try:
            self.dispatcher.run_task(self.queue_outbound(context, outbound))
        except (LedgerConfigError, LedgerTransactionError) as e:
            LOGGER.error("Shutdown on ledger error %s", str(e))
            if self.admin_server:
                self.admin_server.notify_fatal_error()
            raise

    async def queue_outbound(
        self,
        context: InjectionContext,
        outbound: OutboundMessage,
        inbound: InboundMessage = None,
    ):
        """
        Queue an outbound message.

        Args:
            context: The request context
            message: An outbound message to be sent
            inbound: The inbound message that produced this response, if available
        """
        # populate connection target(s)
        if not outbound.target and not outbound.target_list and outbound.connection_id:
            # using provided request context
            mgr = ConnectionManager(context)
            try:
                outbound.target_list = await self.dispatcher.run_task(
                    mgr.get_connection_targets(connection_id=outbound.connection_id)
                )
            except ConnectionManagerError:
                LOGGER.exception("Error preparing outbound message for transmission")
                return
            except (LedgerConfigError, LedgerTransactionError) as e:
                LOGGER.error("Shutdown on ledger error %s", str(e))
                if self.admin_server:
                    self.admin_server.notify_fatal_error()
                raise

        try:
            self.outbound_transport_manager.enqueue_message(context, outbound)
        except OutboundDeliveryError:
            LOGGER.warning("Cannot queue message for delivery, no supported transport")
            self.handle_not_delivered(context, outbound)

    def handle_not_delivered(
        self, context: InjectionContext, outbound: OutboundMessage
    ):
        """Handle a message that failed delivery via outbound transports."""
        self.inbound_transport_manager.return_undelivered(outbound)

    def webhook_router(
        self, topic: str, payload: dict, endpoint: str, max_attempts: int = None
    ):
        """
        Route a webhook through the outbound transport manager.

        Args:
            topic: The webhook topic
            payload: The webhook payload
            endpoint: The endpoint of the webhook target
            max_attempts: The maximum number of attempts
        """
        try:
            self.outbound_transport_manager.enqueue_webhook(
                topic, payload, endpoint, max_attempts
            )
        except OutboundDeliveryError:
            LOGGER.warning(
                "Cannot queue message webhook for delivery, no supported transport"
            )

Methods

def dispatch_complete(self, message: aries_cloudagent.transport.inbound.message.InboundMessage, completed: aries_cloudagent.utils.task_queue.CompletedTask)

Handle completion of message dispatch.

Expand source code
def dispatch_complete(self, message: InboundMessage, completed: CompletedTask):
    """Handle completion of message dispatch."""
    if completed.exc_info:
        LOGGER.exception(
            "Exception in message handler:", exc_info=completed.exc_info
        )
        if isinstance(completed.exc_info[1], LedgerConfigError) or isinstance(
            completed.exc_info[1], LedgerTransactionError
        ):
            LOGGER.error(
                "%shutdown on ledger error %s",
                "S" if self.admin_server else "No admin server to s",
                str(completed.exc_info[1]),
            )
            if self.admin_server:
                self.admin_server.notify_fatal_error()
        else:
            LOGGER.error(
                "DON'T shutdown on %s %s",
                completed.exc_info[0].__name__,
                str(completed.exc_info[1]),
            )
    self.inbound_transport_manager.dispatch_complete(message, completed)
async def get_stats(self) ‑> dict

Get the current stats tracked by the conductor.

Expand source code
async def get_stats(self) -> dict:
    """Get the current stats tracked by the conductor."""
    stats = {
        "in_sessions": len(self.inbound_transport_manager.sessions),
        "out_encode": 0,
        "out_deliver": 0,
        "task_active": self.dispatcher.task_queue.current_active,
        "task_done": self.dispatcher.task_queue.total_done,
        "task_failed": self.dispatcher.task_queue.total_failed,
        "task_pending": self.dispatcher.task_queue.current_pending,
    }
    for m in self.outbound_transport_manager.outbound_buffer:
        if m.state == QueuedOutboundMessage.STATE_ENCODE:
            stats["out_encode"] += 1
        if m.state == QueuedOutboundMessage.STATE_DELIVER:
            stats["out_deliver"] += 1
    return stats
def handle_not_delivered(self, context: InjectionContext, outbound: aries_cloudagent.transport.outbound.message.OutboundMessage)

Handle a message that failed delivery via outbound transports.

Expand source code
def handle_not_delivered(
    self, context: InjectionContext, outbound: OutboundMessage
):
    """Handle a message that failed delivery via outbound transports."""
    self.inbound_transport_manager.return_undelivered(outbound)
def handle_not_returned(self, context: InjectionContext, outbound: aries_cloudagent.transport.outbound.message.OutboundMessage)

Handle a message that failed delivery via an inbound session.

Expand source code
def handle_not_returned(self, context: InjectionContext, outbound: OutboundMessage):
    """Handle a message that failed delivery via an inbound session."""
    try:
        self.dispatcher.run_task(self.queue_outbound(context, outbound))
    except (LedgerConfigError, LedgerTransactionError) as e:
        LOGGER.error("Shutdown on ledger error %s", str(e))
        if self.admin_server:
            self.admin_server.notify_fatal_error()
        raise
def inbound_message_router(self, message: aries_cloudagent.transport.inbound.message.InboundMessage, can_respond: bool = False)

Route inbound messages.

Args

message
The inbound message instance
can_respond
If the session supports return routing
Expand source code
def inbound_message_router(
    self, message: InboundMessage, can_respond: bool = False
):
    """
    Route inbound messages.

    Args:
        message: The inbound message instance
        can_respond: If the session supports return routing

    """

    if message.receipt.direct_response_requested and not can_respond:
        LOGGER.warning(
            "Direct response requested, but not supported by transport: %s",
            message.transport_type,
        )

    # Note: at this point we could send the message to a shared queue
    # if this pod is too busy to process it

    try:
        self.dispatcher.queue_message(
            message,
            self.outbound_message_router,
            self.admin_server and self.admin_server.send_webhook,
            lambda completed: self.dispatch_complete(message, completed),
        )
    except (LedgerConfigError, LedgerTransactionError) as e:
        LOGGER.error("Shutdown on ledger error %s", str(e))
        if self.admin_server:
            self.admin_server.notify_fatal_error()
        raise
async def outbound_message_router(self, context: InjectionContext, outbound: aries_cloudagent.transport.outbound.message.OutboundMessage, inbound: aries_cloudagent.transport.inbound.message.InboundMessage = None) ‑> None

Route an outbound message.

Args

context
The request context
message
An outbound message to be sent
inbound
The inbound message that produced this response, if available
Expand source code
async def outbound_message_router(
    self,
    context: InjectionContext,
    outbound: OutboundMessage,
    inbound: InboundMessage = None,
) -> None:
    """
    Route an outbound message.

    Args:
        context: The request context
        message: An outbound message to be sent
        inbound: The inbound message that produced this response, if available
    """
    if not outbound.target and outbound.reply_to_verkey:
        if not outbound.reply_from_verkey and inbound:
            outbound.reply_from_verkey = inbound.receipt.recipient_verkey
        # return message to an inbound session
        if self.inbound_transport_manager.return_to_session(outbound):
            return

    if not outbound.to_session_only:
        await self.queue_outbound(context, outbound, inbound)
async def queue_outbound(self, context: InjectionContext, outbound: aries_cloudagent.transport.outbound.message.OutboundMessage, inbound: aries_cloudagent.transport.inbound.message.InboundMessage = None)

Queue an outbound message.

Args

context
The request context
message
An outbound message to be sent
inbound
The inbound message that produced this response, if available
Expand source code
async def queue_outbound(
    self,
    context: InjectionContext,
    outbound: OutboundMessage,
    inbound: InboundMessage = None,
):
    """
    Queue an outbound message.

    Args:
        context: The request context
        message: An outbound message to be sent
        inbound: The inbound message that produced this response, if available
    """
    # populate connection target(s)
    if not outbound.target and not outbound.target_list and outbound.connection_id:
        # using provided request context
        mgr = ConnectionManager(context)
        try:
            outbound.target_list = await self.dispatcher.run_task(
                mgr.get_connection_targets(connection_id=outbound.connection_id)
            )
        except ConnectionManagerError:
            LOGGER.exception("Error preparing outbound message for transmission")
            return
        except (LedgerConfigError, LedgerTransactionError) as e:
            LOGGER.error("Shutdown on ledger error %s", str(e))
            if self.admin_server:
                self.admin_server.notify_fatal_error()
            raise

    try:
        self.outbound_transport_manager.enqueue_message(context, outbound)
    except OutboundDeliveryError:
        LOGGER.warning("Cannot queue message for delivery, no supported transport")
        self.handle_not_delivered(context, outbound)
async def setup(self)

Initialize the global request context.

Expand source code
async def setup(self):
    """Initialize the global request context."""

    context = await self.context_builder.build()

    self.dispatcher = Dispatcher(context)
    await self.dispatcher.setup()

    wire_format = await context.inject(BaseWireFormat, required=False)
    if wire_format and hasattr(wire_format, "task_queue"):
        wire_format.task_queue = self.dispatcher.task_queue

    # Register all inbound transports
    self.inbound_transport_manager = InboundTransportManager(
        context, self.inbound_message_router, self.handle_not_returned
    )
    await self.inbound_transport_manager.setup()

    # Register all outbound transports
    self.outbound_transport_manager = OutboundTransportManager(
        context, self.handle_not_delivered
    )
    await self.outbound_transport_manager.setup()

    # Configure the wallet
    public_did = await wallet_config(context)

    # Configure the ledger
    if not await ledger_config(context, public_did):
        LOGGER.warning("No ledger configured")

    # Admin API
    if context.settings.get("admin.enabled"):
        try:
            admin_host = context.settings.get("admin.host", "0.0.0.0")
            admin_port = context.settings.get("admin.port", "80")
            self.admin_server = AdminServer(
                admin_host,
                admin_port,
                context,
                self.outbound_message_router,
                self.webhook_router,
                self.stop,
                self.dispatcher.task_queue,
                self.get_stats,
            )
            webhook_urls = context.settings.get("admin.webhook_urls")
            if webhook_urls:
                for url in webhook_urls:
                    self.admin_server.add_webhook_target(url)
            context.injector.bind_instance(BaseAdminServer, self.admin_server)
            if "http" not in self.outbound_transport_manager.registered_schemes:
                self.outbound_transport_manager.register("http")
        except Exception:
            LOGGER.exception("Unable to register admin server")
            raise

    # Fetch stats collector, if any
    collector = await context.inject(Collector, required=False)
    if collector:
        # add stats to our own methods
        collector.wrap(
            self,
            (
                # "inbound_message_router",
                "outbound_message_router",
                # "create_inbound_session",
            ),
        )
        # at the class level (!) should not be performed multiple times
        collector.wrap(
            ConnectionManager,
            (
                # "get_connection_targets",
                "fetch_did_document",
                "find_inbound_connection",
            ),
        )

    # Configure smart contract
    await smartcontract_config(context)

    self.context = context
async def start(self) ‑> None

Start the agent.

Expand source code
async def start(self) -> None:
    """Start the agent."""

    context = self.context

    # Start up transports
    try:
        await self.inbound_transport_manager.start()
    except Exception:
        LOGGER.exception("Unable to start inbound transports")
        raise
    try:
        await self.outbound_transport_manager.start()
    except Exception:
        LOGGER.exception("Unable to start outbound transports")
        raise

    # Start up Admin server
    if self.admin_server:
        try:
            await self.admin_server.start()
        except Exception:
            LOGGER.exception("Unable to start administration API")
        # Make admin responder available during message parsing
        # This allows webhooks to be called when a connection is marked active,
        # for example
        context.injector.bind_instance(BaseResponder, self.admin_server.responder)

    # Get agent label
    default_label = context.settings.get("default_label")

    # Get public did
    wallet: BaseWallet = await context.inject(BaseWallet)
    public_did = await wallet.get_public_did()

    # Show some details about the configuration to the user
    LoggingConfigurator.print_banner(
        default_label,
        self.inbound_transport_manager.registered_transports,
        self.outbound_transport_manager.registered_transports,
        public_did.did if public_did else None,
        self.admin_server,
    )

    # Create a static connection for use by the test-suite
    if context.settings.get("debug.test_suite_endpoint"):
        mgr = ConnectionManager(self.context)
        their_endpoint = context.settings["debug.test_suite_endpoint"]
        test_conn = await mgr.create_static_connection(
            my_seed=hashlib.sha256(b"aries-protocol-test-subject").digest(),
            their_seed=hashlib.sha256(b"aries-protocol-test-suite").digest(),
            their_endpoint=their_endpoint,
            their_role="tester",
            alias="test-suite",
        )
        print("Created static connection for test suite")
        print(" - My DID:", test_conn.my_did)
        print(" - Their DID:", test_conn.their_did)
        print(" - Their endpoint:", their_endpoint)
        print()

    # Print an invitation to the terminal
    if context.settings.get("debug.print_invitation"):
        try:
            mgr = ConnectionManager(self.context)
            _connection, invitation = await mgr.create_invitation(
                their_role=context.settings.get("debug.invite_role"),
                my_label=context.settings.get("debug.invite_label"),
                multi_use=context.settings.get("debug.invite_multi_use", False),
                public=context.settings.get("debug.invite_public", False),
            )
            base_url = context.settings.get("invite_base_url")
            invite_url = invitation.to_url(base_url)
            print("Invitation URL:")
            print(invite_url, flush=True)
        except Exception:
            LOGGER.exception("Error creating invitation")
async def stop(self, timeout=1.0)

Stop the agent.

Expand source code
async def stop(self, timeout=1.0):
    """Stop the agent."""
    shutdown = TaskQueue()
    if self.dispatcher:
        shutdown.run(self.dispatcher.complete())
    if self.admin_server:
        shutdown.run(self.admin_server.stop())
    if self.inbound_transport_manager:
        shutdown.run(self.inbound_transport_manager.stop())
    if self.outbound_transport_manager:
        shutdown.run(self.outbound_transport_manager.stop())
    await shutdown.complete(timeout)
def webhook_router(self, topic: str, payload: dict, endpoint: str, max_attempts: int = None)

Route a webhook through the outbound transport manager.

Args

topic
The webhook topic
payload
The webhook payload
endpoint
The endpoint of the webhook target
max_attempts
The maximum number of attempts
Expand source code
def webhook_router(
    self, topic: str, payload: dict, endpoint: str, max_attempts: int = None
):
    """
    Route a webhook through the outbound transport manager.

    Args:
        topic: The webhook topic
        payload: The webhook payload
        endpoint: The endpoint of the webhook target
        max_attempts: The maximum number of attempts
    """
    try:
        self.outbound_transport_manager.enqueue_webhook(
            topic, payload, endpoint, max_attempts
        )
    except OutboundDeliveryError:
        LOGGER.warning(
            "Cannot queue message webhook for delivery, no supported transport"
        )