Skip to content

Core Package API Reference

Base Classes

Mindtrace class. Provides unified configuration, logging and context management.

MindtraceMeta

MindtraceMeta(name, bases, attr_dict)

Bases: type

Metaclass for Mindtrace class.

The MindtraceMeta metaclass enables classes deriving from Mindtrace to automatically use the same default logger within class methods as it does within instance methods. i.e. consider the following class:

Usage
from mindtrace.core import Mindtrace

class MyClass(Mindtrace):
    def __init__(self):
        super().__init__()

    def instance_method(self):
        self.logger.info(f"Using logger: {self.logger.name}")  # Using logger: mindtrace.my_module.MyClass

    @classmethod
    def class_method(cls):
        cls.logger.info(f"Using logger: {cls.logger.name}")  # Using logger: mindtrace.my_module.MyClass

Mindtrace

Mindtrace(
    suppress: bool = False,
    *,
    config_overrides: SettingsLike | None = None,
    **kwargs
)

Base class for all Mindtrace package core classes.

The Mindtrace class adds default context manager and logging methods. All classes that derive from Mindtrace can be used as context managers and will use a unified logging format.

The class automatically provides logging capabilities for both class methods and instance methods. For example:

Usage
from mindtrace.core import Mindtrace

class MyClass(Mindtrace):
    def __init__(self):
        super().__init__()

    def instance_method(self):
        self.logger.info(f"Using logger: {self.logger.name}")  # Using logger: mindtrace.my_module.MyClass

    @classmethod
    def class_method(cls):
        cls.logger.info(f"Using logger: {cls.logger.name}")  # Using logger: mindtrace.my_module.MyClass

The logging functionality is automatically provided through the MindtraceMeta metaclass, which ensures consistent logging behavior across all method types.

Initialize the Mindtrace object.

Parameters:

Name Type Description Default
suppress bool

Whether to suppress exceptions in context manager use.

False
config_overrides SettingsLike | None

Additional settings to override the default config.

None
**kwargs

Additional keyword arguments. Logger-related kwargs are passed to get_logger. Valid logger kwargs: log_dir, logger_level, stream_level, file_level, file_mode, propagate, max_bytes, backup_count

{}

autolog classmethod

autolog(
    log_level=logging.DEBUG,
    prefix_formatter: Optional[Callable] = None,
    suffix_formatter: Optional[Callable] = None,
    exception_formatter: Optional[Callable] = None,
    self: Optional[Mindtrace] = None,
)

Decorator that adds logger.log calls to the decorated method before and after the method is called.

By default, the autolog decorator will log the method name, arguments and keyword arguments before the method is called, and the method name and result after the method completes. This behavior can be modified by passing in prefix and suffix formatters.

The autolog decorator will also catch and log all Exceptions, re-raising any exception after logging it. The behavior for autologging exceptions can be modified by passing in an exception_formatter.

The autolog decorator expects a logger to exist at self.logger, and hence can only be used by Mindtrace subclasses or classes that have a logger attribute.

Parameters:

Name Type Description Default
log_level

The log_level passed to logger.log().

DEBUG
prefix_formatter Optional[Callable]

The formatter used to log the command before the wrapped method runs. The prefix_formatter will be given (and must accept) three arguments, in the following order: - function: The function being wrapped. - args: The args passed into the function. - kwargs: The kwargs passed into the function.

None
suffix_formatter Optional[Callable]

The formatter used to log the command after the wrapped method runs. The suffix_formatter will be given (and must accept) two arguments, in the following order: - function: The function being wrapped. - result: The result returned from the wrapped method.

None
exception_formatter Optional[Callable]

The formatter used to log any errors. The exception_formatter will be given (and must accept) three arguments, in the following order: - function: The function being wrapped. - error: The caught Exception. - stack trace: The stack trace, as provided by traceback.format_exc().

None
self Optional[Mindtrace]

The instance of the class that the method is being called on. Self only needs to be passed in if the wrapped method does not have self as the first argument. Refer to the example below for more details.

None
Usage
from mindtrace.core import Mindtrace

    class MyClass(Mindtrace):
        def __init__(self):
            super().__init__()

        @Mindtrace.autolog()
        def divide(self, arg1, arg2):
            self.logger.info("We are about to divide")
            result = arg1 / arg2
            self.logger.info("We have divided")
            return result

    my_instance = MyClass()
    my_instance.divide(1, 2)
    my_instance.divide(1, 0)

The resulting log file should contain something similar to the following:

    MyClass - DEBUG - Calling divide with args: (1, 2) and kwargs: {}
    MyClass - INFO - We are about to divide
    MyClass - INFO - We have divided
    MyClass - DEBUG - Finished divide with result: 0.5
    MyClass - DEBUG - Calling divide with args: (1, 0) and kwargs: {}
    MyClass - INFO - We are about to divide
    MyClass - ERROR - division by zero
    Traceback (most recent call last):
    ...
If the wrapped method does not have self as the first argument, self must be passed in as an argument to the autolog decorator.

Usage
    from fastapi import FastAPI
    from mindtrace.core import Mindtrace

    class MyClass(Mindtrace):
        def __init__():
            super().__init__()

        def create_app(self):
            app_ = FastAPI()

            @Mindtrace.autolog(self=self)  # self must be passed in as an argument as it is not captured in status()
            @app_.post("/status")
            def status():
                return {"status": "Available"}

            return app_

MindtraceABCMeta

MindtraceABCMeta(name, bases, attr_dict)

Bases: MindtraceMeta, ABCMeta

Metaclass that combines MindtraceMeta and ABC metaclasses.

This metaclass resolves metaclass conflicts when creating classes that need to be both abstract (using ABC) and have MindtraceMeta functionality. Python only allows a class to have one metaclass, so this combined metaclass allows classes to inherit from both Mindtrace class and ABC simultaneously.

Without this combined metaclass, trying to create a class that inherits from both Mindtrace class and ABC would raise a metaclass conflict error since they each have different metaclasses.

MindtraceABC

MindtraceABC(*args, **kwargs)

Bases: Mindtrace, ABC

Abstract base class combining Mindtrace class functionality with ABC support.

This class enables creating abstract classes that also have access to all Mindtrace features such as logging, configuration, and context management. Use this class instead of Mindtrace when you need to define abstract methods or properties in your class.

Usage
from mindtrace.core import MindtraceABC
from abc import abstractmethod

class MyAbstractService(MindtraceABC):
    def __init__(self):
        super().__init__()

    @abstractmethod
    def process_data(self, data):
        '''Must be implemented by concrete subclasses.'''
        pass
Note

Without this class, attempting to create a class that inherits from both Mindtrace class and ABC would fail due to metaclass conflicts. MindtraceABC resolves this by using the CombinedABCMeta.

Configuration Management

Config

Config(
    extra_settings: SettingsLike = None, *, apply_env_overrides: bool = True
)

Bases: dict

Unified configuration manager for Mindtrace components.

The Config class consolidates configuration from sources including dictionaries, Pydantic BaseSettings or BaseModel objects. It supports user provided arguments and environment variable overrides, path normalization by expanding the ~ character.

Key Features:

  • Accepts multiple configuration formats: dict, BaseModel, BaseSettings, or lists of these.
  • Attr-style and dict-style access to nested keys.
  • Supports secret fields using pydantic.SecretStr, preserving masking them by default.
  • Overlays environment variables (ENV_VAR__NESTED_KEY) over default configs.
  • Provides cloning, JSON export, and dynamic override capabilities.

Parameters:

Name Type Description Default
extra_settings SettingsLike

Configuration overrides or full config objects. Can be a dict, BaseSettings, BaseModel, or list of any of these.

None
apply_env_overrides bool

Whether to apply environment variable overrides. If True, environment variables will be applied over the default configs. If False, environment variables will not be applied.

True

Examples:

Basic usage with CoreSettings:

from mindtrace.core.config import Config, CoreSettings
config = Config(CoreSettings())
print(config["MINDTRACE_API_KEYS"]["OPENAI"])  # ******** (masked)
print(config.get_secret("MINDTRACE_API_KEYS", "OPENAI"))  # Real secret value

Load from INI file with overrides:

from pathlib import Path
from mindtrace.core.config import Config
from mindtrace.core.utils import load_ini_as_dict

def my_loader():
    file_path = Path("sample.ini")
    return load_ini_as_dict(file_path)

defaults = my_loader()
overrides = {
    "MINDTRACE_DIR_PATHS": {
        "TEMP_DIR": "/tmp/logs",
        "REGISTRY_DIR": "/tmp/registry"
    }
}
config = Config.load(defaults=defaults, overrides=overrides)

Access values in multiple ways:

# Attribute style access
print(config.MINDTRACE_DIR_PATHS.TEMP_DIR)

# Dict style access
print(config["MINDTRACE_DIR_PATHS"]["TEMP_DIR"])

# Get method
print(config.get("MINDTRACE_DIR_PATHS").get("TEMP_DIR"))

Save and reload configuration:

# Save config to JSON
config.save_json("saved_config.json")

# Load config back
reloaded = Config.load_json("saved_config.json")

Clone config with overrides (original unchanged):

cloned = config.clone_with_overrides({
    "MINDTRACE_DIR_PATHS": {
        "TEMP_DIR": "/tmp/clone/logs"
    }
})
print("Original:", config.MINDTRACE_DIR_PATHS.TEMP_DIR)  # Unchanged
print("Cloned:", cloned.MINDTRACE_DIR_PATHS.TEMP_DIR)   # New value

Working with secret fields:

from pydantic import BaseModel, SecretStr
from mindtrace.core.config import Config

class APIKeys(BaseModel):
    OPENAI: SecretStr
    DISCORD: SecretStr

class AppSettings(BaseModel):
    API_KEYS: APIKeys

config = Config(AppSettings(API_KEYS=APIKeys(
    OPENAI=SecretStr("sk-abc123"),
    DISCORD=SecretStr("discord-token")
)))

# Access masked values
print(config.API_KEYS.OPENAI)           # ********
print(config.API_KEYS.DISCORD)          # ********

# Get real secret values
print(config.get_secret("API_KEYS", "OPENAI"))   # sk-abc123
print(config.get_secret("API_KEYS", "DISCORD"))  # discord-token

load classmethod

load(
    *,
    defaults: Optional[Union[Dict[str, Any], BaseSettings, BaseModel]] = None,
    overrides: Optional[
        Union[
            Dict[str, Any],
            List[Union[Dict[str, Any], BaseSettings, BaseModel]],
            BaseSettings,
            BaseModel,
        ]
    ] = None,
    file_loader: Optional[Callable[[], Dict[str, Any]]] = None
) -> Config

Create a Config from optional defaults, optional file loader, and runtime overrides.

This is the recommended way to create a Config instance with proper precedence order: 1. File loader (lowest precedence) 2. Defaults 3. Environment variables 4. Runtime overrides (highest precedence)

Parameters:

Name Type Description Default
defaults Optional[Union[Dict[str, Any], BaseSettings, BaseModel]]

Base configuration as dict, BaseSettings, or BaseModel

None
overrides Optional[Union[Dict[str, Any], List[Union[Dict[str, Any], BaseSettings, BaseModel]], BaseSettings, BaseModel]]

Runtime overrides that take highest precedence

None
file_loader Optional[Callable[[], Dict[str, Any]]]

Optional callable that returns a dict (e.g., from INI file)

None

Returns:

Type Description
Config

Config instance with all sources merged

Examples:

Load from INI file with overrides:

from pathlib import Path
from mindtrace.core.config import Config
from mindtrace.core.utils import load_ini_as_dict

def ini_loader():
    return load_ini_as_dict(Path("config.ini"))

config = Config.load(
    file_loader=ini_loader,
    overrides={"MINDTRACE_DIR_PATHS": {"TEMP_DIR": "/custom/tmp"}}
)

Load with Pydantic model defaults:

from pydantic import BaseModel
from mindtrace.core.config import Config

class MySettings(BaseModel):
    API_URL: str = "http://localhost:8000"
    DEBUG: bool = False

config = Config.load(
    defaults=MySettings(),
    overrides={"DEBUG": True}
)

load_json classmethod

load_json(path: str | Path) -> Config

Load configuration from a JSON file with environment variable overrides and secret masking.

This method loads configuration data from a JSON file and applies the same processing as the main Config class: environment variable overrides and automatic masking of secret fields.

Parameters:

Name Type Description Default
path str | Path

Path to the JSON file (string or Path object)

required

Returns:

Type Description
Config

Config instance loaded from the JSON file

Examples:

Load from JSON file:

from mindtrace.core.config import Config

# Load configuration from JSON file
config = Config.load_json("config.json")
print(config.MINDTRACE_DIR_PATHS.TEMP_DIR)

Load with environment overrides:

import os
os.environ["MINDTRACE_DEFAULT_HOST_URLS__SERVICE"] = "http://env-override:8000"

config = Config.load_json("config.json")
# Environment variable will override the value from JSON
print(config.MINDTRACE_DEFAULT_HOST_URLS.SERVICE)  # http://env-override:8000

Note

The JSON file should contain the same structure as expected by Config. Secret fields will be automatically masked when accessed normally.

save_json

save_json(
    path: str | Path, *, reveal_secrets: bool = False, indent: int = 4
) -> None

Save configuration to a JSON file with optional secret revelation.

This method serializes the current configuration to a JSON file. By default, secret fields are masked (shown as **) for security. You can optionally reveal the actual secret values by setting reveal_secrets=True.

Parameters:

Name Type Description Default
path str | Path

Path where to save the JSON file (string or Path object)

required
reveal_secrets bool

If True, writes actual secret values instead of masked ones

False
indent int

JSON indentation level for pretty printing (default: 4)

4

Returns:

Type Description
None

None

Raises:

Type Description
RuntimeError

If file writing or JSON serialization fails

Examples:

Save with masked secrets (default):

config = Config(CoreSettings())
config.save_json("config.json")
# Secret fields will be saved as "********"

Save with revealed secrets:

config.save_json("config.json", reveal_secrets=True)
# Secret fields will be saved with actual values

Save with custom indentation:

config.save_json("config.json", indent=2)

Note

Parent directories are created automatically if they don't exist. Use reveal_secrets=True only when necessary for debugging or migration.

to_revealed_strings

to_revealed_strings() -> Dict[str, Any]

Convert the config to a dictionary with revealed secret values.

clone_with_overrides

clone_with_overrides(*overrides: SettingsLike) -> Config

Return a new Config clone with overrides applied (original remains unchanged).

This method creates a deep copy of the current config and applies the provided overrides without modifying the original configuration. Useful for creating temporary configurations or testing different settings.

Parameters:

Name Type Description Default
*overrides SettingsLike

Configuration overrides as dict, BaseSettings, BaseModel, or lists of these

()

Returns:

Type Description
Config

New Config instance with overrides applied

Examples:

Clone with simple overrides:

original = Config({"API_URL": "http://prod:8000", "DEBUG": False})
cloned = original.clone_with_overrides({"DEBUG": True})

print(original.DEBUG)  # False (unchanged)
print(cloned.DEBUG)   # True (new value)

Clone with nested overrides:

cloned = config.clone_with_overrides({
    "MINDTRACE_DIR_PATHS": {
        "TEMP_DIR": "/tmp/testing",
        "REGISTRY_DIR": "/tmp/test_registry"
    }
})

Clone with multiple overrides:

cloned = config.clone_with_overrides(
    {"DEBUG": True},
    {"API_URL": "http://test:8000"},
    {"LOGGING_LEVEL": "DEBUG"}
)

get_secret

get_secret(*path: str) -> Optional[str]

Retrieve a secret by dotted path components.

This method accesses the real (unmasked) value of secret fields that were defined using pydantic.SecretStr. The secret values are stored internally and can be retrieved using this method.

Parameters:

Name Type Description Default
*path str

Path components to the secret field (e.g., "API_KEYS", "OPENAI")

()

Returns:

Type Description
Optional[str]

The real secret value as string, or None if not found

Examples:

Get OpenAI API key:

config = Config(CoreSettings())
api_key = config.get_secret("MINDTRACE_API_KEYS", "OPENAI")
print(api_key)  # "sk-abc123..." (real value)
Working with custom secret fields:
from pydantic import BaseModel, SecretStr
from mindtrace.core.config import Config

class APIKeys(BaseModel):
    OPENAI: SecretStr
    DISCORD: SecretStr

config = Config(APIKeys(
    OPENAI=SecretStr("sk-abc123"),
    DISCORD=SecretStr("discord-token")
))

# Access masked value
print(config.OPENAI)  # ********

# Get real value
print(config.get_secret("OPENAI"))  # sk-abc123

secret_paths

secret_paths() -> List[str]

Return dotted paths of fields considered secrets.

CoreConfig

CoreConfig(extra_settings: SettingsLike = None)

Bases: Config

Configuration wrapper that automatically includes CoreSettings with environment variable support.

CoreConfig is a convenience class that wraps the base Config class and automatically loads CoreSettings as the default configuration. This includes support for environment variables, .env files, and INI configuration files with automatic path expansion.

Parameters:

Name Type Description Default
extra_settings SettingsLike

Additional configuration overrides (highest precedence)

None
apply_env_overrides

Whether to apply environment variable overrides

required

Examples:

Basic usage with default CoreSettings:

from mindtrace.core.config import CoreConfig

# Loads CoreSettings with env overrides
config = CoreConfig()
print(config.MINDTRACE_DEFAULT_HOST_URLS.SERVICE)

With additional overrides:

config = CoreConfig({
    "MINDTRACE_DIR_PATHS": {
        "TEMP_DIR": "/custom/tmp"
    }
})
# Override takes precedence over CoreSettings defaults

Environment variable overrides:

import os
os.environ["MINDTRACE_DEFAULT_HOST_URLS__SERVICE"] = "http://custom:8000"

config = CoreConfig()
# Environment variable overrides the INI file value
print(config.MINDTRACE_DEFAULT_HOST_URLS.SERVICE)  # http://custom:8000

Note

Environment variables are applied at the CoreSettings level, not at the Config level. Additional overrides passed to CoreConfig take the highest precedence.

Logging

default_formatter

default_formatter(fmt: Optional[str] = None) -> logging.Formatter

Create a logging formatter with a standardized default format.

This function returns a Python logging Formatter instance configured with a default format string that includes timestamp, log level, logger name, and message. If a custom format string is provided, it will be used instead.

Parameters:

Name Type Description Default
fmt Optional[str]

Optional custom format string. If None, uses the default format: "[%(asctime)s] %(levelname)s: %(name)s: %(message)s" - %(asctime)s: Timestamp when the log record was created - %(levelname)s: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) - %(name)s: Name of the logger - %(message)s: The actual log message

None

Returns:

Type Description
Formatter

logging.Formatter: Configured formatter instance ready to use with handlers.

Examples:

Use default format:

formatter = default_formatter()
handler.setFormatter(formatter)
# Output: [2024-01-15 10:30:45,123] INFO: mindtrace.core: Operation completed

Use custom format:

custom_fmt = "%(levelname)s - %(message)s"
formatter = default_formatter(fmt=custom_fmt)
handler.setFormatter(formatter)
# Output: INFO - Operation completed

setup_logger

setup_logger(
    name: str = "mindtrace",
    *,
    log_dir: Optional[Path] = None,
    logger_level: int = logging.DEBUG,
    stream_level: int = logging.ERROR,
    add_stream_handler: bool = True,
    file_level: int = logging.DEBUG,
    file_mode: str = "a",
    add_file_handler: bool = True,
    propagate: bool = False,
    max_bytes: int = 10 * 1024 * 1024,
    backup_count: int = 5,
    use_structlog: Optional[bool] = None,
    structlog_json: Optional[bool] = True,
    structlog_pre_chain: Optional[list] = None,
    structlog_processors: Optional[list] = None,
    structlog_renderer: Optional[object] = None,
    structlog_bind: Optional[object] = None
) -> Logger | structlog.BoundLogger

Configure and initialize logging for Mindtrace components programmatically.

Sets up a rotating file handler and a console handler on the given logger. Log file defaults to ~/.cache/mindtrace/{name}.log.

Parameters:

Name Type Description Default
name str

Logger name, defaults to "mindtrace".

'mindtrace'
log_dir Optional[Path]

Custom directory for log file.

None
logger_level int

Overall logger level.

DEBUG
stream_level int

StreamHandler level (e.g., ERROR).

ERROR
add_stream_handler bool

Whether to add a stream handler.

True
file_level int

FileHandler level (e.g., DEBUG).

DEBUG
file_mode str

Mode for file handler, default is 'a' (append).

'a'
add_file_handler bool

Whether to add a file handler.

True
propagate bool

Whether the logger should propagate messages to ancestor loggers.

False
max_bytes int

Maximum size in bytes before rotating log file.

10 * 1024 * 1024
backup_count int

Number of backup files to retain.

5
use_structlog Optional[bool]

Optional bool. If True, configure and return a structlog BoundLogger.

None
structlog_json Optional[bool]

Optional bool. If True, render JSON; otherwise use console/dev renderer.

True
structlog_pre_chain Optional[list]

Optional list of pre-processors for stdlib log records.

None
structlog_processors Optional[list]

Optional list of processors after pre_chain (before render).

None
structlog_renderer Optional[object]

Optional custom renderer processor. Overrides structlog_json.

None
structlog_bind Optional[object]

Optional dict or callable(name)->dict to bind fields.

None

Returns:

Type Description
Logger | BoundLogger

Logger | structlog.BoundLogger: Configured logger instance.

get_logger

get_logger(
    name: str | None = "mindtrace", use_structlog: bool | None = None, **kwargs
) -> logging.Logger | structlog.BoundLogger

Create or retrieve a named logger instance.

This function wraps Python's built-in logging.getLogger() to provide a standardized logger for Mindtrace components. If the logger with the given name already exists, it returns the existing instance; otherwise, it creates a new one with optional configuration overrides.

Parameters:

Name Type Description Default
name str

The name of the logger. Defaults to "mindtrace".

'mindtrace'
use_structlog bool

Whether to use structured logging. If None, uses config default.

None
**kwargs

Additional keyword arguments to be passed to setup_logger.

{}

Returns:

Type Description
Logger | BoundLogger

logging.Logger | structlog.BoundLogger: A configured logger instance.

Example:

from mindtrace.core.logging.logger import get_logger

logger = get_logger("core.module", stream_level=logging.INFO, propagate=True)
logger.info("Logger configured with custom settings.")

track_operation

track_operation(
    name: str = None,
    timeout: float | None = None,
    logger: Any | None = None,
    logger_name: str | None = None,
    include_args: list[str] | None = None,
    log_level: int = logging.DEBUG,
    include_system_metrics: bool = False,
    system_metrics: list[str] | None = None,
    **context: Any
)

Unified function that works as both context manager and decorator.

This function can be used in two ways: 1. As a context manager: async with track_operation("name") as log: 2. As a decorator: @track_operation("name")

Provides structured logging for operations, automatically logging start, completion, timeout, and errors with duration metrics. Requires structlog to be installed.

Parameters:

Name Type Description Default
name str

The name of the operation being tracked. When used as decorator, defaults to the function name if not provided.

None
timeout float | None

Optional timeout in seconds. If provided, raises asyncio.TimeoutError when exceeded. If FastAPI is available, raises HTTPException(504) instead.

None
logger Any | None

Optional structlog logger instance. If None, creates a new logger.

None
logger_name str | None

Optional logger name. If None, uses "mindtrace.operations.{name}" for context manager or "mindtrace.methods.{name}" for decorator.

None
include_args list[str] | None

List of argument names to include in the log context (decorator only). If None, no arguments are logged. Only works with bound methods (self as first arg).

None
log_level int

Log level for the operation logs. Defaults to logging.DEBUG.

DEBUG
include_system_metrics bool

If True, include system metrics in the log context.

False
system_metrics list[str] | None

Optional list of metric names to include. If None, include all available metrics.

None
**context Any

Additional context fields to bind to the logger for this operation.

{}

Yields (context manager): structlog.BoundLogger: A bound logger with operation context for logging.

Returns (decorator): Callable: The decorated method with automatic logging.

Raises:

Type Description
TimeoutError

If timeout is exceeded and FastAPI is not available.

HTTPException

If timeout is exceeded and FastAPI is available (status_code=504).

Exception

Re-raises any exception that occurs during operation execution.

Examples:

Context manager usage: .. code-block:: python

import asyncio
from mindtrace.core.logging.logger import track_operation

async def fetch_data():
    async with track_operation("fetch_data", user_id="123") as log:
        # Your async operation here
        result = await some_async_operation()
        log.info("Data fetched successfully", records_count=len(result))
        return result

Decorator usage on async function: .. code-block:: python

@track_operation("process_data", batch_id="batch_123", timeout=5.0)
async def process_data(data: list) -> list:
    # Method execution is automatically logged
    return [item.upper() for item in data]

Decorator usage on class method: .. code-block:: python

class DataProcessor:
    def __init__(self):
        self.logger = structlog.get_logger("data_processor")

    @track_operation("process_batch", include_args=["batch_id"])
    async def process_batch(self, batch_id: str, data: list):
        # Logs will include batch_id in context
        return await self._process_data(data)

With timeout: .. code-block:: python

async def fetch_with_timeout():
    try:
        async with track_operation("fetch_data", timeout=30.0, service="api") as log:
            result = await slow_operation()
            return result
    except asyncio.TimeoutError:
        # Operation timed out after 30 seconds
        return None

Types

bounding_box

BoundingBox dataclass

BoundingBox(x: float, y: float, width: float, height: float)

Axis-aligned rectangle in image or world coordinates.

Coordinates follow OpenCV/Pascal VOC convention: (x, y, width, height), where (x, y) is the top-left corner.

to_roi_slices
to_roi_slices() -> Tuple[slice, slice]

Return (rows_slice, cols_slice) for NumPy image indexing: img[rows, cols].

draw_on_pil
draw_on_pil(
    image: Image,
    color: Tuple[int, int, int] = (255, 0, 0),
    width: int = 2,
    fill: Optional[Tuple[int, int, int, int]] = None,
    label: Optional[str] = None,
    label_color: Tuple[int, int, int] = (255, 255, 255),
    label_bg: Tuple[int, int, int] = (255, 0, 0),
    font: Optional[ImageFont] = None,
) -> Image

Draw the bounding box (and optional label) directly onto a PIL Image and return it.

to_corners
to_corners() -> List[Tuple[float, float]]

Return corners as [(x1,y1), (x2,y1), (x2,y2), (x1,y2)] without NumPy.

rotated_rect

RotatedRect dataclass

RotatedRect(
    cx: float, cy: float, width: float, height: float, angle_deg: float = 0.0
)

Rotated rectangle represented by center (cx, cy), size (width, height), and rotation angle (degrees).

Angle follows OpenCV convention in degrees, counter-clockwise, where 0 aligns the rectangle's width along +X axis.

draw_on_pil
draw_on_pil(
    image: Image,
    color: Tuple[int, int, int] = (0, 255, 0),
    width: int = 2,
    fill: Optional[Tuple[int, int, int, int]] = None,
    label: Optional[str] = None,
    label_color: Tuple[int, int, int] = (255, 255, 255),
    label_bg: Tuple[int, int, int] = (0, 128, 0),
    font: Optional[ImageFont] = None,
) -> Image

Draw the rotated rectangle (and optional label) onto a PIL Image and return it.

task_schema

TaskSchema

Bases: BaseModel

A task schema with strongly-typed input and output models.

Utilities

LocalIPError

Bases: NetworkError

Raised when unable to determine local IP address.

NetworkError

Bases: Exception

Base exception for network-related errors.

NoFreePortError

Bases: NetworkError

Raised when no free port is found in the specified range.

PortInUseError

Bases: NetworkError

Raised when a port is already in use.

ServiceTimeoutError

Bases: NetworkError

Raised when waiting for a service times out.

SystemMetricsCollector

SystemMetricsCollector(
    interval: int | None = None, metrics_to_collect: list[str] | None = None
)

Class for collecting various system metrics.

This class allows collection of CPU, memory, disk usage, network I/O, etc. Users can specify which metrics to collect and optionally enable periodic background updates.

Available metrics include
  • "cpu_percent": Overall CPU usage percentage.
  • "per_core_cpu_percent": CPU usage percentage per core.
  • "memory_percent": Memory usage percentage.
  • "disk_usage": Disk usage percentage.
  • "network_io": Network I/O statistics (bytes sent and received).
  • "load_average": System load average (if available).

Example Usage:

from time import sleep
from mindtrace.core.utils import SystemMetricsCollector

with SystemMetricsCollector(interval=3) as collector:
    for _ in range(10):
        print(collector())
        sleep(1)

Alternative (manual stop):

from time import sleep
from mindtrace.core.utils import SystemMetricsCollector

collector = SystemMetricsCollector(interval=3)
try:
    for _ in range(10):
        print(collector())
        sleep(1)
finally:
    collector.stop()

On-demand usage (no background thread):

from mindtrace.core.utils import SystemMetricsCollector

collector = SystemMetricsCollector()  # no interval; collected on demand
print(collector())

Initialize the system metrics collector.

Parameters:

Name Type Description Default
interval int | None

Interval in seconds for periodic metrics collection. If provided, metrics will be updated to a separate cache periodically, instead of being collected on demand. Using a cache in this way can be less resource intensive than collecting metrics on demand. If None, metrics will be collected on demand.

None
metrics_to_collect list[str] | None

List of metrics to collect. If None, all available metrics will be collected.

None

fetch

fetch() -> dict[str, float | list | dict]

Get the current system metrics.

Returns:

Type Description
dict[str, float | list | dict]

A dictionary containing system metrics. If metrics are cached, return them; otherwise, collect new metrics.

stop

stop()

Stop the background collection thread if running.

Prefer using the context manager (with SystemMetricsCollector(...) as collector:) which stops the thread automatically on exit.

check_libs

check_libs(required_libs: str | list[str]) -> list[str]

Check if all required libraries are available.

Parameters:

Name Type Description Default
required_libs str | list[str]

A list of library names to check.

required

Returns:

Type Description
list[str]

A list of missing libraries.

first_not_none

first_not_none(vals: Iterable[T1 | None], default: T2 = None) -> T1 | T2

Returns the first not-None value in the given iterable, else returns the default.

ifnone

ifnone(val: T1 | None, default: T2) -> T1 | T2

Return the given value if it is not None, else return the default.

ifnone_url

ifnone_url(url: str | Url | None, default: str | Url) -> Url

Wraps ifnone to always return a URL.

Parameters:

Name Type Description Default
url str | Url | None

The Url to return. If none, the default value will be returned instead.

required
default str | Url

The default URL to use if url is None.

required

Returns:

Type Description
Url

The Url object.

download_and_extract_tarball

download_and_extract_tarball(
    url: str,
    extract_to: Union[str, Path],
    filename: Optional[str] = None,
    remove_after_extract: bool = True,
) -> Path

Download a tarball (tar.gz, tar.bz2, etc.) from URL and extract it to the specified directory.

Parameters:

Name Type Description Default
url str

URL to download the tarball from

required
extract_to Union[str, Path]

Directory to extract the tarball to

required
filename Optional[str]

Optional filename for the downloaded file (if None, uses URL basename)

None
remove_after_extract bool

Whether to remove the downloaded tarball after extraction

True

Returns:

Type Description
Path

Path to the extracted directory

Raises:

Type Description
Exception

If download or extraction fails

download_and_extract_zip

download_and_extract_zip(
    url: str,
    extract_to: Union[str, Path],
    filename: Optional[str] = None,
    remove_after_extract: bool = True,
) -> Path

Download a ZIP file from URL and extract it to the specified directory.

Parameters:

Name Type Description Default
url str

URL to download the ZIP file from

required
extract_to Union[str, Path]

Directory to extract the ZIP file to

required
filename Optional[str]

Optional filename for the downloaded file (if None, uses URL basename)

None
remove_after_extract bool

Whether to remove the downloaded ZIP file after extraction

True

Returns:

Type Description
Path

Path to the extracted directory

Raises:

Type Description
Exception

If download or extraction fails

instantiate_target

instantiate_target(target: str, **kwargs) -> Any

Instantiates a target object from a string.

The target string should be in the same format as expected from Hydra targets. I.e. 'module_name.class_name'.

Parameters:

Name Type Description Default
target str

A string representing a target object.

required

Example::

from mindtrace.core import instantiate_target

target = 'mindtrace.core.config.Config'
config = instantiate_target(target)

print(type(config))  # <class 'mindtrace.core.config.Config'>

load_ini_as_dict

load_ini_as_dict(ini_path: Path) -> Dict[str, Any]

Load and parse an INI file into a nested dictionary with normalized keys.

  • Section names and keys are uppercased for uniform access
  • Values with leading '~' are expanded to the user home directory
  • Returns an empty dict if the file does not exist

named_lambda

named_lambda(name: str, lambda_func: Callable) -> Callable

Assigns a name to the given lambda function.

This method is useful when passing lambda functions to other functions that require a name attribute. For example, when using the autolog decorator, the wrapped function will be logged according the function name. If the original function is a lambda function, it's name attribute will be set to the generic name ''.

Parameters:

Name Type Description Default
name str

The name to assign to the lambda function.

required
lambda_func Callable

The lambda function to assign the name to.

required

Returns:

Type Description
Callable

The lambda function with the name attribute set to the given name.

Example::

    from mindtrace.core import Mindtrace, named_lambda

    class HyperRunner(Mindtrace):
        def __init__(self):
            super().__init__()

        def run_command(self, command: Callable, data: Any):  # cannot control the name of the command
            return Mindtrace.autolog(command(data))()

    hyper_runner = HyperRunner()
    hyper_runner.run_command(lambda x, y: x + y, data=(1, 2))  # autologs to '<lambda>'
    hyper_runner.run_command(named_lambda("add", lambda x, y: x + y), data=(1, 2))  # autologs to 'add'

check_port_available

check_port_available(host: str, port: int) -> None

Assert that a port is available for binding.

Parameters:

Name Type Description Default
host str

Host address to check.

required
port int

Port number to check.

required

Raises:

Type Description
PortInUseError

If the port is already in use.

get_free_port

get_free_port(
    host: str = "localhost", start_port: int = 8000, end_port: int = 9000
) -> int

Find a free port in the given range.

Parameters:

Name Type Description Default
host str

Host address to check.

'localhost'
start_port int

Starting port number (inclusive).

8000
end_port int

Ending port number (inclusive).

9000

Returns:

Type Description
int

First available port number in the range.

Raises:

Type Description
NoFreePortError

If no free port is found in the range.

get_local_ip

get_local_ip() -> str

Get the local IP address of the machine.

Uses UDP socket connection to determine the local IP address that would be used to reach external networks.

Returns:

Type Description
str

Local IP address string.

Raises:

Type Description
LocalIPError

If unable to determine local IP address.

get_local_ip_safe

get_local_ip_safe(fallback: str = '127.0.0.1') -> str

Get the local IP address with a fallback value.

This is a convenience wrapper around get_local_ip() that returns a fallback value instead of raising an exception.

Parameters:

Name Type Description Default
fallback str

IP address to return if detection fails.

'127.0.0.1'

Returns:

Type Description
str

Local IP address or fallback value.

is_port_available

is_port_available(host: str, port: int) -> bool

Check if a port is available for binding.

Parameters:

Name Type Description Default
host str

Host address to check.

required
port int

Port number to check.

required

Returns:

Type Description
bool

True if port is available, False if port is in use.

wait_for_service

wait_for_service(
    host: str, port: int, timeout: float = 30.0, poll_interval: float = 0.5
) -> None

Wait for a service to become available on the specified host and port.

Note: For services launched via mindtrace.services, prefer using Service.launch(wait_for_launch=True) which provides better integration with the service lifecycle.

Parameters:

Name Type Description Default
host str

Service host address.

required
port int

Service port number.

required
timeout float

Maximum time to wait in seconds.

30.0
poll_interval float

Time between connection attempts in seconds.

0.5

Raises:

Type Description
ServiceTimeoutError

If the service doesn't become available within timeout.

expand_tilde

expand_tilde(obj: Any) -> Any

Recursively expand leading '~' across strings in nested structures.

Supports dict, list/tuple/set, and str. Other types are returned unchanged.

expand_tilde_str

expand_tilde_str(value: str) -> str

Expand leading '~' in a string value; return unchanged if not present.

checks

ifnone

ifnone(val: T1 | None, default: T2) -> T1 | T2

Return the given value if it is not None, else return the default.

first_not_none

first_not_none(vals: Iterable[T1 | None], default: T2 = None) -> T1 | T2

Returns the first not-None value in the given iterable, else returns the default.

ifnone_url

ifnone_url(url: str | Url | None, default: str | Url) -> Url

Wraps ifnone to always return a URL.

Parameters:

Name Type Description Default
url str | Url | None

The Url to return. If none, the default value will be returned instead.

required
default str | Url

The default URL to use if url is None.

required

Returns:

Type Description
Url

The Url object.

check_libs

check_libs(required_libs: str | list[str]) -> list[str]

Check if all required libraries are available.

Parameters:

Name Type Description Default
required_libs str | list[str]

A list of library names to check.

required

Returns:

Type Description
list[str]

A list of missing libraries.

conversions

Utility methods relating to image conversion.

pil_to_ascii

pil_to_ascii(image: Image) -> str

Serialize PIL Image to ascii.

Example
import PIL
from mindtrace.core import pil_to_ascii, ascii_to_pil

image = PIL.Image.open('tests/resources/hopper.png')
ascii_image = pil_to_ascii(image)
decoded_image = ascii_to_pil(ascii_image)

ascii_to_pil

ascii_to_pil(ascii_image: str) -> Image

Convert ascii image to PIL Image.

Example
import PIL
from mindtrace.core import pil_to_ascii, ascii_to_pil

image = PIL.Image.open('tests/resources/hopper.png')
ascii_image = pil_to_ascii(image)
decoded_image = ascii_to_pil(ascii_image)

pil_to_bytes

pil_to_bytes(image: Image) -> bytes

Serialize PIL Image into io.BytesIO stream.

Example
import PIL
from mindtrace.core import pil_to_bytes, bytes_to_pil

image = PIL.Image.open('tests/resources/hopper.png')
bytes_image = pil_to_bytes(image)
decoded_image = bytes_to_pil(bytes_image)

bytes_to_pil

bytes_to_pil(bytes_image: bytes) -> Image

Convert io.BytesIO stream to PIL Image.

Example
import PIL
from mindtrace.core import pil_to_bytes, bytes_to_pil

image = PIL.Image.open('tests/resources/hopper.png')
bytes_image = pil_to_bytes(image)
decoded_image = bytes_to_pil(bytes_image)

pil_to_tensor

pil_to_tensor(image: Image) -> torch.Tensor

Convert PIL Image to Torch Tensor.

Example
from PIL import Image
from mindtrace.core import pil_to_tensor

image = Image.open('tests/resources/hopper.png')
tensor = pil_to_tensor(image)

tensor_to_pil

tensor_to_pil(image: Tensor, mode=None, min_val=None, max_val=None) -> Image

Convert Torch Tensor to PIL Image.

Note that PIL float images must be scaled [0, 1]. It is often the case, however, that torch tensor images may have a different range (e.g. zero mean or [-1, 1]). As such, the input torch tensor will automatically be scaled to fit in the range [0, 1]. If no min / max value is provided, the output range will be identically 0 / 1, respectively. Else you may pass in min / max range values explicitly.

Parameters:

Name Type Description Default
image Tensor

The input image.

required
mode

The mode of the output image. One of {'L', 'RGB', 'RGBA'}.

None
min_val

The minimum value of the input image. If None, it will be inferred from the input image.

None
max_val

The maximum value of the input image. If None, it will be inferred from the input image.

None
Example
from PIL import Image
from mindtrace.core import pil_to_tensor, tensor_to_pil

image = Image.open('tests/resources/hopper.png')
tensor_image = pil_to_tensor(image)
pil_image = tensor_to_pil(tensor_image)

pil_to_ndarray

pil_to_ndarray(image: Image, image_format='RGB') -> np.ndarray

Convert PIL image to numpy ndarray.

If an alpha channel is present, it will automatically be copied over as well.

Parameters:

Name Type Description Default
image Image

The input image.

required
image_format

Determines the number and order of channels in the output image. One of {'L', 'RGB', 'BGR'}.

'RGB'

Returns:

Type Description
ndarray

An np.ndarray image in the specified format.

ndarray_to_pil

ndarray_to_pil(image: ndarray, image_format: str = 'RGB')

Convert numpy ndarray to PIL image.

The input image can either be a float array with values in the range [0, 1], an int array with values in the range [0, 255], or a bool array.

Parameters:

Name Type Description Default
image ndarray

The input image. It should be a numpy array with 1, 3 or 4 channels.

required
image_format str

The format of the input image. One of {'RGB', 'BGR'}

'RGB'

Returns:

Type Description

A PIL image.

Example
from matplotlib import image, pyplot as plt
from mindtrace.core import ndarray_to_pil

ndarray_image = image.imread('tests/resources/hopper.png')
pil_image = ndarray_to_pil(ndarray_image, image_format='RGB')

pil_to_cv2

pil_to_cv2(image: Image) -> np.ndarray

Convert PIL image to cv2 image.

Note that, in addition to cv2 images being numpy arrays, PIL Images follow RGB format while cv2 images follow BGR format.

Parameters:

Name Type Description Default
image Image

The input image.

required

Returns:

Type Description
ndarray

An np.ndarray image in 'BGR' (cv2) format.

Example:

```python
import PIL
from mindtrace.core import pil_to_cv2

pil_image = PIL.Image.open('tests/resources/hopper.png')
cv2_image = pil_to_cv2(pil_image)
```

cv2_to_pil

cv2_to_pil(image: ndarray) -> Image

Convert PIL image to cv2 image.

Note that, in addition to cv2 images being numpy arrays, PIL Images follow RGB format while cv2 images follow BGR format.

Parameters:

Name Type Description Default
image ndarray

The input image. Should be a np.ndarray in 'BGR' (cv2) format.

required

Returns:

Type Description
Image

A PIL image.

Example
import cv2
from mindtrace.core import cv2_to_pil

cv2_image = cv2.imread('tests/resources/hopper.png')
pil_image = cv2_to_pil(cv2_image)

pil_to_base64

pil_to_base64(image: Image) -> str

Convert a PIL Image to a base64-encoded string.

Parameters:

Name Type Description Default
image Image

The image to be converted.

required

Returns:

Name Type Description
str str

The base64-encoded string representing the image.

Example
from PIL import Image
from mindtrace.core import pil_to_base64

image = Image.open("path_to_image.png")
encoded_image = pil_to_base64(image)

base64_to_pil

base64_to_pil(base64_str: str) -> Image

Convert a base64-encoded string back to a PIL Image.

Parameters:

Name Type Description Default
base64_str str

The base64-encoded string.

required

Returns:

Type Description
Image

PIL.Image: The decoded image object.

Example
from mindtrace.core import base64_to_pil

base64_str = "..."  # Base64 string obtained earlier
image = base64_to_pil(base64_str)
image.show()  # Display the decoded image

pil_to_discord_file

pil_to_discord_file(image: Image, filename: str = 'image.png') -> File

Convert a PIL Image to a Discord File object for uploading.

Parameters:

Name Type Description Default
image Image

The PIL image to be sent.

required
filename str

The filename for the image file (default is "image.png").

'image.png'

Returns:

Type Description
File

discord.File: A Discord file object that can be sent in a message.

Example
from PIL import Image
from mindtrace.core import pil_to_discord_file
from discord import File

image = Image.open("path_to_image.png")
discord_file = pil_to_discord_file(image)
await message.reply(file=discord_file)

discord_file_to_pil async

discord_file_to_pil(attachment: Attachment) -> Image

Convert a Discord attachment to a PIL Image.

Parameters:

Name Type Description Default
attachment Attachment

The Discord file attachment to convert.

required

Returns:

Type Description
Image

The resulting PIL Image.

Example
@commands.command(name="process_images")
async def example_command(self, ctx):
    attachments = ctx.message.attachments
    if not attachments:
        await ctx.send("No attachments found in the message.")
        return

    # Process each attachment in the message
    for i, attachment in enumerate(attachments, start=1):
        if attachment.filename.endswith(('png', 'jpg', 'jpeg')):
            image = await discord_file_to_pil(attachment)
            # Do something with the image, e.g., send it back or process it
            await ctx.send(f"Attachment {i} processed as image.")
        else:
            await ctx.send(f"Attachment {i} is not a valid image file.")

tensor_to_ndarray

tensor_to_ndarray(tensor: Tensor) -> np.ndarray

Convert a PyTorch tensor to a numpy array.

Handles both single images (3D tensors) and batches (4D tensors), converting them to the numpy format.

Parameters:

Name Type Description Default
tensor Tensor

PyTorch tensor in format [C,H,W] or [B,C,H,W]

required
Returns

For batched tensors: a list of numpy arrays in HWC format For single image: a numpy array in HWC format

required

ndarray_to_tensor

ndarray_to_tensor(image: ndarray) -> torch.Tensor

Convert a numpy array to a PyTorch tensor.

Parameters:

Name Type Description Default
image ndarray

The numpy array to convert.

required

Returns:

Type Description
Tensor

The PyTorch tensor.

download

download_and_extract_zip

download_and_extract_zip(
    url: str,
    extract_to: Union[str, Path],
    filename: Optional[str] = None,
    remove_after_extract: bool = True,
) -> Path

Download a ZIP file from URL and extract it to the specified directory.

Parameters:

Name Type Description Default
url str

URL to download the ZIP file from

required
extract_to Union[str, Path]

Directory to extract the ZIP file to

required
filename Optional[str]

Optional filename for the downloaded file (if None, uses URL basename)

None
remove_after_extract bool

Whether to remove the downloaded ZIP file after extraction

True

Returns:

Type Description
Path

Path to the extracted directory

Raises:

Type Description
Exception

If download or extraction fails

download_and_extract_tarball

download_and_extract_tarball(
    url: str,
    extract_to: Union[str, Path],
    filename: Optional[str] = None,
    remove_after_extract: bool = True,
) -> Path

Download a tarball (tar.gz, tar.bz2, etc.) from URL and extract it to the specified directory.

Parameters:

Name Type Description Default
url str

URL to download the tarball from

required
extract_to Union[str, Path]

Directory to extract the tarball to

required
filename Optional[str]

Optional filename for the downloaded file (if None, uses URL basename)

None
remove_after_extract bool

Whether to remove the downloaded tarball after extraction

True

Returns:

Type Description
Path

Path to the extracted directory

Raises:

Type Description
Exception

If download or extraction fails

dynamic

Utility methods relating to dynamically generating objects.

dynamic_instantiation

dynamic_instantiation(module_name: str, class_name: str, **kwargs) -> Any

Dynamically instantiates a class from a module.

instantiate_target

instantiate_target(target: str, **kwargs) -> Any

Instantiates a target object from a string.

The target string should be in the same format as expected from Hydra targets. I.e. 'module_name.class_name'.

Parameters:

Name Type Description Default
target str

A string representing a target object.

required

Example::

from mindtrace.core import instantiate_target

target = 'mindtrace.core.config.Config'
config = instantiate_target(target)

print(type(config))  # <class 'mindtrace.core.config.Config'>

get_class

get_class(target: str) -> type

Gets a class from a module path string without instantiating it.

The target string should be in the same format as expected from Hydra targets. I.e. 'module_name.class_name'.

Parameters:

Name Type Description Default
target str

A string representing a target class path.

required

Returns:

Type Description
type

The class object.

Example::

from mindtrace.core import get_class

target = 'mindtrace.core.config.Config'
config_class = get_class(target)

print(config_class)  # <class 'mindtrace.core.config.Config'>

hashing

compute_dir_hash

compute_dir_hash(directory_path: str | Path, chunk_size: int = 2 ** 20) -> str

Compute SHA256 hash of directory contents.

Hash is deterministic: files are sorted by path, then each file's content is hashed and combined. This ensures the same directory always produces the same hash.

Parameters:

Name Type Description Default
directory_path str | Path

Path to the directory to hash

required
chunk_size int

Size of the chunks (in bytes) to read from the file

2 ** 20

Returns: Hexadecimal SHA256 hash string

ini

load_ini_as_dict

load_ini_as_dict(ini_path: Path) -> Dict[str, Any]

Load and parse an INI file into a nested dictionary with normalized keys.

  • Section names and keys are uppercased for uniform access
  • Values with leading '~' are expanded to the user home directory
  • Returns an empty dict if the file does not exist

lambdas

named_lambda

named_lambda(name: str, lambda_func: Callable) -> Callable

Assigns a name to the given lambda function.

This method is useful when passing lambda functions to other functions that require a name attribute. For example, when using the autolog decorator, the wrapped function will be logged according the function name. If the original function is a lambda function, it's name attribute will be set to the generic name ''.

Parameters:

Name Type Description Default
name str

The name to assign to the lambda function.

required
lambda_func Callable

The lambda function to assign the name to.

required

Returns:

Type Description
Callable

The lambda function with the name attribute set to the given name.

Example::

    from mindtrace.core import Mindtrace, named_lambda

    class HyperRunner(Mindtrace):
        def __init__(self):
            super().__init__()

        def run_command(self, command: Callable, data: Any):  # cannot control the name of the command
            return Mindtrace.autolog(command(data))()

    hyper_runner = HyperRunner()
    hyper_runner.run_command(lambda x, y: x + y, data=(1, 2))  # autologs to '<lambda>'
    hyper_runner.run_command(named_lambda("add", lambda x, y: x + y), data=(1, 2))  # autologs to 'add'

network

Network utilities for port management and service connectivity.

This module provides exception-based network utilities for checking port availability, finding free ports, waiting for services, and getting local IP addresses.

All functions raise exceptions on errors rather than returning sentinel values, forcing callers to handle error conditions explicitly.

NetworkError

Bases: Exception

Base exception for network-related errors.

PortInUseError

Bases: NetworkError

Raised when a port is already in use.

NoFreePortError

Bases: NetworkError

Raised when no free port is found in the specified range.

ServiceTimeoutError

Bases: NetworkError

Raised when waiting for a service times out.

LocalIPError

Bases: NetworkError

Raised when unable to determine local IP address.

is_port_available

is_port_available(host: str, port: int) -> bool

Check if a port is available for binding.

Parameters:

Name Type Description Default
host str

Host address to check.

required
port int

Port number to check.

required

Returns:

Type Description
bool

True if port is available, False if port is in use.

check_port_available

check_port_available(host: str, port: int) -> None

Assert that a port is available for binding.

Parameters:

Name Type Description Default
host str

Host address to check.

required
port int

Port number to check.

required

Raises:

Type Description
PortInUseError

If the port is already in use.

get_free_port

get_free_port(
    host: str = "localhost", start_port: int = 8000, end_port: int = 9000
) -> int

Find a free port in the given range.

Parameters:

Name Type Description Default
host str

Host address to check.

'localhost'
start_port int

Starting port number (inclusive).

8000
end_port int

Ending port number (inclusive).

9000

Returns:

Type Description
int

First available port number in the range.

Raises:

Type Description
NoFreePortError

If no free port is found in the range.

wait_for_service

wait_for_service(
    host: str, port: int, timeout: float = 30.0, poll_interval: float = 0.5
) -> None

Wait for a service to become available on the specified host and port.

Note: For services launched via mindtrace.services, prefer using Service.launch(wait_for_launch=True) which provides better integration with the service lifecycle.

Parameters:

Name Type Description Default
host str

Service host address.

required
port int

Service port number.

required
timeout float

Maximum time to wait in seconds.

30.0
poll_interval float

Time between connection attempts in seconds.

0.5

Raises:

Type Description
ServiceTimeoutError

If the service doesn't become available within timeout.

get_local_ip

get_local_ip() -> str

Get the local IP address of the machine.

Uses UDP socket connection to determine the local IP address that would be used to reach external networks.

Returns:

Type Description
str

Local IP address string.

Raises:

Type Description
LocalIPError

If unable to determine local IP address.

get_local_ip_safe

get_local_ip_safe(fallback: str = '127.0.0.1') -> str

Get the local IP address with a fallback value.

This is a convenience wrapper around get_local_ip() that returns a fallback value instead of raising an exception.

Parameters:

Name Type Description Default
fallback str

IP address to return if detection fails.

'127.0.0.1'

Returns:

Type Description
str

Local IP address or fallback value.

paths

expand_tilde_str

expand_tilde_str(value: str) -> str

Expand leading '~' in a string value; return unchanged if not present.

expand_tilde

expand_tilde(obj: Any) -> Any

Recursively expand leading '~' across strings in nested structures.

Supports dict, list/tuple/set, and str. Other types are returned unchanged.

system_metrics_collector

SystemMetricsCollector

SystemMetricsCollector(
    interval: int | None = None, metrics_to_collect: list[str] | None = None
)

Class for collecting various system metrics.

This class allows collection of CPU, memory, disk usage, network I/O, etc. Users can specify which metrics to collect and optionally enable periodic background updates.

Available metrics include
  • "cpu_percent": Overall CPU usage percentage.
  • "per_core_cpu_percent": CPU usage percentage per core.
  • "memory_percent": Memory usage percentage.
  • "disk_usage": Disk usage percentage.
  • "network_io": Network I/O statistics (bytes sent and received).
  • "load_average": System load average (if available).

Example Usage:

from time import sleep
from mindtrace.core.utils import SystemMetricsCollector

with SystemMetricsCollector(interval=3) as collector:
    for _ in range(10):
        print(collector())
        sleep(1)

Alternative (manual stop):

from time import sleep
from mindtrace.core.utils import SystemMetricsCollector

collector = SystemMetricsCollector(interval=3)
try:
    for _ in range(10):
        print(collector())
        sleep(1)
finally:
    collector.stop()

On-demand usage (no background thread):

from mindtrace.core.utils import SystemMetricsCollector

collector = SystemMetricsCollector()  # no interval; collected on demand
print(collector())

Initialize the system metrics collector.

Parameters:

Name Type Description Default
interval int | None

Interval in seconds for periodic metrics collection. If provided, metrics will be updated to a separate cache periodically, instead of being collected on demand. Using a cache in this way can be less resource intensive than collecting metrics on demand. If None, metrics will be collected on demand.

None
metrics_to_collect list[str] | None

List of metrics to collect. If None, all available metrics will be collected.

None
fetch
fetch() -> dict[str, float | list | dict]

Get the current system metrics.

Returns:

Type Description
dict[str, float | list | dict]

A dictionary containing system metrics. If metrics are cached, return them; otherwise, collect new metrics.

stop
stop()

Stop the background collection thread if running.

Prefer using the context manager (with SystemMetricsCollector(...) as collector:) which stops the thread automatically on exit.

timers

Utility class for simple Timer and TimerCollection classes.

Timer

Timer()

Utility timer class.

This class can be used to time operations. It can be started, stopped, and reset. The duration of the timer can be retrieved at any time.

Usage
import time
from mindtrace.core import Timer

timer = Timer()
timer.start()
time.sleep(1)
timer.stop()
print(f'The timer ran for {timer.duration()} seconds.')  # The timer ran for 1.0000000000000002 seconds.
timer.reset()
timer.start()
time.sleep(2)
timer.stop()
print(f'The timer ran for {timer.duration()} seconds.')  # The timer ran for 2.0000000000000004 seconds.
start
start()

Start the timer.

stop
stop()

Stop the timer.

duration
duration() -> float

Get the duration of the timer.

reset
reset()

Reset the timer.

restart
restart()

Reset and start the timer.

TimerContext

TimerContext(timer_collection: TimerCollection, name: str)

Context manager for individual timers in a TimerCollection.

TimerCollection

TimerCollection()

Utility class for timing multiple operations.

This class keeps a collection of named timers. Each timer can be started, stopped, and reset. The duration of each timer can be retrieved at any time. If a timer is stopped and restarted, the duration will be added to the previous duration. The timers can be reset individually, or all at once.

Usage
import time
from mindtrace.core import TimerCollection

tc = TimerCollection()
tc.start('Timer 1')
tc.start('Timer 2')
time.sleep(1)
tc.stop('Timer 1')
time.sleep(1)
tc.stop('Timer 2')
tc.start('Timer 3')
time.sleep(1)
tc.reset('Timer 1')
print(tc)
    # Timer 1: 0.000s
    # Timer 2: 2.000s
    # Timer 3: 1.000s
tc.reset_all()
print(tc)
    # Timer 1: 0.000s
    # Timer 2: 0.000s
    # Timer 3: 0.000s

Context Manager Usage:

import time
from mindtrace.core import TimerCollection

tc = TimerCollection()
with tc.start('Timer 1'):
    with tc.start('Timer 2'):
        time.sleep(1)
    # stops "Timer 2"
    with tc.start('Timer 3'):
        time.sleep(2)
    # stops "Timer 3"
# stops "Timer 1"

print(tc)
    # Timer 1: 3.000s
    # Timer 2: 1.000s
    # Timer 3: 2.000s

add_timer
add_timer(name: str)

Add a timer with the given name. If the timer already exists, it will be replaced.

start
start(name: str)

Start the timer with the given name. If the timer does not exist, it will be created.

stop
stop(name: str)

Stop the timer with the given name.

Raises:

Type Description
KeyError

If the timer with the given name does not exist.

duration
duration(name: str) -> float

Get the duration of the timer with the given name.

Raises:

Type Description
KeyError

If the timer with the given name does not exist.

reset
reset(name: str)

Reset the timer with the given name.

Raises:

Type Description
KeyError

If the timer with the given name does not exist.

restart
restart(name: str)

Reset and start the timer with the given name.

Raises:

Type Description
KeyError

If the timer with the given name does not exist.

reset_all
reset_all()

Reset all timers.

names
names()

Get the names of all timers.

Timeout

Timeout(
    timeout: float = 60.0,
    retry_delay: float = 1.0,
    exceptions: tuple[Type[Exception], ...] = (Exception,),
    progress_bar: bool = False,
    desc: str | None = None,
)

Utility for adding a timeout to a given method.

The given method will be run and rerun until an exception is not raised, or the timeout period is reached.

If the method raises an exception that is in the exceptions tuple, that exception will be caught and ignored. After a retry_delay, the method will be run again. This process will continue until the method runs without raising an exception, or the timeout period is passed.

If the timeout is reached, a TimeoutError will be raised. If the method ever raises an exception that is not in the exceptions tuple, the timeout process will stop and that exception will be reraised.

Parameters:

Name Type Description Default
timeout float

The maximum time in seconds that the method can run before a TimeoutError is raised.

60.0
retry_delay float

The time in seconds to wait between attempts to run the method.

1.0
exceptions tuple[Type[Exception], ...]

A tuple of exceptions that will be caught and ignored. By default, all exceptions are caught.

(Exception,)
progress_bar bool

A boolean indicating whether to display a progress bar while waiting for the timeout.

False
desc str | None

A description to display in the progress bar.

None

Returns:

Type Description

The result of the given method.

Raises:

Type Description
TimeoutError

If the timeout is reached.

Exception

Any raised exception not in the exceptions tuple will be reraised.

Example— Running Timeout Manually:

import requests
from urllib3.util.url import parse_url, Url
from mindtrace.core import Timeout
from mindtrace.services import Service

def get_server_status(url: Url):
    # The following request may fail for two categories of reasons:
    #   1. The server has not launched yet: Will raise a ConnectionError, we should retry.
    #   2. Any other reason: Will raise some other exception, we should break out and reraise it.
    # Both cases will be raised to the Timeout class. We will tell the Timeout object to ignore ConnectionError.
    response = requests.request("POST", str(url) + "status")

    if response.status_code == 200:
        return json.loads(response.content)["status"]  # Server is up and responding
    else:
        raise HTTPException(response.status_code, response.content)  # Request completed but something is wrong

url = parse_url("http://localhost:8080/")
timeout = Timeout(timeout=60.0, exceptions=(ConnectionRefusedError, requests.exceptions.ConnectionError))

Service.launch(url)
status = timeout.run(get_server_status, url)  # Will wait up to 60 seconds for the server to launch.
print(f"Server status: {status}")

Example— Using Timeout as a Decorator:

import requests
from urllib3.util.url import parse_url, Url
from mindtrace.core import Timeout
from mindtrace.services import Service

@Timeout(timeout=60.0, exceptions=(ConnectionRefusedError, requests.exceptions.ConnectionError))
def get_server_status(url: Url):
    response = requests.request("POST", str(url) + "status")
    if response.status_code == 200:
        return json.loads(response.content)["status"]
    else:
        raise HTTPException(response.status_code, response.content)

url = parse_url("http://localhost:8080/")
Service.launch(url)

try:
    status = get_server_status(url)  # Will wait up to 60 seconds for the server to launch.
except TimeoutError as e:
    print(f"The server did not respond within the timeout period: {e}")  # Timeout of 60 seconds reached.
except Exception as e:  # Guaranteed not to be one of the given exceptions in the exceptions tuple.
    print(f"An unexpected error occurred: {e}")
else:
    print(f"Server status: {status}")

run
run(func, *args, **kwargs)

Run the given function with the given args and kwargs.