12. Re-planning
Chapter 12 of 24 · 20 min
Plans fail. Networks timeout. APIs change. A step that seemed valid produces unexpected output. Re-planning is how agents adapt when reality diverges from the plan.
Re-plan triggers:
class ReplanTrigger:
@staticmethod
def should_replan(plan: TaskPlan, result: StepResult) -> tuple[bool, str]:
# Tool execution failed
if not result.success:
return True, f"Step {result.step} failed: {result.error}"
# Output doesn't match expected outcome pattern
if result.step < len(plan.steps):
expected = plan.steps[result.step].expected_outcome
if not ReplanTrigger._matches_pattern(result.output, expected):
return True, f"Step {result.step} output doesn't match expected: {expected}"
# Tool returned error indicator
if isinstance(result.output, str) and "error" in result.output.lower():
return True, f"Step {result.step} returned error: {result.output}"
return False, ""
@staticmethod
def _matches_pattern(output: Any, expected: str) -> bool:
# Simple keyword check—expand with semantic matching
expected_keywords = set(expected.lower().split())
output_text = str(output).lower()
return expected_keywords.issubset(set(output_text.split()))
Re-planning loop:
class ReplanningExecutor:
def __init__(
self,
planner: TaskPlanner,
executor: TaskExecutor,
max_replans: int = 3
):
self.planner = planner
self.executor = executor
self.max_replans = max_replans
async def execute_with_replan(
self,
initial_task: str,
context: dict[str, Any] = None
) -> ExecutionResult:
remaining_steps = await self.planner.decompose(initial_task, context)
replan_count = 0
while replan_count < self.max_replans:
# Build plan from remaining steps
plan = TaskPlan(steps=remaining_steps)
# Execute
result = await self.executor.execute(plan)
if result.success:
return result
# Determine if replan is needed
if not result.results:
return result
last_result = result.results[-1]
should_replan, reason = ReplanTrigger.should_replan(plan, last_result)
if not should_replan:
return result
replan_count += 1
# Re-plan with context from failed execution
context = context or {}
context["failed_step"] = reason
context["execution_history"] = [
{"step": r.step, "output": str(r.output)[:500]}
for r in result.results
]
remaining_steps = await self.planner.decompose(
f"Continue from failed step: {reason}. Previous steps: {context['execution_history']}",
context
)
return ExecutionResult(
success=False,
error=f"Failed after {self.max_replans} replanning attempts",
partial_results=result.results if result.results else []
)
Failure mode: infinite replan loops. The agent can get stuck re-planning the same failed step indefinitely if the underlying issue isn't resolvable. Cap replans and escalate to human review.
@dataclass
class EscalationResult:
reason: str
failed_step: int
execution_history: list[dict]
agent_diagnosis: str
# After max replans, create escalation
escalation = EscalationResult(
reason="Max replan attempts exceeded",
failed_step=last_result.step,
execution_history=context["execution_history"],
agent_diagnosis=await self.planner.diagnose(context)
)
EXERCISE
Introduce three failure modes into your task execution (API returns unexpected format, step produces no output, step succeeds but output suggests next step won't work). Implement re-planning logic that handles each case differently.