Skip to content

Jobs Package API Reference

LocalClient

LocalClient(
    client_dir: str | Path | None = None,
    broker_id: str | None = None,
    backend: Registry | None = None,
)

Bases: OrchestratorBackend

A registry-backed local job client.

The client maintains a registry of declared queues and a store for job results. Queues are stored in a registry. Job results can be stored to a separate internal registry as well.

Initialize the LocalClient.

Parameters:

Name Type Description Default
client_dir str | Path | None

The directory to store the client. If None, uses the default from config.

None
broker_id str | None

The ID of the broker.

None
backend Registry | None

The backend to use for storage. If None, uses the default from config.

None
declare_queue
declare_queue(
    queue_name: str, queue_type: str = "fifo", **kwargs
) -> dict[str, str]

Declare a queue of type 'fifo', 'stack', or 'priority'.

publish
publish(queue_name: str, message: BaseModel, **kwargs)

Publish a message (as a pydantic model) to the specified queue. If the target queue is a priority queue, accepts an extra 'priority' parameter.

receive_message
receive_message(queue_name: str, **kwargs) -> Optional[dict]

Retrieve a message from the specified queue.

Parameters:

Name Type Description Default
queue_name str

The name of the queue to receive a message from.

required
**kwargs

Additional parameters passed to the queue instance.

{}

Returns:

Type Description
Optional[dict]

The message as a dict or None if queue is empty.

clean_queue
clean_queue(queue_name: str, **kwargs) -> dict[str, str]

Remove all messages from the specified queue.

count_queue_messages
count_queue_messages(queue_name: str, **kwargs) -> int

Return the number of messages in the specified queue.

store_job_result
store_job_result(job_id: str, result: Any)

Save the job result (JSON-serializable) keyed by job_id.

get_job_result
get_job_result(job_id: str) -> Any

Retrieve the stored result for the given job_id.

move_to_dlq
move_to_dlq(
    source_queue: str,
    dlq_name: str,
    message: BaseModel,
    error_details: str,
    **kwargs
)

Move a failed message to a dead letter queue

LocalConsumerBackend

LocalConsumerBackend(
    queue_name: str,
    consumer_frontend,
    orchestrator: LocalClient,
    poll_timeout: float = 1,
)

Bases: ConsumerBackendBase

Local in-memory consumer backend.

consume
consume(
    num_messages: int = 0,
    *,
    queues: str | list[str] | None = None,
    block: bool = True,
    **kwargs
) -> None

Consume messages from the local queue(s).

consume_until_empty
consume_until_empty(
    *, queues: str | list[str] | None = None, block: bool = True, **kwargs
) -> None

Consume messages from the queue(s) until empty.

process_message
process_message(message) -> bool

Process a single message.

LocalQueue

LocalQueue()
to_dict
to_dict()

Convert queue contents to a JSON-serializable dictionary.

from_dict classmethod
from_dict(data)

Create a LocalQueue from a dictionary.

LocalPriorityQueue

LocalPriorityQueue()
to_dict
to_dict()

Convert priority queue contents to a JSON-serializable dictionary.

from_dict classmethod
from_dict(data)

Create a LocalPriorityQueue from a dictionary.

LocalStack

LocalStack()
to_dict
to_dict()

Convert stack contents to a JSON-serializable dictionary.

from_dict classmethod
from_dict(data)

Create a LocalStack from a dictionary.

RabbitMQClient

RabbitMQClient(
    host: str | None = None,
    port: int | None = None,
    username: str | None = None,
    password: str | None = None,
)

Bases: OrchestratorBackend

Initialize the RabbitMQ client with connection parameters. Args: host: RabbitMQ server hostname. port: RabbitMQ server port. username: Username for RabbitMQ authentication. password: Password for RabbitMQ authentication.

declare_exchange
declare_exchange(
    *,
    exchange: str,
    exchange_type: str = "direct",
    durable: bool = True,
    auto_delete: bool = False,
    **kwargs
)

Declare a RabbitMQ exchange. Args: exchange: Name of the exchange to declare. exchange_type: Type of the exchange (e.g., 'direct', 'topic', 'fanout'). durable: Make the exchange durable. auto_delete: Automatically delete the exchange when no queues are bound.

declare_queue
declare_queue(queue_name: str, **kwargs) -> dict[str, str]

Declare a RabbitMQ queue. Args: queue: Name of the queue to declare. exchange: Name of the exchange to bind the queue to. durable: Make the queue durable. exclusive: Make the queue exclusive to the connection. auto_delete: Automatically delete the queue when no consumers are connected. routing_key: Routing key for binding the queue to the exchange. force: Force exchange creation if it doesn't exist. max_priority: Maximum priority for priority queue (0-255).

publish
publish(queue_name: str, message: BaseModel, **kwargs)

Publish a message to the specified exchange using RabbitMQ. Args: queue_name: The queue name to use as default routing key. message: A Pydantic BaseModel payload. exchange: The RabbitMQ exchange to use (from kwargs). routing_key: The routing key to use (from kwargs, defaults to queue_name). durable: Messages that are not durable are discarded if they cannot be routed to an existing consumer (from kwargs). delivery_mode: Use DeliveryMode.Persistent to save messages to disk (from kwargs). mandatory: If True, unroutable messages are returned (from kwargs). Returns: str: The generated job ID for the message.

clean_queue
clean_queue(queue_name: str, **kwargs) -> dict[str, str]

Remove all messages from a queue.

delete_queue
delete_queue(queue_name: str, **kwargs) -> dict[str, str]

Delete a queue.

count_exchanges
count_exchanges(*, exchange: str, **kwargs)

Get the number of exchanges in the RabbitMQ server. Args: exchange: Name of the exchange to check.

delete_exchange
delete_exchange(*, exchange: str, **kwargs)

Delete an exchange.

move_to_dlq
move_to_dlq(
    source_queue: str,
    dlq_name: str,
    message: BaseModel,
    error_details: str,
    **kwargs
)

Move a failed message to a dead letter queue

RabbitMQConsumerBackend

RabbitMQConsumerBackend(
    queue_name: str,
    consumer_frontend,
    prefetch_count: int = 1,
    auto_ack: bool = False,
    durable: bool = True,
    host: str | None = None,
    port: int | None = None,
    username: str | None = None,
    password: str | None = None,
)

Bases: ConsumerBackendBase

RabbitMQ consumer backend with improved consumption logic.

consume
consume(
    num_messages: int = 0,
    *,
    queues: str | list[str] | None = None,
    block: bool = True,
    **kwargs
) -> None

Consume messages from RabbitMQ queue(s) with robust error handling.

process_message
process_message(message) -> bool

Process a single message and return success status.

consume_until_empty
consume_until_empty(
    *, queues: str | list[str] | None = None, block: bool = True, **kwargs
) -> None

Consume messages from the queue(s) until empty.

receive_message
receive_message(channel, queue_name: str, **kwargs) -> Optional[dict]

Retrieve a message from a specified RabbitMQ queue. This method uses RabbitMQ's basic_get method to fetch a message. It supports blocking behavior by polling until a message is available or the timeout is reached. Args: queue_name: The name of the queue from which to receive the message. block: Whether to block until a message is available. timeout: Maximum time in seconds to block if no message is available. auto_ack: Whether to automatically acknowledge the message upon retrieval. **kwargs: Additional keyword arguments to pass to basic_get (if any). Returns: dict: The message content as a dictionary, or None if no message is available.

RedisClient

RedisClient(host: str = 'localhost', port: int = 6379, db: int = 0)

Bases: OrchestratorBackend

Initialize the Redis client and connect to the Redis server. Args: host: Redis server hostname. port: Redis server port. db: Redis database number.

declare_queue
declare_queue(
    queue_name: str, queue_type: str = "fifo", **kwargs
) -> dict[str, str]

Declare a Redis-backed queue of type 'fifo', 'stack', or 'priority'.

delete_queue
delete_queue(queue_name: str, **kwargs) -> dict

Delete a declared queue. Uses distributed locking and transactions to remove the queue from the centralized metadata, and publishes an event to notify other clients.

publish
publish(queue_name: str, message: BaseModel, **kwargs) -> str

Publish a message (a pydantic model) to the specified Redis queue.

clean_queue
clean_queue(queue_name: str, **kwargs) -> dict[str, str]

Clean (purge) a specified Redis queue by deleting its underlying key.

Parameters:

Name Type Description Default
queue_name str

The name of the declared queue to be cleaned.

required
move_to_dlq
move_to_dlq(
    source_queue: str,
    dlq_name: str,
    message: BaseModel,
    error_details: str,
    **kwargs
)

Move a failed message to a dead letter queue

close
close()

Close the Redis connection and clean up resources.

RedisConsumerBackend

RedisConsumerBackend(
    queue_name: str,
    consumer_frontend,
    host: str,
    port: int,
    db: int,
    poll_timeout: int = 5,
)

Bases: ConsumerBackendBase

Redis consumer backend with blocking operations.

consume
consume(
    num_messages: int = 0,
    *,
    queues: str | list[str] | None = None,
    block: bool = True,
    **kwargs
) -> None

Consume messages from Redis queue(s).

process_message
process_message(message) -> bool

Process a single message.

consume_until_empty
consume_until_empty(
    *, queues: str | list[str] | None = None, block: bool = True, **kwargs
) -> None

Consume messages from the queue(s) until empty.

close
close()

Close the Redis connection and clean up resources.

set_poll_timeout
set_poll_timeout(timeout: int) -> None

Set the polling timeout for Redis operations.

receive_message
receive_message(queue_name: str, **kwargs) -> Optional[dict]

Retrieve a message from a specified Redis queue.

Returns the message as a dict.

Job

Bases: BaseModel

A job instance ready for execution - system routes based on schema_name.

job_from_schema

job_from_schema(schema: JobSchema, input_data) -> Job

Create a Job from a JobSchema and input data.

This function automatically adds metadata like job ID and creation timestamp. Args: schema: The JobSchema to use for the job input_data: The input data for the job Returns: Job: A complete Job instance ready for submission

base

connection_base
BrokerConnectionBase
BrokerConnectionBase(*args, **kwargs)

Bases: MindtraceABC

Abstract base class for broker connections.

consumer_base
ConsumerBackendBase
ConsumerBackendBase(queue_name: str, consumer_frontend: 'Consumer')

Bases: MindtraceABC

Base class for consumer backends that handle message consumption.

consume abstractmethod
consume(num_messages: int = 0, **kwargs) -> None

Consume messages from the queue and process them.

consume_until_empty abstractmethod
consume_until_empty(**kwargs) -> None

Consume messages until the queue is empty and process them.

process_message abstractmethod
process_message(message) -> bool

Process a single message using the stored run method.

orchestrator_backend
OrchestratorBackend
OrchestratorBackend()

Bases: MindtraceABC

Abstract base class for orchestrator backends.

Defines the interface that all backend implementations must follow for queue management operations.

create_consumer_backend
create_consumer_backend(
    consumer_frontend: Consumer, queue_name: str
) -> ConsumerBackendBase

Create a consumer backend for the given schema and consumer frontend.

declare_queue abstractmethod
declare_queue(queue_name: str, **kwargs) -> dict[str, str]

Declare a queue

Parameters:

Name Type Description Default
queue_name str

Name of the queue to declare

required
publish abstractmethod
publish(queue_name: str, message: BaseModel, **kwargs) -> str

Publish a message to the specified queue

Parameters:

Name Type Description Default
queue_name str

Name of the queue to publish to

required
message BaseModel

Pydantic model to publish

required
clean_queue abstractmethod
clean_queue(queue_name: str, **kwargs) -> dict[str, str]

Remove all messages from the specified queue

Parameters:

Name Type Description Default
queue_name str

Name of the queue to clean

required
delete_queue abstractmethod
delete_queue(queue_name: str, **kwargs) -> dict[str, str]

Delete the specified queue

Parameters:

Name Type Description Default
queue_name str

Name of the queue to delete

required
count_queue_messages abstractmethod
count_queue_messages(queue_name: str, **kwargs) -> int

Count the number of messages in the specified queue

Parameters:

Name Type Description Default
queue_name str

Name of the queue to count

required

Returns:

Type Description
int

Number of messages in the queue

move_to_dlq abstractmethod
move_to_dlq(
    source_queue: str,
    dlq_name: str,
    message: BaseModel,
    error_details: str,
    **kwargs
)

Move a failed message to a dead letter queue

declare_exchange
declare_exchange(**kwargs)

Declare an exchange. Only implemented in RabbitMQ backend.

delete_exchange
delete_exchange(**kwargs)

Delete an exchange. Only implemented in RabbitMQ backend.

count_exchanges
count_exchanges(**kwargs)

Count the number of exchanges. Only implemented in RabbitMQ backend.

local

client
LocalClient
LocalClient(
    client_dir: str | Path | None = None,
    broker_id: str | None = None,
    backend: Registry | None = None,
)

Bases: OrchestratorBackend

A registry-backed local job client.

The client maintains a registry of declared queues and a store for job results. Queues are stored in a registry. Job results can be stored to a separate internal registry as well.

Initialize the LocalClient.

Parameters:

Name Type Description Default
client_dir str | Path | None

The directory to store the client. If None, uses the default from config.

None
broker_id str | None

The ID of the broker.

None
backend Registry | None

The backend to use for storage. If None, uses the default from config.

None
declare_queue
declare_queue(
    queue_name: str, queue_type: str = "fifo", **kwargs
) -> dict[str, str]

Declare a queue of type 'fifo', 'stack', or 'priority'.

publish
publish(queue_name: str, message: BaseModel, **kwargs)

Publish a message (as a pydantic model) to the specified queue. If the target queue is a priority queue, accepts an extra 'priority' parameter.

receive_message
receive_message(queue_name: str, **kwargs) -> Optional[dict]

Retrieve a message from the specified queue.

Parameters:

Name Type Description Default
queue_name str

The name of the queue to receive a message from.

required
**kwargs

Additional parameters passed to the queue instance.

{}

Returns:

Type Description
Optional[dict]

The message as a dict or None if queue is empty.

clean_queue
clean_queue(queue_name: str, **kwargs) -> dict[str, str]

Remove all messages from the specified queue.

count_queue_messages
count_queue_messages(queue_name: str, **kwargs) -> int

Return the number of messages in the specified queue.

store_job_result
store_job_result(job_id: str, result: Any)

Save the job result (JSON-serializable) keyed by job_id.

get_job_result
get_job_result(job_id: str) -> Any

Retrieve the stored result for the given job_id.

move_to_dlq
move_to_dlq(
    source_queue: str,
    dlq_name: str,
    message: BaseModel,
    error_details: str,
    **kwargs
)

Move a failed message to a dead letter queue

consumer_backend
LocalConsumerBackend
LocalConsumerBackend(
    queue_name: str,
    consumer_frontend,
    orchestrator: LocalClient,
    poll_timeout: float = 1,
)

Bases: ConsumerBackendBase

Local in-memory consumer backend.

consume
consume(
    num_messages: int = 0,
    *,
    queues: str | list[str] | None = None,
    block: bool = True,
    **kwargs
) -> None

Consume messages from the local queue(s).

consume_until_empty
consume_until_empty(
    *, queues: str | list[str] | None = None, block: bool = True, **kwargs
) -> None

Consume messages from the queue(s) until empty.

process_message
process_message(message) -> bool

Process a single message.

fifo_queue
LocalQueue
LocalQueue()
to_dict
to_dict()

Convert queue contents to a JSON-serializable dictionary.

from_dict classmethod
from_dict(data)

Create a LocalQueue from a dictionary.

LocalQueueArchiver
LocalQueueArchiver(uri: str, **kwargs)

Bases: Archiver

Archiver for LocalQueue objects using JSON serialization.

save
save(item: LocalQueue)

Save a LocalQueue object to JSON.

load
load(data_type: Type[Any]) -> LocalQueue

Load a LocalQueue object from JSON.

priority_queue
LocalPriorityQueue
LocalPriorityQueue()
to_dict
to_dict()

Convert priority queue contents to a JSON-serializable dictionary.

from_dict classmethod
from_dict(data)

Create a LocalPriorityQueue from a dictionary.

PriorityQueueArchiver
PriorityQueueArchiver(uri: str, **kwargs)

Bases: Archiver

Archiver for LocalPriorityQueue objects using JSON serialization.

save
save(item: LocalPriorityQueue)

Save a LocalPriorityQueue object to JSON.

load
load(data_type: Type[Any]) -> LocalPriorityQueue

Load a LocalPriorityQueue object from JSON.

stack
LocalStack
LocalStack()
to_dict
to_dict()

Convert stack contents to a JSON-serializable dictionary.

from_dict classmethod
from_dict(data)

Create a LocalStack from a dictionary.

StackArchiver
StackArchiver(uri: str, **kwargs)

Bases: Archiver

Archiver for LocalStack objects using JSON serialization.

save
save(item: LocalStack)

Save a LocalStack object to JSON.

load
load(data_type: Type[Any]) -> LocalStack

Load a LocalStack object from JSON.

rabbitmq

client
RabbitMQClient
RabbitMQClient(
    host: str | None = None,
    port: int | None = None,
    username: str | None = None,
    password: str | None = None,
)

Bases: OrchestratorBackend

Initialize the RabbitMQ client with connection parameters. Args: host: RabbitMQ server hostname. port: RabbitMQ server port. username: Username for RabbitMQ authentication. password: Password for RabbitMQ authentication.

declare_exchange
declare_exchange(
    *,
    exchange: str,
    exchange_type: str = "direct",
    durable: bool = True,
    auto_delete: bool = False,
    **kwargs
)

Declare a RabbitMQ exchange. Args: exchange: Name of the exchange to declare. exchange_type: Type of the exchange (e.g., 'direct', 'topic', 'fanout'). durable: Make the exchange durable. auto_delete: Automatically delete the exchange when no queues are bound.

declare_queue
declare_queue(queue_name: str, **kwargs) -> dict[str, str]

Declare a RabbitMQ queue. Args: queue: Name of the queue to declare. exchange: Name of the exchange to bind the queue to. durable: Make the queue durable. exclusive: Make the queue exclusive to the connection. auto_delete: Automatically delete the queue when no consumers are connected. routing_key: Routing key for binding the queue to the exchange. force: Force exchange creation if it doesn't exist. max_priority: Maximum priority for priority queue (0-255).

publish
publish(queue_name: str, message: BaseModel, **kwargs)

Publish a message to the specified exchange using RabbitMQ. Args: queue_name: The queue name to use as default routing key. message: A Pydantic BaseModel payload. exchange: The RabbitMQ exchange to use (from kwargs). routing_key: The routing key to use (from kwargs, defaults to queue_name). durable: Messages that are not durable are discarded if they cannot be routed to an existing consumer (from kwargs). delivery_mode: Use DeliveryMode.Persistent to save messages to disk (from kwargs). mandatory: If True, unroutable messages are returned (from kwargs). Returns: str: The generated job ID for the message.

clean_queue
clean_queue(queue_name: str, **kwargs) -> dict[str, str]

Remove all messages from a queue.

delete_queue
delete_queue(queue_name: str, **kwargs) -> dict[str, str]

Delete a queue.

count_exchanges
count_exchanges(*, exchange: str, **kwargs)

Get the number of exchanges in the RabbitMQ server. Args: exchange: Name of the exchange to check.

delete_exchange
delete_exchange(*, exchange: str, **kwargs)

Delete an exchange.

move_to_dlq
move_to_dlq(
    source_queue: str,
    dlq_name: str,
    message: BaseModel,
    error_details: str,
    **kwargs
)

Move a failed message to a dead letter queue

connection
RabbitMQConnection
RabbitMQConnection(
    host: str | None = None,
    port: int | None = None,
    username: str | None = None,
    password: str | None = None,
)

Bases: BrokerConnectionBase

Singleton class for RabbitMQ connection. The use of a singleton class ensures that only one connection is established throughout an application.

Initialize the RabbitMQ connection. Args: host: The host address of the RabbitMQ server. port: The port number of the RabbitMQ server. username: The username for the RabbitMQ server. password: The password for the RabbitMQ server.

connect
connect()

Connect to the RabbitMQ server.

is_connected
is_connected()

Check if the connection to the RabbitMQ server is open.

close
close()

Close the connection to the RabbitMQ server.

get_channel
get_channel() -> BlockingChannel

Get a channel from the RabbitMQ connection.

count_queue_messages
count_queue_messages(queue_name: str, **kwargs) -> int

Get the number of messages in a queue.

consumer_backend
RabbitMQConsumerBackend
RabbitMQConsumerBackend(
    queue_name: str,
    consumer_frontend,
    prefetch_count: int = 1,
    auto_ack: bool = False,
    durable: bool = True,
    host: str | None = None,
    port: int | None = None,
    username: str | None = None,
    password: str | None = None,
)

Bases: ConsumerBackendBase

RabbitMQ consumer backend with improved consumption logic.

consume
consume(
    num_messages: int = 0,
    *,
    queues: str | list[str] | None = None,
    block: bool = True,
    **kwargs
) -> None

Consume messages from RabbitMQ queue(s) with robust error handling.

process_message
process_message(message) -> bool

Process a single message and return success status.

consume_until_empty
consume_until_empty(
    *, queues: str | list[str] | None = None, block: bool = True, **kwargs
) -> None

Consume messages from the queue(s) until empty.

receive_message
receive_message(channel, queue_name: str, **kwargs) -> Optional[dict]

Retrieve a message from a specified RabbitMQ queue. This method uses RabbitMQ's basic_get method to fetch a message. It supports blocking behavior by polling until a message is available or the timeout is reached. Args: queue_name: The name of the queue from which to receive the message. block: Whether to block until a message is available. timeout: Maximum time in seconds to block if no message is available. auto_ack: Whether to automatically acknowledge the message upon retrieval. **kwargs: Additional keyword arguments to pass to basic_get (if any). Returns: dict: The message content as a dictionary, or None if no message is available.

redis

client
RedisClient
RedisClient(host: str = 'localhost', port: int = 6379, db: int = 0)

Bases: OrchestratorBackend

Initialize the Redis client and connect to the Redis server. Args: host: Redis server hostname. port: Redis server port. db: Redis database number.

declare_queue
declare_queue(
    queue_name: str, queue_type: str = "fifo", **kwargs
) -> dict[str, str]

Declare a Redis-backed queue of type 'fifo', 'stack', or 'priority'.

delete_queue
delete_queue(queue_name: str, **kwargs) -> dict

Delete a declared queue. Uses distributed locking and transactions to remove the queue from the centralized metadata, and publishes an event to notify other clients.

publish
publish(queue_name: str, message: BaseModel, **kwargs) -> str

Publish a message (a pydantic model) to the specified Redis queue.

clean_queue
clean_queue(queue_name: str, **kwargs) -> dict[str, str]

Clean (purge) a specified Redis queue by deleting its underlying key.

Parameters:

Name Type Description Default
queue_name str

The name of the declared queue to be cleaned.

required
move_to_dlq
move_to_dlq(
    source_queue: str,
    dlq_name: str,
    message: BaseModel,
    error_details: str,
    **kwargs
)

Move a failed message to a dead letter queue

close
close()

Close the Redis connection and clean up resources.

connection
RedisConnection
RedisConnection(
    host: str | None = None,
    port: int | None = None,
    db: int | None = None,
    password: str | None = None,
    socket_timeout: float | None = None,
    socket_connect_timeout: float | None = None,
)

Bases: BrokerConnectionBase

Initialize the Redis connection. Args: host: The Redis server host address. port: The Redis server port. db: The Redis database number. password: The password for the Redis server (if any). socket_timeout: Timeout for socket operations (in seconds). socket_connect_timeout: Timeout for socket connect (in seconds).

EVENTS_CHANNEL class-attribute instance-attribute
EVENTS_CHANNEL = 'mindtrace:queue_events'

Singleton class for Redis connection. This class establishes and maintains a connection to the Redis server. It uses a retry loop and a PING command to verify connectivity.

connect
connect(max_tries: int = 10)

Connect to the Redis server using a retry loop.

is_connected
is_connected() -> bool

Return True if the connection to Redis is active (verified via PING).

close
close()

Close the connection to the Redis server and shutdown background thread.

count_queue_messages
count_queue_messages(queue_name: str, **kwargs) -> int

Count the number of messages in a specified Redis queue.

Parameters:

Name Type Description Default
queue_name str

The name of the declared queue.

required

Returns:

Type Description
int

Number of messages in the given queue.

consumer_backend
RedisConsumerBackend
RedisConsumerBackend(
    queue_name: str,
    consumer_frontend,
    host: str,
    port: int,
    db: int,
    poll_timeout: int = 5,
)

Bases: ConsumerBackendBase

Redis consumer backend with blocking operations.

consume
consume(
    num_messages: int = 0,
    *,
    queues: str | list[str] | None = None,
    block: bool = True,
    **kwargs
) -> None

Consume messages from Redis queue(s).

process_message
process_message(message) -> bool

Process a single message.

consume_until_empty
consume_until_empty(
    *, queues: str | list[str] | None = None, block: bool = True, **kwargs
) -> None

Consume messages from the queue(s) until empty.

close
close()

Close the Redis connection and clean up resources.

set_poll_timeout
set_poll_timeout(timeout: int) -> None

Set the polling timeout for Redis operations.

receive_message
receive_message(queue_name: str, **kwargs) -> Optional[dict]

Retrieve a message from a specified Redis queue.

Returns the message as a dict.

fifo_queue
RedisQueue
RedisQueue(name, namespace='queue', **redis_kwargs)

A FIFO (first-in, first-out) message queue backed by Redis. This class uses a Redis list to store serialized messages. The put method pushes items to the tail of the list, while the get method pops from the head. Blocking retrieval is implemented using Redis' BLPOP command.

Initialize a RedisQueue object. Args: name: Name of the queue. namespace: Namespace prefix for the Redis key. redis_kwargs: Additional keyword arguments for redis.Redis.

push
push(item)

Serialize and add an item to the queue.

pop
pop(block=True, timeout=None)

Remove and return an item from the queue. Args: block: If True, block until an item is available. timeout: Maximum time to block in seconds (if block=True). Raises: queue.Empty: If no item is available (in non-blocking mode or if the timeout expires).

qsize
qsize()

Return the approximate size of the queue.

empty
empty()

Return True if the queue is empty, False otherwise.

priority
RedisPriorityQueue
RedisPriorityQueue(name, namespace='priority_queue', **redis_kwargs)

A priority message queue backed by Redis. This class uses a Redis sorted set to store messages with priorities. Higher numerical priority values are retrieved first (higher priority).

push
push(item, priority=0)

Serialize and add an item to the priority queue. Args: item: The item to add to the queue. priority: Priority value (higher numbers = higher priority).

pop
pop(block=True, timeout=None)

Remove and return the highest priority item from the queue. Args: block: If True, block until an item is available. timeout: Maximum time to block in seconds (if block=True). Raises: queue.Empty: If no item is available (in non-blocking mode or if the timeout expires).

qsize
qsize()

Return the approximate size of the priority queue.

empty
empty()

Return True if the priority queue is empty, False otherwise.

stack
RedisStack
RedisStack(name, namespace='stack', **redis_kwargs)

A LIFO (last-in, first-out) message stack backed by Redis. This class uses a Redis list to store serialized messages. The push method pushes items to the head of the list, while the pop method pops from the head. Blocking retrieval is implemented using Redis' BLPOP command.

push
push(item)

Serialize and add an item to the stack.

pop
pop(block=True, timeout=None)

Remove and return the top item from the stack. Args: block: If True, block until an item is available. timeout: Maximum time to block in seconds (if block=True). Raises: queue.Empty: If no item is available (non-blocking or timeout reached).

qsize
qsize()

Return the approximate size of the stack.

empty
empty()

Return True if the stack is empty, False otherwise.

types

job_specs
Job

Bases: BaseModel

A job instance ready for execution - system routes based on schema_name.

utils

schemas
job_from_schema
job_from_schema(schema: JobSchema, input_data) -> Job

Create a Job from a JobSchema and input data.

This function automatically adds metadata like job ID and creation timestamp. Args: schema: The JobSchema to use for the job input_data: The input data for the job Returns: Job: A complete Job instance ready for submission