Module dexa_sdk.utils.utils

Expand source code
import jcs
import ast
import typing
import semver
import math
import aiohttp
from urllib.parse import urlparse, parse_qs
from collections import namedtuple
from aries_cloudagent.messaging.models.base_record import BaseRecord
from aries_cloudagent.config.injection_context import InjectionContext


def jcs_rfc8785(data: typing.Union[list, dict]) -> bytes:
    """JSON canonicalisation schema as per IETF RFC 8785"""
    return jcs.canonicalize(data)


def jcs_bytes_to_pyobject(data: bytes) -> typing.Any:
    """Convert JCS bytes to <dict, list e.t.c>"""
    return ast.literal_eval(data.decode())


def replace_jws(doc: dict) -> dict:
    """Replace 'jws' field with 'proofValue' field

    Args:
        doc (dict): input document with proof

    Returns:
        dict: replaced document
    """
    # Add 'proofValue' field
    doc["proofValue"] = doc["jws"]
    # Delete 'jws' field
    del doc["jws"]

    return doc


def replace_proof_value(doc: dict) -> dict:
    """Replace 'proofValue' field with 'jws' field

    Args:
        doc (dict): input document with proof

    Returns:
        dict: replaced document
    """
    # Add 'proofValue' field
    doc["jws"] = doc["proofValue"]
    # Delete 'jws' field
    del doc["proofValue"]

    return doc


def replace_proof_chain(doc: dict) -> dict:
    """Replace 'proofChain' field with 'proof' field

    Args:
        doc (dict): input document with proof

    Returns:
        dict: replaced document
    """
    # Add 'proof' field with first proof in the chain
    doc["proof"] = doc["proofChain"][0]
    # Delete 'proofChain' field
    del doc["proofChain"]

    return doc


def sort_exchange_record_dicts_by_created_at(records: typing.List[dict],
                                             sort_order: str = "desc") -> typing.List[dict]:
    """Sort exchange record dicts based on 'created at' field

    Args:
        records (typing.List[dict]): Exchange record dicts
        sort_order (str, optional): Sort order ('desc' or 'asc'). Defaults to "desc".

    Returns:
        typing.List[dict]: Sorted record dicts
    """
    assert sort_order in ("desc", "asc")

    return sorted(records,
                  key=lambda k: k['updated_at'],
                  reverse=True if sort_order == "desc" else False)


def bump_major_for_semver_string(version: str) -> str:
    """Bump major part of the semver string.

    Args:
        version (str): semver compatible version string

    Returns:
        str: Bumped version
    """
    v = semver.parse_version_info(version)
    return str(v.bump_major())


# Pagination config
PaginationConfig = namedtuple(
    'PaginationConfig',
    [
        "total_count",
        "page",
        "page_size",
        "total_pages"
    ]
)

# Pagination result
PaginationResult = namedtuple(
    'PaginationResult',
    [
        "results",
        "pagination"
    ]
)


def get_slices(page, page_size=10):
    """
    Get the start and end indices for the given page and page size.

    Args:
        page: page number
        page_size: page size

    Returns: start and end indices

    """
    start = (page - 1) * page_size

    end = start + page_size

    return start, end


def paginate(items_list: typing.List, page: int = 1, page_size: int = 10) -> PaginationResult:
    """Paginate an items list

    Args:
        items_list (typing.List): items list
        page (int, optional): page number. Defaults to None.

    Returns:
        PaginationResult: Pagination result
    """

    page = page if page else 1

    # total count
    total_count = len(items_list)

    # total pages
    total_pages = math.ceil(total_count / page_size)

    # Get the items list for the current page
    # Fetch pagination indices
    lower, upper = get_slices(page, page_size)

    # Slice the items for current page
    items_list = items_list[lower:upper]

    pconfig = PaginationConfig(
        total_count=total_count,
        page=page,
        page_size=page_size,
        total_pages=total_pages
    )

    res = PaginationResult(results=items_list, pagination=pconfig._asdict())

    return res


def paginate_records(
    records: typing.List[BaseRecord],
    page: int = 1,
    page_size: int = 10
) -> PaginationResult:
    """Paginate records

    Args:
        records (typing.List[BaseRecord]): records
        page (int, optional): page. Defaults to 1.
        page_size (int, optional): page_size. Defaults to 10.

    Returns:
        PaginationResult: Results
    """

    presults = paginate(records, page, page_size)

    serialised_item_list = []
    for item in presults.results:
        serialised_item_list.append(item.serialize())

    # Sort the serialised records.
    serialised_item_list = sort_exchange_record_dicts_by_created_at(serialised_item_list)

    res = PaginationResult(results=serialised_item_list, pagination=presults.pagination)

    return res


def clean_and_get_field_from_dict(
        input: dict,
        key: str
) -> typing.Union[None, typing.Any]:
    """Return the value of the field in dict if present.

    Args:
        input (dict): dict
        key (str): dict key

    Returns:
        typing.Union[None, Any]: Value of the field
    """

    # Get field value
    field_value = input.get(key)

    # Check if field value is empty string
    if field_value and isinstance(field_value, str) and len(field_value) == 0:
        field_value = None

    return field_value


def drop_none_dict(input: dict) -> dict:
    """Return dict with None values removed

    Args:
        input (dict): input

    Returns:
        dict: output
    """

    for k, v in list(input.items()):
        if v is None:
            input.pop(k)

    return input


async def generate_firebase_dynamic_link(context: InjectionContext, payload: str) -> str:
    """Generate firebase dynamic link

    Args:
        context (InjectionContext): Injection context to be used.
        payload (str): Payload.

    Returns:
        str: Firebase dynamic link
    """

    domain_uri_prefix = context.settings.get("intermediary.firebase_domain_uri_prefix")
    android_package_name = context.settings.get("intermediary.firebase_android_package_name")
    ios_bundle_id = context.settings.get("intermediary.firebase_ios_bundle_id")
    ios_appstore_id = context.settings.get("intermediary.firebase_ios_appstore_id")
    firebase_web_api_key = context.settings.get("intermediary.firebase_web_api_key")

    payload = {
        "dynamicLinkInfo": {
            "domainUriPrefix": domain_uri_prefix,
            "link": payload,
            "androidInfo": {
                "androidPackageName": android_package_name,
            },
            "iosInfo": {
                "iosBundleId": ios_bundle_id,
                "iosAppStoreId": ios_appstore_id,
            }
        },
        "suffix": {
            "option": "UNGUESSABLE"
        }
    }

    firebase_dynamic_link_endpoint = "https://firebasedynamiclinks.googleapis.com/v1/shortLinks?key="
    firebase_dynamic_link_endpoint += firebase_web_api_key

    jresp = {}
    async with aiohttp.ClientSession() as session:
        async with session.post(firebase_dynamic_link_endpoint, json=payload) as resp:
            if resp.status == 200:
                jresp = await resp.json()
            else:
                tresp = await resp.text()
                raise Exception(f"Error in Firebase dynamic link: {tresp}")

    return jresp["shortLink"]


async def fetch_org_details_from_intermediary(context: InjectionContext) -> dict:
    """Fetch org details from intermediary

    Args:
        context (InjectionContext): Injection context to be used.

    Returns:
        dict: Org details.
    """

    endpoint_url = context.settings.get("intermediary.igrantio_endpoint_url")
    org_id = context.settings.get("intermediary.igrantio_org_id")
    api_key = context.settings.get("intermediary.igrantio_org_api_key")

    # Construct iGrant.io organisation detail endpoint URL
    org_detail_url = f"{endpoint_url}/v1/organizations/{org_id}"

    # Construct request headers
    request_headers = {
        "Authorization": f"ApiKey {api_key}"
    }

    # Make request to iGrant.io organisation detail endpoint
    jresp = None
    async with aiohttp.ClientSession(headers=request_headers) as session:
        async with session.get(org_detail_url) as resp:
            if resp.status == 200:
                jresp = await resp.json()
                return jresp["Organization"]

    return jresp


async def parse_query_params(url: str, query_param: str) -> str:
    """Parse query params in a URL

    Args:
        url (str): URL
        query_param (str): Query param key.

    Returns:
        str: _description_
    """

    # Parse the URL
    parsed_url = urlparse(url)

    # Parse query string params.
    parsed_qs = parse_qs(parsed_url.query)

    return parsed_qs.get(query_param)[0]

Functions

def bump_major_for_semver_string(version: str) ‑> str

Bump major part of the semver string.

Args

version : str
semver compatible version string

Returns

str
Bumped version
Expand source code
def bump_major_for_semver_string(version: str) -> str:
    """Bump major part of the semver string.

    Args:
        version (str): semver compatible version string

    Returns:
        str: Bumped version
    """
    v = semver.parse_version_info(version)
    return str(v.bump_major())
def clean_and_get_field_from_dict(input: dict, key: str) ‑> Optional[None]

Return the value of the field in dict if present.

Args

input : dict
dict
key : str
dict key

Returns

typing.Union[None, Any]
Value of the field
Expand source code
def clean_and_get_field_from_dict(
        input: dict,
        key: str
) -> typing.Union[None, typing.Any]:
    """Return the value of the field in dict if present.

    Args:
        input (dict): dict
        key (str): dict key

    Returns:
        typing.Union[None, Any]: Value of the field
    """

    # Get field value
    field_value = input.get(key)

    # Check if field value is empty string
    if field_value and isinstance(field_value, str) and len(field_value) == 0:
        field_value = None

    return field_value
def drop_none_dict(input: dict) ‑> dict

Return dict with None values removed

Args

input : dict
input

Returns

dict
output
Expand source code
def drop_none_dict(input: dict) -> dict:
    """Return dict with None values removed

    Args:
        input (dict): input

    Returns:
        dict: output
    """

    for k, v in list(input.items()):
        if v is None:
            input.pop(k)

    return input
async def fetch_org_details_from_intermediary(context: aries_cloudagent.config.injection_context.InjectionContext) ‑> dict

Fetch org details from intermediary

Args

context : InjectionContext
Injection context to be used.

Returns

dict
Org details.
Expand source code
async def fetch_org_details_from_intermediary(context: InjectionContext) -> dict:
    """Fetch org details from intermediary

    Args:
        context (InjectionContext): Injection context to be used.

    Returns:
        dict: Org details.
    """

    endpoint_url = context.settings.get("intermediary.igrantio_endpoint_url")
    org_id = context.settings.get("intermediary.igrantio_org_id")
    api_key = context.settings.get("intermediary.igrantio_org_api_key")

    # Construct iGrant.io organisation detail endpoint URL
    org_detail_url = f"{endpoint_url}/v1/organizations/{org_id}"

    # Construct request headers
    request_headers = {
        "Authorization": f"ApiKey {api_key}"
    }

    # Make request to iGrant.io organisation detail endpoint
    jresp = None
    async with aiohttp.ClientSession(headers=request_headers) as session:
        async with session.get(org_detail_url) as resp:
            if resp.status == 200:
                jresp = await resp.json()
                return jresp["Organization"]

    return jresp

Generate firebase dynamic link

Args

context : InjectionContext
Injection context to be used.
payload : str
Payload.

Returns

str
Firebase dynamic link
Expand source code
async def generate_firebase_dynamic_link(context: InjectionContext, payload: str) -> str:
    """Generate firebase dynamic link

    Args:
        context (InjectionContext): Injection context to be used.
        payload (str): Payload.

    Returns:
        str: Firebase dynamic link
    """

    domain_uri_prefix = context.settings.get("intermediary.firebase_domain_uri_prefix")
    android_package_name = context.settings.get("intermediary.firebase_android_package_name")
    ios_bundle_id = context.settings.get("intermediary.firebase_ios_bundle_id")
    ios_appstore_id = context.settings.get("intermediary.firebase_ios_appstore_id")
    firebase_web_api_key = context.settings.get("intermediary.firebase_web_api_key")

    payload = {
        "dynamicLinkInfo": {
            "domainUriPrefix": domain_uri_prefix,
            "link": payload,
            "androidInfo": {
                "androidPackageName": android_package_name,
            },
            "iosInfo": {
                "iosBundleId": ios_bundle_id,
                "iosAppStoreId": ios_appstore_id,
            }
        },
        "suffix": {
            "option": "UNGUESSABLE"
        }
    }

    firebase_dynamic_link_endpoint = "https://firebasedynamiclinks.googleapis.com/v1/shortLinks?key="
    firebase_dynamic_link_endpoint += firebase_web_api_key

    jresp = {}
    async with aiohttp.ClientSession() as session:
        async with session.post(firebase_dynamic_link_endpoint, json=payload) as resp:
            if resp.status == 200:
                jresp = await resp.json()
            else:
                tresp = await resp.text()
                raise Exception(f"Error in Firebase dynamic link: {tresp}")

    return jresp["shortLink"]
def get_slices(page, page_size=10)

Get the start and end indices for the given page and page size.

Args

page
page number
page_size
page size

Returns: start and end indices

Expand source code
def get_slices(page, page_size=10):
    """
    Get the start and end indices for the given page and page size.

    Args:
        page: page number
        page_size: page size

    Returns: start and end indices

    """
    start = (page - 1) * page_size

    end = start + page_size

    return start, end
def jcs_bytes_to_pyobject(data: bytes) ‑> Any

Convert JCS bytes to

Expand source code
def jcs_bytes_to_pyobject(data: bytes) -> typing.Any:
    """Convert JCS bytes to <dict, list e.t.c>"""
    return ast.literal_eval(data.decode())
def jcs_rfc8785(data: Union[list, dict]) ‑> bytes

JSON canonicalisation schema as per IETF RFC 8785

Expand source code
def jcs_rfc8785(data: typing.Union[list, dict]) -> bytes:
    """JSON canonicalisation schema as per IETF RFC 8785"""
    return jcs.canonicalize(data)
def paginate(items_list: List[~T], page: int = 1, page_size: int = 10) ‑> PaginationResult

Paginate an items list

Args

items_list : typing.List
items list
page : int, optional
page number. Defaults to None.

Returns

PaginationResult
Pagination result
Expand source code
def paginate(items_list: typing.List, page: int = 1, page_size: int = 10) -> PaginationResult:
    """Paginate an items list

    Args:
        items_list (typing.List): items list
        page (int, optional): page number. Defaults to None.

    Returns:
        PaginationResult: Pagination result
    """

    page = page if page else 1

    # total count
    total_count = len(items_list)

    # total pages
    total_pages = math.ceil(total_count / page_size)

    # Get the items list for the current page
    # Fetch pagination indices
    lower, upper = get_slices(page, page_size)

    # Slice the items for current page
    items_list = items_list[lower:upper]

    pconfig = PaginationConfig(
        total_count=total_count,
        page=page,
        page_size=page_size,
        total_pages=total_pages
    )

    res = PaginationResult(results=items_list, pagination=pconfig._asdict())

    return res
def paginate_records(records: List[aries_cloudagent.messaging.models.base_record.BaseRecord], page: int = 1, page_size: int = 10) ‑> PaginationResult

Paginate records

Args

records : typing.List[BaseRecord]
records
page : int, optional
page. Defaults to 1.
page_size : int, optional
page_size. Defaults to 10.

Returns

PaginationResult
Results
Expand source code
def paginate_records(
    records: typing.List[BaseRecord],
    page: int = 1,
    page_size: int = 10
) -> PaginationResult:
    """Paginate records

    Args:
        records (typing.List[BaseRecord]): records
        page (int, optional): page. Defaults to 1.
        page_size (int, optional): page_size. Defaults to 10.

    Returns:
        PaginationResult: Results
    """

    presults = paginate(records, page, page_size)

    serialised_item_list = []
    for item in presults.results:
        serialised_item_list.append(item.serialize())

    # Sort the serialised records.
    serialised_item_list = sort_exchange_record_dicts_by_created_at(serialised_item_list)

    res = PaginationResult(results=serialised_item_list, pagination=presults.pagination)

    return res
async def parse_query_params(url: str, query_param: str) ‑> str

Parse query params in a URL

Args

url : str
URL
query_param : str
Query param key.

Returns

str
description
Expand source code
async def parse_query_params(url: str, query_param: str) -> str:
    """Parse query params in a URL

    Args:
        url (str): URL
        query_param (str): Query param key.

    Returns:
        str: _description_
    """

    # Parse the URL
    parsed_url = urlparse(url)

    # Parse query string params.
    parsed_qs = parse_qs(parsed_url.query)

    return parsed_qs.get(query_param)[0]
def replace_jws(doc: dict) ‑> dict

Replace 'jws' field with 'proofValue' field

Args

doc : dict
input document with proof

Returns

dict
replaced document
Expand source code
def replace_jws(doc: dict) -> dict:
    """Replace 'jws' field with 'proofValue' field

    Args:
        doc (dict): input document with proof

    Returns:
        dict: replaced document
    """
    # Add 'proofValue' field
    doc["proofValue"] = doc["jws"]
    # Delete 'jws' field
    del doc["jws"]

    return doc
def replace_proof_chain(doc: dict) ‑> dict

Replace 'proofChain' field with 'proof' field

Args

doc : dict
input document with proof

Returns

dict
replaced document
Expand source code
def replace_proof_chain(doc: dict) -> dict:
    """Replace 'proofChain' field with 'proof' field

    Args:
        doc (dict): input document with proof

    Returns:
        dict: replaced document
    """
    # Add 'proof' field with first proof in the chain
    doc["proof"] = doc["proofChain"][0]
    # Delete 'proofChain' field
    del doc["proofChain"]

    return doc
def replace_proof_value(doc: dict) ‑> dict

Replace 'proofValue' field with 'jws' field

Args

doc : dict
input document with proof

Returns

dict
replaced document
Expand source code
def replace_proof_value(doc: dict) -> dict:
    """Replace 'proofValue' field with 'jws' field

    Args:
        doc (dict): input document with proof

    Returns:
        dict: replaced document
    """
    # Add 'proofValue' field
    doc["jws"] = doc["proofValue"]
    # Delete 'jws' field
    del doc["proofValue"]

    return doc
def sort_exchange_record_dicts_by_created_at(records: List[dict], sort_order: str = 'desc') ‑> List[dict]

Sort exchange record dicts based on 'created at' field

Args

records : typing.List[dict]
Exchange record dicts
sort_order : str, optional
Sort order ('desc' or 'asc'). Defaults to "desc".

Returns

typing.List[dict]
Sorted record dicts
Expand source code
def sort_exchange_record_dicts_by_created_at(records: typing.List[dict],
                                             sort_order: str = "desc") -> typing.List[dict]:
    """Sort exchange record dicts based on 'created at' field

    Args:
        records (typing.List[dict]): Exchange record dicts
        sort_order (str, optional): Sort order ('desc' or 'asc'). Defaults to "desc".

    Returns:
        typing.List[dict]: Sorted record dicts
    """
    assert sort_order in ("desc", "asc")

    return sorted(records,
                  key=lambda k: k['updated_at'],
                  reverse=True if sort_order == "desc" else False)

Classes

class PaginationConfig (total_count, page, page_size, total_pages)

PaginationConfig(total_count, page, page_size, total_pages)

Ancestors

  • builtins.tuple

Instance variables

var page

Alias for field number 1

var page_size

Alias for field number 2

var total_count

Alias for field number 0

var total_pages

Alias for field number 3

class PaginationResult (results, pagination)

PaginationResult(results, pagination)

Ancestors

  • builtins.tuple

Instance variables

var pagination

Alias for field number 1

var results

Alias for field number 0