Module dexa_sdk.ledgers.ethereum.core

Expand source code
import typing
import asyncio
from web3 import Web3, Account
from web3.contract import Contract
from web3.exceptions import ContractLogicError
from web3._utils.encoding import to_json
from eth_account.signers.local import LocalAccount
from aries_cloudagent.config.injection_context import InjectionContext
from loguru import logger


class EthereumClient:
    """Ethereum blockchain client"""

    def __init__(self, context: InjectionContext) -> None:
        """Initialise ethreum blockchain client"""

        # Injection context
        self._context = context

        # Logger
        self._logger = logger

        # Eth node rpc endpoint
        self._eth_node_rpc = self._context.settings.get("dexa.eth_node_rpc")

        # Eth private key of the controller
        self._org_eth_private_key = self._context.settings.get("dexa.org_eth_private_key")

        # Set organisation account for ethereum client
        self._org_eth_account = Account.from_key(self._org_eth_private_key)

        # Eth private key of the intermediary
        self._intermediary_eth_private_key = self._context.settings.get(
            "dexa.intermediary_eth_private_key")

        # Set intermediary account for ethereum client
        self._intermediary_eth_account = Account.from_key(self._intermediary_eth_private_key)

        # Ethereum client
        self._client = Web3(Web3.HTTPProvider(self._eth_node_rpc))

        # Contract ABI
        self._contract_abi = self._context.settings.get("dexa.contract_abi")

        # Contract address
        self._contract_address = self._context.settings.get("dexa.contract_address")

        # Contract interface
        self._contract: Contract = self._client.eth.contract(
            address=self._contract_address,
            abi=self._contract_abi
        )

    @property
    def context(self) -> InjectionContext:
        """Accessor for injection context.

        Returns:
            InjectionContext: Injection context
        """
        return self._context

    @property
    def logger(self):
        """Accessor for logger.

        Returns:
            Logger: Logger
        """
        return self._logger

    @property
    def client(self) -> Web3:
        """Returns the client for interacting with ethereum blockchain node.

        Returns:
            Web3: Ethereum blockchain client
        """
        return self._client

    @property
    def org_account(self) -> LocalAccount:
        """Accessor for ethereum account for organisation.

        Returns:
            LocalAccount: account
        """
        return self._org_eth_account

    @property
    def org_private_key(self) -> str:
        """Accessor for organisation private key.

        Returns:
            str: private key
        """
        return self._org_eth_private_key

    @property
    def intermediary_account(self) -> LocalAccount:
        """Accessor for ethereum account for intermediary.

        Returns:
            LocalAccount: account
        """
        return self._intermediary_eth_account

    @property
    def intermediary_private_key(self) -> str:
        """Accessor for intermediary private key.

        Returns:
            str: private key
        """
        return self._intermediary_eth_private_key

    @property
    def contract_abi(self) -> str:
        """Accessor for dexa contract abi.

        Returns:
            str: smart contract abi
        """
        return self._contract_abi

    @property
    def contract_address(self) -> str:
        """Accessor for dexa contract address.

        Returns:
            str: smart contract address
        """
        return self._contract_address

    @property
    def contract(self) -> Contract:
        """Accessor for dexa contract interface.

        Returns:
            Contract: smart contract
        """
        return self._contract

    async def emit_da_did(self, did: str) -> typing.Tuple[typing.Any, typing.Any]:
        """Emit did:mydata identifier in the blockchain logs"""
        org_account = self.org_account
        org_balance = self.client.eth.get_balance(org_account.address)

        self.logger.info(f"Organisation account address: {org_account.address}")
        self.logger.info(f"Organisation account balance: {org_balance}")

        try:
            contract = self.contract
            contract_function = contract.functions.emitDADID(did)
            contract_function_txn = contract_function.buildTransaction(
                {
                    'from': org_account.address,
                    'nonce': self.client.eth.get_transaction_count(org_account.address),
                    'maxFeePerGas': 2000000000,
                    'maxPriorityFeePerGas': 1000000000
                }
            )

            tx_create = self.client.eth.account.sign_transaction(
                contract_function_txn,
                org_account.privateKey
            )

            tx_hash = self.client.eth.send_raw_transaction(tx_create.rawTransaction)

            # Suspend execution and let other task run.
            await asyncio.sleep(5)

            tx_receipt = self.client.eth.wait_for_transaction_receipt(tx_hash)

            if tx_receipt.get("status") == 1:
                self.logger.info(f"Status (emitDADID): Succesfully emitted {did}")
            else:
                self.logger.info(
                    "Status (emitDADID): Failed to emit {did}")

            return (tx_hash, tx_receipt)
        except ContractLogicError as err:
            self.logger.info(f"Status (emitDADID): {err}")

    async def emit_dda_did(self, did: str) -> typing.Tuple[typing.Any, typing.Any]:
        """Emit did:mydata identifier in the blockchain logs"""
        org_account = self.org_account
        org_balance = self.client.eth.get_balance(org_account.address)

        self.logger.info(f"Organisation account address: {org_account.address}")
        self.logger.info(f"Organisation account balance: {org_balance}")

        try:
            contract = self.contract
            contract_function = contract.functions.emitDDADID(did)
            contract_function_txn = contract_function.buildTransaction(
                {
                    'from': org_account.address,
                    'nonce': self.client.eth.get_transaction_count(org_account.address),
                    'maxFeePerGas': 2000000000,
                    'maxPriorityFeePerGas': 1000000000
                }
            )

            tx_create = self.client.eth.account.sign_transaction(
                contract_function_txn,
                org_account.privateKey
            )

            tx_hash = self.client.eth.send_raw_transaction(tx_create.rawTransaction)

            # Suspend execution and let other task run.
            await asyncio.sleep(5)

            tx_receipt = self.client.eth.wait_for_transaction_receipt(tx_hash)

            if tx_receipt.get("status") == 1:
                self.logger.info(f"Status (emitDDADID): Succesfully emitted {did}")
            else:
                self.logger.info(
                    "Status (emitDDADID): Failed to emit {did}")

            return (tx_hash, tx_receipt)
        except ContractLogicError as err:
            self.logger.info(f"Status (emitDDADID): {err}")

    async def add_organisation(self) -> None:
        """Add organisation to the whitelist"""
        org_account = self.org_account
        org_balance = self.client.eth.get_balance(org_account.address)

        self.logger.info(f"Organisation account address: {org_account.address}")
        self.logger.info(f"Organisation account balance: {org_balance}")

        intermediary_account = self.intermediary_account
        intermediary_balance = self.client.eth.get_balance(intermediary_account.address)

        self.logger.info(f"Intermediary account address: {intermediary_account.address}")
        self.logger.info(f"Intermediary account balance: {intermediary_balance}")
        self.logger.info(f"Transaction count: \
                {self.client.eth.get_transaction_count(intermediary_account.address)}")

        try:
            contract = self.contract
            contract_function = contract.functions.addOrganisation(org_account.address)
            contract_function_txn = contract_function.buildTransaction(
                {
                    'from': intermediary_account.address,
                    'nonce': self.client.eth.get_transaction_count(intermediary_account.address),
                    'maxFeePerGas': 2000000000,
                    'maxPriorityFeePerGas': 1000000000
                }
            )

            tx_create = self.client.eth.account.sign_transaction(
                contract_function_txn,
                intermediary_account.privateKey
            )

            tx_hash = self.client.eth.send_raw_transaction(tx_create.rawTransaction)

            self.logger.info(f"Transaction hash (addOrganisation): {tx_hash.hex()}")

            tx_receipt = self.client.eth.wait_for_transaction_receipt(tx_hash)

            self.logger.info(f"Transaction receipt (addOrganisation): {to_json(tx_receipt)}")

            if tx_receipt.get("status") == 1:
                self.logger.info("Status (addOrganisation): Added organisation to whitelist.")
            else:
                self.logger.info(
                    "Status (addOrganisation): Organisation is already present in whitelist.")
        except ContractLogicError as err:
            self.logger.info(f"Status (addOrganisation): {err}")

Classes

class EthereumClient (context: aries_cloudagent.config.injection_context.InjectionContext)

Ethereum blockchain client

Initialise ethreum blockchain client

Expand source code
class EthereumClient:
    """Ethereum blockchain client"""

    def __init__(self, context: InjectionContext) -> None:
        """Initialise ethreum blockchain client"""

        # Injection context
        self._context = context

        # Logger
        self._logger = logger

        # Eth node rpc endpoint
        self._eth_node_rpc = self._context.settings.get("dexa.eth_node_rpc")

        # Eth private key of the controller
        self._org_eth_private_key = self._context.settings.get("dexa.org_eth_private_key")

        # Set organisation account for ethereum client
        self._org_eth_account = Account.from_key(self._org_eth_private_key)

        # Eth private key of the intermediary
        self._intermediary_eth_private_key = self._context.settings.get(
            "dexa.intermediary_eth_private_key")

        # Set intermediary account for ethereum client
        self._intermediary_eth_account = Account.from_key(self._intermediary_eth_private_key)

        # Ethereum client
        self._client = Web3(Web3.HTTPProvider(self._eth_node_rpc))

        # Contract ABI
        self._contract_abi = self._context.settings.get("dexa.contract_abi")

        # Contract address
        self._contract_address = self._context.settings.get("dexa.contract_address")

        # Contract interface
        self._contract: Contract = self._client.eth.contract(
            address=self._contract_address,
            abi=self._contract_abi
        )

    @property
    def context(self) -> InjectionContext:
        """Accessor for injection context.

        Returns:
            InjectionContext: Injection context
        """
        return self._context

    @property
    def logger(self):
        """Accessor for logger.

        Returns:
            Logger: Logger
        """
        return self._logger

    @property
    def client(self) -> Web3:
        """Returns the client for interacting with ethereum blockchain node.

        Returns:
            Web3: Ethereum blockchain client
        """
        return self._client

    @property
    def org_account(self) -> LocalAccount:
        """Accessor for ethereum account for organisation.

        Returns:
            LocalAccount: account
        """
        return self._org_eth_account

    @property
    def org_private_key(self) -> str:
        """Accessor for organisation private key.

        Returns:
            str: private key
        """
        return self._org_eth_private_key

    @property
    def intermediary_account(self) -> LocalAccount:
        """Accessor for ethereum account for intermediary.

        Returns:
            LocalAccount: account
        """
        return self._intermediary_eth_account

    @property
    def intermediary_private_key(self) -> str:
        """Accessor for intermediary private key.

        Returns:
            str: private key
        """
        return self._intermediary_eth_private_key

    @property
    def contract_abi(self) -> str:
        """Accessor for dexa contract abi.

        Returns:
            str: smart contract abi
        """
        return self._contract_abi

    @property
    def contract_address(self) -> str:
        """Accessor for dexa contract address.

        Returns:
            str: smart contract address
        """
        return self._contract_address

    @property
    def contract(self) -> Contract:
        """Accessor for dexa contract interface.

        Returns:
            Contract: smart contract
        """
        return self._contract

    async def emit_da_did(self, did: str) -> typing.Tuple[typing.Any, typing.Any]:
        """Emit did:mydata identifier in the blockchain logs"""
        org_account = self.org_account
        org_balance = self.client.eth.get_balance(org_account.address)

        self.logger.info(f"Organisation account address: {org_account.address}")
        self.logger.info(f"Organisation account balance: {org_balance}")

        try:
            contract = self.contract
            contract_function = contract.functions.emitDADID(did)
            contract_function_txn = contract_function.buildTransaction(
                {
                    'from': org_account.address,
                    'nonce': self.client.eth.get_transaction_count(org_account.address),
                    'maxFeePerGas': 2000000000,
                    'maxPriorityFeePerGas': 1000000000
                }
            )

            tx_create = self.client.eth.account.sign_transaction(
                contract_function_txn,
                org_account.privateKey
            )

            tx_hash = self.client.eth.send_raw_transaction(tx_create.rawTransaction)

            # Suspend execution and let other task run.
            await asyncio.sleep(5)

            tx_receipt = self.client.eth.wait_for_transaction_receipt(tx_hash)

            if tx_receipt.get("status") == 1:
                self.logger.info(f"Status (emitDADID): Succesfully emitted {did}")
            else:
                self.logger.info(
                    "Status (emitDADID): Failed to emit {did}")

            return (tx_hash, tx_receipt)
        except ContractLogicError as err:
            self.logger.info(f"Status (emitDADID): {err}")

    async def emit_dda_did(self, did: str) -> typing.Tuple[typing.Any, typing.Any]:
        """Emit did:mydata identifier in the blockchain logs"""
        org_account = self.org_account
        org_balance = self.client.eth.get_balance(org_account.address)

        self.logger.info(f"Organisation account address: {org_account.address}")
        self.logger.info(f"Organisation account balance: {org_balance}")

        try:
            contract = self.contract
            contract_function = contract.functions.emitDDADID(did)
            contract_function_txn = contract_function.buildTransaction(
                {
                    'from': org_account.address,
                    'nonce': self.client.eth.get_transaction_count(org_account.address),
                    'maxFeePerGas': 2000000000,
                    'maxPriorityFeePerGas': 1000000000
                }
            )

            tx_create = self.client.eth.account.sign_transaction(
                contract_function_txn,
                org_account.privateKey
            )

            tx_hash = self.client.eth.send_raw_transaction(tx_create.rawTransaction)

            # Suspend execution and let other task run.
            await asyncio.sleep(5)

            tx_receipt = self.client.eth.wait_for_transaction_receipt(tx_hash)

            if tx_receipt.get("status") == 1:
                self.logger.info(f"Status (emitDDADID): Succesfully emitted {did}")
            else:
                self.logger.info(
                    "Status (emitDDADID): Failed to emit {did}")

            return (tx_hash, tx_receipt)
        except ContractLogicError as err:
            self.logger.info(f"Status (emitDDADID): {err}")

    async def add_organisation(self) -> None:
        """Add organisation to the whitelist"""
        org_account = self.org_account
        org_balance = self.client.eth.get_balance(org_account.address)

        self.logger.info(f"Organisation account address: {org_account.address}")
        self.logger.info(f"Organisation account balance: {org_balance}")

        intermediary_account = self.intermediary_account
        intermediary_balance = self.client.eth.get_balance(intermediary_account.address)

        self.logger.info(f"Intermediary account address: {intermediary_account.address}")
        self.logger.info(f"Intermediary account balance: {intermediary_balance}")
        self.logger.info(f"Transaction count: \
                {self.client.eth.get_transaction_count(intermediary_account.address)}")

        try:
            contract = self.contract
            contract_function = contract.functions.addOrganisation(org_account.address)
            contract_function_txn = contract_function.buildTransaction(
                {
                    'from': intermediary_account.address,
                    'nonce': self.client.eth.get_transaction_count(intermediary_account.address),
                    'maxFeePerGas': 2000000000,
                    'maxPriorityFeePerGas': 1000000000
                }
            )

            tx_create = self.client.eth.account.sign_transaction(
                contract_function_txn,
                intermediary_account.privateKey
            )

            tx_hash = self.client.eth.send_raw_transaction(tx_create.rawTransaction)

            self.logger.info(f"Transaction hash (addOrganisation): {tx_hash.hex()}")

            tx_receipt = self.client.eth.wait_for_transaction_receipt(tx_hash)

            self.logger.info(f"Transaction receipt (addOrganisation): {to_json(tx_receipt)}")

            if tx_receipt.get("status") == 1:
                self.logger.info("Status (addOrganisation): Added organisation to whitelist.")
            else:
                self.logger.info(
                    "Status (addOrganisation): Organisation is already present in whitelist.")
        except ContractLogicError as err:
            self.logger.info(f"Status (addOrganisation): {err}")

Instance variables

var client : web3.main.Web3

Returns the client for interacting with ethereum blockchain node.

Returns

Web3
Ethereum blockchain client
Expand source code
@property
def client(self) -> Web3:
    """Returns the client for interacting with ethereum blockchain node.

    Returns:
        Web3: Ethereum blockchain client
    """
    return self._client
var context : aries_cloudagent.config.injection_context.InjectionContext

Accessor for injection context.

Returns

InjectionContext
Injection context
Expand source code
@property
def context(self) -> InjectionContext:
    """Accessor for injection context.

    Returns:
        InjectionContext: Injection context
    """
    return self._context
var contract : web3.contract.Contract

Accessor for dexa contract interface.

Returns

Contract
smart contract
Expand source code
@property
def contract(self) -> Contract:
    """Accessor for dexa contract interface.

    Returns:
        Contract: smart contract
    """
    return self._contract
var contract_abi : str

Accessor for dexa contract abi.

Returns

str
smart contract abi
Expand source code
@property
def contract_abi(self) -> str:
    """Accessor for dexa contract abi.

    Returns:
        str: smart contract abi
    """
    return self._contract_abi
var contract_address : str

Accessor for dexa contract address.

Returns

str
smart contract address
Expand source code
@property
def contract_address(self) -> str:
    """Accessor for dexa contract address.

    Returns:
        str: smart contract address
    """
    return self._contract_address
var intermediary_account : eth_account.signers.local.LocalAccount

Accessor for ethereum account for intermediary.

Returns

LocalAccount
account
Expand source code
@property
def intermediary_account(self) -> LocalAccount:
    """Accessor for ethereum account for intermediary.

    Returns:
        LocalAccount: account
    """
    return self._intermediary_eth_account
var intermediary_private_key : str

Accessor for intermediary private key.

Returns

str
private key
Expand source code
@property
def intermediary_private_key(self) -> str:
    """Accessor for intermediary private key.

    Returns:
        str: private key
    """
    return self._intermediary_eth_private_key
var logger

Accessor for logger.

Returns

Logger
Logger
Expand source code
@property
def logger(self):
    """Accessor for logger.

    Returns:
        Logger: Logger
    """
    return self._logger
var org_account : eth_account.signers.local.LocalAccount

Accessor for ethereum account for organisation.

Returns

LocalAccount
account
Expand source code
@property
def org_account(self) -> LocalAccount:
    """Accessor for ethereum account for organisation.

    Returns:
        LocalAccount: account
    """
    return self._org_eth_account
var org_private_key : str

Accessor for organisation private key.

Returns

str
private key
Expand source code
@property
def org_private_key(self) -> str:
    """Accessor for organisation private key.

    Returns:
        str: private key
    """
    return self._org_eth_private_key

Methods

async def add_organisation(self) ‑> None

Add organisation to the whitelist

Expand source code
async def add_organisation(self) -> None:
    """Add organisation to the whitelist"""
    org_account = self.org_account
    org_balance = self.client.eth.get_balance(org_account.address)

    self.logger.info(f"Organisation account address: {org_account.address}")
    self.logger.info(f"Organisation account balance: {org_balance}")

    intermediary_account = self.intermediary_account
    intermediary_balance = self.client.eth.get_balance(intermediary_account.address)

    self.logger.info(f"Intermediary account address: {intermediary_account.address}")
    self.logger.info(f"Intermediary account balance: {intermediary_balance}")
    self.logger.info(f"Transaction count: \
            {self.client.eth.get_transaction_count(intermediary_account.address)}")

    try:
        contract = self.contract
        contract_function = contract.functions.addOrganisation(org_account.address)
        contract_function_txn = contract_function.buildTransaction(
            {
                'from': intermediary_account.address,
                'nonce': self.client.eth.get_transaction_count(intermediary_account.address),
                'maxFeePerGas': 2000000000,
                'maxPriorityFeePerGas': 1000000000
            }
        )

        tx_create = self.client.eth.account.sign_transaction(
            contract_function_txn,
            intermediary_account.privateKey
        )

        tx_hash = self.client.eth.send_raw_transaction(tx_create.rawTransaction)

        self.logger.info(f"Transaction hash (addOrganisation): {tx_hash.hex()}")

        tx_receipt = self.client.eth.wait_for_transaction_receipt(tx_hash)

        self.logger.info(f"Transaction receipt (addOrganisation): {to_json(tx_receipt)}")

        if tx_receipt.get("status") == 1:
            self.logger.info("Status (addOrganisation): Added organisation to whitelist.")
        else:
            self.logger.info(
                "Status (addOrganisation): Organisation is already present in whitelist.")
    except ContractLogicError as err:
        self.logger.info(f"Status (addOrganisation): {err}")
async def emit_da_did(self, did: str) ‑> Tuple[Any, Any]

Emit did:mydata identifier in the blockchain logs

Expand source code
async def emit_da_did(self, did: str) -> typing.Tuple[typing.Any, typing.Any]:
    """Emit did:mydata identifier in the blockchain logs"""
    org_account = self.org_account
    org_balance = self.client.eth.get_balance(org_account.address)

    self.logger.info(f"Organisation account address: {org_account.address}")
    self.logger.info(f"Organisation account balance: {org_balance}")

    try:
        contract = self.contract
        contract_function = contract.functions.emitDADID(did)
        contract_function_txn = contract_function.buildTransaction(
            {
                'from': org_account.address,
                'nonce': self.client.eth.get_transaction_count(org_account.address),
                'maxFeePerGas': 2000000000,
                'maxPriorityFeePerGas': 1000000000
            }
        )

        tx_create = self.client.eth.account.sign_transaction(
            contract_function_txn,
            org_account.privateKey
        )

        tx_hash = self.client.eth.send_raw_transaction(tx_create.rawTransaction)

        # Suspend execution and let other task run.
        await asyncio.sleep(5)

        tx_receipt = self.client.eth.wait_for_transaction_receipt(tx_hash)

        if tx_receipt.get("status") == 1:
            self.logger.info(f"Status (emitDADID): Succesfully emitted {did}")
        else:
            self.logger.info(
                "Status (emitDADID): Failed to emit {did}")

        return (tx_hash, tx_receipt)
    except ContractLogicError as err:
        self.logger.info(f"Status (emitDADID): {err}")
async def emit_dda_did(self, did: str) ‑> Tuple[Any, Any]

Emit did:mydata identifier in the blockchain logs

Expand source code
async def emit_dda_did(self, did: str) -> typing.Tuple[typing.Any, typing.Any]:
    """Emit did:mydata identifier in the blockchain logs"""
    org_account = self.org_account
    org_balance = self.client.eth.get_balance(org_account.address)

    self.logger.info(f"Organisation account address: {org_account.address}")
    self.logger.info(f"Organisation account balance: {org_balance}")

    try:
        contract = self.contract
        contract_function = contract.functions.emitDDADID(did)
        contract_function_txn = contract_function.buildTransaction(
            {
                'from': org_account.address,
                'nonce': self.client.eth.get_transaction_count(org_account.address),
                'maxFeePerGas': 2000000000,
                'maxPriorityFeePerGas': 1000000000
            }
        )

        tx_create = self.client.eth.account.sign_transaction(
            contract_function_txn,
            org_account.privateKey
        )

        tx_hash = self.client.eth.send_raw_transaction(tx_create.rawTransaction)

        # Suspend execution and let other task run.
        await asyncio.sleep(5)

        tx_receipt = self.client.eth.wait_for_transaction_receipt(tx_hash)

        if tx_receipt.get("status") == 1:
            self.logger.info(f"Status (emitDDADID): Succesfully emitted {did}")
        else:
            self.logger.info(
                "Status (emitDDADID): Failed to emit {did}")

        return (tx_hash, tx_receipt)
    except ContractLogicError as err:
        self.logger.info(f"Status (emitDDADID): {err}")