> ## Documentation Index
> Fetch the complete documentation index at: https://koreai.mintlify.app/llms.txt
> Use this file to discover all available pages before exploring further.

# Custom Orchestration

> Implement custom orchestration logic to route messages between agents and users in the AgenticAI Core SDK.

# Custom Orchestration

The orchestrator routes each conversation turn to the right agent and controls the flow of information between agents and users. Implement `AbstractOrchestrator` to define custom routing logic.

## Prerequisites

* AgenticAI Core SDK installed and configured.
* At least one agent configured and converted to `AgentMeta`. See [Creating Agents](/agent-platform/sdk/guide/creating-agents).

## Message handling protocol

### Incoming messages

The orchestrator's `_handle_message` method receives a `List[MessageItem]`. The last item is either:

* A user message (`role='user'`) — when the user sends a query.
* An agent response (`role='tool'`) — when an agent completes its task.

### Outgoing messages

The method must return one of:

| Return type    | When to use                                |
| -------------- | ------------------------------------------ |
| `ToolCall`     | Route to an agent or back to the user.     |
| `ErrorMessage` | When routing fails or an exception occurs. |

`ToolCall` fields:

* `tool_name` — the agent to invoke, or `"route_to_user"` to respond to the user.
* `message` — the content sent to the agent or shown to the user.
* `input`, `thought`, `reason` — optional context for debugging and tracing.

### Special agent: route\_to\_user

`route_to_user` is a built-in proxy for the user. Use it to deliver final answers or request clarification. The `message` field in the `ToolCall` is exactly what the user sees.

## Create a custom orchestrator

Subclass `AbstractOrchestrator` and override `_handle_message`:

```python theme={null}
from agenticai_core.designtime.models.agent import AgentMeta
from agenticai_core.runtime.agents.abstract_orchestrator import AbstractOrchestrator
from agenticai_core.runtime.message_item import MessageItem, ToolCall, ErrorMessage
from typing import List, Optional

class CustomOrchestrator(AbstractOrchestrator):
    """Custom orchestrator with keyword-based routing."""

    def __init__(
        self,
        agents: List[AgentMeta],
        name: str = "custom",
        description: str = "Custom keyword-based orchestrator"
    ):
        super().__init__(name=name, agents=agents, description=description)

    async def _handle_message(self, conversation: List[MessageItem]) -> MessageItem:
        """
        Override this method to implement your orchestration logic.

        Protocol:
        - Incoming: MessageItem with role='user' or role='tool'
        - Outgoing: ToolCall (next agent) or ErrorMessage (failure)
        """
        last_message = conversation[-1]

        try:
            # Handle user query
            if last_message.role == 'user':
                selected_agent = self._select_agent(last_message)

                if selected_agent:
                    return ToolCall(
                        tool_name=selected_agent.name,
                        message=last_message.content,
                        thought=f"Routing to {selected_agent.name}",
                        reason="Query matches agent capabilities"
                    )
                else:
                    return ToolCall(
                        tool_name="route_to_user",
                        message="I'm not sure how to help. Could you rephrase?",
                        thought="No suitable agent found"
                    )

            # Handle agent response
            elif last_message.role == 'tool':
                if self._is_complete(last_message):
                    return ToolCall(
                        tool_name="route_to_user",
                        message=f"Here are the results: {last_message.content}",
                        thought="Task completed successfully"
                    )
                else:
                    # Route to another agent for follow-up
                    next_agent = self._get_next_agent(last_message)
                    return ToolCall(
                        tool_name=next_agent.name,
                        message=f"Continue with: {last_message.content}"
                    )

            return ErrorMessage(error=RuntimeError(f"Unsupported role: {last_message.role}"))

        except Exception as e:
            return ErrorMessage(error=e)

    def _select_agent(self, message: MessageItem) -> Optional[AgentMeta]:
        """Select agent based on message content."""
        # Your selection logic here
        return self._agents[0] if self._agents else None

    def _is_complete(self, message: MessageItem) -> bool:
        """Check if task is complete."""
        content = message.content.lower()
        return any(indicator in content for indicator in [
            "completed", "finished", "done"
        ])
```

## Routing strategies

### Keyword-based routing

```python theme={null}
class KeywordOrchestrator(AbstractOrchestrator):
    def __init__(self, agents, **kwargs):
        super().__init__(agents=agents, **kwargs)
        self.keywords = {
            "calculator": ["calculate", "math", "add", "multiply"],
            "weather": ["weather", "temperature", "forecast"],
            "translator": ["translate", "language"]
        }

    def _select_agent(self, message: MessageItem) -> Optional[AgentMeta]:
        content = message.content.lower()

        for agent_type, keywords in self.keywords.items():
            if any(kw in content for kw in keywords):
                for agent in self._agents:
                    if agent_type in agent.name.lower():
                        return agent

        return self._agents[0] if self._agents else None
```

### Round-robin routing

```python theme={null}
class RoundRobinOrchestrator(AbstractOrchestrator):
    def __init__(self, agents, **kwargs):
        super().__init__(agents=agents, **kwargs)
        self.current_index = 0

    def _select_agent(self, message: MessageItem) -> Optional[AgentMeta]:
        if not self._agents:
            return None

        agent = self._agents[self.current_index]
        self.current_index = (self.current_index + 1) % len(self._agents)
        return agent
```

### Task-based routing

```python theme={null}
class TaskOrchestrator(AbstractOrchestrator):
    def __init__(self, agents, **kwargs):
        super().__init__(agents=agents, **kwargs)
        self.task_map = {
            "data_analysis": "AnalystAgent",
            "customer_support": "SupportAgent",
            "billing": "BillingAgent"
        }

    def _select_agent(self, message: MessageItem) -> Optional[AgentMeta]:
        # Analyze message to determine task type
        task_type = self._identify_task(message)
        agent_name = self.task_map.get(task_type)

        if agent_name:
            for agent in self._agents:
                if agent.name == agent_name:
                    return agent

        return None
```

## Use memory in orchestrators

Use `RequestContext` to persist and retrieve orchestration state across turns:

```python theme={null}
from agenticai_core.runtime.sessions.request_context import RequestContext

class StatefulOrchestrator(AbstractOrchestrator):
    async def _handle_message(self, conversation: List[MessageItem]) -> MessageItem:
        context = RequestContext()
        memory = context.get_memory()

        # Get orchestration state
        state = await memory.get_content('orchestrator_state', {
            'last_agent': 1,
            'task_count': 1
        })

        # Your routing logic
        selected_agent = self._select_agent(conversation[-1])

        # Update state
        if state.success and state.data:
            await memory.set_content('orchestrator_state', {
                'last_agent': selected_agent.name,
                'task_count': state.data.get('task_count', 0) + 1
            })

        return ToolCall(
            tool_name=selected_agent.name,
            message=conversation[-1].content
        )
```

## Add distributed tracing

Decorate `_handle_message` and `_select_agent` with `@tracer.observe` to capture routing spans:

```python theme={null}
from agenticai_core.runtime.trace._langfuse_tracer import Tracer

tracer = Tracer()

class TracedOrchestrator(AbstractOrchestrator):
    @tracer.observe(span_name="Orchestrator._handle_message", kind="Orchestrator")
    async def _handle_message(self, conversation: List[MessageItem]) -> MessageItem:
        """Handle message with distributed tracing."""
        last_message = conversation[-1]

        if last_message.role == 'user':
            selected_agent = self._select_agent(last_message)

            return ToolCall(
                tool_name=selected_agent.name,
                message=last_message.content,
                thought=f"Routing to {selected_agent.name}"
            )

        # Handle other cases...

    @tracer.observe(metadata={"operation": "agent_selection"})
    def _select_agent(self, message: MessageItem):
        """Agent selection with tracing."""
        # Selection logic
        return self._agents[0]
```

## Common orchestration flows

**User → agent → user** (single-agent turn):

```python theme={null}
# User query → select agent → agent processes → return to user
role='user' → route to agent → role='tool' → route_to_user
```

**User → agent → agent → user** (chained agents):

```python theme={null}
# Chain multiple agents before returning
role='user' → Agent1 → Agent2 → route_to_user
```

**User → user** (direct response, no agent):

```python theme={null}
# Direct clarification request
role='user' → route_to_user (no suitable agent)
```

## Register the orchestrator

Pass the orchestrator class to `app.start`:

```python theme={null}
from src.orchestrator.custom_orchestrator import CustomOrchestrator

app.start(
    orchestrator_cls=CustomOrchestrator,
    host="0.0.0.0",
    port=8080
)
```

## Best practices

* **`message` field**: This is the most important field in `ToolCall`. For agents, it contains the task or question to process. For `route_to_user`, it is the exact response the user sees.
* **Error handling**: Handle both success and error cases. Return `ErrorMessage` for routing failures and always implement a `route_to_user` fallback.
* **Logging**: Use `thought` and `reason` fields for debugging routing decisions. Log agent selection outcomes and track agent performance.
* **State management**: Use memory stores to track conversation context and agent selection history across turns.
* **Performance**: Keep routing logic lightweight. Avoid blocking calls in `_handle_message`. Use tracing to identify latency bottlenecks.

## Related resources

* [Building Applications](/agent-platform/sdk/guide/building-apps)
* [Memory Stores](/agent-platform/sdk/guide/memory-stores)
