Skip to content

Worker Integration

Sol orchestrates external workers through a ZMQ DEALER-ROUTER protocol. Any language that supports ZMQ and msgpack can implement a worker that connects to Sol, receives tasks, and reports results.

Architecture Overview

Sol ROUTER (5555) <--> Worker DEALER
Sol PUB    (5556) ---> Event subscribers

Workers connect as DEALER sockets to Sol's ROUTER gateway. Tasks are dispatched in msgpack format. Workers stream partial output, report results, and signal readiness for the next task.

Connection Lifecycle

  1. Open a DEALER socket, optionally set ZMQ.IDENTITY
  2. Connect to tcp://<host>:5555
  3. Send a ready message with capabilities
  4. Receive task messages from Sol
  5. Stream token messages during execution
  6. Send result or error when complete
  7. Send ready again for the next task

Protocol Reference

See ZMQ Protocol Reference for the full wire specification.

Message Format

All messages are single-frame msgpack maps with binary string keys. The DEALER socket adds an empty delimiter frame automatically.

Worker-to-Sol Messages

TypePurpose
readyAnnounce availability and capabilities
tokenStream partial output
resultReport task completion
errorReport task failure
logDiagnostic log message

Sol-to-Worker Messages

TypePurpose
taskDispatch work
cancelAbort running task
spawnCreate child task

Python Worker

python
import msgpack
import zmq
import uuid

def run_worker(server_url="tcp://localhost:5555"):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.DEALER)
    worker_id = f"python-{uuid.uuid4().hex[:8]}"
    sock.identity = worker_id.encode()
    sock.connect(server_url)

    sock.send(msgpack.packb({
        b"type": b"ready",
        b"worker_id": worker_id.encode(),
        b"capabilities": [b"tools", b"streaming"]
    }, use_bin_type=True))

    while True:
        msg = sock.recv()
        task = msgpack.unpackb(msg, raw=False)

        if task["type"] == "task":
            task_id = task["task_id"]
            prompt = task["prompt"]

            for word in prompt.split()[:5]:
                sock.send(msgpack.packb({
                    b"type": b"token",
                    b"task_id": task_id.encode(),
                    b"content": (word + " ").encode()
                }, use_bin_type=True))

            sock.send(msgpack.packb({
                b"type": b"result",
                b"task_id": task_id.encode(),
                b"status": b"ok",
                b"content": f"Processed: {prompt}".encode()
            }, use_bin_type=True))

            sock.send(msgpack.packb({
                b"type": b"ready",
                b"worker_id": worker_id.encode(),
                b"capabilities": [b"tools", b"streaming"]
            }, use_bin_type=True))

        elif task["type"] == "cancel":
            task_id = task["task_id"]
            sock.send(msgpack.packb({
                b"type": b"result",
                b"task_id": task_id.encode(),
                b"status": b"cancelled",
                b"content": b""
            }, use_bin_type=True))

            sock.send(msgpack.packb({
                b"type": b"ready",
                b"worker_id": worker_id.encode(),
                b"capabilities": [b"tools", b"streaming"]
            }, use_bin_type=True))

    sock.close()
    ctx.term()

if __name__ == "__main__":
    run_worker()

Go Worker

go
package main

import (
	"fmt"
	"log"

	"github.com/pebbe/zmq4"
	"github.com/vmihailenco/msgpack/v5"
)

type Ready struct {
	Type         string   `msgpack:"type"`
	WorkerID     string   `msgpack:"worker_id"`
	Capabilities []string `msgpack:"capabilities"`
}

type Task struct {
	Type   string `msgpack:"type"`
	TaskID string `msgpack:"task_id"`
	Prompt string `msgpack:"prompt"`
}

type Result struct {
	Type   string `msgpack:"type"`
	TaskID string `msgpack:"task_id"`
	Status string `msgpack:"status"`
	Content string `msgpack:"content"`
}

func main() {
	sock, _ := zmq4.NewSocket(zmq4.DEALER)
	defer sock.Close()
	sock.SetIdentity("go-worker-1")
	sock.Connect("tcp://localhost:5555")

	ready := Ready{
		Type:         "ready",
		WorkerID:     "go-worker-1",
		Capabilities: []string{"tools"},
	}
	data, _ := msgpack.Marshal(&ready)
	sock.Send(string(data), 0)

	for {
		msg, _ := sock.Recv(0)
		var task Task
		msgpack.Unmarshal([]byte(msg), &task)

		if task.Type == "task" {
			result := Result{
				Type:    "result",
				TaskID:  task.TaskID,
				Status:  "ok",
				Content: fmt.Sprintf("Go processed: %s", task.Prompt),
			}
			data, _ := msgpack.Marshal(&result)
			sock.Send(string(data), 0)

			data, _ = msgpack.Marshal(&ready)
			sock.Send(string(data), 0)
		}
	}
}

Rust Worker

rust
use zmq;
use serde::{Serialize, Deserialize};
use serde_json::json;

fn main() {
    let ctx = zmq::Context::new();
    let sock = ctx.socket(zmq::DEALER).unwrap();
    sock.set_identity(b"rust-worker-1").unwrap();
    sock.connect("tcp://localhost:5555").unwrap();

    let ready = json!({
        "type": "ready",
        "worker_id": "rust-worker-1",
        "capabilities": ["tools"]
    });
    sock.send(&serde_json::to_vec(&ready).unwrap(), 0).unwrap();

    loop {
        let msg = sock.recv_bytes(0).unwrap();
        let task: serde_json::Value = serde_json::from_slice(&msg).unwrap();

        if task["type"] == "task" {
            let task_id = task["task_id"].as_str().unwrap();
            let prompt = task["prompt"].as_str().unwrap();

            let result = json!({
                "type": "result",
                "task_id": task_id,
                "status": "ok",
                "content": format!("Rust processed: {}", prompt)
            });
            sock.send(&serde_json::to_vec(&result).unwrap(), 0).unwrap();

            sock.send(&serde_json::to_vec(&ready).unwrap(), 0).unwrap();
        }
    }
}

Error Handling

ScenarioWorker Action
Invalid task payloadSend error with description, then ready
Task execution failsSend error with failure reason, then ready
Cancel receivedAbort work, send result with "cancelled", then ready
Connection lostZMQ DEALER reconnects automatically; send ready on reconnect
Malformed msgpackLog and drop; Sol also logs warnings for bad messages

PUB/SUB Event Bridge

Workers can subscribe to Sol's PUB socket on port 5556 for real-time events:

python
sub = ctx.socket(zmq.SUB)
sub.connect("tcp://localhost:5556")
sub.setsockopt_string(zmq.SUBSCRIBE, "")

while True:
    msg = sub.recv()
    event = msgpack.unpackb(msg, raw=False)
    print(f"Event: {event['type']}")

Reference Implementations

LanguageFileDescription
Lua (Luna)client/core/zmq_worker.luaFull agent with streaming, cancel, IOLoop
Pythonexample/python/zmq_worker.pyReference worker
Erlang (Sol)server/src/sol_zmq_gateway.erlROUTER gateway
Erlang (Protocol)server/src/sol_zmq_protocol.erlmsgpack encode/decode

Released under the MIT License.