topics, subscribers, and things that happen on a schedule.
A topic is a named mailbox. Many publishers can send to it. Many subscribers can listen to it. Zef uses topics everywhere:
FX.SubscribeFX handlers react to topicsA topic is just an identity value. Routing happens in the Zef runtime โ topics don't "exist" as running objects, they're coordinating IDs.
When you want to react but don't need state, use SubscribeFX:
topic = ET.Topic('๐-abc...def')
# react with any ZefOp
FX.SubscribeFX(
topic=topic,
op=log, # just print
) | run
FX.Publish(target=topic, content='hello') | run
# ๐ชต: 'hello'
FX.SubscribeFX(
topic=in_topic,
op=parse_json | F.name | to_upper_case | log,
) | run
# push to another topic
FX.SubscribeFX(
topic=in_topic,
op=parse_json | F.payload | insert_into(FX.Publish(target=out_topic), 'content'),
) | run
collect in the op chainThe op passed to SubscribeFX must be a ZefOp chain, not a collecting pipeline. Drop the final | collect if you have one.
# โ wrong
op=parse_json | F.name | collect | log
# โ
right
op=parse_json | F.name | log
@zef_function
def on_msg(msg: String) -> Any:
print(f'got: {msg}')
return FX.Log(content=ET.Received(text=msg))
FX.SubscribeFX(topic=in_topic, op=on_msg) | run
If the function returns an FX (or list of FXs), they run automatically.
topic = ET.Topic(generate_uid())
# simple actor to listen
monitor = FX.StartActor(
input=topic,
initial_state=0,
rules={(Any, Any): constant_func([[FX.Print(content='tick!')], 0])},
) | run
# ticks every second
timer = FX.StartTimer(interval=1, target=topic, content='tick') | run
# ...
# wait a bit...
# stop it
FX.StopTimer(timer=timer) | run
FX.StopActor(actor=monitor) | run
| value | meaning |
|---|---|
5 | every 5 seconds |
0.5 | every 500ms |
'hourly' | every 1 hour |
'daily' | every 24 hours |
'weekly' | every 7 days |
'monthly' | ~30 days |
'yearly' | ~365 days |
FX.StartTimer(
interval=60,
starting_at=Time('2026-01-15 09:00:00 +0000'),
target=topic,
content='wake-up',
) | run
One Tokio task per timer. Zero CPU when sleeping. No Python threads.
Cancelled cleanly on FX.StopTimer.
FX.Publish(target=topic, content='hi') | run
FX.Publish(target=topic, content=42) | run
FX.Publish(target=topic, content=ET.Order(qty=3)) | run
# publish in bulk by iterating
for i in range(100):
FX.Publish(target=topic, content=i) | run
A pipeline: JSON strings flow in โ parse โ validate โ save to DB โ broadcast.
raw = ET.Topic('๐-raw-input-topic-000000')
valid = ET.Topic('๐-valid-events-topic00000')
db = FX.CreateDB(type=DictDatabase, persistence='in_memory') | run
# stage 1 โ parse + validate, forward valid ones
@zef_function
def parse_and_validate(raw_msg: String) -> Any:
data = str(raw_msg) | parse_json | collect
if 'type' in data and 'at' in data:
return FX.Publish(target=valid, content=data)
return None
FX.SubscribeFX(topic=raw, op=parse_and_validate) | run
# stage 2 โ save every valid event to DB
@zef_function
def save_it(ev: Dict) -> Any:
key = str(ev['at'])
return FX.UpdateDB(db=db, insert={key: ev})
FX.SubscribeFX(topic=valid, op=save_it) | run
# pump some data in
FX.Publish(target=raw, content='{"type":"click","at":1}') | run
FX.Publish(target=raw, content='not json :(') | run # dropped silently
FX.Publish(target=raw, content='{"type":"hover","at":2}') | run
Each stage is isolated, composable, observable. You can add a 3rd stage
(say, a logger) by subscribing another handler to valid. You
can add a 4th topic for rejected messages. Very lego-like.
log_events = ET.Topic('๐-logs-topic-12345678abcd')
# pipe all logs through a filter, save to a signal for in-memory inspection
recent_logs = FX.CreateSignal(value=[], history=50) | run
@zef_function
def collect_log(msg: Any) -> Any:
return FX.UpdateSignal(
signal=recent_logs,
transform=(lambda lst: (list(lst) + [msg])[-100:]),
)
FX.SubscribeFX(topic=log_events, op=collect_log) | run
# now any code can:
FX.Publish(target=log_events, content={'lvl':'info', 'msg':'hello'}) | run
FX.ReadSignal(signal=recent_logs) | run
Cron-like jobs are just timer + actor:
nightly_topic = ET.Topic(generate_uid())
@zef_function
def nightly_cleanup(msg: Any, state: Any) -> Array:
# ... delete old cache entries, etc ...
return [[FX.Log(content=ET.CleanupRan(at=FX.CurrentTime() | run))], state]
FX.StartActor(
input=nightly_topic,
initial_state={},
handler=nightly_cleanup,
) | run
# fire every 24 hours
FX.StartTimer(
interval='daily',
target=nightly_topic,
content='go',
) | run
FX.ListActors() | run # all active actors
FX.ListTimers() | run # all active timers
FX.SignalsSnapshot() | run # all signals with current values
Use a topic + timer + actor. Every second, print the current count.
t = ET.Topic(generate_uid())
@zef_function
def tick(msg: Any, state: Int) -> Array:
n = state + 1
return [[FX.Print(content=f'tick #{n}')], n]
FX.StartActor(input=t, initial_state=0, handler=tick) | run
FX.StartTimer(interval=1, target=t, content='.') | run
Next up: HTTP servers โ declarative web endpoints. โ