RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Custom Agent Frameworks
  6. /Ch. 12
Custom Agent Frameworks

12. Re-planning

Chapter 12 of 24 · 20 min
KEY INSIGHT

Re-planning is recovery. The trigger determines when to adapt; the replanner uses failure context to generate a new approach. Capping replans prevents infinite loops and enables graceful escalation.

Plans fail. Networks timeout. APIs change. A step that seemed valid produces unexpected output. Re-planning is how agents adapt when reality diverges from the plan.

Re-plan triggers:

class ReplanTrigger:
    @staticmethod
    def should_replan(plan: TaskPlan, result: StepResult) -> tuple[bool, str]:
        # Tool execution failed
        if not result.success:
            return True, f"Step {result.step} failed: {result.error}"
        
        # Output doesn't match expected outcome pattern
        if result.step < len(plan.steps):
            expected = plan.steps[result.step].expected_outcome
            if not ReplanTrigger._matches_pattern(result.output, expected):
                return True, f"Step {result.step} output doesn't match expected: {expected}"
        
        # Tool returned error indicator
        if isinstance(result.output, str) and "error" in result.output.lower():
            return True, f"Step {result.step} returned error: {result.output}"
        
        return False, ""
    
    @staticmethod
    def _matches_pattern(output: Any, expected: str) -> bool:
        # Simple keyword check—expand with semantic matching
        expected_keywords = set(expected.lower().split())
        output_text = str(output).lower()
        return expected_keywords.issubset(set(output_text.split()))

Re-planning loop:

class ReplanningExecutor:
    def __init__(
        self,
        planner: TaskPlanner,
        executor: TaskExecutor,
        max_replans: int = 3
    ):
        self.planner = planner
        self.executor = executor
        self.max_replans = max_replans
    
    async def execute_with_replan(
        self,
        initial_task: str,
        context: dict[str, Any] = None
    ) -> ExecutionResult:
        remaining_steps = await self.planner.decompose(initial_task, context)
        replan_count = 0
        
        while replan_count < self.max_replans:
            # Build plan from remaining steps
            plan = TaskPlan(steps=remaining_steps)
            
            # Execute
            result = await self.executor.execute(plan)
            
            if result.success:
                return result
            
            # Determine if replan is needed
            if not result.results:
                return result
            
            last_result = result.results[-1]
            should_replan, reason = ReplanTrigger.should_replan(plan, last_result)
            
            if not should_replan:
                return result
            
            replan_count += 1
            
            # Re-plan with context from failed execution
            context = context or {}
            context["failed_step"] = reason
            context["execution_history"] = [
                {"step": r.step, "output": str(r.output)[:500]}
                for r in result.results
            ]
            
            remaining_steps = await self.planner.decompose(
                f"Continue from failed step: {reason}. Previous steps: {context['execution_history']}",
                context
            )
        
        return ExecutionResult(
            success=False,
            error=f"Failed after {self.max_replans} replanning attempts",
            partial_results=result.results if result.results else []
        )

Failure mode: infinite replan loops. The agent can get stuck re-planning the same failed step indefinitely if the underlying issue isn't resolvable. Cap replans and escalate to human review.

@dataclass
class EscalationResult:
    reason: str
    failed_step: int
    execution_history: list[dict]
    agent_diagnosis: str

# After max replans, create escalation
escalation = EscalationResult(
    reason="Max replan attempts exceeded",
    failed_step=last_result.step,
    execution_history=context["execution_history"],
    agent_diagnosis=await self.planner.diagnose(context)
)
EXERCISE

Introduce three failure modes into your task execution (API returns unexpected format, step produces no output, step succeeds but output suggests next step won't work). Implement re-planning logic that handles each case differently.

← Chapter 11
Task Planner
Chapter 13 →
Multi-Agent Protocols