Worker Integration
Sol orchestrates external workers through a ZMQ DEALER-ROUTER protocol. Any language that supports ZMQ and msgpack can implement a worker that connects to Sol, receives tasks, and reports results.
Architecture Overview
Sol ROUTER (5555) <--> Worker DEALER
Sol PUB (5556) ---> Event subscribersWorkers connect as DEALER sockets to Sol's ROUTER gateway. Tasks are dispatched in msgpack format. Workers stream partial output, report results, and signal readiness for the next task.
Connection Lifecycle
- Open a DEALER socket, optionally set
ZMQ.IDENTITY - Connect to
tcp://<host>:5555 - Send a
readymessage with capabilities - Receive
taskmessages from Sol - Stream
tokenmessages during execution - Send
resultorerrorwhen complete - Send
readyagain for the next task
Protocol Reference
See ZMQ Protocol Reference for the full wire specification.
Message Format
All messages are single-frame msgpack maps with binary string keys. The DEALER socket adds an empty delimiter frame automatically.
Worker-to-Sol Messages
| Type | Purpose |
|---|---|
ready | Announce availability and capabilities |
token | Stream partial output |
result | Report task completion |
error | Report task failure |
log | Diagnostic log message |
Sol-to-Worker Messages
| Type | Purpose |
|---|---|
task | Dispatch work |
cancel | Abort running task |
spawn | Create child task |
Python Worker
python
import msgpack
import zmq
import uuid
def run_worker(server_url="tcp://localhost:5555"):
ctx = zmq.Context()
sock = ctx.socket(zmq.DEALER)
worker_id = f"python-{uuid.uuid4().hex[:8]}"
sock.identity = worker_id.encode()
sock.connect(server_url)
sock.send(msgpack.packb({
b"type": b"ready",
b"worker_id": worker_id.encode(),
b"capabilities": [b"tools", b"streaming"]
}, use_bin_type=True))
while True:
msg = sock.recv()
task = msgpack.unpackb(msg, raw=False)
if task["type"] == "task":
task_id = task["task_id"]
prompt = task["prompt"]
for word in prompt.split()[:5]:
sock.send(msgpack.packb({
b"type": b"token",
b"task_id": task_id.encode(),
b"content": (word + " ").encode()
}, use_bin_type=True))
sock.send(msgpack.packb({
b"type": b"result",
b"task_id": task_id.encode(),
b"status": b"ok",
b"content": f"Processed: {prompt}".encode()
}, use_bin_type=True))
sock.send(msgpack.packb({
b"type": b"ready",
b"worker_id": worker_id.encode(),
b"capabilities": [b"tools", b"streaming"]
}, use_bin_type=True))
elif task["type"] == "cancel":
task_id = task["task_id"]
sock.send(msgpack.packb({
b"type": b"result",
b"task_id": task_id.encode(),
b"status": b"cancelled",
b"content": b""
}, use_bin_type=True))
sock.send(msgpack.packb({
b"type": b"ready",
b"worker_id": worker_id.encode(),
b"capabilities": [b"tools", b"streaming"]
}, use_bin_type=True))
sock.close()
ctx.term()
if __name__ == "__main__":
run_worker()Go Worker
go
package main
import (
"fmt"
"log"
"github.com/pebbe/zmq4"
"github.com/vmihailenco/msgpack/v5"
)
type Ready struct {
Type string `msgpack:"type"`
WorkerID string `msgpack:"worker_id"`
Capabilities []string `msgpack:"capabilities"`
}
type Task struct {
Type string `msgpack:"type"`
TaskID string `msgpack:"task_id"`
Prompt string `msgpack:"prompt"`
}
type Result struct {
Type string `msgpack:"type"`
TaskID string `msgpack:"task_id"`
Status string `msgpack:"status"`
Content string `msgpack:"content"`
}
func main() {
sock, _ := zmq4.NewSocket(zmq4.DEALER)
defer sock.Close()
sock.SetIdentity("go-worker-1")
sock.Connect("tcp://localhost:5555")
ready := Ready{
Type: "ready",
WorkerID: "go-worker-1",
Capabilities: []string{"tools"},
}
data, _ := msgpack.Marshal(&ready)
sock.Send(string(data), 0)
for {
msg, _ := sock.Recv(0)
var task Task
msgpack.Unmarshal([]byte(msg), &task)
if task.Type == "task" {
result := Result{
Type: "result",
TaskID: task.TaskID,
Status: "ok",
Content: fmt.Sprintf("Go processed: %s", task.Prompt),
}
data, _ := msgpack.Marshal(&result)
sock.Send(string(data), 0)
data, _ = msgpack.Marshal(&ready)
sock.Send(string(data), 0)
}
}
}Rust Worker
rust
use zmq;
use serde::{Serialize, Deserialize};
use serde_json::json;
fn main() {
let ctx = zmq::Context::new();
let sock = ctx.socket(zmq::DEALER).unwrap();
sock.set_identity(b"rust-worker-1").unwrap();
sock.connect("tcp://localhost:5555").unwrap();
let ready = json!({
"type": "ready",
"worker_id": "rust-worker-1",
"capabilities": ["tools"]
});
sock.send(&serde_json::to_vec(&ready).unwrap(), 0).unwrap();
loop {
let msg = sock.recv_bytes(0).unwrap();
let task: serde_json::Value = serde_json::from_slice(&msg).unwrap();
if task["type"] == "task" {
let task_id = task["task_id"].as_str().unwrap();
let prompt = task["prompt"].as_str().unwrap();
let result = json!({
"type": "result",
"task_id": task_id,
"status": "ok",
"content": format!("Rust processed: {}", prompt)
});
sock.send(&serde_json::to_vec(&result).unwrap(), 0).unwrap();
sock.send(&serde_json::to_vec(&ready).unwrap(), 0).unwrap();
}
}
}Error Handling
| Scenario | Worker Action |
|---|---|
| Invalid task payload | Send error with description, then ready |
| Task execution fails | Send error with failure reason, then ready |
| Cancel received | Abort work, send result with "cancelled", then ready |
| Connection lost | ZMQ DEALER reconnects automatically; send ready on reconnect |
| Malformed msgpack | Log and drop; Sol also logs warnings for bad messages |
PUB/SUB Event Bridge
Workers can subscribe to Sol's PUB socket on port 5556 for real-time events:
python
sub = ctx.socket(zmq.SUB)
sub.connect("tcp://localhost:5556")
sub.setsockopt_string(zmq.SUBSCRIBE, "")
while True:
msg = sub.recv()
event = msgpack.unpackb(msg, raw=False)
print(f"Event: {event['type']}")Reference Implementations
| Language | File | Description |
|---|---|---|
| Lua (Luna) | client/core/zmq_worker.lua | Full agent with streaming, cancel, IOLoop |
| Python | example/python/zmq_worker.py | Reference worker |
| Erlang (Sol) | server/src/sol_zmq_gateway.erl | ROUTER gateway |
| Erlang (Protocol) | server/src/sol_zmq_protocol.erl | msgpack encode/decode |