Core Package API Reference
Base Classes
Mindtrace class. Provides unified configuration, logging and context management.
MindtraceMeta
Bases: type
Metaclass for Mindtrace class.
The MindtraceMeta metaclass enables classes deriving from Mindtrace to automatically use the same default logger within class methods as it does within instance methods. i.e. consider the following class:
Usage
from mindtrace.core import Mindtrace
class MyClass(Mindtrace):
def __init__(self):
super().__init__()
def instance_method(self):
self.logger.info(f"Using logger: {self.logger.name}") # Using logger: mindtrace.my_module.MyClass
@classmethod
def class_method(cls):
cls.logger.info(f"Using logger: {cls.logger.name}") # Using logger: mindtrace.my_module.MyClass
Mindtrace
Base class for all Mindtrace package core classes.
The Mindtrace class adds default context manager and logging methods. All classes that derive from Mindtrace can be used as context managers and will use a unified logging format.
The class automatically provides logging capabilities for both class methods and instance methods. For example:
Usage
from mindtrace.core import Mindtrace
class MyClass(Mindtrace):
def __init__(self):
super().__init__()
def instance_method(self):
self.logger.info(f"Using logger: {self.logger.name}") # Using logger: mindtrace.my_module.MyClass
@classmethod
def class_method(cls):
cls.logger.info(f"Using logger: {cls.logger.name}") # Using logger: mindtrace.my_module.MyClass
The logging functionality is automatically provided through the MindtraceMeta metaclass, which ensures consistent logging behavior across all method types.
Initialize the Mindtrace object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
suppress
|
bool
|
Whether to suppress exceptions in context manager use. |
False
|
config_overrides
|
SettingsLike | None
|
Additional settings to override the default config. |
None
|
**kwargs
|
Additional keyword arguments. Logger-related kwargs are passed to |
{}
|
autolog
classmethod
autolog(
log_level=logging.DEBUG,
prefix_formatter: Optional[Callable] = None,
suffix_formatter: Optional[Callable] = None,
exception_formatter: Optional[Callable] = None,
self: Optional[Mindtrace] = None,
)
Decorator that adds logger.log calls to the decorated method before and after the method is called.
By default, the autolog decorator will log the method name, arguments and keyword arguments before the method is called, and the method name and result after the method completes. This behavior can be modified by passing in prefix and suffix formatters.
The autolog decorator will also catch and log all Exceptions, re-raising any exception after logging it. The behavior for autologging exceptions can be modified by passing in an exception_formatter.
The autolog decorator expects a logger to exist at self.logger, and hence can only be used by Mindtrace subclasses or classes that have a logger attribute.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
log_level
|
The log_level passed to logger.log(). |
DEBUG
|
|
prefix_formatter
|
Optional[Callable]
|
The formatter used to log the command before the wrapped method runs. The prefix_formatter will be given (and must accept) three arguments, in the following order: - function: The function being wrapped. - args: The args passed into the function. - kwargs: The kwargs passed into the function. |
None
|
suffix_formatter
|
Optional[Callable]
|
The formatter used to log the command after the wrapped method runs. The suffix_formatter will be given (and must accept) two arguments, in the following order: - function: The function being wrapped. - result: The result returned from the wrapped method. |
None
|
exception_formatter
|
Optional[Callable]
|
The formatter used to log any errors. The exception_formatter will be given (and must accept) three arguments, in the following order: - function: The function being wrapped. - error: The caught Exception. - stack trace: The stack trace, as provided by traceback.format_exc(). |
None
|
self
|
Optional[Mindtrace]
|
The instance of the class that the method is being called on. Self only needs to be passed in if the wrapped method does not have self as the first argument. Refer to the example below for more details. |
None
|
Usage
from mindtrace.core import Mindtrace
class MyClass(Mindtrace):
def __init__(self):
super().__init__()
@Mindtrace.autolog()
def divide(self, arg1, arg2):
self.logger.info("We are about to divide")
result = arg1 / arg2
self.logger.info("We have divided")
return result
my_instance = MyClass()
my_instance.divide(1, 2)
my_instance.divide(1, 0)
The resulting log file should contain something similar to the following:
MyClass - DEBUG - Calling divide with args: (1, 2) and kwargs: {}
MyClass - INFO - We are about to divide
MyClass - INFO - We have divided
MyClass - DEBUG - Finished divide with result: 0.5
MyClass - DEBUG - Calling divide with args: (1, 0) and kwargs: {}
MyClass - INFO - We are about to divide
MyClass - ERROR - division by zero
Traceback (most recent call last):
...
Usage
from fastapi import FastAPI
from mindtrace.core import Mindtrace
class MyClass(Mindtrace):
def __init__():
super().__init__()
def create_app(self):
app_ = FastAPI()
@Mindtrace.autolog(self=self) # self must be passed in as an argument as it is not captured in status()
@app_.post("/status")
def status():
return {"status": "Available"}
return app_
MindtraceABCMeta
Bases: MindtraceMeta, ABCMeta
Metaclass that combines MindtraceMeta and ABC metaclasses.
This metaclass resolves metaclass conflicts when creating classes that need to be both abstract (using ABC) and have MindtraceMeta functionality. Python only allows a class to have one metaclass, so this combined metaclass allows classes to inherit from both Mindtrace class and ABC simultaneously.
Without this combined metaclass, trying to create a class that inherits from both Mindtrace class and ABC would raise a metaclass conflict error since they each have different metaclasses.
MindtraceABC
Bases: Mindtrace, ABC
Abstract base class combining Mindtrace class functionality with ABC support.
This class enables creating abstract classes that also have access to all Mindtrace features such as logging, configuration, and context management. Use this class instead of Mindtrace when you need to define abstract methods or properties in your class.
Usage
Note
Without this class, attempting to create a class that inherits from both Mindtrace class and ABC would fail due to metaclass conflicts. MindtraceABC resolves this by using the CombinedABCMeta.
Configuration Management
Config
Bases: dict
Unified configuration manager for Mindtrace components.
The Config class consolidates configuration from sources including
dictionaries, Pydantic BaseSettings or BaseModel objects.
It supports user provided arguments and environment variable overrides, path normalization by expanding the ~ character.
Key Features:
- Accepts multiple configuration formats:
dict,BaseModel,BaseSettings, or lists of these. - Attr-style and dict-style access to nested keys.
- Supports secret fields using
pydantic.SecretStr, preserving masking them by default. - Overlays environment variables (
ENV_VAR__NESTED_KEY) over default configs. - Provides cloning, JSON export, and dynamic override capabilities.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
extra_settings
|
SettingsLike
|
Configuration overrides or full config objects.
Can be a |
None
|
apply_env_overrides
|
bool
|
Whether to apply environment variable overrides. If True, environment variables will be applied over the default configs. If False, environment variables will not be applied. |
True
|
Examples:
Basic usage with CoreSettings:
from mindtrace.core.config import Config, CoreSettings
config = Config(CoreSettings())
print(config["MINDTRACE_API_KEYS"]["OPENAI"]) # ******** (masked)
print(config.get_secret("MINDTRACE_API_KEYS", "OPENAI")) # Real secret value
Load from INI file with overrides:
from pathlib import Path
from mindtrace.core.config import Config
from mindtrace.core.utils import load_ini_as_dict
def my_loader():
file_path = Path("sample.ini")
return load_ini_as_dict(file_path)
defaults = my_loader()
overrides = {
"MINDTRACE_DIR_PATHS": {
"TEMP_DIR": "/tmp/logs",
"REGISTRY_DIR": "/tmp/registry"
}
}
config = Config.load(defaults=defaults, overrides=overrides)
Access values in multiple ways:
# Attribute style access
print(config.MINDTRACE_DIR_PATHS.TEMP_DIR)
# Dict style access
print(config["MINDTRACE_DIR_PATHS"]["TEMP_DIR"])
# Get method
print(config.get("MINDTRACE_DIR_PATHS").get("TEMP_DIR"))
Save and reload configuration:
# Save config to JSON
config.save_json("saved_config.json")
# Load config back
reloaded = Config.load_json("saved_config.json")
Clone config with overrides (original unchanged):
cloned = config.clone_with_overrides({
"MINDTRACE_DIR_PATHS": {
"TEMP_DIR": "/tmp/clone/logs"
}
})
print("Original:", config.MINDTRACE_DIR_PATHS.TEMP_DIR) # Unchanged
print("Cloned:", cloned.MINDTRACE_DIR_PATHS.TEMP_DIR) # New value
Working with secret fields:
from pydantic import BaseModel, SecretStr
from mindtrace.core.config import Config
class APIKeys(BaseModel):
OPENAI: SecretStr
DISCORD: SecretStr
class AppSettings(BaseModel):
API_KEYS: APIKeys
config = Config(AppSettings(API_KEYS=APIKeys(
OPENAI=SecretStr("sk-abc123"),
DISCORD=SecretStr("discord-token")
)))
# Access masked values
print(config.API_KEYS.OPENAI) # ********
print(config.API_KEYS.DISCORD) # ********
# Get real secret values
print(config.get_secret("API_KEYS", "OPENAI")) # sk-abc123
print(config.get_secret("API_KEYS", "DISCORD")) # discord-token
load
classmethod
load(
*,
defaults: Optional[Union[Dict[str, Any], BaseSettings, BaseModel]] = None,
overrides: Optional[
Union[
Dict[str, Any],
List[Union[Dict[str, Any], BaseSettings, BaseModel]],
BaseSettings,
BaseModel,
]
] = None,
file_loader: Optional[Callable[[], Dict[str, Any]]] = None
) -> Config
Create a Config from optional defaults, optional file loader, and runtime overrides.
This is the recommended way to create a Config instance with proper precedence order: 1. File loader (lowest precedence) 2. Defaults 3. Environment variables 4. Runtime overrides (highest precedence)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
defaults
|
Optional[Union[Dict[str, Any], BaseSettings, BaseModel]]
|
Base configuration as dict, BaseSettings, or BaseModel |
None
|
overrides
|
Optional[Union[Dict[str, Any], List[Union[Dict[str, Any], BaseSettings, BaseModel]], BaseSettings, BaseModel]]
|
Runtime overrides that take highest precedence |
None
|
file_loader
|
Optional[Callable[[], Dict[str, Any]]]
|
Optional callable that returns a dict (e.g., from INI file) |
None
|
Returns:
| Type | Description |
|---|---|
Config
|
Config instance with all sources merged |
Examples:
Load from INI file with overrides:
from pathlib import Path
from mindtrace.core.config import Config
from mindtrace.core.utils import load_ini_as_dict
def ini_loader():
return load_ini_as_dict(Path("config.ini"))
config = Config.load(
file_loader=ini_loader,
overrides={"MINDTRACE_DIR_PATHS": {"TEMP_DIR": "/custom/tmp"}}
)
Load with Pydantic model defaults:
load_json
classmethod
Load configuration from a JSON file with environment variable overrides and secret masking.
This method loads configuration data from a JSON file and applies the same processing as the main Config class: environment variable overrides and automatic masking of secret fields.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Path to the JSON file (string or Path object) |
required |
Returns:
| Type | Description |
|---|---|
Config
|
Config instance loaded from the JSON file |
Examples:
Load from JSON file:
from mindtrace.core.config import Config
# Load configuration from JSON file
config = Config.load_json("config.json")
print(config.MINDTRACE_DIR_PATHS.TEMP_DIR)
Load with environment overrides:
import os
os.environ["MINDTRACE_DEFAULT_HOST_URLS__SERVICE"] = "http://env-override:8000"
config = Config.load_json("config.json")
# Environment variable will override the value from JSON
print(config.MINDTRACE_DEFAULT_HOST_URLS.SERVICE) # http://env-override:8000
Note
The JSON file should contain the same structure as expected by Config. Secret fields will be automatically masked when accessed normally.
save_json
Save configuration to a JSON file with optional secret revelation.
This method serializes the current configuration to a JSON file. By default, secret fields are masked (shown as **) for security. You can optionally reveal the actual secret values by setting reveal_secrets=True.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Path where to save the JSON file (string or Path object) |
required |
reveal_secrets
|
bool
|
If True, writes actual secret values instead of masked ones |
False
|
indent
|
int
|
JSON indentation level for pretty printing (default: 4) |
4
|
Returns:
| Type | Description |
|---|---|
None
|
None |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If file writing or JSON serialization fails |
Examples:
Save with masked secrets (default):
config = Config(CoreSettings())
config.save_json("config.json")
# Secret fields will be saved as "********"
Save with revealed secrets:
config.save_json("config.json", reveal_secrets=True)
# Secret fields will be saved with actual values
Save with custom indentation:
Note
Parent directories are created automatically if they don't exist. Use reveal_secrets=True only when necessary for debugging or migration.
to_revealed_strings
Convert the config to a dictionary with revealed secret values.
clone_with_overrides
Return a new Config clone with overrides applied (original remains unchanged).
This method creates a deep copy of the current config and applies the provided overrides without modifying the original configuration. Useful for creating temporary configurations or testing different settings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*overrides
|
SettingsLike
|
Configuration overrides as dict, BaseSettings, BaseModel, or lists of these |
()
|
Returns:
| Type | Description |
|---|---|
Config
|
New Config instance with overrides applied |
Examples:
Clone with simple overrides:
original = Config({"API_URL": "http://prod:8000", "DEBUG": False})
cloned = original.clone_with_overrides({"DEBUG": True})
print(original.DEBUG) # False (unchanged)
print(cloned.DEBUG) # True (new value)
Clone with nested overrides:
cloned = config.clone_with_overrides({
"MINDTRACE_DIR_PATHS": {
"TEMP_DIR": "/tmp/testing",
"REGISTRY_DIR": "/tmp/test_registry"
}
})
Clone with multiple overrides:
get_secret
Retrieve a secret by dotted path components.
This method accesses the real (unmasked) value of secret fields that were defined using pydantic.SecretStr. The secret values are stored internally and can be retrieved using this method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*path
|
str
|
Path components to the secret field (e.g., "API_KEYS", "OPENAI") |
()
|
Returns:
| Type | Description |
|---|---|
Optional[str]
|
The real secret value as string, or None if not found |
Examples:
Get OpenAI API key:
config = Config(CoreSettings())
api_key = config.get_secret("MINDTRACE_API_KEYS", "OPENAI")
print(api_key) # "sk-abc123..." (real value)
from pydantic import BaseModel, SecretStr
from mindtrace.core.config import Config
class APIKeys(BaseModel):
OPENAI: SecretStr
DISCORD: SecretStr
config = Config(APIKeys(
OPENAI=SecretStr("sk-abc123"),
DISCORD=SecretStr("discord-token")
))
# Access masked value
print(config.OPENAI) # ********
# Get real value
print(config.get_secret("OPENAI")) # sk-abc123
CoreConfig
Bases: Config
Configuration wrapper that automatically includes CoreSettings with environment variable support.
CoreConfig is a convenience class that wraps the base Config class and automatically loads CoreSettings as the default configuration. This includes support for environment variables, .env files, and INI configuration files with automatic path expansion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
extra_settings
|
SettingsLike
|
Additional configuration overrides (highest precedence) |
None
|
apply_env_overrides
|
Whether to apply environment variable overrides |
required |
Examples:
Basic usage with default CoreSettings:
from mindtrace.core.config import CoreConfig
# Loads CoreSettings with env overrides
config = CoreConfig()
print(config.MINDTRACE_DEFAULT_HOST_URLS.SERVICE)
With additional overrides:
config = CoreConfig({
"MINDTRACE_DIR_PATHS": {
"TEMP_DIR": "/custom/tmp"
}
})
# Override takes precedence over CoreSettings defaults
Environment variable overrides:
import os
os.environ["MINDTRACE_DEFAULT_HOST_URLS__SERVICE"] = "http://custom:8000"
config = CoreConfig()
# Environment variable overrides the INI file value
print(config.MINDTRACE_DEFAULT_HOST_URLS.SERVICE) # http://custom:8000
Note
Environment variables are applied at the CoreSettings level, not at the Config level. Additional overrides passed to CoreConfig take the highest precedence.
Logging
default_formatter
Create a logging formatter with a standardized default format.
This function returns a Python logging Formatter instance configured with a default format string that includes timestamp, log level, logger name, and message. If a custom format string is provided, it will be used instead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fmt
|
Optional[str]
|
Optional custom format string. If None, uses the default format:
|
None
|
Returns:
| Type | Description |
|---|---|
Formatter
|
logging.Formatter: Configured formatter instance ready to use with handlers. |
Examples:
Use default format:
formatter = default_formatter()
handler.setFormatter(formatter)
# Output: [2024-01-15 10:30:45,123] INFO: mindtrace.core: Operation completed
Use custom format:
setup_logger
setup_logger(
name: str = "mindtrace",
*,
log_dir: Optional[Path] = None,
logger_level: int = logging.DEBUG,
stream_level: int = logging.ERROR,
add_stream_handler: bool = True,
file_level: int = logging.DEBUG,
file_mode: str = "a",
add_file_handler: bool = True,
propagate: bool = False,
max_bytes: int = 10 * 1024 * 1024,
backup_count: int = 5,
use_structlog: Optional[bool] = None,
structlog_json: Optional[bool] = True,
structlog_pre_chain: Optional[list] = None,
structlog_processors: Optional[list] = None,
structlog_renderer: Optional[object] = None,
structlog_bind: Optional[object] = None
) -> Logger | structlog.BoundLogger
Configure and initialize logging for Mindtrace components programmatically.
Sets up a rotating file handler and a console handler on the given logger. Log file defaults to ~/.cache/mindtrace/{name}.log.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Logger name, defaults to "mindtrace". |
'mindtrace'
|
log_dir
|
Optional[Path]
|
Custom directory for log file. |
None
|
logger_level
|
int
|
Overall logger level. |
DEBUG
|
stream_level
|
int
|
StreamHandler level (e.g., ERROR). |
ERROR
|
add_stream_handler
|
bool
|
Whether to add a stream handler. |
True
|
file_level
|
int
|
FileHandler level (e.g., DEBUG). |
DEBUG
|
file_mode
|
str
|
Mode for file handler, default is 'a' (append). |
'a'
|
add_file_handler
|
bool
|
Whether to add a file handler. |
True
|
propagate
|
bool
|
Whether the logger should propagate messages to ancestor loggers. |
False
|
max_bytes
|
int
|
Maximum size in bytes before rotating log file. |
10 * 1024 * 1024
|
backup_count
|
int
|
Number of backup files to retain. |
5
|
use_structlog
|
Optional[bool]
|
Optional bool. If True, configure and return a structlog BoundLogger. |
None
|
structlog_json
|
Optional[bool]
|
Optional bool. If True, render JSON; otherwise use console/dev renderer. |
True
|
structlog_pre_chain
|
Optional[list]
|
Optional list of pre-processors for stdlib log records. |
None
|
structlog_processors
|
Optional[list]
|
Optional list of processors after pre_chain (before render). |
None
|
structlog_renderer
|
Optional[object]
|
Optional custom renderer processor. Overrides |
None
|
structlog_bind
|
Optional[object]
|
Optional dict or callable(name)->dict to bind fields. |
None
|
Returns:
| Type | Description |
|---|---|
Logger | BoundLogger
|
Logger | structlog.BoundLogger: Configured logger instance. |
get_logger
get_logger(
name: str | None = "mindtrace", use_structlog: bool | None = None, **kwargs
) -> logging.Logger | structlog.BoundLogger
Create or retrieve a named logger instance.
This function wraps Python's built-in logging.getLogger() to provide a
standardized logger for Mindtrace components. If the logger with the given
name already exists, it returns the existing instance; otherwise, it creates
a new one with optional configuration overrides.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The name of the logger. Defaults to "mindtrace". |
'mindtrace'
|
use_structlog
|
bool
|
Whether to use structured logging. If None, uses config default. |
None
|
**kwargs
|
Additional keyword arguments to be passed to |
{}
|
Returns:
| Type | Description |
|---|---|
Logger | BoundLogger
|
logging.Logger | structlog.BoundLogger: A configured logger instance. |
Example:
track_operation
track_operation(
name: str = None,
timeout: float | None = None,
logger: Any | None = None,
logger_name: str | None = None,
include_args: list[str] | None = None,
log_level: int = logging.DEBUG,
include_system_metrics: bool = False,
system_metrics: list[str] | None = None,
**context: Any
)
Unified function that works as both context manager and decorator.
This function can be used in two ways:
1. As a context manager: async with track_operation("name") as log:
2. As a decorator: @track_operation("name")
Provides structured logging for operations, automatically logging start, completion, timeout, and errors with duration metrics. Requires structlog to be installed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The name of the operation being tracked. When used as decorator, defaults to the function name if not provided. |
None
|
timeout
|
float | None
|
Optional timeout in seconds. If provided, raises asyncio.TimeoutError when exceeded. If FastAPI is available, raises HTTPException(504) instead. |
None
|
logger
|
Any | None
|
Optional structlog logger instance. If None, creates a new logger. |
None
|
logger_name
|
str | None
|
Optional logger name. If None, uses "mindtrace.operations.{name}" for context manager or "mindtrace.methods.{name}" for decorator. |
None
|
include_args
|
list[str] | None
|
List of argument names to include in the log context (decorator only). If None, no arguments are logged. Only works with bound methods (self as first arg). |
None
|
log_level
|
int
|
Log level for the operation logs. Defaults to logging.DEBUG. |
DEBUG
|
include_system_metrics
|
bool
|
If True, include system metrics in the log context. |
False
|
system_metrics
|
list[str] | None
|
Optional list of metric names to include. If None, include all available metrics. |
None
|
**context
|
Any
|
Additional context fields to bind to the logger for this operation. |
{}
|
Yields (context manager): structlog.BoundLogger: A bound logger with operation context for logging.
Returns (decorator): Callable: The decorated method with automatic logging.
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If timeout is exceeded and FastAPI is not available. |
HTTPException
|
If timeout is exceeded and FastAPI is available (status_code=504). |
Exception
|
Re-raises any exception that occurs during operation execution. |
Examples:
Context manager usage: .. code-block:: python
import asyncio
from mindtrace.core.logging.logger import track_operation
async def fetch_data():
async with track_operation("fetch_data", user_id="123") as log:
# Your async operation here
result = await some_async_operation()
log.info("Data fetched successfully", records_count=len(result))
return result
Decorator usage on async function: .. code-block:: python
@track_operation("process_data", batch_id="batch_123", timeout=5.0)
async def process_data(data: list) -> list:
# Method execution is automatically logged
return [item.upper() for item in data]
Decorator usage on class method: .. code-block:: python
class DataProcessor:
def __init__(self):
self.logger = structlog.get_logger("data_processor")
@track_operation("process_batch", include_args=["batch_id"])
async def process_batch(self, batch_id: str, data: list):
# Logs will include batch_id in context
return await self._process_data(data)
With timeout: .. code-block:: python
async def fetch_with_timeout():
try:
async with track_operation("fetch_data", timeout=30.0, service="api") as log:
result = await slow_operation()
return result
except asyncio.TimeoutError:
# Operation timed out after 30 seconds
return None
Types
bounding_box
BoundingBox
dataclass
Axis-aligned rectangle in image or world coordinates.
Coordinates follow OpenCV/Pascal VOC convention: (x, y, width, height), where (x, y) is the top-left corner.
to_roi_slices
Return (rows_slice, cols_slice) for NumPy image indexing: img[rows, cols].
draw_on_pil
draw_on_pil(
image: Image,
color: Tuple[int, int, int] = (255, 0, 0),
width: int = 2,
fill: Optional[Tuple[int, int, int, int]] = None,
label: Optional[str] = None,
label_color: Tuple[int, int, int] = (255, 255, 255),
label_bg: Tuple[int, int, int] = (255, 0, 0),
font: Optional[ImageFont] = None,
) -> Image
Draw the bounding box (and optional label) directly onto a PIL Image and return it.
rotated_rect
RotatedRect
dataclass
Rotated rectangle represented by center (cx, cy), size (width, height), and rotation angle (degrees).
Angle follows OpenCV convention in degrees, counter-clockwise, where 0 aligns the rectangle's width along +X axis.
draw_on_pil
draw_on_pil(
image: Image,
color: Tuple[int, int, int] = (0, 255, 0),
width: int = 2,
fill: Optional[Tuple[int, int, int, int]] = None,
label: Optional[str] = None,
label_color: Tuple[int, int, int] = (255, 255, 255),
label_bg: Tuple[int, int, int] = (0, 128, 0),
font: Optional[ImageFont] = None,
) -> Image
Draw the rotated rectangle (and optional label) onto a PIL Image and return it.
task_schema
TaskSchema
Bases: BaseModel
A task schema with strongly-typed input and output models.
Utilities
LocalIPError
Bases: NetworkError
Raised when unable to determine local IP address.
NetworkError
Bases: Exception
Base exception for network-related errors.
NoFreePortError
Bases: NetworkError
Raised when no free port is found in the specified range.
PortInUseError
Bases: NetworkError
Raised when a port is already in use.
ServiceTimeoutError
Bases: NetworkError
Raised when waiting for a service times out.
SystemMetricsCollector
Class for collecting various system metrics.
This class allows collection of CPU, memory, disk usage, network I/O, etc. Users can specify which metrics to collect and optionally enable periodic background updates.
Available metrics include
- "cpu_percent": Overall CPU usage percentage.
- "per_core_cpu_percent": CPU usage percentage per core.
- "memory_percent": Memory usage percentage.
- "disk_usage": Disk usage percentage.
- "network_io": Network I/O statistics (bytes sent and received).
- "load_average": System load average (if available).
Example Usage:
from time import sleep
from mindtrace.core.utils import SystemMetricsCollector
with SystemMetricsCollector(interval=3) as collector:
for _ in range(10):
print(collector())
sleep(1)
Alternative (manual stop):
from time import sleep
from mindtrace.core.utils import SystemMetricsCollector
collector = SystemMetricsCollector(interval=3)
try:
for _ in range(10):
print(collector())
sleep(1)
finally:
collector.stop()
On-demand usage (no background thread):
from mindtrace.core.utils import SystemMetricsCollector
collector = SystemMetricsCollector() # no interval; collected on demand
print(collector())
Initialize the system metrics collector.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
interval
|
int | None
|
Interval in seconds for periodic metrics collection. If provided, metrics will be updated to a separate cache periodically, instead of being collected on demand. Using a cache in this way can be less resource intensive than collecting metrics on demand. If None, metrics will be collected on demand. |
None
|
metrics_to_collect
|
list[str] | None
|
List of metrics to collect. If None, all available metrics will be collected. |
None
|
fetch
Get the current system metrics.
Returns:
| Type | Description |
|---|---|
dict[str, float | list | dict]
|
A dictionary containing system metrics. If metrics are cached, return them; otherwise, collect new metrics. |
check_libs
Check if all required libraries are available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
required_libs
|
str | list[str]
|
A list of library names to check. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
A list of missing libraries. |
first_not_none
Returns the first not-None value in the given iterable, else returns the default.
ifnone
Return the given value if it is not None, else return the default.
ifnone_url
Wraps ifnone to always return a URL.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str | Url | None
|
The Url to return. If none, the default value will be returned instead. |
required |
default
|
str | Url
|
The default URL to use if url is None. |
required |
Returns:
| Type | Description |
|---|---|
Url
|
The Url object. |
download_and_extract_tarball
download_and_extract_tarball(
url: str,
extract_to: Union[str, Path],
filename: Optional[str] = None,
remove_after_extract: bool = True,
) -> Path
Download a tarball (tar.gz, tar.bz2, etc.) from URL and extract it to the specified directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
URL to download the tarball from |
required |
extract_to
|
Union[str, Path]
|
Directory to extract the tarball to |
required |
filename
|
Optional[str]
|
Optional filename for the downloaded file (if None, uses URL basename) |
None
|
remove_after_extract
|
bool
|
Whether to remove the downloaded tarball after extraction |
True
|
Returns:
| Type | Description |
|---|---|
Path
|
Path to the extracted directory |
Raises:
| Type | Description |
|---|---|
Exception
|
If download or extraction fails |
download_and_extract_zip
download_and_extract_zip(
url: str,
extract_to: Union[str, Path],
filename: Optional[str] = None,
remove_after_extract: bool = True,
) -> Path
Download a ZIP file from URL and extract it to the specified directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
URL to download the ZIP file from |
required |
extract_to
|
Union[str, Path]
|
Directory to extract the ZIP file to |
required |
filename
|
Optional[str]
|
Optional filename for the downloaded file (if None, uses URL basename) |
None
|
remove_after_extract
|
bool
|
Whether to remove the downloaded ZIP file after extraction |
True
|
Returns:
| Type | Description |
|---|---|
Path
|
Path to the extracted directory |
Raises:
| Type | Description |
|---|---|
Exception
|
If download or extraction fails |
instantiate_target
Instantiates a target object from a string.
The target string should be in the same format as expected from Hydra targets. I.e. 'module_name.class_name'.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target
|
str
|
A string representing a target object. |
required |
Example::
from mindtrace.core import instantiate_target
target = 'mindtrace.core.config.Config'
config = instantiate_target(target)
print(type(config)) # <class 'mindtrace.core.config.Config'>
load_ini_as_dict
Load and parse an INI file into a nested dictionary with normalized keys.
- Section names and keys are uppercased for uniform access
- Values with leading '~' are expanded to the user home directory
- Returns an empty dict if the file does not exist
named_lambda
Assigns a name to the given lambda function.
This method is useful when passing lambda functions to other functions that require a name attribute. For example,
when using the autolog decorator, the wrapped function will be logged according the function name. If the original
function is a lambda function, it's name attribute will be set to the generic name '
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The name to assign to the lambda function. |
required |
lambda_func
|
Callable
|
The lambda function to assign the name to. |
required |
Returns:
| Type | Description |
|---|---|
Callable
|
The lambda function with the name attribute set to the given name. |
Example::
from mindtrace.core import Mindtrace, named_lambda
class HyperRunner(Mindtrace):
def __init__(self):
super().__init__()
def run_command(self, command: Callable, data: Any): # cannot control the name of the command
return Mindtrace.autolog(command(data))()
hyper_runner = HyperRunner()
hyper_runner.run_command(lambda x, y: x + y, data=(1, 2)) # autologs to '<lambda>'
hyper_runner.run_command(named_lambda("add", lambda x, y: x + y), data=(1, 2)) # autologs to 'add'
check_port_available
Assert that a port is available for binding.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
Host address to check. |
required |
port
|
int
|
Port number to check. |
required |
Raises:
| Type | Description |
|---|---|
PortInUseError
|
If the port is already in use. |
get_free_port
Find a free port in the given range.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
Host address to check. |
'localhost'
|
start_port
|
int
|
Starting port number (inclusive). |
8000
|
end_port
|
int
|
Ending port number (inclusive). |
9000
|
Returns:
| Type | Description |
|---|---|
int
|
First available port number in the range. |
Raises:
| Type | Description |
|---|---|
NoFreePortError
|
If no free port is found in the range. |
get_local_ip
Get the local IP address of the machine.
Uses UDP socket connection to determine the local IP address that would be used to reach external networks.
Returns:
| Type | Description |
|---|---|
str
|
Local IP address string. |
Raises:
| Type | Description |
|---|---|
LocalIPError
|
If unable to determine local IP address. |
get_local_ip_safe
Get the local IP address with a fallback value.
This is a convenience wrapper around get_local_ip() that returns a fallback value instead of raising an exception.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fallback
|
str
|
IP address to return if detection fails. |
'127.0.0.1'
|
Returns:
| Type | Description |
|---|---|
str
|
Local IP address or fallback value. |
is_port_available
Check if a port is available for binding.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
Host address to check. |
required |
port
|
int
|
Port number to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if port is available, False if port is in use. |
wait_for_service
Wait for a service to become available on the specified host and port.
Note: For services launched via mindtrace.services, prefer using Service.launch(wait_for_launch=True) which provides better integration with the service lifecycle.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
Service host address. |
required |
port
|
int
|
Service port number. |
required |
timeout
|
float
|
Maximum time to wait in seconds. |
30.0
|
poll_interval
|
float
|
Time between connection attempts in seconds. |
0.5
|
Raises:
| Type | Description |
|---|---|
ServiceTimeoutError
|
If the service doesn't become available within timeout. |
expand_tilde
Recursively expand leading '~' across strings in nested structures.
Supports dict, list/tuple/set, and str. Other types are returned unchanged.
expand_tilde_str
Expand leading '~' in a string value; return unchanged if not present.
checks
ifnone
Return the given value if it is not None, else return the default.
first_not_none
Returns the first not-None value in the given iterable, else returns the default.
ifnone_url
Wraps ifnone to always return a URL.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str | Url | None
|
The Url to return. If none, the default value will be returned instead. |
required |
default
|
str | Url
|
The default URL to use if url is None. |
required |
Returns:
| Type | Description |
|---|---|
Url
|
The Url object. |
conversions
Utility methods relating to image conversion.
pil_to_ascii
ascii_to_pil
pil_to_bytes
bytes_to_pil
pil_to_tensor
tensor_to_pil
Convert Torch Tensor to PIL Image.
Note that PIL float images must be scaled [0, 1]. It is often the case, however, that torch tensor images may have a different range (e.g. zero mean or [-1, 1]). As such, the input torch tensor will automatically be scaled to fit in the range [0, 1]. If no min / max value is provided, the output range will be identically 0 / 1, respectively. Else you may pass in min / max range values explicitly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image
|
Tensor
|
The input image. |
required |
mode
|
The mode of the output image. One of {'L', 'RGB', 'RGBA'}. |
None
|
|
min_val
|
The minimum value of the input image. If None, it will be inferred from the input image. |
None
|
|
max_val
|
The maximum value of the input image. If None, it will be inferred from the input image. |
None
|
pil_to_ndarray
Convert PIL image to numpy ndarray.
If an alpha channel is present, it will automatically be copied over as well.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image
|
Image
|
The input image. |
required |
image_format
|
Determines the number and order of channels in the output image. One of {'L', 'RGB', 'BGR'}. |
'RGB'
|
Returns:
| Type | Description |
|---|---|
ndarray
|
An np.ndarray image in the specified format. |
ndarray_to_pil
Convert numpy ndarray to PIL image.
The input image can either be a float array with values in the range [0, 1], an int array with values in the range [0, 255], or a bool array.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image
|
ndarray
|
The input image. It should be a numpy array with 1, 3 or 4 channels. |
required |
image_format
|
str
|
The format of the input image. One of {'RGB', 'BGR'} |
'RGB'
|
Returns:
| Type | Description |
|---|---|
|
A PIL image. |
pil_to_cv2
Convert PIL image to cv2 image.
Note that, in addition to cv2 images being numpy arrays, PIL Images follow RGB format while cv2 images follow BGR format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image
|
Image
|
The input image. |
required |
Returns:
| Type | Description |
|---|---|
ndarray
|
An np.ndarray image in 'BGR' (cv2) format. |
Example:
```python
import PIL
from mindtrace.core import pil_to_cv2
pil_image = PIL.Image.open('tests/resources/hopper.png')
cv2_image = pil_to_cv2(pil_image)
```
cv2_to_pil
Convert PIL image to cv2 image.
Note that, in addition to cv2 images being numpy arrays, PIL Images follow RGB format while cv2 images follow BGR format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image
|
ndarray
|
The input image. Should be a np.ndarray in 'BGR' (cv2) format. |
required |
Returns:
| Type | Description |
|---|---|
Image
|
A PIL image. |
pil_to_base64
Convert a PIL Image to a base64-encoded string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image
|
Image
|
The image to be converted. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The base64-encoded string representing the image. |
base64_to_pil
Convert a base64-encoded string back to a PIL Image.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base64_str
|
str
|
The base64-encoded string. |
required |
Returns:
| Type | Description |
|---|---|
Image
|
PIL.Image: The decoded image object. |
pil_to_discord_file
Convert a PIL Image to a Discord File object for uploading.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image
|
Image
|
The PIL image to be sent. |
required |
filename
|
str
|
The filename for the image file (default is "image.png"). |
'image.png'
|
Returns:
| Type | Description |
|---|---|
File
|
discord.File: A Discord file object that can be sent in a message. |
discord_file_to_pil
async
Convert a Discord attachment to a PIL Image.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
attachment
|
Attachment
|
The Discord file attachment to convert. |
required |
Returns:
| Type | Description |
|---|---|
Image
|
The resulting PIL Image. |
Example
@commands.command(name="process_images")
async def example_command(self, ctx):
attachments = ctx.message.attachments
if not attachments:
await ctx.send("No attachments found in the message.")
return
# Process each attachment in the message
for i, attachment in enumerate(attachments, start=1):
if attachment.filename.endswith(('png', 'jpg', 'jpeg')):
image = await discord_file_to_pil(attachment)
# Do something with the image, e.g., send it back or process it
await ctx.send(f"Attachment {i} processed as image.")
else:
await ctx.send(f"Attachment {i} is not a valid image file.")
tensor_to_ndarray
Convert a PyTorch tensor to a numpy array.
Handles both single images (3D tensors) and batches (4D tensors), converting them to the numpy format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tensor
|
Tensor
|
PyTorch tensor in format [C,H,W] or [B,C,H,W] |
required |
Returns
|
For batched tensors: a list of numpy arrays in HWC format For single image: a numpy array in HWC format |
required |
download
download_and_extract_zip
download_and_extract_zip(
url: str,
extract_to: Union[str, Path],
filename: Optional[str] = None,
remove_after_extract: bool = True,
) -> Path
Download a ZIP file from URL and extract it to the specified directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
URL to download the ZIP file from |
required |
extract_to
|
Union[str, Path]
|
Directory to extract the ZIP file to |
required |
filename
|
Optional[str]
|
Optional filename for the downloaded file (if None, uses URL basename) |
None
|
remove_after_extract
|
bool
|
Whether to remove the downloaded ZIP file after extraction |
True
|
Returns:
| Type | Description |
|---|---|
Path
|
Path to the extracted directory |
Raises:
| Type | Description |
|---|---|
Exception
|
If download or extraction fails |
download_and_extract_tarball
download_and_extract_tarball(
url: str,
extract_to: Union[str, Path],
filename: Optional[str] = None,
remove_after_extract: bool = True,
) -> Path
Download a tarball (tar.gz, tar.bz2, etc.) from URL and extract it to the specified directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
URL to download the tarball from |
required |
extract_to
|
Union[str, Path]
|
Directory to extract the tarball to |
required |
filename
|
Optional[str]
|
Optional filename for the downloaded file (if None, uses URL basename) |
None
|
remove_after_extract
|
bool
|
Whether to remove the downloaded tarball after extraction |
True
|
Returns:
| Type | Description |
|---|---|
Path
|
Path to the extracted directory |
Raises:
| Type | Description |
|---|---|
Exception
|
If download or extraction fails |
dynamic
Utility methods relating to dynamically generating objects.
dynamic_instantiation
Dynamically instantiates a class from a module.
instantiate_target
Instantiates a target object from a string.
The target string should be in the same format as expected from Hydra targets. I.e. 'module_name.class_name'.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target
|
str
|
A string representing a target object. |
required |
Example::
from mindtrace.core import instantiate_target
target = 'mindtrace.core.config.Config'
config = instantiate_target(target)
print(type(config)) # <class 'mindtrace.core.config.Config'>
get_class
Gets a class from a module path string without instantiating it.
The target string should be in the same format as expected from Hydra targets. I.e. 'module_name.class_name'.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target
|
str
|
A string representing a target class path. |
required |
Returns:
| Type | Description |
|---|---|
type
|
The class object. |
Example::
from mindtrace.core import get_class
target = 'mindtrace.core.config.Config'
config_class = get_class(target)
print(config_class) # <class 'mindtrace.core.config.Config'>
hashing
compute_dir_hash
Compute SHA256 hash of directory contents.
Hash is deterministic: files are sorted by path, then each file's content is hashed and combined. This ensures the same directory always produces the same hash.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
directory_path
|
str | Path
|
Path to the directory to hash |
required |
chunk_size
|
int
|
Size of the chunks (in bytes) to read from the file |
2 ** 20
|
Returns: Hexadecimal SHA256 hash string
ini
load_ini_as_dict
Load and parse an INI file into a nested dictionary with normalized keys.
- Section names and keys are uppercased for uniform access
- Values with leading '~' are expanded to the user home directory
- Returns an empty dict if the file does not exist
lambdas
named_lambda
Assigns a name to the given lambda function.
This method is useful when passing lambda functions to other functions that require a name attribute. For example,
when using the autolog decorator, the wrapped function will be logged according the function name. If the original
function is a lambda function, it's name attribute will be set to the generic name '
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The name to assign to the lambda function. |
required |
lambda_func
|
Callable
|
The lambda function to assign the name to. |
required |
Returns:
| Type | Description |
|---|---|
Callable
|
The lambda function with the name attribute set to the given name. |
Example::
from mindtrace.core import Mindtrace, named_lambda
class HyperRunner(Mindtrace):
def __init__(self):
super().__init__()
def run_command(self, command: Callable, data: Any): # cannot control the name of the command
return Mindtrace.autolog(command(data))()
hyper_runner = HyperRunner()
hyper_runner.run_command(lambda x, y: x + y, data=(1, 2)) # autologs to '<lambda>'
hyper_runner.run_command(named_lambda("add", lambda x, y: x + y), data=(1, 2)) # autologs to 'add'
network
Network utilities for port management and service connectivity.
This module provides exception-based network utilities for checking port availability, finding free ports, waiting for services, and getting local IP addresses.
All functions raise exceptions on errors rather than returning sentinel values, forcing callers to handle error conditions explicitly.
NetworkError
Bases: Exception
Base exception for network-related errors.
PortInUseError
Bases: NetworkError
Raised when a port is already in use.
NoFreePortError
Bases: NetworkError
Raised when no free port is found in the specified range.
ServiceTimeoutError
Bases: NetworkError
Raised when waiting for a service times out.
LocalIPError
Bases: NetworkError
Raised when unable to determine local IP address.
is_port_available
Check if a port is available for binding.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
Host address to check. |
required |
port
|
int
|
Port number to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if port is available, False if port is in use. |
check_port_available
Assert that a port is available for binding.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
Host address to check. |
required |
port
|
int
|
Port number to check. |
required |
Raises:
| Type | Description |
|---|---|
PortInUseError
|
If the port is already in use. |
get_free_port
Find a free port in the given range.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
Host address to check. |
'localhost'
|
start_port
|
int
|
Starting port number (inclusive). |
8000
|
end_port
|
int
|
Ending port number (inclusive). |
9000
|
Returns:
| Type | Description |
|---|---|
int
|
First available port number in the range. |
Raises:
| Type | Description |
|---|---|
NoFreePortError
|
If no free port is found in the range. |
wait_for_service
Wait for a service to become available on the specified host and port.
Note: For services launched via mindtrace.services, prefer using Service.launch(wait_for_launch=True) which provides better integration with the service lifecycle.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
Service host address. |
required |
port
|
int
|
Service port number. |
required |
timeout
|
float
|
Maximum time to wait in seconds. |
30.0
|
poll_interval
|
float
|
Time between connection attempts in seconds. |
0.5
|
Raises:
| Type | Description |
|---|---|
ServiceTimeoutError
|
If the service doesn't become available within timeout. |
get_local_ip
Get the local IP address of the machine.
Uses UDP socket connection to determine the local IP address that would be used to reach external networks.
Returns:
| Type | Description |
|---|---|
str
|
Local IP address string. |
Raises:
| Type | Description |
|---|---|
LocalIPError
|
If unable to determine local IP address. |
get_local_ip_safe
Get the local IP address with a fallback value.
This is a convenience wrapper around get_local_ip() that returns a fallback value instead of raising an exception.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fallback
|
str
|
IP address to return if detection fails. |
'127.0.0.1'
|
Returns:
| Type | Description |
|---|---|
str
|
Local IP address or fallback value. |
paths
expand_tilde_str
Expand leading '~' in a string value; return unchanged if not present.
system_metrics_collector
SystemMetricsCollector
Class for collecting various system metrics.
This class allows collection of CPU, memory, disk usage, network I/O, etc. Users can specify which metrics to collect and optionally enable periodic background updates.
Available metrics include
- "cpu_percent": Overall CPU usage percentage.
- "per_core_cpu_percent": CPU usage percentage per core.
- "memory_percent": Memory usage percentage.
- "disk_usage": Disk usage percentage.
- "network_io": Network I/O statistics (bytes sent and received).
- "load_average": System load average (if available).
Example Usage:
from time import sleep
from mindtrace.core.utils import SystemMetricsCollector
with SystemMetricsCollector(interval=3) as collector:
for _ in range(10):
print(collector())
sleep(1)
Alternative (manual stop):
from time import sleep
from mindtrace.core.utils import SystemMetricsCollector
collector = SystemMetricsCollector(interval=3)
try:
for _ in range(10):
print(collector())
sleep(1)
finally:
collector.stop()
On-demand usage (no background thread):
from mindtrace.core.utils import SystemMetricsCollector
collector = SystemMetricsCollector() # no interval; collected on demand
print(collector())
Initialize the system metrics collector.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
interval
|
int | None
|
Interval in seconds for periodic metrics collection. If provided, metrics will be updated to a separate cache periodically, instead of being collected on demand. Using a cache in this way can be less resource intensive than collecting metrics on demand. If None, metrics will be collected on demand. |
None
|
metrics_to_collect
|
list[str] | None
|
List of metrics to collect. If None, all available metrics will be collected. |
None
|
fetch
Get the current system metrics.
Returns:
| Type | Description |
|---|---|
dict[str, float | list | dict]
|
A dictionary containing system metrics. If metrics are cached, return them; otherwise, collect new metrics. |
timers
Utility class for simple Timer and TimerCollection classes.
Timer
Utility timer class.
This class can be used to time operations. It can be started, stopped, and reset. The duration of the timer can be retrieved at any time.
Usage
import time
from mindtrace.core import Timer
timer = Timer()
timer.start()
time.sleep(1)
timer.stop()
print(f'The timer ran for {timer.duration()} seconds.') # The timer ran for 1.0000000000000002 seconds.
timer.reset()
timer.start()
time.sleep(2)
timer.stop()
print(f'The timer ran for {timer.duration()} seconds.') # The timer ran for 2.0000000000000004 seconds.
TimerContext
Context manager for individual timers in a TimerCollection.
TimerCollection
Utility class for timing multiple operations.
This class keeps a collection of named timers. Each timer can be started, stopped, and reset. The duration of each timer can be retrieved at any time. If a timer is stopped and restarted, the duration will be added to the previous duration. The timers can be reset individually, or all at once.
Usage
import time
from mindtrace.core import TimerCollection
tc = TimerCollection()
tc.start('Timer 1')
tc.start('Timer 2')
time.sleep(1)
tc.stop('Timer 1')
time.sleep(1)
tc.stop('Timer 2')
tc.start('Timer 3')
time.sleep(1)
tc.reset('Timer 1')
print(tc)
# Timer 1: 0.000s
# Timer 2: 2.000s
# Timer 3: 1.000s
tc.reset_all()
print(tc)
# Timer 1: 0.000s
# Timer 2: 0.000s
# Timer 3: 0.000s
Context Manager Usage:
import time
from mindtrace.core import TimerCollection
tc = TimerCollection()
with tc.start('Timer 1'):
with tc.start('Timer 2'):
time.sleep(1)
# stops "Timer 2"
with tc.start('Timer 3'):
time.sleep(2)
# stops "Timer 3"
# stops "Timer 1"
print(tc)
# Timer 1: 3.000s
# Timer 2: 1.000s
# Timer 3: 2.000s
add_timer
Add a timer with the given name. If the timer already exists, it will be replaced.
start
Start the timer with the given name. If the timer does not exist, it will be created.
stop
Stop the timer with the given name.
Raises:
| Type | Description |
|---|---|
KeyError
|
If the timer with the given name does not exist. |
duration
Get the duration of the timer with the given name.
Raises:
| Type | Description |
|---|---|
KeyError
|
If the timer with the given name does not exist. |
reset
Reset the timer with the given name.
Raises:
| Type | Description |
|---|---|
KeyError
|
If the timer with the given name does not exist. |
restart
Reset and start the timer with the given name.
Raises:
| Type | Description |
|---|---|
KeyError
|
If the timer with the given name does not exist. |
Timeout
Timeout(
timeout: float = 60.0,
retry_delay: float = 1.0,
exceptions: tuple[Type[Exception], ...] = (Exception,),
progress_bar: bool = False,
desc: str | None = None,
)
Utility for adding a timeout to a given method.
The given method will be run and rerun until an exception is not raised, or the timeout period is reached.
If the method raises an exception that is in the exceptions tuple, that exception will be caught and ignored. After a retry_delay, the method will be run again. This process will continue until the method runs without raising an exception, or the timeout period is passed.
If the timeout is reached, a TimeoutError will be raised. If the method ever raises an exception that is not in the exceptions tuple, the timeout process will stop and that exception will be reraised.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
The maximum time in seconds that the method can run before a TimeoutError is raised. |
60.0
|
retry_delay
|
float
|
The time in seconds to wait between attempts to run the method. |
1.0
|
exceptions
|
tuple[Type[Exception], ...]
|
A tuple of exceptions that will be caught and ignored. By default, all exceptions are caught. |
(Exception,)
|
progress_bar
|
bool
|
A boolean indicating whether to display a progress bar while waiting for the timeout. |
False
|
desc
|
str | None
|
A description to display in the progress bar. |
None
|
Returns:
| Type | Description |
|---|---|
|
The result of the given method. |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If the timeout is reached. |
Exception
|
Any raised exception not in the exceptions tuple will be reraised. |
Example— Running Timeout Manually:
import requests
from urllib3.util.url import parse_url, Url
from mindtrace.core import Timeout
from mindtrace.services import Service
def get_server_status(url: Url):
# The following request may fail for two categories of reasons:
# 1. The server has not launched yet: Will raise a ConnectionError, we should retry.
# 2. Any other reason: Will raise some other exception, we should break out and reraise it.
# Both cases will be raised to the Timeout class. We will tell the Timeout object to ignore ConnectionError.
response = requests.request("POST", str(url) + "status")
if response.status_code == 200:
return json.loads(response.content)["status"] # Server is up and responding
else:
raise HTTPException(response.status_code, response.content) # Request completed but something is wrong
url = parse_url("http://localhost:8080/")
timeout = Timeout(timeout=60.0, exceptions=(ConnectionRefusedError, requests.exceptions.ConnectionError))
Service.launch(url)
status = timeout.run(get_server_status, url) # Will wait up to 60 seconds for the server to launch.
print(f"Server status: {status}")
Example— Using Timeout as a Decorator:
import requests
from urllib3.util.url import parse_url, Url
from mindtrace.core import Timeout
from mindtrace.services import Service
@Timeout(timeout=60.0, exceptions=(ConnectionRefusedError, requests.exceptions.ConnectionError))
def get_server_status(url: Url):
response = requests.request("POST", str(url) + "status")
if response.status_code == 200:
return json.loads(response.content)["status"]
else:
raise HTTPException(response.status_code, response.content)
url = parse_url("http://localhost:8080/")
Service.launch(url)
try:
status = get_server_status(url) # Will wait up to 60 seconds for the server to launch.
except TimeoutError as e:
print(f"The server did not respond within the timeout period: {e}") # Timeout of 60 seconds reached.
except Exception as e: # Guaranteed not to be one of the given exceptions in the exceptions tuple.
print(f"An unexpected error occurred: {e}")
else:
print(f"Server status: {status}")