Interface StreamableAgent<TInput, TOutput>

Interface for AI Agents that support streaming execution.

Streaming allows agents to emit partial results as they become available, providing a better user experience for long-running operations.

class StreamingChatAgent
extends AIAgent<ChatInput, ChatOutput>
implements StreamableAgent<ChatInput, ChatOutput>
{
async *executeStream(input: ChatInput): AsyncIterable<StreamChunk<ChatOutput>> {
const stream = await this.llmProvider.streamChat(input.messages);

let accumulated = '';
for await (const chunk of stream) {
accumulated += chunk.content;

yield {
data: { message: accumulated },
done: false,
metadata: { tokens: chunk.tokens }
};
}

yield {
data: { message: accumulated },
done: true,
metadata: { tokens: totalTokens }
};
}
}

// Use the streaming agent
const agent = new StreamingChatAgent(...);

for await (const chunk of agent.executeStream(input)) {
console.log(chunk.data.message);
if (chunk.done) {
console.log('Streaming complete');
}
}
interface StreamableAgent<TInput, TOutput> {
    name: string;
    description: string;
    version: AgentVersion;
    capabilities: string[];
    model: ModelConfig;
    _context?: AgentContext;
    _memory?: AgentMemory;
    retryConfig?: RetryConfig;
    timeoutMs?: number;
    execute(input: TInput): Promise<AgentResult<TOutput>>;
    executeWithEvents(input: TInput): Promise<AgentResult<TOutput>>;
    beforeExecute(input: TInput): Promise<void>;
    afterExecute(result: AgentResult<TOutput>): Promise<void>;
    onError(error: Error): Promise<void>;
    setContext(context: AgentContext): void;
    setMemory(memory: AgentMemory): void;
    setRetryConfig(config: RetryConfig): void;
    setTimeout(ms: number): void;
    executeWithRetry(input: TInput): Promise<AgentResult<TOutput>>;
    remember(
        key: string,
        value: unknown,
        type?: "short" | "long",
    ): Promise<void>;
    recall(key: string): Promise<unknown>;
    searchMemory(query: string, limit?: number): Promise<unknown[]>;
    forget(type?: "short" | "long" | "all"): Promise<void>;
    getAgentId(): AgentId;
    hasCapability(capability: string): boolean;
    toMetadata(): {
        id: string;
        name: string;
        description: string;
        version: string;
        capabilities: string[];
        model: ModelConfig;
    };
    recordToolUsage(
        toolName: string,
        toolArguments: Record<string, unknown>,
        toolResult?: unknown,
    ): void;
    executeStream(input: TInput): AsyncIterable<StreamChunk<TOutput>>;
    executeStreamWithCallbacks(
        input: TInput,
        options: StreamOptions,
    ): Promise<AgentResult<TOutput>>;
    record(event: DomainEvent): void;
    pullDomainEvents(): DomainEvent[];
    hasDomainEvents(): boolean;
    get id(): EntityId<T>;
    get createdAt(): Date;
    get updatedAt(): Date;
    touch(): void;
    equals(other: Entity<"AIAgent">): boolean;
}

Type Parameters

  • TInput

    The type of input the agent accepts

  • TOutput

    The type of output the agent produces

Hierarchy (View Summary)

Accessors

  • get createdAt(): Date

    Gets the timestamp when this entity was created.

    Returns Date

  • get updatedAt(): Date

    Gets the timestamp when this entity was last updated.

    Returns Date

Methods

  • Protected

    Main execution method for the agent. Must be implemented by concrete agent classes.

    This method is called internally by executeWithEvents() and should contain the core agent logic. Domain events are automatically recorded by the wrapper.

    Parameters

    • input: TInput

      The input data for the agent

    Returns Promise<AgentResult<TOutput>>

    A promise resolving to the agent result

  • Executes the agent with automatic domain event recording. This is the public method that should be called by orchestrators and clients.

    Records the following events:

    • AgentExecutionStarted: When execution begins
    • AgentExecutionCompleted: When execution succeeds
    • AgentExecutionFailed: When execution fails

    Parameters

    • input: TInput

      The input data for the agent

    Returns Promise<AgentResult<TOutput>>

    A promise resolving to the agent result

  • Stores a value in agent memory

    Parameters

    • key: string

      The key to store under

    • value: unknown

      The value to store

    • type: "short" | "long" = 'short'

      Memory type: 'short' for session, 'long' for persistent

    Returns Promise<void>

  • Retrieves a value from agent memory

    Parameters

    • key: string

      The key to retrieve

    Returns Promise<unknown>

    The stored value, or null if not found

  • Searches memory semantically

    Parameters

    • query: string

      The search query

    • limit: number = 5

      Maximum results to return

    Returns Promise<unknown[]>

    Array of relevant values

  • Removes a value from memory

    Parameters

    • type: "short" | "long" | "all" = 'short'

      Type of memory to clear

    Returns Promise<void>

  • Gets agent metadata as a plain object

    Returns {
        id: string;
        name: string;
        description: string;
        version: string;
        capabilities: string[];
        model: ModelConfig;
    }

  • Protected

    Records a tool used event Should be called by subclasses when they use tools

    Parameters

    • toolName: string

      Name of the tool used

    • toolArguments: Record<string, unknown>

      Arguments passed to the tool

    • OptionaltoolResult: unknown

      Result from the tool execution

    Returns void

  • Executes the agent with streaming, emitting partial results as they become available.

    Parameters

    • input: TInput

      The input data for the agent

    Returns AsyncIterable<StreamChunk<TOutput>>

    An async iterable of stream chunks

    for await (const chunk of agent.executeStream(input)) {
    if (!chunk.done) {
    console.log('Partial:', chunk.data);
    } else {
    console.log('Complete:', chunk.data);
    }
    }
  • Executes the agent with streaming and invokes callbacks for each chunk.

    This is a convenience method for when you want callback-based streaming instead of async iteration.

    Parameters

    • input: TInput

      The input data for the agent

    • options: StreamOptions

      Streaming options with callbacks

    Returns Promise<AgentResult<TOutput>>

    A promise that resolves with the final result

    const result = await agent.executeStreamWithCallbacks(input, {
    onChunk: (chunk) => console.log('Chunk:', chunk.data),
    onComplete: (result) => console.log('Done:', result.data),
    onError: (error) => console.error('Error:', error),
    signal: abortController.signal
    });
  • Protected

    Records a domain event that occurred within this aggregate. Events are stored internally and can be retrieved later for publishing.

    Parameters

    Returns void

    this.record({
    occurredAt: new Date(),
    userId: this.id.value,
    email: 'user@example.com'
    });
  • Retrieves and clears all domain events recorded by this aggregate. This is typically called after the aggregate is persisted, to publish the events.

    Returns DomainEvent[]

    An array of domain events

    await repository.save(order);
    const events = order.pullDomainEvents();
    await eventBus.publish(events);
  • Protected

    Updates the updatedAt timestamp to the current time. Should be called whenever the entity's state changes.

    Returns void

  • Compares this entity with another for equality based on identity. Two entities are equal if they have the same ID and are of the same type.

    Parameters

    • other: Entity<"AIAgent">

      The other entity to compare with

    Returns boolean

    true if the entities are equal, false otherwise

Properties

name: string

Human-readable name of the agent

description: string

Description of what the agent does

version: AgentVersion
capabilities: string[]

Capabilities this agent has

LLM model configuration

_context?: AgentContext

Current execution context

_memory?: AgentMemory
retryConfig?: RetryConfig
timeoutMs?: number