Module dexa_sdk.agent.admin.server

Admin server classes.

Expand source code
"""Admin server classes."""

import asyncio
import logging
from typing import Callable, Coroutine, Sequence, Set
import uuid

from aiohttp import web
from aiohttp_apispec import (
    docs,
    response_schema,
    validation_middleware,
)
import aiohttp_cors

from marshmallow import fields, Schema

from ..config.injection_context import InjectionContext
from .aiohttp_apispec.custom import custom_setup_aiohttp_apispec
from aries_cloudagent.core.plugin_registry import PluginRegistry
from aries_cloudagent.ledger.error import LedgerConfigError, LedgerTransactionError
from aries_cloudagent.messaging.responder import BaseResponder
from aries_cloudagent.transport.queue.basic import BasicMessageQueue
from aries_cloudagent.transport.outbound.message import OutboundMessage
from aries_cloudagent.utils.stats import Collector
from aries_cloudagent.utils.task_queue import TaskQueue
from aries_cloudagent.version import __version__
from aries_cloudagent.admin.base_server import BaseAdminServer
from aries_cloudagent.admin.error import AdminSetupError
from ...managers.dexa_manager import DexaManager


LOGGER = logging.getLogger(__name__)


class AdminModulesSchema(Schema):
    """Schema for the modules endpoint."""

    result = fields.List(
        fields.Str(description="admin module"), description="List of admin modules"
    )


class AdminStatusSchema(Schema):
    """Schema for the status endpoint."""


class AdminStatusLivelinessSchema(Schema):
    """Schema for the liveliness endpoint."""

    alive = fields.Boolean(description="Liveliness status", example=True)


class AdminStatusReadinessSchema(Schema):
    """Schema for the readiness endpoint."""

    ready = fields.Boolean(description="Readiness status", example=True)


class AdminResponder(BaseResponder):
    """Handle outgoing messages from message handlers."""

    def __init__(
        self,
        context: InjectionContext,
        send: Coroutine,
        webhook: Coroutine,
        **kwargs,
    ):
        """
        Initialize an instance of `AdminResponder`.

        Args:
            send: Function to send outbound message

        """
        super().__init__(**kwargs)
        self._context = context
        self._send = send
        self._webhook = webhook

    async def send_outbound(self, message: OutboundMessage):
        """
        Send outbound message.

        Args:
            message: The `OutboundMessage` to be sent
        """
        await self._send(self._context, message)

    async def send_webhook(self, topic: str, payload: dict):
        """
        Dispatch a webhook.

        Args:
            topic: the webhook topic identifier
            payload: the webhook payload value
        """
        await self._webhook(topic, payload)


class WebhookTarget:
    """Class for managing webhook target information."""

    def __init__(
        self,
        endpoint: str,
        topic_filter: Sequence[str] = None,
        max_attempts: int = None,
    ):
        """Initialize the webhook target."""
        self.endpoint = endpoint
        self.max_attempts = max_attempts
        self._topic_filter = None
        self.topic_filter = topic_filter  # call setter

    @property
    def topic_filter(self) -> Set[str]:
        """Accessor for the target's topic filter."""
        return self._topic_filter

    @topic_filter.setter
    def topic_filter(self, val: Sequence[str]):
        """Setter for the target's topic filter."""
        filter = set(val) if val else None
        if filter and "*" in filter:
            filter = None
        self._topic_filter = filter


@web.middleware
async def ready_middleware(request: web.BaseRequest, handler: Coroutine):
    """Only continue if application is ready to take work."""

    if (
        str(request.rel_url).rstrip("/")
        in (
            "/status/live",
            "/status/ready",
        )
        or request.app._state.get("ready")
    ):
        try:
            return await handler(request)
        except (LedgerConfigError, LedgerTransactionError) as e:
            # fatal, signal server shutdown
            LOGGER.error("Shutdown with %s", str(e))
            request.app._state["ready"] = False
            request.app._state["alive"] = False
            raise
        except web.HTTPFound as e:
            # redirect, typically / -> /api/doc
            LOGGER.info("Handler redirect to: %s", e.location)
            raise
        except asyncio.CancelledError:
            # redirection spawns new task and cancels old
            LOGGER.debug("Task cancelled")
            raise
        except Exception as e:
            # some other error?
            LOGGER.error("Handler error with exception: %s", str(e))
            raise

    raise web.HTTPServiceUnavailable(reason="Shutdown in progress")


@web.middleware
async def debug_middleware(request: web.BaseRequest, handler: Coroutine):
    """Show request detail in debug log."""

    if LOGGER.isEnabledFor(logging.DEBUG):
        LOGGER.debug(f"Incoming request: {request.method} {request.path_qs}")
        LOGGER.debug(f"Match info: {request.match_info}")
        body = await request.text()
        LOGGER.debug(f"Body: {body}")

    return await handler(request)


class AdminServer(BaseAdminServer):
    """Admin HTTP server class."""

    def __init__(
        self,
        host: str,
        port: int,
        context: InjectionContext,
        outbound_message_router: Coroutine,
        webhook_router: Callable,
        conductor_stop: Coroutine,
        task_queue: TaskQueue = None,
        conductor_stats: Coroutine = None,
    ):
        """
        Initialize an AdminServer instance.

        Args:
            host: Host to listen on
            port: Port to listen on
            context: The application context instance
            outbound_message_router: Coroutine for delivering outbound messages
            webhook_router: Callable for delivering webhooks
            conductor_stop: Conductor (graceful) stop for shutdown API call
            task_queue: An optional task queue for handlers
        """
        self.app = None
        self.admin_api_key = context.settings.get("admin.admin_api_key")
        self.admin_insecure_mode = bool(
            context.settings.get("admin.admin_insecure_mode")
        )
        self.host = host
        self.port = port
        self.conductor_stop = conductor_stop
        self.conductor_stats = conductor_stats
        self.loaded_modules = []
        self.task_queue = task_queue
        self.webhook_router = webhook_router
        self.webhook_targets = {}
        self.websocket_queues = {}
        self.site = None

        self.context = context.start_scope("admin")
        self.responder = AdminResponder(
            self.context,
            outbound_message_router,
            self.send_webhook,
        )
        self.context.injector.bind_instance(BaseResponder, self.responder)

    async def make_application(self) -> web.Application:
        """Get the aiohttp application instance."""

        middlewares = [ready_middleware,
                       debug_middleware, validation_middleware]

        # admin-token and admin-token are mutually exclusive and required.
        # This should be enforced during parameter parsing but to be sure,
        # we check here.
        assert self.admin_insecure_mode ^ bool(self.admin_api_key)

        def is_unprotected_path(path: str):
            return (
                path
                in [
                    "/api/doc",
                    "/api/docs/swagger.json",
                    "/favicon.ico",
                    "/ws",  # ws handler checks authentication
                ]
                or path.startswith("/static/swagger/")
            )

        # If admin_api_key is None, then admin_insecure_mode must be set so
        # we can safely enable the admin server with no security
        if self.admin_api_key:

            @web.middleware
            async def check_token(request, handler):
                header_admin_api_key = request.headers.get("x-api-key")
                valid_key = self.admin_api_key == header_admin_api_key

                if valid_key or is_unprotected_path(request.path):
                    return await handler(request)
                else:
                    raise web.HTTPUnauthorized()

            middlewares.append(check_token)

        collector: Collector = await self.context.inject(Collector, required=False)

        if self.task_queue:

            @web.middleware
            async def apply_limiter(request, handler):
                task = await self.task_queue.put(handler(request))
                return await task

            middlewares.append(apply_limiter)

        elif collector:

            @web.middleware
            async def collect_stats(request, handler):
                handler = collector.wrap_coro(handler, [handler.__qualname__])
                return await handler(request)

            middlewares.append(collect_stats)

        app = web.Application(middlewares=middlewares)
        app["request_context"] = self.context
        app["outbound_message_router"] = self.responder.send

        app.add_routes(
            [
                web.get("/", self.redirect_handler, allow_head=False),
                web.get("/plugins", self.plugins_handler, allow_head=False),
                web.get("/status", self.status_handler, allow_head=False),
                web.post("/status/reset", self.status_reset_handler),
                web.get("/status/live", self.liveliness_handler,
                        allow_head=False),
                web.get("/status/ready", self.readiness_handler,
                        allow_head=False),
                web.get("/shutdown", self.shutdown_handler, allow_head=False),
                web.get("/ws", self.websocket_handler, allow_head=False),
                web.post("/webhooks/topic/connections/", self.connections_webhook_handler),
            ]
        )

        plugin_registry: PluginRegistry = await self.context.inject(
            PluginRegistry, required=False
        )
        if plugin_registry:
            await plugin_registry.register_admin_routes(app)

        cors = aiohttp_cors.setup(
            app,
            defaults={
                "*": aiohttp_cors.ResourceOptions(
                    allow_credentials=True,
                    expose_headers="*",
                    allow_headers="*",
                    allow_methods="*",
                )
            },
        )
        for route in app.router.routes():
            cors.add(route)
        # get agent label
        agent_label = self.context.settings.get("default_label")
        version_string = f"v{__version__}"

        custom_setup_aiohttp_apispec(
            app=app,
            title=agent_label,
            version=version_string,
            swagger_path="/api/doc"
        )
        app.on_startup.append(self.on_startup)

        # ensure we always have status values
        app._state["ready"] = False
        app._state["alive"] = False

        return app

    async def start(self) -> None:
        """
        Start the webserver.

        Raises:
            AdminSetupError: If there was an error starting the webserver

        """

        def sort_dict(raw: dict) -> dict:
            """Order (JSON, string keys) dict asciibetically by key, recursively."""
            for (k, v) in raw.items():
                if isinstance(v, dict):
                    raw[k] = sort_dict(v)
            return dict(sorted([item for item in raw.items()], key=lambda x: x[0]))

        self.app = await self.make_application()
        runner = web.AppRunner(self.app)
        await runner.setup()

        plugin_registry: PluginRegistry = await self.context.inject(
            PluginRegistry, required=False
        )
        if plugin_registry:
            plugin_registry.post_process_routes(self.app)

        # order tags alphabetically, parameters deterministically and pythonically
        swagger_dict = self.app._state["swagger_dict"]
        swagger_dict.get("tags", []).sort(key=lambda t: t["name"])
        for path in swagger_dict["paths"].values():
            for method_spec in path.values():
                method_spec["parameters"].sort(
                    key=lambda p: (p["in"], not p["required"], p["name"])
                )

        # order definitions alphabetically by dict key
        swagger_dict["definitions"] = sort_dict(swagger_dict["definitions"])

        self.site = web.TCPSite(runner, host=self.host, port=self.port)

        try:
            await self.site.start()
            self.app._state["ready"] = True
            self.app._state["alive"] = True
        except OSError:
            raise AdminSetupError(
                "Unable to start webserver with host "
                + f"'{self.host}' and port '{self.port}'\n"
            )

    async def stop(self) -> None:
        """Stop the webserver."""
        self.app._state["ready"] = False  # in case call does not come through OpenAPI
        for queue in self.websocket_queues.values():
            queue.stop()
        if self.site:
            await self.site.stop()
            self.site = None

    async def on_startup(self, app: web.Application):
        """Perform webserver startup actions."""
        if self.admin_api_key:
            swagger = app["swagger_dict"]
            swagger["securityDefinitions"] = {
                "ApiKeyHeader": {"type": "apiKey", "in": "header", "name": "X-API-KEY"}
            }
            swagger["security"] = [{"ApiKeyHeader": []}]

    @docs(tags=["server"], summary="Fetch the list of loaded plugins")
    @response_schema(AdminModulesSchema(), 200)
    async def plugins_handler(self, request: web.BaseRequest):
        """
        Request handler for the loaded plugins list.

        Args:
            request: aiohttp request object

        Returns:
            The module list response

        """
        registry: PluginRegistry = await self.context.inject(
            PluginRegistry, required=False
        )
        plugins = registry and sorted(registry.plugin_names) or []
        return web.json_response({"result": plugins})

    @docs(tags=["server"], summary="Fetch the server status")
    @response_schema(AdminStatusSchema(), 200)
    async def status_handler(self, request: web.BaseRequest):
        """
        Request handler for the server status information.

        Args:
            request: aiohttp request object

        Returns:
            The web response

        """
        status = {"version": __version__}
        status["label"] = self.context.settings.get("default_label")
        collector: Collector = await self.context.inject(Collector, required=False)
        if collector:
            status["timing"] = collector.results
        if self.conductor_stats:
            status["conductor"] = await self.conductor_stats()
        return web.json_response(status)

    @docs(tags=["server"], summary="Reset statistics")
    @response_schema(AdminStatusSchema(), 200)
    async def status_reset_handler(self, request: web.BaseRequest):
        """
        Request handler for resetting the timing statistics.

        Args:
            request: aiohttp request object

        Returns:
            The web response

        """
        collector: Collector = await self.context.inject(Collector, required=False)
        if collector:
            collector.reset()
        return web.json_response({})

    @docs(tags=["server"], summary="Webhooks handler")
    async def connections_webhook_handler(self, request: web.BaseRequest):
        """
        Request handler for webhooks

        Args:
            request: aiohttp request object

        Returns:
            The web response

        """

        # Request body
        body = await request.json()

        # Initialise manager
        mgr = DexaManager(self.context)

        # Handle webhook.
        await mgr.handle_connections_webhook(body)

        return web.json_response({})

    async def redirect_handler(self, request: web.BaseRequest):
        """Perform redirect to documentation."""
        raise web.HTTPFound("/api/doc")

    @docs(tags=["server"], summary="Liveliness check")
    @response_schema(AdminStatusLivelinessSchema(), 200)
    async def liveliness_handler(self, request: web.BaseRequest):
        """
        Request handler for liveliness check.

        Args:
            request: aiohttp request object

        Returns:
            The web response, always indicating True

        """
        app_live = self.app._state["alive"]
        if app_live:
            return web.json_response({"alive": app_live})
        else:
            raise web.HTTPServiceUnavailable(reason="Service not available")

    @docs(tags=["server"], summary="Readiness check")
    @response_schema(AdminStatusReadinessSchema(), 200)
    async def readiness_handler(self, request: web.BaseRequest):
        """
        Request handler for liveliness check.

        Args:
            request: aiohttp request object

        Returns:
            The web response, indicating readiness for further calls

        """
        app_ready = self.app._state["ready"] and self.app._state["alive"]
        if app_ready:
            return web.json_response({"ready": app_ready})
        else:
            raise web.HTTPServiceUnavailable(reason="Service not ready")

    @docs(tags=["server"], summary="Shut down server")
    async def shutdown_handler(self, request: web.BaseRequest):
        """
        Request handler for server shutdown.

        Args:
            request: aiohttp request object

        Returns:
            The web response (empty production)

        """
        self.app._state["ready"] = False
        loop = asyncio.get_event_loop()
        asyncio.ensure_future(self.conductor_stop(), loop=loop)

        return web.json_response({})

    def notify_fatal_error(self):
        """Set our readiness flags to force a restart (openshift)."""
        LOGGER.error("Received shutdown request notify_fatal_error()")
        self.app._state["ready"] = False
        self.app._state["alive"] = False

    async def websocket_handler(self, request):
        """Send notifications to admin client over websocket."""

        ws = web.WebSocketResponse()
        await ws.prepare(request)
        socket_id = str(uuid.uuid4())
        queue = BasicMessageQueue()
        loop = asyncio.get_event_loop()

        if self.admin_insecure_mode:
            # open to send websocket messages without api key auth
            queue.authenticated = True
        else:
            header_admin_api_key = request.headers.get("x-api-key")
            # authenticated via http header?
            queue.authenticated = header_admin_api_key == self.admin_api_key

        try:
            self.websocket_queues[socket_id] = queue
            await queue.enqueue(
                {
                    "topic": "settings",
                    "payload": {
                        "authenticated": queue.authenticated,
                        "label": self.context.settings.get("default_label"),
                        "endpoint": self.context.settings.get("default_endpoint"),
                        "no_receive_invites": self.context.settings.get(
                            "admin.no_receive_invites", False
                        ),
                        "help_link": self.context.settings.get("admin.help_link"),
                    },
                }
            )

            closed = False
            receive = loop.create_task(ws.receive_json())
            send = loop.create_task(queue.dequeue(timeout=5.0))

            while not closed:
                try:
                    await asyncio.wait(
                        (receive, send), return_when=asyncio.FIRST_COMPLETED
                    )
                    if ws.closed:
                        closed = True

                    if receive.done():
                        if not closed:
                            msg_received = None
                            msg_api_key = None
                            try:
                                # this call can re-raise exeptions from inside the task
                                msg_received = receive.result()
                                msg_api_key = msg_received.get("x-api-key")
                            except Exception:
                                LOGGER.exception(
                                    "Exception in websocket receiving task:"
                                )
                            if self.admin_api_key and self.admin_api_key == msg_api_key:
                                # authenticated via websocket message
                                queue.authenticated = True

                            receive = loop.create_task(ws.receive_json())

                    if send.done():
                        try:
                            msg = send.result()
                        except asyncio.TimeoutError:
                            msg = None

                        if msg is None:
                            # we send fake pings because the JS client
                            # can't detect real ones
                            msg = {
                                "topic": "ping",
                                "authenticated": queue.authenticated,
                            }
                        if not closed:
                            if msg:
                                await ws.send_json(msg)
                            send = loop.create_task(queue.dequeue(timeout=5.0))

                except asyncio.CancelledError:
                    closed = True

            if not receive.done():
                receive.cancel()
            if not send.done():
                send.cancel()

        finally:
            del self.websocket_queues[socket_id]

        return ws

    def add_webhook_target(
        self,
        target_url: str,
        topic_filter: Sequence[str] = None,
        max_attempts: int = None,
    ):
        """Add a webhook target."""
        self.webhook_targets[target_url] = WebhookTarget(
            target_url, topic_filter, max_attempts
        )

    def remove_webhook_target(self, target_url: str):
        """Remove a webhook target."""
        if target_url in self.webhook_targets:
            del self.webhook_targets[target_url]

    async def send_webhook(self, topic: str, payload: dict):
        """Add a webhook to the queue, to send to all registered targets."""
        if self.webhook_router:
            for idx, target in self.webhook_targets.items():
                if not target.topic_filter or topic in target.topic_filter:
                    self.webhook_router(
                        topic, payload, target.endpoint, target.max_attempts
                    )

        for queue in self.websocket_queues.values():
            if queue.authenticated or topic in ("ping", "settings"):
                await queue.enqueue({"topic": topic, "payload": payload})

Functions

async def debug_middleware(request: aiohttp.web_request.BaseRequest, handler: Coroutine[+T_co, -T_contra, +V_co])

Show request detail in debug log.

Expand source code
@web.middleware
async def debug_middleware(request: web.BaseRequest, handler: Coroutine):
    """Show request detail in debug log."""

    if LOGGER.isEnabledFor(logging.DEBUG):
        LOGGER.debug(f"Incoming request: {request.method} {request.path_qs}")
        LOGGER.debug(f"Match info: {request.match_info}")
        body = await request.text()
        LOGGER.debug(f"Body: {body}")

    return await handler(request)
async def ready_middleware(request: aiohttp.web_request.BaseRequest, handler: Coroutine[+T_co, -T_contra, +V_co])

Only continue if application is ready to take work.

Expand source code
@web.middleware
async def ready_middleware(request: web.BaseRequest, handler: Coroutine):
    """Only continue if application is ready to take work."""

    if (
        str(request.rel_url).rstrip("/")
        in (
            "/status/live",
            "/status/ready",
        )
        or request.app._state.get("ready")
    ):
        try:
            return await handler(request)
        except (LedgerConfigError, LedgerTransactionError) as e:
            # fatal, signal server shutdown
            LOGGER.error("Shutdown with %s", str(e))
            request.app._state["ready"] = False
            request.app._state["alive"] = False
            raise
        except web.HTTPFound as e:
            # redirect, typically / -> /api/doc
            LOGGER.info("Handler redirect to: %s", e.location)
            raise
        except asyncio.CancelledError:
            # redirection spawns new task and cancels old
            LOGGER.debug("Task cancelled")
            raise
        except Exception as e:
            # some other error?
            LOGGER.error("Handler error with exception: %s", str(e))
            raise

    raise web.HTTPServiceUnavailable(reason="Shutdown in progress")

Classes

class AdminModulesSchema (*, only: Union[Sequence[str], Set[str]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Dict[~KT, ~VT] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: str = None)

Schema for the modules endpoint.

Expand source code
class AdminModulesSchema(Schema):
    """Schema for the modules endpoint."""

    result = fields.List(
        fields.Str(description="admin module"), description="List of admin modules"
    )

Ancestors

  • marshmallow.schema.Schema
  • marshmallow.base.SchemaABC

Class variables

var opts
class AdminResponder (context: InjectionContext, send: Coroutine[+T_co, -T_contra, +V_co], webhook: Coroutine[+T_co, -T_contra, +V_co], **kwargs)

Handle outgoing messages from message handlers.

Initialize an instance of AdminResponder.

Args

send
Function to send outbound message
Expand source code
class AdminResponder(BaseResponder):
    """Handle outgoing messages from message handlers."""

    def __init__(
        self,
        context: InjectionContext,
        send: Coroutine,
        webhook: Coroutine,
        **kwargs,
    ):
        """
        Initialize an instance of `AdminResponder`.

        Args:
            send: Function to send outbound message

        """
        super().__init__(**kwargs)
        self._context = context
        self._send = send
        self._webhook = webhook

    async def send_outbound(self, message: OutboundMessage):
        """
        Send outbound message.

        Args:
            message: The `OutboundMessage` to be sent
        """
        await self._send(self._context, message)

    async def send_webhook(self, topic: str, payload: dict):
        """
        Dispatch a webhook.

        Args:
            topic: the webhook topic identifier
            payload: the webhook payload value
        """
        await self._webhook(topic, payload)

Ancestors

  • aries_cloudagent.messaging.responder.BaseResponder
  • abc.ABC

Methods

async def send_outbound(self, message: aries_cloudagent.transport.outbound.message.OutboundMessage)

Send outbound message.

Args

message
The OutboundMessage to be sent
Expand source code
async def send_outbound(self, message: OutboundMessage):
    """
    Send outbound message.

    Args:
        message: The `OutboundMessage` to be sent
    """
    await self._send(self._context, message)
async def send_webhook(self, topic: str, payload: dict)

Dispatch a webhook.

Args

topic
the webhook topic identifier
payload
the webhook payload value
Expand source code
async def send_webhook(self, topic: str, payload: dict):
    """
    Dispatch a webhook.

    Args:
        topic: the webhook topic identifier
        payload: the webhook payload value
    """
    await self._webhook(topic, payload)
class AdminServer (host: str, port: int, context: InjectionContext, outbound_message_router: Coroutine[+T_co, -T_contra, +V_co], webhook_router: Callable, conductor_stop: Coroutine[+T_co, -T_contra, +V_co], task_queue: aries_cloudagent.utils.task_queue.TaskQueue = None, conductor_stats: Coroutine[+T_co, -T_contra, +V_co] = None)

Admin HTTP server class.

Initialize an AdminServer instance.

Args

host
Host to listen on
port
Port to listen on
context
The application context instance
outbound_message_router
Coroutine for delivering outbound messages
webhook_router
Callable for delivering webhooks
conductor_stop
Conductor (graceful) stop for shutdown API call
task_queue
An optional task queue for handlers
Expand source code
class AdminServer(BaseAdminServer):
    """Admin HTTP server class."""

    def __init__(
        self,
        host: str,
        port: int,
        context: InjectionContext,
        outbound_message_router: Coroutine,
        webhook_router: Callable,
        conductor_stop: Coroutine,
        task_queue: TaskQueue = None,
        conductor_stats: Coroutine = None,
    ):
        """
        Initialize an AdminServer instance.

        Args:
            host: Host to listen on
            port: Port to listen on
            context: The application context instance
            outbound_message_router: Coroutine for delivering outbound messages
            webhook_router: Callable for delivering webhooks
            conductor_stop: Conductor (graceful) stop for shutdown API call
            task_queue: An optional task queue for handlers
        """
        self.app = None
        self.admin_api_key = context.settings.get("admin.admin_api_key")
        self.admin_insecure_mode = bool(
            context.settings.get("admin.admin_insecure_mode")
        )
        self.host = host
        self.port = port
        self.conductor_stop = conductor_stop
        self.conductor_stats = conductor_stats
        self.loaded_modules = []
        self.task_queue = task_queue
        self.webhook_router = webhook_router
        self.webhook_targets = {}
        self.websocket_queues = {}
        self.site = None

        self.context = context.start_scope("admin")
        self.responder = AdminResponder(
            self.context,
            outbound_message_router,
            self.send_webhook,
        )
        self.context.injector.bind_instance(BaseResponder, self.responder)

    async def make_application(self) -> web.Application:
        """Get the aiohttp application instance."""

        middlewares = [ready_middleware,
                       debug_middleware, validation_middleware]

        # admin-token and admin-token are mutually exclusive and required.
        # This should be enforced during parameter parsing but to be sure,
        # we check here.
        assert self.admin_insecure_mode ^ bool(self.admin_api_key)

        def is_unprotected_path(path: str):
            return (
                path
                in [
                    "/api/doc",
                    "/api/docs/swagger.json",
                    "/favicon.ico",
                    "/ws",  # ws handler checks authentication
                ]
                or path.startswith("/static/swagger/")
            )

        # If admin_api_key is None, then admin_insecure_mode must be set so
        # we can safely enable the admin server with no security
        if self.admin_api_key:

            @web.middleware
            async def check_token(request, handler):
                header_admin_api_key = request.headers.get("x-api-key")
                valid_key = self.admin_api_key == header_admin_api_key

                if valid_key or is_unprotected_path(request.path):
                    return await handler(request)
                else:
                    raise web.HTTPUnauthorized()

            middlewares.append(check_token)

        collector: Collector = await self.context.inject(Collector, required=False)

        if self.task_queue:

            @web.middleware
            async def apply_limiter(request, handler):
                task = await self.task_queue.put(handler(request))
                return await task

            middlewares.append(apply_limiter)

        elif collector:

            @web.middleware
            async def collect_stats(request, handler):
                handler = collector.wrap_coro(handler, [handler.__qualname__])
                return await handler(request)

            middlewares.append(collect_stats)

        app = web.Application(middlewares=middlewares)
        app["request_context"] = self.context
        app["outbound_message_router"] = self.responder.send

        app.add_routes(
            [
                web.get("/", self.redirect_handler, allow_head=False),
                web.get("/plugins", self.plugins_handler, allow_head=False),
                web.get("/status", self.status_handler, allow_head=False),
                web.post("/status/reset", self.status_reset_handler),
                web.get("/status/live", self.liveliness_handler,
                        allow_head=False),
                web.get("/status/ready", self.readiness_handler,
                        allow_head=False),
                web.get("/shutdown", self.shutdown_handler, allow_head=False),
                web.get("/ws", self.websocket_handler, allow_head=False),
                web.post("/webhooks/topic/connections/", self.connections_webhook_handler),
            ]
        )

        plugin_registry: PluginRegistry = await self.context.inject(
            PluginRegistry, required=False
        )
        if plugin_registry:
            await plugin_registry.register_admin_routes(app)

        cors = aiohttp_cors.setup(
            app,
            defaults={
                "*": aiohttp_cors.ResourceOptions(
                    allow_credentials=True,
                    expose_headers="*",
                    allow_headers="*",
                    allow_methods="*",
                )
            },
        )
        for route in app.router.routes():
            cors.add(route)
        # get agent label
        agent_label = self.context.settings.get("default_label")
        version_string = f"v{__version__}"

        custom_setup_aiohttp_apispec(
            app=app,
            title=agent_label,
            version=version_string,
            swagger_path="/api/doc"
        )
        app.on_startup.append(self.on_startup)

        # ensure we always have status values
        app._state["ready"] = False
        app._state["alive"] = False

        return app

    async def start(self) -> None:
        """
        Start the webserver.

        Raises:
            AdminSetupError: If there was an error starting the webserver

        """

        def sort_dict(raw: dict) -> dict:
            """Order (JSON, string keys) dict asciibetically by key, recursively."""
            for (k, v) in raw.items():
                if isinstance(v, dict):
                    raw[k] = sort_dict(v)
            return dict(sorted([item for item in raw.items()], key=lambda x: x[0]))

        self.app = await self.make_application()
        runner = web.AppRunner(self.app)
        await runner.setup()

        plugin_registry: PluginRegistry = await self.context.inject(
            PluginRegistry, required=False
        )
        if plugin_registry:
            plugin_registry.post_process_routes(self.app)

        # order tags alphabetically, parameters deterministically and pythonically
        swagger_dict = self.app._state["swagger_dict"]
        swagger_dict.get("tags", []).sort(key=lambda t: t["name"])
        for path in swagger_dict["paths"].values():
            for method_spec in path.values():
                method_spec["parameters"].sort(
                    key=lambda p: (p["in"], not p["required"], p["name"])
                )

        # order definitions alphabetically by dict key
        swagger_dict["definitions"] = sort_dict(swagger_dict["definitions"])

        self.site = web.TCPSite(runner, host=self.host, port=self.port)

        try:
            await self.site.start()
            self.app._state["ready"] = True
            self.app._state["alive"] = True
        except OSError:
            raise AdminSetupError(
                "Unable to start webserver with host "
                + f"'{self.host}' and port '{self.port}'\n"
            )

    async def stop(self) -> None:
        """Stop the webserver."""
        self.app._state["ready"] = False  # in case call does not come through OpenAPI
        for queue in self.websocket_queues.values():
            queue.stop()
        if self.site:
            await self.site.stop()
            self.site = None

    async def on_startup(self, app: web.Application):
        """Perform webserver startup actions."""
        if self.admin_api_key:
            swagger = app["swagger_dict"]
            swagger["securityDefinitions"] = {
                "ApiKeyHeader": {"type": "apiKey", "in": "header", "name": "X-API-KEY"}
            }
            swagger["security"] = [{"ApiKeyHeader": []}]

    @docs(tags=["server"], summary="Fetch the list of loaded plugins")
    @response_schema(AdminModulesSchema(), 200)
    async def plugins_handler(self, request: web.BaseRequest):
        """
        Request handler for the loaded plugins list.

        Args:
            request: aiohttp request object

        Returns:
            The module list response

        """
        registry: PluginRegistry = await self.context.inject(
            PluginRegistry, required=False
        )
        plugins = registry and sorted(registry.plugin_names) or []
        return web.json_response({"result": plugins})

    @docs(tags=["server"], summary="Fetch the server status")
    @response_schema(AdminStatusSchema(), 200)
    async def status_handler(self, request: web.BaseRequest):
        """
        Request handler for the server status information.

        Args:
            request: aiohttp request object

        Returns:
            The web response

        """
        status = {"version": __version__}
        status["label"] = self.context.settings.get("default_label")
        collector: Collector = await self.context.inject(Collector, required=False)
        if collector:
            status["timing"] = collector.results
        if self.conductor_stats:
            status["conductor"] = await self.conductor_stats()
        return web.json_response(status)

    @docs(tags=["server"], summary="Reset statistics")
    @response_schema(AdminStatusSchema(), 200)
    async def status_reset_handler(self, request: web.BaseRequest):
        """
        Request handler for resetting the timing statistics.

        Args:
            request: aiohttp request object

        Returns:
            The web response

        """
        collector: Collector = await self.context.inject(Collector, required=False)
        if collector:
            collector.reset()
        return web.json_response({})

    @docs(tags=["server"], summary="Webhooks handler")
    async def connections_webhook_handler(self, request: web.BaseRequest):
        """
        Request handler for webhooks

        Args:
            request: aiohttp request object

        Returns:
            The web response

        """

        # Request body
        body = await request.json()

        # Initialise manager
        mgr = DexaManager(self.context)

        # Handle webhook.
        await mgr.handle_connections_webhook(body)

        return web.json_response({})

    async def redirect_handler(self, request: web.BaseRequest):
        """Perform redirect to documentation."""
        raise web.HTTPFound("/api/doc")

    @docs(tags=["server"], summary="Liveliness check")
    @response_schema(AdminStatusLivelinessSchema(), 200)
    async def liveliness_handler(self, request: web.BaseRequest):
        """
        Request handler for liveliness check.

        Args:
            request: aiohttp request object

        Returns:
            The web response, always indicating True

        """
        app_live = self.app._state["alive"]
        if app_live:
            return web.json_response({"alive": app_live})
        else:
            raise web.HTTPServiceUnavailable(reason="Service not available")

    @docs(tags=["server"], summary="Readiness check")
    @response_schema(AdminStatusReadinessSchema(), 200)
    async def readiness_handler(self, request: web.BaseRequest):
        """
        Request handler for liveliness check.

        Args:
            request: aiohttp request object

        Returns:
            The web response, indicating readiness for further calls

        """
        app_ready = self.app._state["ready"] and self.app._state["alive"]
        if app_ready:
            return web.json_response({"ready": app_ready})
        else:
            raise web.HTTPServiceUnavailable(reason="Service not ready")

    @docs(tags=["server"], summary="Shut down server")
    async def shutdown_handler(self, request: web.BaseRequest):
        """
        Request handler for server shutdown.

        Args:
            request: aiohttp request object

        Returns:
            The web response (empty production)

        """
        self.app._state["ready"] = False
        loop = asyncio.get_event_loop()
        asyncio.ensure_future(self.conductor_stop(), loop=loop)

        return web.json_response({})

    def notify_fatal_error(self):
        """Set our readiness flags to force a restart (openshift)."""
        LOGGER.error("Received shutdown request notify_fatal_error()")
        self.app._state["ready"] = False
        self.app._state["alive"] = False

    async def websocket_handler(self, request):
        """Send notifications to admin client over websocket."""

        ws = web.WebSocketResponse()
        await ws.prepare(request)
        socket_id = str(uuid.uuid4())
        queue = BasicMessageQueue()
        loop = asyncio.get_event_loop()

        if self.admin_insecure_mode:
            # open to send websocket messages without api key auth
            queue.authenticated = True
        else:
            header_admin_api_key = request.headers.get("x-api-key")
            # authenticated via http header?
            queue.authenticated = header_admin_api_key == self.admin_api_key

        try:
            self.websocket_queues[socket_id] = queue
            await queue.enqueue(
                {
                    "topic": "settings",
                    "payload": {
                        "authenticated": queue.authenticated,
                        "label": self.context.settings.get("default_label"),
                        "endpoint": self.context.settings.get("default_endpoint"),
                        "no_receive_invites": self.context.settings.get(
                            "admin.no_receive_invites", False
                        ),
                        "help_link": self.context.settings.get("admin.help_link"),
                    },
                }
            )

            closed = False
            receive = loop.create_task(ws.receive_json())
            send = loop.create_task(queue.dequeue(timeout=5.0))

            while not closed:
                try:
                    await asyncio.wait(
                        (receive, send), return_when=asyncio.FIRST_COMPLETED
                    )
                    if ws.closed:
                        closed = True

                    if receive.done():
                        if not closed:
                            msg_received = None
                            msg_api_key = None
                            try:
                                # this call can re-raise exeptions from inside the task
                                msg_received = receive.result()
                                msg_api_key = msg_received.get("x-api-key")
                            except Exception:
                                LOGGER.exception(
                                    "Exception in websocket receiving task:"
                                )
                            if self.admin_api_key and self.admin_api_key == msg_api_key:
                                # authenticated via websocket message
                                queue.authenticated = True

                            receive = loop.create_task(ws.receive_json())

                    if send.done():
                        try:
                            msg = send.result()
                        except asyncio.TimeoutError:
                            msg = None

                        if msg is None:
                            # we send fake pings because the JS client
                            # can't detect real ones
                            msg = {
                                "topic": "ping",
                                "authenticated": queue.authenticated,
                            }
                        if not closed:
                            if msg:
                                await ws.send_json(msg)
                            send = loop.create_task(queue.dequeue(timeout=5.0))

                except asyncio.CancelledError:
                    closed = True

            if not receive.done():
                receive.cancel()
            if not send.done():
                send.cancel()

        finally:
            del self.websocket_queues[socket_id]

        return ws

    def add_webhook_target(
        self,
        target_url: str,
        topic_filter: Sequence[str] = None,
        max_attempts: int = None,
    ):
        """Add a webhook target."""
        self.webhook_targets[target_url] = WebhookTarget(
            target_url, topic_filter, max_attempts
        )

    def remove_webhook_target(self, target_url: str):
        """Remove a webhook target."""
        if target_url in self.webhook_targets:
            del self.webhook_targets[target_url]

    async def send_webhook(self, topic: str, payload: dict):
        """Add a webhook to the queue, to send to all registered targets."""
        if self.webhook_router:
            for idx, target in self.webhook_targets.items():
                if not target.topic_filter or topic in target.topic_filter:
                    self.webhook_router(
                        topic, payload, target.endpoint, target.max_attempts
                    )

        for queue in self.websocket_queues.values():
            if queue.authenticated or topic in ("ping", "settings"):
                await queue.enqueue({"topic": topic, "payload": payload})

Ancestors

  • aries_cloudagent.admin.base_server.BaseAdminServer
  • abc.ABC

Methods

def add_webhook_target(self, target_url: str, topic_filter: Sequence[str] = None, max_attempts: int = None)

Add a webhook target.

Expand source code
def add_webhook_target(
    self,
    target_url: str,
    topic_filter: Sequence[str] = None,
    max_attempts: int = None,
):
    """Add a webhook target."""
    self.webhook_targets[target_url] = WebhookTarget(
        target_url, topic_filter, max_attempts
    )
async def connections_webhook_handler(self, request: aiohttp.web_request.BaseRequest)

Request handler for webhooks

Args

request
aiohttp request object

Returns

The web response

Expand source code
@docs(tags=["server"], summary="Webhooks handler")
async def connections_webhook_handler(self, request: web.BaseRequest):
    """
    Request handler for webhooks

    Args:
        request: aiohttp request object

    Returns:
        The web response

    """

    # Request body
    body = await request.json()

    # Initialise manager
    mgr = DexaManager(self.context)

    # Handle webhook.
    await mgr.handle_connections_webhook(body)

    return web.json_response({})
async def liveliness_handler(self, request: aiohttp.web_request.BaseRequest)

Request handler for liveliness check.

Args

request
aiohttp request object

Returns

The web response, always indicating True

Expand source code
@docs(tags=["server"], summary="Liveliness check")
@response_schema(AdminStatusLivelinessSchema(), 200)
async def liveliness_handler(self, request: web.BaseRequest):
    """
    Request handler for liveliness check.

    Args:
        request: aiohttp request object

    Returns:
        The web response, always indicating True

    """
    app_live = self.app._state["alive"]
    if app_live:
        return web.json_response({"alive": app_live})
    else:
        raise web.HTTPServiceUnavailable(reason="Service not available")
async def make_application(self) ‑> aiohttp.web_app.Application

Get the aiohttp application instance.

Expand source code
async def make_application(self) -> web.Application:
    """Get the aiohttp application instance."""

    middlewares = [ready_middleware,
                   debug_middleware, validation_middleware]

    # admin-token and admin-token are mutually exclusive and required.
    # This should be enforced during parameter parsing but to be sure,
    # we check here.
    assert self.admin_insecure_mode ^ bool(self.admin_api_key)

    def is_unprotected_path(path: str):
        return (
            path
            in [
                "/api/doc",
                "/api/docs/swagger.json",
                "/favicon.ico",
                "/ws",  # ws handler checks authentication
            ]
            or path.startswith("/static/swagger/")
        )

    # If admin_api_key is None, then admin_insecure_mode must be set so
    # we can safely enable the admin server with no security
    if self.admin_api_key:

        @web.middleware
        async def check_token(request, handler):
            header_admin_api_key = request.headers.get("x-api-key")
            valid_key = self.admin_api_key == header_admin_api_key

            if valid_key or is_unprotected_path(request.path):
                return await handler(request)
            else:
                raise web.HTTPUnauthorized()

        middlewares.append(check_token)

    collector: Collector = await self.context.inject(Collector, required=False)

    if self.task_queue:

        @web.middleware
        async def apply_limiter(request, handler):
            task = await self.task_queue.put(handler(request))
            return await task

        middlewares.append(apply_limiter)

    elif collector:

        @web.middleware
        async def collect_stats(request, handler):
            handler = collector.wrap_coro(handler, [handler.__qualname__])
            return await handler(request)

        middlewares.append(collect_stats)

    app = web.Application(middlewares=middlewares)
    app["request_context"] = self.context
    app["outbound_message_router"] = self.responder.send

    app.add_routes(
        [
            web.get("/", self.redirect_handler, allow_head=False),
            web.get("/plugins", self.plugins_handler, allow_head=False),
            web.get("/status", self.status_handler, allow_head=False),
            web.post("/status/reset", self.status_reset_handler),
            web.get("/status/live", self.liveliness_handler,
                    allow_head=False),
            web.get("/status/ready", self.readiness_handler,
                    allow_head=False),
            web.get("/shutdown", self.shutdown_handler, allow_head=False),
            web.get("/ws", self.websocket_handler, allow_head=False),
            web.post("/webhooks/topic/connections/", self.connections_webhook_handler),
        ]
    )

    plugin_registry: PluginRegistry = await self.context.inject(
        PluginRegistry, required=False
    )
    if plugin_registry:
        await plugin_registry.register_admin_routes(app)

    cors = aiohttp_cors.setup(
        app,
        defaults={
            "*": aiohttp_cors.ResourceOptions(
                allow_credentials=True,
                expose_headers="*",
                allow_headers="*",
                allow_methods="*",
            )
        },
    )
    for route in app.router.routes():
        cors.add(route)
    # get agent label
    agent_label = self.context.settings.get("default_label")
    version_string = f"v{__version__}"

    custom_setup_aiohttp_apispec(
        app=app,
        title=agent_label,
        version=version_string,
        swagger_path="/api/doc"
    )
    app.on_startup.append(self.on_startup)

    # ensure we always have status values
    app._state["ready"] = False
    app._state["alive"] = False

    return app
def notify_fatal_error(self)

Set our readiness flags to force a restart (openshift).

Expand source code
def notify_fatal_error(self):
    """Set our readiness flags to force a restart (openshift)."""
    LOGGER.error("Received shutdown request notify_fatal_error()")
    self.app._state["ready"] = False
    self.app._state["alive"] = False
async def on_startup(self, app: aiohttp.web_app.Application)

Perform webserver startup actions.

Expand source code
async def on_startup(self, app: web.Application):
    """Perform webserver startup actions."""
    if self.admin_api_key:
        swagger = app["swagger_dict"]
        swagger["securityDefinitions"] = {
            "ApiKeyHeader": {"type": "apiKey", "in": "header", "name": "X-API-KEY"}
        }
        swagger["security"] = [{"ApiKeyHeader": []}]
async def plugins_handler(self, request: aiohttp.web_request.BaseRequest)

Request handler for the loaded plugins list.

Args

request
aiohttp request object

Returns

The module list response

Expand source code
@docs(tags=["server"], summary="Fetch the list of loaded plugins")
@response_schema(AdminModulesSchema(), 200)
async def plugins_handler(self, request: web.BaseRequest):
    """
    Request handler for the loaded plugins list.

    Args:
        request: aiohttp request object

    Returns:
        The module list response

    """
    registry: PluginRegistry = await self.context.inject(
        PluginRegistry, required=False
    )
    plugins = registry and sorted(registry.plugin_names) or []
    return web.json_response({"result": plugins})
async def readiness_handler(self, request: aiohttp.web_request.BaseRequest)

Request handler for liveliness check.

Args

request
aiohttp request object

Returns

The web response, indicating readiness for further calls

Expand source code
@docs(tags=["server"], summary="Readiness check")
@response_schema(AdminStatusReadinessSchema(), 200)
async def readiness_handler(self, request: web.BaseRequest):
    """
    Request handler for liveliness check.

    Args:
        request: aiohttp request object

    Returns:
        The web response, indicating readiness for further calls

    """
    app_ready = self.app._state["ready"] and self.app._state["alive"]
    if app_ready:
        return web.json_response({"ready": app_ready})
    else:
        raise web.HTTPServiceUnavailable(reason="Service not ready")
async def redirect_handler(self, request: aiohttp.web_request.BaseRequest)

Perform redirect to documentation.

Expand source code
async def redirect_handler(self, request: web.BaseRequest):
    """Perform redirect to documentation."""
    raise web.HTTPFound("/api/doc")
def remove_webhook_target(self, target_url: str)

Remove a webhook target.

Expand source code
def remove_webhook_target(self, target_url: str):
    """Remove a webhook target."""
    if target_url in self.webhook_targets:
        del self.webhook_targets[target_url]
async def send_webhook(self, topic: str, payload: dict)

Add a webhook to the queue, to send to all registered targets.

Expand source code
async def send_webhook(self, topic: str, payload: dict):
    """Add a webhook to the queue, to send to all registered targets."""
    if self.webhook_router:
        for idx, target in self.webhook_targets.items():
            if not target.topic_filter or topic in target.topic_filter:
                self.webhook_router(
                    topic, payload, target.endpoint, target.max_attempts
                )

    for queue in self.websocket_queues.values():
        if queue.authenticated or topic in ("ping", "settings"):
            await queue.enqueue({"topic": topic, "payload": payload})
async def shutdown_handler(self, request: aiohttp.web_request.BaseRequest)

Request handler for server shutdown.

Args

request
aiohttp request object

Returns

The web response (empty production)

Expand source code
@docs(tags=["server"], summary="Shut down server")
async def shutdown_handler(self, request: web.BaseRequest):
    """
    Request handler for server shutdown.

    Args:
        request: aiohttp request object

    Returns:
        The web response (empty production)

    """
    self.app._state["ready"] = False
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(self.conductor_stop(), loop=loop)

    return web.json_response({})
async def start(self) ‑> None

Start the webserver.

Raises

AdminSetupError
If there was an error starting the webserver
Expand source code
async def start(self) -> None:
    """
    Start the webserver.

    Raises:
        AdminSetupError: If there was an error starting the webserver

    """

    def sort_dict(raw: dict) -> dict:
        """Order (JSON, string keys) dict asciibetically by key, recursively."""
        for (k, v) in raw.items():
            if isinstance(v, dict):
                raw[k] = sort_dict(v)
        return dict(sorted([item for item in raw.items()], key=lambda x: x[0]))

    self.app = await self.make_application()
    runner = web.AppRunner(self.app)
    await runner.setup()

    plugin_registry: PluginRegistry = await self.context.inject(
        PluginRegistry, required=False
    )
    if plugin_registry:
        plugin_registry.post_process_routes(self.app)

    # order tags alphabetically, parameters deterministically and pythonically
    swagger_dict = self.app._state["swagger_dict"]
    swagger_dict.get("tags", []).sort(key=lambda t: t["name"])
    for path in swagger_dict["paths"].values():
        for method_spec in path.values():
            method_spec["parameters"].sort(
                key=lambda p: (p["in"], not p["required"], p["name"])
            )

    # order definitions alphabetically by dict key
    swagger_dict["definitions"] = sort_dict(swagger_dict["definitions"])

    self.site = web.TCPSite(runner, host=self.host, port=self.port)

    try:
        await self.site.start()
        self.app._state["ready"] = True
        self.app._state["alive"] = True
    except OSError:
        raise AdminSetupError(
            "Unable to start webserver with host "
            + f"'{self.host}' and port '{self.port}'\n"
        )
async def status_handler(self, request: aiohttp.web_request.BaseRequest)

Request handler for the server status information.

Args

request
aiohttp request object

Returns

The web response

Expand source code
@docs(tags=["server"], summary="Fetch the server status")
@response_schema(AdminStatusSchema(), 200)
async def status_handler(self, request: web.BaseRequest):
    """
    Request handler for the server status information.

    Args:
        request: aiohttp request object

    Returns:
        The web response

    """
    status = {"version": __version__}
    status["label"] = self.context.settings.get("default_label")
    collector: Collector = await self.context.inject(Collector, required=False)
    if collector:
        status["timing"] = collector.results
    if self.conductor_stats:
        status["conductor"] = await self.conductor_stats()
    return web.json_response(status)
async def status_reset_handler(self, request: aiohttp.web_request.BaseRequest)

Request handler for resetting the timing statistics.

Args

request
aiohttp request object

Returns

The web response

Expand source code
@docs(tags=["server"], summary="Reset statistics")
@response_schema(AdminStatusSchema(), 200)
async def status_reset_handler(self, request: web.BaseRequest):
    """
    Request handler for resetting the timing statistics.

    Args:
        request: aiohttp request object

    Returns:
        The web response

    """
    collector: Collector = await self.context.inject(Collector, required=False)
    if collector:
        collector.reset()
    return web.json_response({})
async def stop(self) ‑> None

Stop the webserver.

Expand source code
async def stop(self) -> None:
    """Stop the webserver."""
    self.app._state["ready"] = False  # in case call does not come through OpenAPI
    for queue in self.websocket_queues.values():
        queue.stop()
    if self.site:
        await self.site.stop()
        self.site = None
async def websocket_handler(self, request)

Send notifications to admin client over websocket.

Expand source code
async def websocket_handler(self, request):
    """Send notifications to admin client over websocket."""

    ws = web.WebSocketResponse()
    await ws.prepare(request)
    socket_id = str(uuid.uuid4())
    queue = BasicMessageQueue()
    loop = asyncio.get_event_loop()

    if self.admin_insecure_mode:
        # open to send websocket messages without api key auth
        queue.authenticated = True
    else:
        header_admin_api_key = request.headers.get("x-api-key")
        # authenticated via http header?
        queue.authenticated = header_admin_api_key == self.admin_api_key

    try:
        self.websocket_queues[socket_id] = queue
        await queue.enqueue(
            {
                "topic": "settings",
                "payload": {
                    "authenticated": queue.authenticated,
                    "label": self.context.settings.get("default_label"),
                    "endpoint": self.context.settings.get("default_endpoint"),
                    "no_receive_invites": self.context.settings.get(
                        "admin.no_receive_invites", False
                    ),
                    "help_link": self.context.settings.get("admin.help_link"),
                },
            }
        )

        closed = False
        receive = loop.create_task(ws.receive_json())
        send = loop.create_task(queue.dequeue(timeout=5.0))

        while not closed:
            try:
                await asyncio.wait(
                    (receive, send), return_when=asyncio.FIRST_COMPLETED
                )
                if ws.closed:
                    closed = True

                if receive.done():
                    if not closed:
                        msg_received = None
                        msg_api_key = None
                        try:
                            # this call can re-raise exeptions from inside the task
                            msg_received = receive.result()
                            msg_api_key = msg_received.get("x-api-key")
                        except Exception:
                            LOGGER.exception(
                                "Exception in websocket receiving task:"
                            )
                        if self.admin_api_key and self.admin_api_key == msg_api_key:
                            # authenticated via websocket message
                            queue.authenticated = True

                        receive = loop.create_task(ws.receive_json())

                if send.done():
                    try:
                        msg = send.result()
                    except asyncio.TimeoutError:
                        msg = None

                    if msg is None:
                        # we send fake pings because the JS client
                        # can't detect real ones
                        msg = {
                            "topic": "ping",
                            "authenticated": queue.authenticated,
                        }
                    if not closed:
                        if msg:
                            await ws.send_json(msg)
                        send = loop.create_task(queue.dequeue(timeout=5.0))

            except asyncio.CancelledError:
                closed = True

        if not receive.done():
            receive.cancel()
        if not send.done():
            send.cancel()

    finally:
        del self.websocket_queues[socket_id]

    return ws
class AdminStatusLivelinessSchema (*, only: Union[Sequence[str], Set[str]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Dict[~KT, ~VT] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: str = None)

Schema for the liveliness endpoint.

Expand source code
class AdminStatusLivelinessSchema(Schema):
    """Schema for the liveliness endpoint."""

    alive = fields.Boolean(description="Liveliness status", example=True)

Ancestors

  • marshmallow.schema.Schema
  • marshmallow.base.SchemaABC

Class variables

var opts
class AdminStatusReadinessSchema (*, only: Union[Sequence[str], Set[str]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Dict[~KT, ~VT] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: str = None)

Schema for the readiness endpoint.

Expand source code
class AdminStatusReadinessSchema(Schema):
    """Schema for the readiness endpoint."""

    ready = fields.Boolean(description="Readiness status", example=True)

Ancestors

  • marshmallow.schema.Schema
  • marshmallow.base.SchemaABC

Class variables

var opts
class AdminStatusSchema (*, only: Union[Sequence[str], Set[str]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Dict[~KT, ~VT] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: str = None)

Schema for the status endpoint.

Expand source code
class AdminStatusSchema(Schema):
    """Schema for the status endpoint."""

Ancestors

  • marshmallow.schema.Schema
  • marshmallow.base.SchemaABC

Class variables

var opts
class WebhookTarget (endpoint: str, topic_filter: Sequence[str] = None, max_attempts: int = None)

Class for managing webhook target information.

Initialize the webhook target.

Expand source code
class WebhookTarget:
    """Class for managing webhook target information."""

    def __init__(
        self,
        endpoint: str,
        topic_filter: Sequence[str] = None,
        max_attempts: int = None,
    ):
        """Initialize the webhook target."""
        self.endpoint = endpoint
        self.max_attempts = max_attempts
        self._topic_filter = None
        self.topic_filter = topic_filter  # call setter

    @property
    def topic_filter(self) -> Set[str]:
        """Accessor for the target's topic filter."""
        return self._topic_filter

    @topic_filter.setter
    def topic_filter(self, val: Sequence[str]):
        """Setter for the target's topic filter."""
        filter = set(val) if val else None
        if filter and "*" in filter:
            filter = None
        self._topic_filter = filter

Instance variables

var topic_filter : Set[str]

Accessor for the target's topic filter.

Expand source code
@property
def topic_filter(self) -> Set[str]:
    """Accessor for the target's topic filter."""
    return self._topic_filter