Skip to content

Bulkhead sync

πŸ€– AI-Generated Content

This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.

provide.foundation.resilience.bulkhead_sync

Classes

SyncResourcePool

Synchronous resource pool with limited capacity for isolation.

Thread-safe implementation using threading.Lock and threading.Event. For async contexts, use AsyncResourcePool instead.

Functions
__attrs_post_init__
__attrs_post_init__() -> None

Initialize internal state.

Source code in provide/foundation/resilience/bulkhead_sync.py
def __attrs_post_init__(self) -> None:
    """Initialize internal state."""
acquire
acquire(timeout: float | None = None) -> bool

Acquire a resource slot (blocking).

Parameters:

Name Type Description Default
timeout float | None

Maximum time to wait (defaults to pool timeout)

None

Returns:

Type Description
bool

True if acquired, False if timeout

Raises:

Type Description
RuntimeError

If queue is full

Source code in provide/foundation/resilience/bulkhead_sync.py
def acquire(self, timeout: float | None = None) -> bool:
    """Acquire a resource slot (blocking).

    Args:
        timeout: Maximum time to wait (defaults to pool timeout)

    Returns:
        True if acquired, False if timeout

    Raises:
        RuntimeError: If queue is full
    """
    actual_timeout = timeout if timeout is not None else self.timeout

    # Try to acquire immediately
    with self._counter_lock:
        if self._active_count < self.max_concurrent:
            self._active_count += 1
            return True

        # Check queue limit
        if self._waiting_count >= self.max_queue_size:
            raise RuntimeError(f"Queue is full (max: {self.max_queue_size})")

        # Add to wait queue
        self._waiting_count += 1
        waiter = threading.Event()
        self._waiters.append(waiter)

    # Wait for signal from release
    try:
        if waiter.wait(timeout=actual_timeout):
            # Successfully signaled, we now have the slot
            return True
        # Timeout - remove from queue
        with self._counter_lock, contextlib.suppress(ValueError):
            # Remove from queue if still present (already removed by signal if not found)
            self._waiters.remove(waiter)
        return False
    finally:
        with self._counter_lock:
            self._waiting_count -= 1
active_count
active_count() -> int

Number of currently active operations.

Source code in provide/foundation/resilience/bulkhead_sync.py
def active_count(self) -> int:
    """Number of currently active operations."""
    with self._counter_lock:
        return self._active_count
available_capacity
available_capacity() -> int

Number of available slots.

Source code in provide/foundation/resilience/bulkhead_sync.py
def available_capacity(self) -> int:
    """Number of available slots."""
    with self._counter_lock:
        return max(0, self.max_concurrent - self._active_count)
get_stats
get_stats() -> dict[str, Any]

Get pool statistics.

Source code in provide/foundation/resilience/bulkhead_sync.py
def get_stats(self) -> dict[str, Any]:
    """Get pool statistics."""
    with self._counter_lock:
        return {
            "max_concurrent": self.max_concurrent,
            "active_count": self._active_count,
            "available_capacity": self.max_concurrent - self._active_count,
            "waiting_count": self._waiting_count,
            "max_queue_size": self.max_queue_size,
            "utilization": self._active_count / self.max_concurrent if self.max_concurrent > 0 else 0.0,
        }
queue_size
queue_size() -> int

Current number of waiting operations.

Source code in provide/foundation/resilience/bulkhead_sync.py
def queue_size(self) -> int:
    """Current number of waiting operations."""
    with self._counter_lock:
        return self._waiting_count
release
release() -> None

Release a resource slot.

Source code in provide/foundation/resilience/bulkhead_sync.py
def release(self) -> None:
    """Release a resource slot."""
    with self._counter_lock:
        if self._active_count > 0:
            self._active_count -= 1

        # Signal next waiter in FIFO order
        if self._waiters:
            waiter_event = self._waiters.popleft()
            self._active_count += 1
            waiter_event.set()