Jobs Package API Reference
LocalClient
LocalClient(
client_dir: str | Path | None = None,
broker_id: str | None = None,
backend: Registry | None = None,
)
Bases: OrchestratorBackend
A registry-backed local job client.
The client maintains a registry of declared queues and a store for job results. Queues are stored in a registry. Job results can be stored to a separate internal registry as well.
Initialize the LocalClient.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client_dir
|
str | Path | None
|
The directory to store the client. If None, uses the default from config. |
None
|
broker_id
|
str | None
|
The ID of the broker. |
None
|
backend
|
Registry | None
|
The backend to use for storage. If None, uses the default from config. |
None
|
declare_queue
Declare a queue of type 'fifo', 'stack', or 'priority'.
publish
Publish a message (as a pydantic model) to the specified queue. If the target queue is a priority queue, accepts an extra 'priority' parameter.
receive_message
Retrieve a message from the specified queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue_name
|
str
|
The name of the queue to receive a message from. |
required |
**kwargs
|
Additional parameters passed to the queue instance. |
{}
|
Returns:
| Type | Description |
|---|---|
Optional[dict]
|
The message as a dict or None if queue is empty. |
clean_queue
Remove all messages from the specified queue.
count_queue_messages
Return the number of messages in the specified queue.
store_job_result
Save the job result (JSON-serializable) keyed by job_id.
LocalConsumerBackend
LocalConsumerBackend(
queue_name: str,
consumer_frontend,
orchestrator: LocalClient,
poll_timeout: float = 1,
)
Bases: ConsumerBackendBase
Local in-memory consumer backend.
consume
consume(
num_messages: int = 0,
*,
queues: str | list[str] | None = None,
block: bool = True,
**kwargs
) -> None
Consume messages from the local queue(s).
LocalQueue
LocalPriorityQueue
LocalStack
RabbitMQClient
RabbitMQClient(
host: str | None = None,
port: int | None = None,
username: str | None = None,
password: str | None = None,
)
Bases: OrchestratorBackend
Initialize the RabbitMQ client with connection parameters. Args: host: RabbitMQ server hostname. port: RabbitMQ server port. username: Username for RabbitMQ authentication. password: Password for RabbitMQ authentication.
declare_exchange
declare_exchange(
*,
exchange: str,
exchange_type: str = "direct",
durable: bool = True,
auto_delete: bool = False,
**kwargs
)
Declare a RabbitMQ exchange. Args: exchange: Name of the exchange to declare. exchange_type: Type of the exchange (e.g., 'direct', 'topic', 'fanout'). durable: Make the exchange durable. auto_delete: Automatically delete the exchange when no queues are bound.
declare_queue
Declare a RabbitMQ queue. Args: queue: Name of the queue to declare. exchange: Name of the exchange to bind the queue to. durable: Make the queue durable. exclusive: Make the queue exclusive to the connection. auto_delete: Automatically delete the queue when no consumers are connected. routing_key: Routing key for binding the queue to the exchange. force: Force exchange creation if it doesn't exist. max_priority: Maximum priority for priority queue (0-255).
publish
Publish a message to the specified exchange using RabbitMQ. Args: queue_name: The queue name to use as default routing key. message: A Pydantic BaseModel payload. exchange: The RabbitMQ exchange to use (from kwargs). routing_key: The routing key to use (from kwargs, defaults to queue_name). durable: Messages that are not durable are discarded if they cannot be routed to an existing consumer (from kwargs). delivery_mode: Use DeliveryMode.Persistent to save messages to disk (from kwargs). mandatory: If True, unroutable messages are returned (from kwargs). Returns: str: The generated job ID for the message.
clean_queue
Remove all messages from a queue.
count_exchanges
Get the number of exchanges in the RabbitMQ server. Args: exchange: Name of the exchange to check.
RabbitMQConsumerBackend
RabbitMQConsumerBackend(
queue_name: str,
consumer_frontend,
prefetch_count: int = 1,
auto_ack: bool = False,
durable: bool = True,
host: str | None = None,
port: int | None = None,
username: str | None = None,
password: str | None = None,
)
Bases: ConsumerBackendBase
RabbitMQ consumer backend with improved consumption logic.
consume
consume(
num_messages: int = 0,
*,
queues: str | list[str] | None = None,
block: bool = True,
**kwargs
) -> None
Consume messages from RabbitMQ queue(s) with robust error handling.
process_message
Process a single message and return success status.
consume_until_empty
consume_until_empty(
*, queues: str | list[str] | None = None, block: bool = True, **kwargs
) -> None
Consume messages from the queue(s) until empty.
receive_message
Retrieve a message from a specified RabbitMQ queue. This method uses RabbitMQ's basic_get method to fetch a message. It supports blocking behavior by polling until a message is available or the timeout is reached. Args: queue_name: The name of the queue from which to receive the message. block: Whether to block until a message is available. timeout: Maximum time in seconds to block if no message is available. auto_ack: Whether to automatically acknowledge the message upon retrieval. **kwargs: Additional keyword arguments to pass to basic_get (if any). Returns: dict: The message content as a dictionary, or None if no message is available.
RedisClient
Bases: OrchestratorBackend
Initialize the Redis client and connect to the Redis server. Args: host: Redis server hostname. port: Redis server port. db: Redis database number.
declare_queue
Declare a Redis-backed queue of type 'fifo', 'stack', or 'priority'.
delete_queue
Delete a declared queue. Uses distributed locking and transactions to remove the queue from the centralized metadata, and publishes an event to notify other clients.
publish
Publish a message (a pydantic model) to the specified Redis queue.
clean_queue
Clean (purge) a specified Redis queue by deleting its underlying key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue_name
|
str
|
The name of the declared queue to be cleaned. |
required |
move_to_dlq
Move a failed message to a dead letter queue
RedisConsumerBackend
RedisConsumerBackend(
queue_name: str,
consumer_frontend,
host: str,
port: int,
db: int,
poll_timeout: int = 5,
)
Bases: ConsumerBackendBase
Redis consumer backend with blocking operations.
consume
consume(
num_messages: int = 0,
*,
queues: str | list[str] | None = None,
block: bool = True,
**kwargs
) -> None
Consume messages from Redis queue(s).
consume_until_empty
consume_until_empty(
*, queues: str | list[str] | None = None, block: bool = True, **kwargs
) -> None
Consume messages from the queue(s) until empty.
set_poll_timeout
Set the polling timeout for Redis operations.
Job
Bases: BaseModel
A job instance ready for execution - system routes based on schema_name.
job_from_schema
Create a Job from a JobSchema and input data.
This function automatically adds metadata like job ID and creation timestamp. Args: schema: The JobSchema to use for the job input_data: The input data for the job Returns: Job: A complete Job instance ready for submission
base
connection_base
BrokerConnectionBase
consumer_base
ConsumerBackendBase
Bases: MindtraceABC
Base class for consumer backends that handle message consumption.
abstractmethod
Consume messages from the queue and process them.
abstractmethod
Consume messages until the queue is empty and process them.
orchestrator_backend
OrchestratorBackend
Bases: MindtraceABC
Abstract base class for orchestrator backends.
Defines the interface that all backend implementations must follow for queue management operations.
Create a consumer backend for the given schema and consumer frontend.
abstractmethod
Declare a queue
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue_name
|
str
|
Name of the queue to declare |
required |
abstractmethod
Publish a message to the specified queue
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue_name
|
str
|
Name of the queue to publish to |
required |
message
|
BaseModel
|
Pydantic model to publish |
required |
abstractmethod
Remove all messages from the specified queue
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue_name
|
str
|
Name of the queue to clean |
required |
abstractmethod
Delete the specified queue
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue_name
|
str
|
Name of the queue to delete |
required |
abstractmethod
Count the number of messages in the specified queue
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue_name
|
str
|
Name of the queue to count |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of messages in the queue |
abstractmethod
Move a failed message to a dead letter queue
Declare an exchange. Only implemented in RabbitMQ backend.
local
client
LocalClient
LocalClient(
client_dir: str | Path | None = None,
broker_id: str | None = None,
backend: Registry | None = None,
)
Bases: OrchestratorBackend
A registry-backed local job client.
The client maintains a registry of declared queues and a store for job results. Queues are stored in a registry. Job results can be stored to a separate internal registry as well.
Initialize the LocalClient.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client_dir
|
str | Path | None
|
The directory to store the client. If None, uses the default from config. |
None
|
broker_id
|
str | None
|
The ID of the broker. |
None
|
backend
|
Registry | None
|
The backend to use for storage. If None, uses the default from config. |
None
|
Declare a queue of type 'fifo', 'stack', or 'priority'.
Publish a message (as a pydantic model) to the specified queue. If the target queue is a priority queue, accepts an extra 'priority' parameter.
Retrieve a message from the specified queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue_name
|
str
|
The name of the queue to receive a message from. |
required |
**kwargs
|
Additional parameters passed to the queue instance. |
{}
|
Returns:
| Type | Description |
|---|---|
Optional[dict]
|
The message as a dict or None if queue is empty. |
Remove all messages from the specified queue.
Return the number of messages in the specified queue.
Save the job result (JSON-serializable) keyed by job_id.
consumer_backend
LocalConsumerBackend
LocalConsumerBackend(
queue_name: str,
consumer_frontend,
orchestrator: LocalClient,
poll_timeout: float = 1,
)
Bases: ConsumerBackendBase
Local in-memory consumer backend.
consume(
num_messages: int = 0,
*,
queues: str | list[str] | None = None,
block: bool = True,
**kwargs
) -> None
Consume messages from the local queue(s).
fifo_queue
LocalQueue
priority_queue
LocalPriorityQueue
PriorityQueueArchiver
stack
LocalStack
rabbitmq
client
RabbitMQClient
RabbitMQClient(
host: str | None = None,
port: int | None = None,
username: str | None = None,
password: str | None = None,
)
Bases: OrchestratorBackend
Initialize the RabbitMQ client with connection parameters. Args: host: RabbitMQ server hostname. port: RabbitMQ server port. username: Username for RabbitMQ authentication. password: Password for RabbitMQ authentication.
declare_exchange(
*,
exchange: str,
exchange_type: str = "direct",
durable: bool = True,
auto_delete: bool = False,
**kwargs
)
Declare a RabbitMQ exchange. Args: exchange: Name of the exchange to declare. exchange_type: Type of the exchange (e.g., 'direct', 'topic', 'fanout'). durable: Make the exchange durable. auto_delete: Automatically delete the exchange when no queues are bound.
Declare a RabbitMQ queue. Args: queue: Name of the queue to declare. exchange: Name of the exchange to bind the queue to. durable: Make the queue durable. exclusive: Make the queue exclusive to the connection. auto_delete: Automatically delete the queue when no consumers are connected. routing_key: Routing key for binding the queue to the exchange. force: Force exchange creation if it doesn't exist. max_priority: Maximum priority for priority queue (0-255).
Publish a message to the specified exchange using RabbitMQ. Args: queue_name: The queue name to use as default routing key. message: A Pydantic BaseModel payload. exchange: The RabbitMQ exchange to use (from kwargs). routing_key: The routing key to use (from kwargs, defaults to queue_name). durable: Messages that are not durable are discarded if they cannot be routed to an existing consumer (from kwargs). delivery_mode: Use DeliveryMode.Persistent to save messages to disk (from kwargs). mandatory: If True, unroutable messages are returned (from kwargs). Returns: str: The generated job ID for the message.
Remove all messages from a queue.
Get the number of exchanges in the RabbitMQ server. Args: exchange: Name of the exchange to check.
connection
RabbitMQConnection
RabbitMQConnection(
host: str | None = None,
port: int | None = None,
username: str | None = None,
password: str | None = None,
)
Bases: BrokerConnectionBase
Singleton class for RabbitMQ connection. The use of a singleton class ensures that only one connection is established throughout an application.
Initialize the RabbitMQ connection. Args: host: The host address of the RabbitMQ server. port: The port number of the RabbitMQ server. username: The username for the RabbitMQ server. password: The password for the RabbitMQ server.
consumer_backend
RabbitMQConsumerBackend
RabbitMQConsumerBackend(
queue_name: str,
consumer_frontend,
prefetch_count: int = 1,
auto_ack: bool = False,
durable: bool = True,
host: str | None = None,
port: int | None = None,
username: str | None = None,
password: str | None = None,
)
Bases: ConsumerBackendBase
RabbitMQ consumer backend with improved consumption logic.
consume(
num_messages: int = 0,
*,
queues: str | list[str] | None = None,
block: bool = True,
**kwargs
) -> None
Consume messages from RabbitMQ queue(s) with robust error handling.
Process a single message and return success status.
consume_until_empty(
*, queues: str | list[str] | None = None, block: bool = True, **kwargs
) -> None
Consume messages from the queue(s) until empty.
Retrieve a message from a specified RabbitMQ queue. This method uses RabbitMQ's basic_get method to fetch a message. It supports blocking behavior by polling until a message is available or the timeout is reached. Args: queue_name: The name of the queue from which to receive the message. block: Whether to block until a message is available. timeout: Maximum time in seconds to block if no message is available. auto_ack: Whether to automatically acknowledge the message upon retrieval. **kwargs: Additional keyword arguments to pass to basic_get (if any). Returns: dict: The message content as a dictionary, or None if no message is available.
redis
client
RedisClient
Bases: OrchestratorBackend
Initialize the Redis client and connect to the Redis server. Args: host: Redis server hostname. port: Redis server port. db: Redis database number.
Declare a Redis-backed queue of type 'fifo', 'stack', or 'priority'.
Delete a declared queue. Uses distributed locking and transactions to remove the queue from the centralized metadata, and publishes an event to notify other clients.
Publish a message (a pydantic model) to the specified Redis queue.
Clean (purge) a specified Redis queue by deleting its underlying key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue_name
|
str
|
The name of the declared queue to be cleaned. |
required |
Move a failed message to a dead letter queue
connection
RedisConnection
RedisConnection(
host: str | None = None,
port: int | None = None,
db: int | None = None,
password: str | None = None,
socket_timeout: float | None = None,
socket_connect_timeout: float | None = None,
)
Bases: BrokerConnectionBase
Initialize the Redis connection. Args: host: The Redis server host address. port: The Redis server port. db: The Redis database number. password: The password for the Redis server (if any). socket_timeout: Timeout for socket operations (in seconds). socket_connect_timeout: Timeout for socket connect (in seconds).
class-attribute
instance-attribute
Singleton class for Redis connection. This class establishes and maintains a connection to the Redis server. It uses a retry loop and a PING command to verify connectivity.
Return True if the connection to Redis is active (verified via PING).
consumer_backend
RedisConsumerBackend
RedisConsumerBackend(
queue_name: str,
consumer_frontend,
host: str,
port: int,
db: int,
poll_timeout: int = 5,
)
Bases: ConsumerBackendBase
Redis consumer backend with blocking operations.
consume(
num_messages: int = 0,
*,
queues: str | list[str] | None = None,
block: bool = True,
**kwargs
) -> None
Consume messages from Redis queue(s).
consume_until_empty(
*, queues: str | list[str] | None = None, block: bool = True, **kwargs
) -> None
Consume messages from the queue(s) until empty.
Set the polling timeout for Redis operations.
fifo_queue
RedisQueue
A FIFO (first-in, first-out) message queue backed by Redis.
This class uses a Redis list to store serialized messages. The put method pushes items to the tail of the list,
while the get method pops from the head. Blocking retrieval is implemented using Redis' BLPOP command.
Initialize a RedisQueue object. Args: name: Name of the queue. namespace: Namespace prefix for the Redis key. redis_kwargs: Additional keyword arguments for redis.Redis.
Remove and return an item from the queue. Args: block: If True, block until an item is available. timeout: Maximum time to block in seconds (if block=True). Raises: queue.Empty: If no item is available (in non-blocking mode or if the timeout expires).
priority
RedisPriorityQueue
A priority message queue backed by Redis. This class uses a Redis sorted set to store messages with priorities. Higher numerical priority values are retrieved first (higher priority).
Serialize and add an item to the priority queue. Args: item: The item to add to the queue. priority: Priority value (higher numbers = higher priority).
Remove and return the highest priority item from the queue. Args: block: If True, block until an item is available. timeout: Maximum time to block in seconds (if block=True). Raises: queue.Empty: If no item is available (in non-blocking mode or if the timeout expires).
stack
RedisStack
A LIFO (last-in, first-out) message stack backed by Redis.
This class uses a Redis list to store serialized messages. The push method pushes items to the head of the list,
while the pop method pops from the head. Blocking retrieval is implemented using Redis' BLPOP command.
Remove and return the top item from the stack. Args: block: If True, block until an item is available. timeout: Maximum time to block in seconds (if block=True). Raises: queue.Empty: If no item is available (non-blocking or timeout reached).
types
job_specs
Job
Bases: BaseModel
A job instance ready for execution - system routes based on schema_name.
utils
schemas
job_from_schema
Create a Job from a JobSchema and input data.
This function automatically adds metadata like job ID and creation timestamp. Args: schema: The JobSchema to use for the job input_data: The input data for the job Returns: Job: A complete Job instance ready for submission