Implementing AsyncGenerator Workflows for Streamed AI Dialogue Systems
The QueryEngine class serves as the central processing unit for a CLI-based AI dialogue system, managing session state, streaming responses, and resource controls. Each instance encapsulates a complete conversation lifecycle.
export class QueryEngine {
private settings: EngineConfiguration;
private sessionHistory: DialogueEvent[] = [];
private cancellationSignal: AbortController;
private blockedOperations: PermissionViolation[] = [];
private aggregatedMetrics: ResourceConsumption;
private identifiedCapabilities = new Set<string>();
private accessedResourcePaths = new Set<string>();
async *processInput(userInput: string | StructuredContent[], opts?: ProcessingOptions): AsyncGenerator<SystemMessage> {
// Core streaming logic
}
}
Input Processing Method
The primary method employs an AsyncGenerator pattern for incremental response delivery.
async *processInput(
userInput: string | StructuredContent[],
opts?: { identifier?: string; metadata?: boolean }
): AsyncGenerator<SystemMessage> {
// Implementation details
}
Execution Phases
Phase 1: Configuration Initialization
Engine parameters are unpacked and validators are established.
const {
workingDirectory,
availableCommands,
accessibleUtilities,
externalConnectors,
reasoningParameters,
maximumIterations,
spendingLimit,
taskAllocation,
authorizationDelegate,
customInstructions,
supplementalPrompt,
} = this.settings;
const instrumentedAuthorizer: AuthorizationDelegate = async (
utility, parameters, context, assistantEvent, invocationId, override
) => {
const decision = await authorizationDelegate(...);
if (decision.outcome !== 'approved') {
this.blockedOperations.push({
utility_name: utility.name,
invocation_identifier: invocationId,
supplied_parameters: parameters,
});
}
return decision;
};
Phase 2: Instruction Assembly
System directives are constructed from multiple sources.
const { baseInstructions, userEnvironment, systemEnvironment } =
await assembleSystemDirectives({
accessibleUtilities,
primaryModel: selectedPrimaryModel,
supplementaryDirectories: Array.from(
initialState.authorizationContext.supplementaryDirectories.keys()
),
externalConnectors,
customInstructions,
});
const completeInstructions = formatSystemDirectives([
...(customInstruction ? [customInstruction] : baseInstructions),
...(memoryManagementPrompt ? [memoryManagementPrompt] : []),
...(supplementalPrompt ? [supplementalPrompt] : []),
]);
Phase 3: User Command Handling
Local commands are processed without LLM invocation.
const {
processedEvents,
requiresModelQuery,
permittedUtilities,
selectedModel,
commandOutput,
} = await interpretUserCommand({
rawInput: userInput,
processingMode: 'standard',
});
Phase 4: Session Persistence
Conversation state is saved before model interaction to enable recovery.
if (enablePersistence && processedEvents.length > 0) {
const storageOperation = archiveSession(processedEvents);
if (isMinimalMode()) {
void storageOperation;
} else {
await storageOperation;
if (shouldForceWrite()) {
await commitSessionArchive();
}
}
}
Phase 5: Model Interaction Loop
The generator processes streaming responses from the language model.
for await (const event of executeQuery({
dialogueEvents: processedEvents,
systemDirectives: completeInstructions,
userEnvironment,
systemEnvironment,
authorizationDelegate: instrumentedAuthorizer,
utilityContext: commandProcessingContext,
backupModel,
maximumIterations,
taskAllocation,
})) {
// Event type handling
}
Phase 6: Resource Enforcement
Multiple guardrails prevent excessive consumption.
// Financial limit enforcement
if (spendingLimit !== undefined && calculateTotalExpense() >= spendingLimit) {
yield { type: 'outcome', category: 'budget_exceeded' };
return;
}
// Iteration limit enforcement
if (event.attachment?.type === 'iteration_limit_reached') {
yield { type: 'outcome', category: 'maximum_iterations' };
return;
}
// Structured output retry limit
if (outputSchema && attemptsThisCycle >= maximumAttempts) {
yield { type: 'outcome', category: 'schema_retry_exhausted' };
return;
}
Phase 7: Result Delivery
Final outcomes are packaged with comprehensive metadata.
let textualOutput = '';
if (outcome.type === 'assistant') {
const finalContent = last(outcome.message.content);
if (finalContent?.type === 'text') {
textualOutput = finalContent.text;
}
}
yield {
type: 'outcome',
category: 'completed',
output: textualOutput,
elapsed_ms: Date.now() - cycleStart,
api_duration_ms: calculateAPILatency(),
iteration_count: cycleCount,
total_expense: calculateTotalExpense(),
consumption: this.aggregatedMetrics,
authorization_violations: this.blockedOperations,
};
Architectural Advantages
Streaming Response Patttern
async *processInput(...) {
// Setup
for await (const event of executeQuery(...)) {
yield* standardizeEvent(event);
}
// Finalization
yield { type: 'outcome', ... };
}
Benefits include real-time feedback, efficient memory utilization, and built-in interruption support.
Authorization Monitoring
The wrapped delegate automatically logs permission denials for audit trails.
Proactive State Preservation
Saving conversation state before model interaction ensures session recoverability evenif the process terminates prematurely.
Memory Optimization
History compression is triggered via boundary markers to prevent unbounded memory growth.
if (event.subcategory === 'compaction_marker') {
const boundaryIndex = this.sessionHistory.length - 1;
if (boundaryIndex > 0) {
this.sessionHistory.splice(0, boundaryIndex);
}
}
Processing Flow
User Input
↓
interpretUserCommand() → requiresModelQuery?
↓
┌──────────────────────────────────────────┐
│ TRUE → executeQuery() calls LLM │
│ FALSE → local execution, direct return │
└──────────────────────────────────────────┘
↓
for await event of executeQuery():
↓
├→ assistant → standardizeEvent → yield
├→ progress → standardizeEvent → yield
├→ stream_event → update consumption
├→ system (compaction_marker) → compress history
└→ ...additional types
↓
Resource enforcement checks
↓
Return final outcome