Module dexa_sdk.marketplace.records.published_dda_template_record
Expand source code
from aries_cloudagent.messaging.models.base_record import (
BaseRecord,
BaseRecordSchema
)
from aries_cloudagent.config.injection_context import InjectionContext
from marshmallow import fields
from ...agreements.dda.v1_0.models.dda_models import (
DataDisclosureAgreementModel
)
class PublishedDDATemplateRecord(BaseRecord):
"""Published DDA template record."""
class Meta:
schema_class = "PublishedDDATemplateRecordSchema"
# Record type
RECORD_TYPE = "published_dda_template_record"
# Record identifier
RECORD_ID_NAME = "id"
# Record tags
TAG_NAMES = {
"~connection_id",
"~template_id",
"~industry_sector",
"~state"
}
def __init__(
self,
id: str = None,
connection_id: str = None,
template_id: str = None,
industry_sector: str = None,
dda: dict = None,
connection_url: str = None,
state: str = None,
**kwargs
):
# Pass the identifier and state to parent class
super().__init__(id, state, **kwargs)
self.connection_id = connection_id
self.dda = dda
self.template_id = template_id
self.industry_sector = industry_sector
self.connection_url = connection_url
@property
def request_id(self) -> str:
"""Accessor for record identifier"""
return self._id
@property
def record_value(self) -> dict:
"""Accessor for JSON record value generated for this transaction record."""
return {
prop: getattr(self, prop)
for prop in (
"connection_id",
"dda",
"template_id",
"industry_sector",
"connection_url"
)
}
@classmethod
async def store_publish_dda_record(
cls,
context: InjectionContext,
connection_id: str,
dda: DataDisclosureAgreementModel,
connection_url: str
) -> "PublishedDDATemplateRecord":
"""Store a dda and create/update publish dda record.
Args:
context (InjectionContext): Injection context to be used.
connection_id (str): Connection id
dda (DataDisclosureAgreementModel): Data disclosure agreement.
connection_url (str): Connection URL.
Returns:
PublishedDDATemplateRecord: Published DDA template record.
"""
# Query for existing record for connection and template id.
tag_filter = {
"connection_id": connection_id,
"template_id": dda.id
}
records = await cls.query(context, tag_filter)
if not records:
# Not an existing entry.
# Create a new record.
record = PublishedDDATemplateRecord(
connection_id=connection_id,
template_id=dda.id,
industry_sector=dda.data_sharing_restrictions.industry_sector,
dda=dda.serialize(),
connection_url=connection_url
)
else:
# Existing entry.
# Update the record.
record: PublishedDDATemplateRecord = records[0]
# Industry sector
record.industry_sector = dda.data_sharing_restrictions.industry_sector
# DDA
record.dda = dda.serialize()
# Connection URL.
record.connection_url = connection_url
# Save the record.
await record.save(context)
return record
@classmethod
async def delete_publish_dda_record(
cls,
context: InjectionContext,
connection_id: str,
template_id: str
):
"""Delete publish DDA record.
Args:
context (InjectionContext): Injection context to be used.
connection_id (str): Connection identifier.
template_id (str): Template identifier.
"""
tag_filter = {
"connection_id": connection_id,
"template_id": template_id
}
records = await cls.query(context, tag_filter)
assert records, "Publish DDA record not found."
record: PublishedDDATemplateRecord = records[0]
await record.delete_record(context)
class PublishedDDATemplateRecordSchema(BaseRecordSchema):
"""Publish DDA template record schema"""
class Meta:
model_class = PublishedDDATemplateRecord
connection_id = fields.Str()
dda = fields.Dict()
template_id = fields.Str()
industry_sector = fields.Str()
connection_url = fields.Str()
Classes
class PublishedDDATemplateRecord (id: str = None, connection_id: str = None, template_id: str = None, industry_sector: str = None, dda: dict = None, connection_url: str = None, state: str = None, **kwargs)
-
Published DDA template record.
Initialize a new BaseRecord.
Expand source code
class PublishedDDATemplateRecord(BaseRecord): """Published DDA template record.""" class Meta: schema_class = "PublishedDDATemplateRecordSchema" # Record type RECORD_TYPE = "published_dda_template_record" # Record identifier RECORD_ID_NAME = "id" # Record tags TAG_NAMES = { "~connection_id", "~template_id", "~industry_sector", "~state" } def __init__( self, id: str = None, connection_id: str = None, template_id: str = None, industry_sector: str = None, dda: dict = None, connection_url: str = None, state: str = None, **kwargs ): # Pass the identifier and state to parent class super().__init__(id, state, **kwargs) self.connection_id = connection_id self.dda = dda self.template_id = template_id self.industry_sector = industry_sector self.connection_url = connection_url @property def request_id(self) -> str: """Accessor for record identifier""" return self._id @property def record_value(self) -> dict: """Accessor for JSON record value generated for this transaction record.""" return { prop: getattr(self, prop) for prop in ( "connection_id", "dda", "template_id", "industry_sector", "connection_url" ) } @classmethod async def store_publish_dda_record( cls, context: InjectionContext, connection_id: str, dda: DataDisclosureAgreementModel, connection_url: str ) -> "PublishedDDATemplateRecord": """Store a dda and create/update publish dda record. Args: context (InjectionContext): Injection context to be used. connection_id (str): Connection id dda (DataDisclosureAgreementModel): Data disclosure agreement. connection_url (str): Connection URL. Returns: PublishedDDATemplateRecord: Published DDA template record. """ # Query for existing record for connection and template id. tag_filter = { "connection_id": connection_id, "template_id": dda.id } records = await cls.query(context, tag_filter) if not records: # Not an existing entry. # Create a new record. record = PublishedDDATemplateRecord( connection_id=connection_id, template_id=dda.id, industry_sector=dda.data_sharing_restrictions.industry_sector, dda=dda.serialize(), connection_url=connection_url ) else: # Existing entry. # Update the record. record: PublishedDDATemplateRecord = records[0] # Industry sector record.industry_sector = dda.data_sharing_restrictions.industry_sector # DDA record.dda = dda.serialize() # Connection URL. record.connection_url = connection_url # Save the record. await record.save(context) return record @classmethod async def delete_publish_dda_record( cls, context: InjectionContext, connection_id: str, template_id: str ): """Delete publish DDA record. Args: context (InjectionContext): Injection context to be used. connection_id (str): Connection identifier. template_id (str): Template identifier. """ tag_filter = { "connection_id": connection_id, "template_id": template_id } records = await cls.query(context, tag_filter) assert records, "Publish DDA record not found." record: PublishedDDATemplateRecord = records[0] await record.delete_record(context)
Ancestors
- aries_cloudagent.messaging.models.base_record.BaseRecord
- aries_cloudagent.messaging.models.base.BaseModel
- abc.ABC
Class variables
var Meta
var RECORD_ID_NAME
var RECORD_TYPE
var TAG_NAMES
Static methods
async def delete_publish_dda_record(context: aries_cloudagent.config.injection_context.InjectionContext, connection_id: str, template_id: str)
-
Delete publish DDA record.
Args
context
:InjectionContext
- Injection context to be used.
connection_id
:str
- Connection identifier.
template_id
:str
- Template identifier.
Expand source code
@classmethod async def delete_publish_dda_record( cls, context: InjectionContext, connection_id: str, template_id: str ): """Delete publish DDA record. Args: context (InjectionContext): Injection context to be used. connection_id (str): Connection identifier. template_id (str): Template identifier. """ tag_filter = { "connection_id": connection_id, "template_id": template_id } records = await cls.query(context, tag_filter) assert records, "Publish DDA record not found." record: PublishedDDATemplateRecord = records[0] await record.delete_record(context)
async def store_publish_dda_record(context: aries_cloudagent.config.injection_context.InjectionContext, connection_id: str, dda: DataDisclosureAgreementModel, connection_url: str) ‑> PublishedDDATemplateRecord
-
Store a dda and create/update publish dda record.
Args
context
:InjectionContext
- Injection context to be used.
connection_id
:str
- Connection id
dda
:DataDisclosureAgreementModel
- Data disclosure agreement.
connection_url
:str
- Connection URL.
Returns
PublishedDDATemplateRecord
- Published DDA template record.
Expand source code
@classmethod async def store_publish_dda_record( cls, context: InjectionContext, connection_id: str, dda: DataDisclosureAgreementModel, connection_url: str ) -> "PublishedDDATemplateRecord": """Store a dda and create/update publish dda record. Args: context (InjectionContext): Injection context to be used. connection_id (str): Connection id dda (DataDisclosureAgreementModel): Data disclosure agreement. connection_url (str): Connection URL. Returns: PublishedDDATemplateRecord: Published DDA template record. """ # Query for existing record for connection and template id. tag_filter = { "connection_id": connection_id, "template_id": dda.id } records = await cls.query(context, tag_filter) if not records: # Not an existing entry. # Create a new record. record = PublishedDDATemplateRecord( connection_id=connection_id, template_id=dda.id, industry_sector=dda.data_sharing_restrictions.industry_sector, dda=dda.serialize(), connection_url=connection_url ) else: # Existing entry. # Update the record. record: PublishedDDATemplateRecord = records[0] # Industry sector record.industry_sector = dda.data_sharing_restrictions.industry_sector # DDA record.dda = dda.serialize() # Connection URL. record.connection_url = connection_url # Save the record. await record.save(context) return record
Instance variables
var record_value : dict
-
Accessor for JSON record value generated for this transaction record.
Expand source code
@property def record_value(self) -> dict: """Accessor for JSON record value generated for this transaction record.""" return { prop: getattr(self, prop) for prop in ( "connection_id", "dda", "template_id", "industry_sector", "connection_url" ) }
var request_id : str
-
Accessor for record identifier
Expand source code
@property def request_id(self) -> str: """Accessor for record identifier""" return self._id
class PublishedDDATemplateRecordSchema (*args, **kwargs)
-
Publish DDA template record schema
Initialize BaseModelSchema.
Raises
TypeError
- If model_class is not set on Meta
Expand source code
class PublishedDDATemplateRecordSchema(BaseRecordSchema): """Publish DDA template record schema""" class Meta: model_class = PublishedDDATemplateRecord connection_id = fields.Str() dda = fields.Dict() template_id = fields.Str() industry_sector = fields.Str() connection_url = fields.Str()
Ancestors
- aries_cloudagent.messaging.models.base_record.BaseRecordSchema
- aries_cloudagent.messaging.models.base.BaseModelSchema
- marshmallow.schema.Schema
- marshmallow.base.SchemaABC
Class variables
var Meta
var opts