This document describes the formal state machine implementation for NIJA’s position management system, implemented on February 15, 2026.
The original position management system had several critical issues:
excess_positions = 0class PositionManagementState(Enum):
"""Position management state machine for NIJA trading bot."""
NORMAL = "normal" # Under cap, entries allowed
DRAIN = "drain" # Over cap, draining excess, entries blocked
FORCED_UNWIND = "forced_unwind" # Emergency exit, closing all
State is determined by two factors:
continuous_exit_enforcer.is_forced_unwind_active()positions_over_cap = len(current_positions) - MAX_POSITIONS_ALLOWEDif forced_unwind_active:
new_state = PositionManagementState.FORCED_UNWIND
elif positions_over_cap > 0: # KEY FIX: Only activate when excess > 0
new_state = PositionManagementState.DRAIN
else:
new_state = PositionManagementState.NORMAL
Valid transitions:
Transitions are validated and logged:
🔄 STATE TRANSITION: NORMAL → DRAIN
Positions: 10/8
Excess: 2
Forced Unwind: False
The StateInvariantValidator class enforces system invariants:
assert num_positions >= 0
calculated_excess = num_positions - max_positions
assert excess_positions == calculated_excess
if state == PositionManagementState.DRAIN:
assert excess_positions > 0 # Drain only when over cap
if state == PositionManagementState.NORMAL:
assert excess_positions <= 0 # Normal only when at or under cap
The position analysis loop runs for both DRAIN and NORMAL modes:
# Position analysis loop (runs for both DRAIN and NORMAL modes)
if new_state in (PositionManagementState.DRAIN, PositionManagementState.NORMAL):
for idx, position in enumerate(current_positions):
# Analyze position for exit criteria
# ...
# Rate limiting between position checks
if idx < len(current_positions) - 1:
time.sleep(POSITION_CHECK_DELAY + jitter)
Key differences between modes:
Before:
for position_idx, position in enumerate(current_positions):
# ...
if position_idx < len(current_positions) - 1: # External reference
time.sleep(POSITION_CHECK_DELAY)
After:
for idx, position in enumerate(current_positions):
# ...
if idx < len(current_positions) - 1: # Scoped to loop
time.sleep(POSITION_CHECK_DELAY)
The variable idx is local to the loop and only used within it, eliminating external references.
Comprehensive test suite validates:
All tests passing ✅
excess > 0bot/trading_strategy.py: Added state machine, invariant validator, state trackingtest_state_machine.py: Comprehensive test suite (new file)All changes are backward compatible. Existing functionality is preserved with additional:
Potential improvements:
test_state_machine.py