utils.utils
annotations
inspect
json
logging
os
sys
time
traceback
datetime
Decimal
lru_cache
wraps
Path
TYPE_CHECKING
Dict
Tuple
httpx
constants
load_dotenv
GoogleAPICallError
NotFound
storage
retry
Chain
Network
Protocol
ISDK
sdk_registry
Config
get_logger
Web3
cached_w3_clients
cached_sdk_clients
logger
NUM_RETRIES_WARN
IS_AGENT_DEPLOYMENT
DEBUG
is_staging_env
@lru_cache(maxsize=1)
def is_staging_env() -> bool
set_on_test_mode
def set_on_test_mode(test_value)
get_node_provider_by_network_and_chain
def get_node_provider_by_network_and_chain(network: Network, chain: Chain)
Return the node provider for the given network and chain
get_web3_by_network_and_chain
def get_web3_by_network_and_chain(network: Network, chain: Chain) -> Web3
Create a web3 client for the user_address and cache it. Recreates if not connected.
is_EVM_compatible_chain
def is_EVM_compatible_chain(chain: Chain) -> bool
serialize_timestamp
def serialize_timestamp(timestamp: float) -> str
Convert a Unix timestamp to a human-readable string.
deserialize_timestamp
def deserialize_timestamp(timestamp_str: str) -> float
Convert a human-readable string to a Unix timestamp.
retry_on_exception_for_download
def retry_on_exception_for_download(max_retries=3, delay=2)
Retry decorator specifically for the download_action_bundle_from_storage function. It returns None if NotFound exception persists after all retries and raises any other exception immediately or after retries.
retry_on_exception
def retry_on_exception(max_retries=3,
delay=2,
exceptions=(Exception, ),
return_value=None)
Retry decorator for functions that may raise exceptions. This is a simple retry mechanism. It is used to prevent the service from crashing due to transient errors.
upload_transaction_for_agent_gcs
def upload_transaction_for_agent_gcs(blob_str: str, raw_transaction: str)
upload_transaction_for_agent
@retry_on_exception(max_retries=3, delay=2, exceptions=(Exception, ))
def upload_transaction_for_agent(blob_str: str, raw_transaction: str)
upload_transactions_for_agent
def upload_transactions_for_agent(action_bundle: ActionBundle)
upload_action_bundle_to_storage
@retry_on_exception(max_retries=sys.maxsize, delay=2, exceptions=(Exception, ))
def upload_action_bundle_to_storage(action_bundle: ActionBundle)
upload_action_bundle_to_storage_local
def upload_action_bundle_to_storage_local(action_bundle: ActionBundle)
upload_action_bundle_to_storage_gcs
def upload_action_bundle_to_storage_gcs(action_bundle: ActionBundle)
Writes in a GCS file to update the status of a given Action. The filename is the Action UUID and the content is pickled for simplicity. The Executioner is the only one writing in this/these files, the other services are only reading them.
Raises:
FileNotFoundError
- If the configuration file is not found at the specified path.ValueError
- If there is an error decoding JSON from the configuration file.EnvironmentError
- If required environment variables are not set.PermissionsError
- If there are insufficient permissions to access the file.TimeoutError
- If the request to download the configuration file times out.GoogleAPICallError
- For network-related errors or issues on the backend from Google Cloud services.
download_action_bundle_from_storage
@retry_on_exception_for_download(max_retries=3, delay=2)
def download_action_bundle_from_storage(gcs_bucket_name, client_name,
deployment_name, strategy_id,
action_id, object_cls)
download_action_bundle_from_storage_local
def download_action_bundle_from_storage_local(strategy_id, action_id,
object_cls)
download_action_bundle_from_storage_gcs
def download_action_bundle_from_storage_gcs(gcs_bucket_name, client_name,
deployment_name, strategy_id,
action_id, object_cls)
get_protocol_sdk
def get_protocol_sdk(protocol: Protocol, network: Network, chain: Chain)
Return the protocol SDK for the given web3 client
get_blocknative_tip
@retry(tries=3, delay=1, max_delay=1)
def get_blocknative_tip(chain: Chain)