thread-safe variables with automatic history.
counter = FX.CreateSignal(value=0) | run
# โ Signal('๐-a48ba7b311a6cbb2fd6d')
FX.ReadSignal(signal=counter) | run # 0
FX.SetSignal(signal=counter, value=42) | run
FX.ReadSignal(signal=counter) | run # 42
counter is a handle (not the value). Every read/write is
thread-safe. Fine.
You've got multiple threads or handlers needing to share state. Options:
| option | fast? | thread-safe? | history? |
|---|---|---|---|
| plain variable | โ | โ | โ |
variable + threading.Lock | โ | you hope | โ |
| DictDatabase | meh | โ | โ (full) |
| Signal | โ | โ (automatic) | โ (ring buffer) |
Named by UID. Lives in a process-wide registry. Every thread can read/write safely.
| operation | atomic? | why |
|---|---|---|
ReadSignal | โ | holds read lock for duration |
SetSignal | โ | holds write lock for duration |
| Read โ then โ Write | โ | lock released between the two |
UpdateSignal(transform=f) | โ | optimistic CAS + retry |
If two threads both do:
v = FX.ReadSignal(signal=counter) | run
FX.SetSignal(signal=counter, value=v + 1) | run
...one increment will be lost. Between the read and the write, another
thread can slip in and also read the same value. Both then write v+1.
Lost increment.
Fix: always use UpdateSignal with a pure transform.
FX.UpdateSignal(signal=counter, transform=add(1)) | run
How it works: reads the value, releases the lock, runs your transform, then re-acquires the lock to commit โ but only if no concurrent write landed. Otherwise it retries automatically with the fresh value.
@zef_function
def increment_by_n(x: Int) -> Int:
return x + 5
FX.UpdateSignal(signal=counter, transform=increment_by_n) | run
Since UpdateSignal retries on conflict, a transform that sends
an email or writes a file may do so multiple times. Keep transforms to
arithmetic, dict/list construction โ pure data shuffling.
sig = FX.CreateSignal(value=0, history=100) | run
# keep last 100 values
# ... many updates ...
hist = FX.ReadSignalHistory(signal=sig, count=10) | run
# [
# {'time': Time('2026-01-15 10:30:01 +0000'), 'value': 1},
# {'time': Time('2026-01-15 10:30:02 +0000'), 'value': 2},
# ...
# ]
Ring buffer: when it fills up, oldest values get overwritten. Perfect for metrics, audit trails, "what changed recently."
sig = FX.CreateSignal(value=0, type=Int) | run
FX.SetSignal(signal=sig, value=42) | run # ok
FX.SetSignal(signal=sig, value='hi') | run # error! type mismatch
# union types
sig = FX.CreateSignal(value=0, type=Int | String) | run
| you write | you read back | works | breaks |
|---|---|---|---|
int | Int32_ | x+1, x*2 | isinstance(x, int) |
str | StringShort_ | x == 'foo' | x.upper(), x.split() |
None | Nil_ | x == None | x is None |
dict | Dict1_ | x['k'], x.items() | isinstance(x, dict) |
Most operations "just work" because Zef types implement most Python protocols. But isinstance and specific-python-only methods need different approaches.
# Hour-long sliding window of request counts
metrics = FX.CreateSignal(
value={'requests': 0, 'errors': 0},
history=60, # 60 snapshots
) | run
@zef_function
def inc_requests(m: Dict) -> Dict:
return {'requests': m['requests'] + 1, 'errors': m['errors']}
@zef_function
def inc_errors(m: Dict) -> Dict:
return {'requests': m['requests'], 'errors': m['errors'] + 1}
# From any handler thread:
FX.UpdateSignal(signal=metrics, transform=inc_requests) | run
# Read a historical snapshot
FX.ReadSignalHistory(signal=metrics, count=10) | run
| need | use |
|---|---|
| thread-safe variable in one process | Signal |
| persistent across restarts | Database |
| cross-process sharing | Database (vault) |
| full history | Database (append log) |
| last N values | Signal (ring buffer) |
| single-threaded state | plain variable |
In-memory, process-scoped. When the process exits, they vanish. Think of them as the "configuration / current metrics / application state" layer. For "saved for later" data, use databases.
snapshot = FX.SignalsSnapshot() | run
# {Signal('๐-...'): <value>, Signal('๐-...'): <value>, ...}
FX.DeleteSignal(signal=counter) | run
Handy for debugging โ "what's the current state of everything?"
Build a signal-based "last 10 requests" log. Every time a request comes in, append its timestamp + path to the stored list. Never grow beyond 10.
reqs = FX.CreateSignal(value=[], history=50) | run
@zef_function
def append_cap10(lst: Array, entry: Any) -> Array:
return (list(lst) + [entry])[-10:]
# each request:
entry = {'t': FX.CurrentTime() | run, 'p': req.path}
FX.UpdateSignal(signal=reqs, transform=lambda lst: append_cap10(lst, entry)) | run
# (or wrap in a zef_function to avoid lambda restriction)
Next up: actors โ the concurrency model. โ