Module dexa_sdk.agent.commands.start

Entrypoint.

Expand source code
"""Entrypoint."""

import asyncio
import functools
import logging
import os
import signal
from configargparse import ArgumentParser
from typing import Coroutine, Sequence

try:
    import uvloop
except ImportError:
    uvloop = None

from ..core.conductor import Conductor
from ..config.default_context import DefaultContextBuilder
from ..config import argparse as custom_arg
from aries_cloudagent.config import argparse as arg
from aries_cloudagent.config.util import common_config

LOGGER = logging.getLogger(__name__)


async def start_app(conductor: Conductor):
    """Start up."""
    await conductor.setup()
    await conductor.start()


async def shutdown_app(conductor: Conductor):
    """Shut down."""
    print("\nShutting down")
    await conductor.stop()


def init_argument_parser(parser: ArgumentParser):
    """Initialize an argument parser with the module's arguments."""
    return arg.load_argument_groups(
        parser,
        *arg.group.get_registered(arg.CAT_START),
        *custom_arg.group.get_registered(arg.CAT_START)
    )


def execute(argv: Sequence[str] = None):
    """Entrypoint."""
    parser = arg.create_argument_parser()
    parser.prog += " start"
    get_settings = init_argument_parser(parser)
    args = parser.parse_args(argv)
    settings = get_settings(args)
    common_config(settings)

    # set ledger to read only if explicitely specified
    settings["ledger.read_only"] = settings.get("read_only_ledger", False)

    # Support WEBHOOK_URL environment variable
    webhook_url = os.environ.get("WEBHOOK_URL")
    if webhook_url:
        webhook_urls = list(settings.get("admin.webhook_urls") or [])
        webhook_urls.append(webhook_url)
        settings["admin.webhook_urls"] = webhook_urls

    # Create the Conductor instance
    context_builder = DefaultContextBuilder(settings)
    conductor = Conductor(context_builder)

    # Run the application
    if uvloop:
        uvloop.install()
        LOGGER.info("uvloop installed")
    else:
        LOGGER.error("uvloop is not installed")
    run_loop(start_app(conductor), shutdown_app(conductor))


def run_loop(startup: Coroutine, shutdown: Coroutine):
    """Execute the application, handling signals and ctrl-c."""

    async def init(cleanup):
        """Perform startup, terminating if an exception occurs."""
        try:
            await startup
        except Exception:
            LOGGER.exception("Exception during startup:")
            cleanup()

    async def done():
        """Run shutdown and clean up any outstanding tasks."""
        await shutdown
        tasks = [
            task
            for task in asyncio.Task.all_tasks()
            if task is not asyncio.Task.current_task()
        ]
        for task in tasks:
            task.cancel()
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)
        asyncio.get_event_loop().stop()

    loop = asyncio.get_event_loop()
    cleanup = functools.partial(asyncio.ensure_future, done(), loop=loop)
    loop.add_signal_handler(signal.SIGTERM, cleanup)
    asyncio.ensure_future(init(cleanup), loop=loop)

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        loop.run_until_complete(done())

Functions

def execute(argv: Sequence[str] = None)

Entrypoint.

Expand source code
def execute(argv: Sequence[str] = None):
    """Entrypoint."""
    parser = arg.create_argument_parser()
    parser.prog += " start"
    get_settings = init_argument_parser(parser)
    args = parser.parse_args(argv)
    settings = get_settings(args)
    common_config(settings)

    # set ledger to read only if explicitely specified
    settings["ledger.read_only"] = settings.get("read_only_ledger", False)

    # Support WEBHOOK_URL environment variable
    webhook_url = os.environ.get("WEBHOOK_URL")
    if webhook_url:
        webhook_urls = list(settings.get("admin.webhook_urls") or [])
        webhook_urls.append(webhook_url)
        settings["admin.webhook_urls"] = webhook_urls

    # Create the Conductor instance
    context_builder = DefaultContextBuilder(settings)
    conductor = Conductor(context_builder)

    # Run the application
    if uvloop:
        uvloop.install()
        LOGGER.info("uvloop installed")
    else:
        LOGGER.error("uvloop is not installed")
    run_loop(start_app(conductor), shutdown_app(conductor))
def init_argument_parser(parser: configargparse.ArgumentParser)

Initialize an argument parser with the module's arguments.

Expand source code
def init_argument_parser(parser: ArgumentParser):
    """Initialize an argument parser with the module's arguments."""
    return arg.load_argument_groups(
        parser,
        *arg.group.get_registered(arg.CAT_START),
        *custom_arg.group.get_registered(arg.CAT_START)
    )
def run_loop(startup: Coroutine[+T_co, -T_contra, +V_co], shutdown: Coroutine[+T_co, -T_contra, +V_co])

Execute the application, handling signals and ctrl-c.

Expand source code
def run_loop(startup: Coroutine, shutdown: Coroutine):
    """Execute the application, handling signals and ctrl-c."""

    async def init(cleanup):
        """Perform startup, terminating if an exception occurs."""
        try:
            await startup
        except Exception:
            LOGGER.exception("Exception during startup:")
            cleanup()

    async def done():
        """Run shutdown and clean up any outstanding tasks."""
        await shutdown
        tasks = [
            task
            for task in asyncio.Task.all_tasks()
            if task is not asyncio.Task.current_task()
        ]
        for task in tasks:
            task.cancel()
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)
        asyncio.get_event_loop().stop()

    loop = asyncio.get_event_loop()
    cleanup = functools.partial(asyncio.ensure_future, done(), loop=loop)
    loop.add_signal_handler(signal.SIGTERM, cleanup)
    asyncio.ensure_future(init(cleanup), loop=loop)

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        loop.run_until_complete(done())
async def shutdown_app(conductor: Conductor)

Shut down.

Expand source code
async def shutdown_app(conductor: Conductor):
    """Shut down."""
    print("\nShutting down")
    await conductor.stop()
async def start_app(conductor: Conductor)

Start up.

Expand source code
async def start_app(conductor: Conductor):
    """Start up."""
    await conductor.setup()
    await conductor.start()