Skip to main content

Custom Orchestration

The orchestrator routes each conversation turn to the right agent and controls the flow of information between agents and users. Implement AbstractOrchestrator to define custom routing logic.

Prerequisites

  • AgenticAI Core SDK installed and configured.
  • At least one agent configured and converted to AgentMeta. See Creating Agents.

Message handling protocol

Incoming messages

The orchestrator’s _handle_message method receives a List[MessageItem]. The last item is either:
  • A user message (role='user') — when the user sends a query.
  • An agent response (role='tool') — when an agent completes its task.

Outgoing messages

The method must return one of:
Return typeWhen to use
ToolCallRoute to an agent or back to the user.
ErrorMessageWhen routing fails or an exception occurs.
ToolCall fields:
  • tool_name — the agent to invoke, or "route_to_user" to respond to the user.
  • message — the content sent to the agent or shown to the user.
  • input, thought, reason — optional context for debugging and tracing.

Special agent: route_to_user

route_to_user is a built-in proxy for the user. Use it to deliver final answers or request clarification. The message field in the ToolCall is exactly what the user sees.

Create a custom orchestrator

Subclass AbstractOrchestrator and override _handle_message:
from agenticai_core.designtime.models.agent import AgentMeta
from agenticai_core.runtime.agents.abstract_orchestrator import AbstractOrchestrator
from agenticai_core.runtime.message_item import MessageItem, ToolCall, ErrorMessage
from typing import List, Optional

class CustomOrchestrator(AbstractOrchestrator):
    """Custom orchestrator with keyword-based routing."""

    def __init__(
        self,
        agents: List[AgentMeta],
        name: str = "custom",
        description: str = "Custom keyword-based orchestrator"
    ):
        super().__init__(name=name, agents=agents, description=description)

    async def _handle_message(self, conversation: List[MessageItem]) -> MessageItem:
        """
        Override this method to implement your orchestration logic.

        Protocol:
        - Incoming: MessageItem with role='user' or role='tool'
        - Outgoing: ToolCall (next agent) or ErrorMessage (failure)
        """
        last_message = conversation[-1]

        try:
            # Handle user query
            if last_message.role == 'user':
                selected_agent = self._select_agent(last_message)

                if selected_agent:
                    return ToolCall(
                        tool_name=selected_agent.name,
                        message=last_message.content,
                        thought=f"Routing to {selected_agent.name}",
                        reason="Query matches agent capabilities"
                    )
                else:
                    return ToolCall(
                        tool_name="route_to_user",
                        message="I'm not sure how to help. Could you rephrase?",
                        thought="No suitable agent found"
                    )

            # Handle agent response
            elif last_message.role == 'tool':
                if self._is_complete(last_message):
                    return ToolCall(
                        tool_name="route_to_user",
                        message=f"Here are the results: {last_message.content}",
                        thought="Task completed successfully"
                    )
                else:
                    # Route to another agent for follow-up
                    next_agent = self._get_next_agent(last_message)
                    return ToolCall(
                        tool_name=next_agent.name,
                        message=f"Continue with: {last_message.content}"
                    )

            return ErrorMessage(error=RuntimeError(f"Unsupported role: {last_message.role}"))

        except Exception as e:
            return ErrorMessage(error=e)

    def _select_agent(self, message: MessageItem) -> Optional[AgentMeta]:
        """Select agent based on message content."""
        # Your selection logic here
        return self._agents[0] if self._agents else None

    def _is_complete(self, message: MessageItem) -> bool:
        """Check if task is complete."""
        content = message.content.lower()
        return any(indicator in content for indicator in [
            "completed", "finished", "done"
        ])

Routing strategies

Keyword-based routing

class KeywordOrchestrator(AbstractOrchestrator):
    def __init__(self, agents, **kwargs):
        super().__init__(agents=agents, **kwargs)
        self.keywords = {
            "calculator": ["calculate", "math", "add", "multiply"],
            "weather": ["weather", "temperature", "forecast"],
            "translator": ["translate", "language"]
        }

    def _select_agent(self, message: MessageItem) -> Optional[AgentMeta]:
        content = message.content.lower()

        for agent_type, keywords in self.keywords.items():
            if any(kw in content for kw in keywords):
                for agent in self._agents:
                    if agent_type in agent.name.lower():
                        return agent

        return self._agents[0] if self._agents else None

Round-robin routing

class RoundRobinOrchestrator(AbstractOrchestrator):
    def __init__(self, agents, **kwargs):
        super().__init__(agents=agents, **kwargs)
        self.current_index = 0

    def _select_agent(self, message: MessageItem) -> Optional[AgentMeta]:
        if not self._agents:
            return None

        agent = self._agents[self.current_index]
        self.current_index = (self.current_index + 1) % len(self._agents)
        return agent

Task-based routing

class TaskOrchestrator(AbstractOrchestrator):
    def __init__(self, agents, **kwargs):
        super().__init__(agents=agents, **kwargs)
        self.task_map = {
            "data_analysis": "AnalystAgent",
            "customer_support": "SupportAgent",
            "billing": "BillingAgent"
        }

    def _select_agent(self, message: MessageItem) -> Optional[AgentMeta]:
        # Analyze message to determine task type
        task_type = self._identify_task(message)
        agent_name = self.task_map.get(task_type)

        if agent_name:
            for agent in self._agents:
                if agent.name == agent_name:
                    return agent

        return None

Use memory in orchestrators

Use RequestContext to persist and retrieve orchestration state across turns:
from agenticai_core.runtime.sessions.request_context import RequestContext

class StatefulOrchestrator(AbstractOrchestrator):
    async def _handle_message(self, conversation: List[MessageItem]) -> MessageItem:
        context = RequestContext()
        memory = context.get_memory()

        # Get orchestration state
        state = await memory.get_content('orchestrator_state', {
            'last_agent': 1,
            'task_count': 1
        })

        # Your routing logic
        selected_agent = self._select_agent(conversation[-1])

        # Update state
        if state.success and state.data:
            await memory.set_content('orchestrator_state', {
                'last_agent': selected_agent.name,
                'task_count': state.data.get('task_count', 0) + 1
            })

        return ToolCall(
            tool_name=selected_agent.name,
            message=conversation[-1].content
        )

Add distributed tracing

Decorate _handle_message and _select_agent with @tracer.observe to capture routing spans:
from agenticai_core.runtime.trace._langfuse_tracer import Tracer

tracer = Tracer()

class TracedOrchestrator(AbstractOrchestrator):
    @tracer.observe(span_name="Orchestrator._handle_message", kind="Orchestrator")
    async def _handle_message(self, conversation: List[MessageItem]) -> MessageItem:
        """Handle message with distributed tracing."""
        last_message = conversation[-1]

        if last_message.role == 'user':
            selected_agent = self._select_agent(last_message)

            return ToolCall(
                tool_name=selected_agent.name,
                message=last_message.content,
                thought=f"Routing to {selected_agent.name}"
            )

        # Handle other cases...

    @tracer.observe(metadata={"operation": "agent_selection"})
    def _select_agent(self, message: MessageItem):
        """Agent selection with tracing."""
        # Selection logic
        return self._agents[0]

Common orchestration flows

User → agent → user (single-agent turn):
# User query → select agent → agent processes → return to user
role='user' → route to agent → role='tool' → route_to_user
User → agent → agent → user (chained agents):
# Chain multiple agents before returning
role='user' → Agent1 → Agent2 → route_to_user
User → user (direct response, no agent):
# Direct clarification request
role='user' → route_to_user (no suitable agent)

Register the orchestrator

Pass the orchestrator class to app.start:
from src.orchestrator.custom_orchestrator import CustomOrchestrator

app.start(
    orchestrator_cls=CustomOrchestrator,
    host="0.0.0.0",
    port=8080
)

Best practices

  • message field: This is the most important field in ToolCall. For agents, it contains the task or question to process. For route_to_user, it is the exact response the user sees.
  • Error handling: Handle both success and error cases. Return ErrorMessage for routing failures and always implement a route_to_user fallback.
  • Logging: Use thought and reason fields for debugging routing decisions. Log agent selection outcomes and track agent performance.
  • State management: Use memory stores to track conversation context and agent selection history across turns.
  • Performance: Keep routing logic lightweight. Avoid blocking calls in _handle_message. Use tracing to identify latency bottlenecks.