Jump to content
Main menu
Main menu
move to sidebar
hide
Navigation
Main page
Recent changes
Random page
freem
Search
Search
Appearance
Create account
Log in
Personal tools
Create account
Log in
Pages for logged out editors
learn more
Contributions
Talk
Editing
Openai/694057b6-101c-8007-9a65-c40578c7252d
(section)
Add languages
Page
Discussion
English
Read
Edit
Edit source
View history
Tools
Tools
move to sidebar
hide
Actions
Read
Edit
Edit source
View history
General
What links here
Related changes
Special pages
Page information
Appearance
move to sidebar
hide
Warning:
You are not logged in. Your IP address will be publicly visible if you make any edits. If you
log in
or
create an account
, your edits will be attributed to your username, along with other benefits.
Anti-spam check. Do
not
fill this in!
=== ## === llama_worker is a Python 3 module (fully typed with PEP 484 + typing_extensions) that supervises a single <code>llama-server</code> subprocess from llama.cpp and provides an asyncio-native, slot-limited, resilient interface for running chat-style inference requests against it. It is intended to be used by a higher-level “hivemind” orchestrator that runs multiple workers (different models / GPUs / profiles), routes work between them, and prefers nuke & repave (restart broken workers) over fragile replay. ==== This module provides: ==== * Subprocess supervision for llama-server: - start/stop/restart - hard guarantees against orphan processes * Async request execution: - submit() returns immediately (with a request id) - caller polls for status/result * Admission control via concurrency slots: - no internal queue by default * Robust failure handling: - detect dead/stalled/unreachable server - restart the subprocess - fail in-flight requests with explicit reasons * Prompting with a worker-owned BIOS system prompt layer: - hivemind/cooperation guidance - runtime metadata (date/time, tool budgets) - stable, testable, separately generated * Tools: - normal OpenAI-format tool calling with a pluggable ToolRunner (round-trip) - fallback tool-call parsing method when needed * Exit tools (one-way control signals): - provided at init - recorded when emitted by the model - never alter control flow (beyond whatever the model does) * Internal streaming: - used for monitoring, progress detection, loop detection - progress = any data flowing after headers * Loop mitigation: - rely on max_tokens - plus a simple repeated-line early-kill detector ==== 1. One worker = one <code>llama-server</code> process (one model per process; may span multiple GPUs if configured). ==== # Async-first API: explicitly asyncio-native. # Slot-based concurrency: accept up to slots concurrent in-flight requests; otherwise reject immediately. # Nuke & repave reliability: - detect dead/stalled/unreachable server - restart subprocess - fail in-flight requests with explicit reasons (no replay) # Long-prefill-friendly operation: - time-to-first-token can be minutes to tens of minutes - avoid false restarts by using liveness/progress-based timeouts # OpenAI-format tool calling: - ToolRunner is pluggable - tool mechanism must not hardcode assumptions about tool “weight” # BIOS prompt layer: - inject stable platform/hivemind guidance + runtime metadata - generated via separate, testable method/component # Exit tools / control signals: - model emits structured signals upward - worker records them but does not change behavior # Simple early loop kill: - repeated-line detector to stop the worst degenerate loops early # Forward-compatible parameters: * accept an open params mapping and pass through unknown keys unchanged # Engineering style: * simplicity, clarity, robustness, strong tests > clever tricks * avoid wasteful compute in CPU/RAM constrained environments ==== - In-place model swapping or reconfiguration within a running worker. ==== * Replay/resume of in-flight requests after restart. * Global scheduling across workers (belongs in orchestrator). * Complex token analytics or heavyweight monitoring agents. ==== - The module is asyncio-native. ==== * Public APIs are async def, intended to run in an asyncio event loop. * Thread-safety is not a v1 requirement; call methods from a consistent async context. ==== ### ==== * <code>LlamaWorker</code> - owns the subprocess and HTTP transport - manages slots and request lifecycle - assembles prompts (BIOS + caller system + conversation) - streams responses internally and accumulates full output - runs tool loop and parses/records exit-tools - supervises health and restarts * <code>ToolRunner</code> (plugin) - executes normal round-trip tools - may be lightweight or heavy; worker must not assume * Request records - store full output until retrieved - store minimal status and optional nice-to-haves ===== BIOS generation and prompt/message assembly must be separate methods/components, not inline inside transport or request execution. ===== This is both a rule and a guiding approach: * When changing behavior, identify the layer (supervision/transport/prompting/tool loop/parsing/state). * Prefer small, pure-ish, unit-testable functions over clever shared logic. * Keep policy (timeouts, budgets, prompts) data/config-driven, separate from mechanics. ==== - llama-server runs in its own process group/session. ==== * stop() must ensure no orphaned processes: - SIGTERM process group → wait → SIGKILL process group if needed * Capture stdout/stderr into a bounded ring buffer for debug info. * Port is assigned externally and passed in configuration. ==== - Worker has slots: int. ==== * Slots represent “whatever maps best to having multiple queries in flight at once.” - Implementation will treat a slot as permission to dispatch one concurrent HTTP streaming request. - Slots are admission control, not a guarantee of linear throughput. * If slots are full, submit() returns immediately with NO_SLOT_AVAILABLE. * No internal queue by default. ==== - Request IDs are incrementing integers per worker lifetime (1, 2, 3…). ==== * Caller supplies a <code>job_name</code> string per request for correlation/logging. ==== ### ==== * async start() -> None * async stop() -> None * (internal) async restart(reason: str) -> None ===== - async submit(job_name: str, system_prompt: str, user_prompt: str, *, params: Mapping[str, Any] | None = None) -> SubmitResult - returns immediately: - success: request_id - failure: NO_SLOT_AVAILABLE / WORKER_NOT_READY / WORKER_FAILED ===== * async get_status(request_id: int) -> RequestStatus | NOT_FOUND * async get_result(request_id: int) -> RequestResult | NOT_FOUND * async cancel(request_id: int) -> bool (best effort) Result retrieval releases resources (locked in): * get_result(request_id) on a terminal request returns the result and releases all stored state/output for that request id. * After release, get_status/get_result returns NOT_FOUND (or equivalent stable “released” code). ===== - async get_worker_status() -> WorkerStatus ===== * async get_debug_info() -> WorkerDebugInfo ==== - params is an open mapping passed into the OpenAI-compatible llama-server request payload. ==== * The worker: - merges required fields it controls (messages/tools/stream flags), - passes unknown keys through unchanged, - does not require module edits when llama.cpp adds new parameters. ==== ### ==== The BIOS prompt includes: * stable platform/hivemind guidance (cooperating agent context) * current date/time + timezone * tool iteration budgets and constraints * instructions for normal tools and exit tools BIOS is regenerated: * at request start * before each post-tool continuation ===== 1. BIOS system prompt ===== # caller’s system prompt # conversation messages (user/assistant/tool) Implementation may emit multiple system messages or a combined message with delimiters if needed for backend compatibility. ==== - Tools follow OpenAI function-calling schema. ==== * Worker responsibilities: - expose tool definitions to model - detect tool calls - execute via ToolRunner - append tool result message(s) - continue generation until completion or tool-iteration budget is exhausted Fallback tool parsing (acceptable): * If native structured tool calls are unreliable, worker may use a BIOS-enforced structured JSON convention and strict parsing. * Tool parsing failure is a request-level terminal error (or can be signaled via exit-tools if configured). ==== Exit tools never terminate output or change control flow beyond what the model does. ==== They exist to convey structured information upward. * Exit tools are provided at worker init as OpenAI-format tool definitions. * Worker includes them so the model knows its signaling options. * If the model emits an exit-tool call, the worker records a structured signal entry. * Orchestrator may react (route, cancel, escalate), but worker does not auto-stop/restart based on signals. Priority behavior (locked in): * Normal tool calls are processed normally. * Exit tool calls are recorded whenever they appear. * Models will be encouraged via BIOS to emit exit-tools near completion, but correctness does not depend on that. ==== - Worker uses streaming internally for all requests. ==== * Progress definition (locked in): any response data flowing after headers counts as progress. * Track timestamps: - last_stream_byte_at - last_liveness_at (prefill probes) - last_progress_at = max(last_stream_byte_at, last_liveness_at) Prefill liveness baseline: * process alive * /proc/<pid> CPU time delta (Linux) ==== - No per-request timeout overrides. ==== * Timeouts must tolerate long prefills (minutes to tens of minutes). * TimeoutProfile includes: - connect timeout (short) - headers timeout (moderate) - TTFT timeout usually disabled/None - prefill liveness timeout (large/None) - idle stream timeout (no bytes once streaming) - optional absolute timeout (often None) - liveness probe interval - restart backoff + crash-loop limits Recovery is nuke & repave: * restart subprocess on unhealthy/stalled/disconnected * fail in-flight requests (no replay) ==== - Primary: max_tokens (max new tokens) via params (default per worker). ==== * Secondary: repeated-line detector cancels the request when the same sufficiently-long line repeats N times consecutively. ==== - Full output is accumulated in memory while running. ==== * On successful get_result(), all stored output and state for that request is released. ==== ### ==== Minimum is: * READY * RUNNING * FAILED Optional nice-to-haves: * tokens received * tokens/sec * partial output size ===== - bounded ring buffer of recent subprocess logs ===== * recent restart reasons/timestamps ==== Default readiness/health probe: ==== * GET /v1/models must return HTTP 200 and parse as JSON → worker is READY. ==== Prioritize: ==== * simplicity and correctness * clear invariants * strong tests over cleverness ===== - slot accounting and request state transitions ===== * BIOS prompt generation and message ordering (isolated, pure-ish) * normal tool loop (native + fallback parsing) * exit tools recording (ensures non-terminating behavior) * repeated-line detector behavior * timeout bookkeeping with simulated progress/liveness ===== - llama-server subprocess process-group teardown (no orphans) ===== * readiness probe behavior * long prefill does not false-timeout * disconnect/stall triggers restart and fails in-flight requests * malformed tool calls and parsing failures === <syntaxhighlight lang="python">from __future__ import annotations === from dataclasses import dataclass from datetime import datetime from enum import Enum from typing import Any, Mapping, Sequence from typing_extensions import ( Literal, NotRequired, Protocol, TypedDict, runtime_checkable, ) class WorkerState(str, Enum): RUNNING = "running" READY = "ready" FAILED = "failed" STOPPED = "stopped" class RequestState(str, Enum): RUNNING = "running" TOOL_RUNNING = "tool_running" COMPLETED = "completed" FAILED = "failed" CANCELED = "canceled" RequestFailReason = Literal[ "worker_restarted", "server_died", "connect_failed", "headers_timeout", "stall_timeout", "tool_parse_error", "tool_execution_error", "repeated_line_loop", "canceled", "unknown_error", ] RequestFinishReason = Literal[ "stop", "max_tokens", "canceled", "failed", ] class ChatMessage(TypedDict, total=False): role: Literal["system", "user", "assistant", "tool"] content: str name: str tool_call_id: str tool_calls: Any class ToolFunctionDef(TypedDict, total=False): name: str description: str parameters: dict[str, Any] class ToolDef(TypedDict, total=False): type: Literal["function"] function: ToolFunctionDef class ToolCall(TypedDict, total=False): id: str type: Literal["function"] function: dict[str, Any] # expects {"name": str, "arguments": str|dict} class ExitSignal(TypedDict, total=False): tool_name: str arguments: dict[str, Any] emitted_at: float class SubmitOk(TypedDict): ok: Literal[True] request_id: int class SubmitErr(TypedDict): ok: Literal[False] error: Literal["NO_SLOT_AVAILABLE", "WORKER_NOT_READY", "WORKER_FAILED"] SubmitResult = SubmitOk | SubmitErr class RequestStatus(TypedDict, total=False): request_id: int job_name: str state: RequestState created_at: float dispatched_at: NotRequired[float] completed_at: NotRequired[float] last_progress_at: NotRequired[float] output_chars: NotRequired[int] # optional nice-to-haves tokens_received: NotRequired[int] tokens_per_second: NotRequired[float] tool_iters_remaining: NotRequired[int] signals: NotRequired[list[ExitSignal]] fail_reason: NotRequired[RequestFailReason] fail_detail: NotRequired[str] class RequestResult(TypedDict, total=False): request_id: int job_name: str state: Literal["completed", "failed", "canceled"] finish_reason: RequestFinishReason text: str signals: NotRequired[list[ExitSignal]] fail_reason: NotRequired[RequestFailReason] fail_detail: NotRequired[str] class NotFound(TypedDict): ok: Literal[False] error: Literal["NOT_FOUND"] GetResultResponse = RequestResult | NotFound GetStatusResponse = RequestStatus | NotFound class WorkerStatus(TypedDict, total=False): state: WorkerState slots_total: int slots_used: int active_request_ids: list[int] restart_count: int last_error: NotRequired[str] last_ready_at: NotRequired[float] class WorkerDebugInfo(TypedDict, total=False): recent_logs: list[str] recent_restart_reasons: list[str] @dataclass(frozen=True, slots=True) class TimeoutProfile: connect_timeout_s: float headers_timeout_s: float ttft_timeout_s: float | None prefill_liveness_timeout_s: float | None idle_stream_timeout_s: float | None absolute_timeout_s: float | None liveness_probe_interval_s: float restart_backoff_s: float restart_window_s: float max_restarts_per_window: int @dataclass(frozen=True, slots=True) class BiosContext: now: datetime timezone_name: str worker_name: str tool_iters_remaining: int normal_tools: Sequence[ToolDef] exit_tools: Sequence[ToolDef] bios_version: str = "bios-v1" @runtime_checkable class BiosProvider(Protocol): def __call__(self, ctx: BiosContext) -> str: ... def build_message_stack( *, bios_text: str, caller_system_prompt: str, conversation: Sequence[ChatMessage], ) -> list[ChatMessage]: """Pure function: returns full message list in required order.""" raise NotImplementedError @runtime_checkable class ToolRunner(Protocol): async def run_tool( self, *, name: str, arguments: dict[str, Any], request_id: int, job_name: str, ) -> Any: ... GenerationParams = Mapping[str, Any] @dataclass(frozen=True, slots=True) class WorkerConfig: name: str host: str port: int server_cmd: Sequence[str] env: Mapping[str, str] slots: int timeouts: TimeoutProfile normal_tools: Sequence[ToolDef] tool_runner: ToolRunner | None exit_tools: Sequence[ToolDef] bios_provider: BiosProvider timezone_name: str @dataclass(frozen=True, slots=True) class ReadinessProbe: method: Literal["GET"] = "GET" path: str = "/v1/models" </syntaxhighlight> If you want to proceed, the natural next section is Appendix B: internal module layout (classes/files and responsibilities) so the code stays clean and test-driven from day one.
Summary:
Please note that all contributions to freem are considered to be released under the Creative Commons Attribution-ShareAlike 4.0 (see
Freem:Copyrights
for details). If you do not want your writing to be edited mercilessly and redistributed at will, then do not submit it here.
You are also promising us that you wrote this yourself, or copied it from a public domain or similar free resource.
Do not submit copyrighted work without permission!
Cancel
Editing help
(opens in new window)