๐Ÿ 

pub-sub & timers

topics, subscribers, and things that happen on a schedule.

topics are the central nervous system

A topic is a named mailbox. Many publishers can send to it. Many subscribers can listen to it. Zef uses topics everywhere:

topic = identity, nothing else
topic = ET.Topic('๐Ÿƒ-88d4f6a0bbbe12345678') โ”‚ โ–ฒ publishers subscribers โ”‚ โ”‚ FX.Publish โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถ [ any zef code that cares ] FX.StartTimer [ could be an actor ] WebSocket client msgs [ or SubscribeFX ] [ or a server ]

A topic is just an identity value. Routing happens in the Zef runtime โ€” topics don't "exist" as running objects, they're coordinating IDs.

FX.SubscribeFX โ€” react to a topic without an actor

When you want to react but don't need state, use SubscribeFX:

topic = ET.Topic('๐Ÿƒ-abc...def')

# react with any ZefOp
FX.SubscribeFX(
    topic=topic,
    op=log,      # just print
) | run

FX.Publish(target=topic, content='hello') | run
# ๐Ÿชต: 'hello'

chain ops

FX.SubscribeFX(
    topic=in_topic,
    op=parse_json | F.name | to_upper_case | log,
) | run

# push to another topic
FX.SubscribeFX(
    topic=in_topic,
    op=parse_json | F.payload | insert_into(FX.Publish(target=out_topic), 'content'),
) | run

don't use collect in the op chain

The op passed to SubscribeFX must be a ZefOp chain, not a collecting pipeline. Drop the final | collect if you have one.

# โŒ wrong
op=parse_json | F.name | collect | log

# โœ… right
op=parse_json | F.name | log

@zef_function handlers in SubscribeFX

@zef_function
def on_msg(msg: String) -> Any:
    print(f'got: {msg}')
    return FX.Log(content=ET.Received(text=msg))

FX.SubscribeFX(topic=in_topic, op=on_msg) | run

If the function returns an FX (or list of FXs), they run automatically.

timers โ€” things that happen on a schedule

topic = ET.Topic(generate_uid())

# simple actor to listen
monitor = FX.StartActor(
    input=topic,
    initial_state=0,
    rules={(Any, Any): constant_func([[FX.Print(content='tick!')], 0])},
) | run

# ticks every second
timer = FX.StartTimer(interval=1, target=topic, content='tick') | run

# ...
# wait a bit...
# stop it
FX.StopTimer(timer=timer) | run
FX.StopActor(actor=monitor) | run

interval options

valuemeaning
5every 5 seconds
0.5every 500ms
'hourly'every 1 hour
'daily'every 24 hours
'weekly'every 7 days
'monthly'~30 days
'yearly'~365 days

delayed start

FX.StartTimer(
    interval=60,
    starting_at=Time('2026-01-15 09:00:00 +0000'),
    target=topic,
    content='wake-up',
) | run
timers under the hood
FX.StartTimer โ”€โ”€โ–ถ [Tokio async task] โ”‚ โ”œ sleep interval โ–ผ publish content to topic โ”‚ โ””โ”€โ”€ loop

One Tokio task per timer. Zero CPU when sleeping. No Python threads. Cancelled cleanly on FX.StopTimer.

publish โ€” the base op

FX.Publish(target=topic, content='hi') | run
FX.Publish(target=topic, content=42) | run
FX.Publish(target=topic, content=ET.Order(qty=3)) | run

# publish in bulk by iterating
for i in range(100):
    FX.Publish(target=topic, content=i) | run

a complete pub-sub example

A pipeline: JSON strings flow in โ†’ parse โ†’ validate โ†’ save to DB โ†’ broadcast.

raw = ET.Topic('๐Ÿƒ-raw-input-topic-000000')
valid = ET.Topic('๐Ÿƒ-valid-events-topic00000')
db = FX.CreateDB(type=DictDatabase, persistence='in_memory') | run

# stage 1 โ€” parse + validate, forward valid ones
@zef_function
def parse_and_validate(raw_msg: String) -> Any:
    data = str(raw_msg) | parse_json | collect
    if 'type' in data and 'at' in data:
        return FX.Publish(target=valid, content=data)
    return None

FX.SubscribeFX(topic=raw, op=parse_and_validate) | run

# stage 2 โ€” save every valid event to DB
@zef_function
def save_it(ev: Dict) -> Any:
    key = str(ev['at'])
    return FX.UpdateDB(db=db, insert={key: ev})

FX.SubscribeFX(topic=valid, op=save_it) | run

# pump some data in
FX.Publish(target=raw, content='{"type":"click","at":1}') | run
FX.Publish(target=raw, content='not json :(') | run       # dropped silently
FX.Publish(target=raw, content='{"type":"hover","at":2}') | run

pipelines across topics

Each stage is isolated, composable, observable. You can add a 3rd stage (say, a logger) by subscribing another handler to valid. You can add a 4th topic for rejected messages. Very lego-like.

log aggregation pattern

log_events = ET.Topic('๐Ÿƒ-logs-topic-12345678abcd')

# pipe all logs through a filter, save to a signal for in-memory inspection
recent_logs = FX.CreateSignal(value=[], history=50) | run

@zef_function
def collect_log(msg: Any) -> Any:
    return FX.UpdateSignal(
        signal=recent_logs,
        transform=(lambda lst: (list(lst) + [msg])[-100:]),
    )

FX.SubscribeFX(topic=log_events, op=collect_log) | run

# now any code can:
FX.Publish(target=log_events, content={'lvl':'info', 'msg':'hello'}) | run
FX.ReadSignal(signal=recent_logs) | run

scheduled job pattern

Cron-like jobs are just timer + actor:

nightly_topic = ET.Topic(generate_uid())

@zef_function
def nightly_cleanup(msg: Any, state: Any) -> Array:
    # ... delete old cache entries, etc ...
    return [[FX.Log(content=ET.CleanupRan(at=FX.CurrentTime() | run))], state]

FX.StartActor(
    input=nightly_topic,
    initial_state={},
    handler=nightly_cleanup,
) | run

# fire every 24 hours
FX.StartTimer(
    interval='daily',
    target=nightly_topic,
    content='go',
) | run

monitoring what's running

FX.ListActors() | run         # all active actors
FX.ListTimers() | run         # all active timers
FX.SignalsSnapshot() | run    # all signals with current values

build a counter that ticks once per second

Use a topic + timer + actor. Every second, print the current count.

solution
t = ET.Topic(generate_uid())

@zef_function
def tick(msg: Any, state: Int) -> Array:
    n = state + 1
    return [[FX.Print(content=f'tick #{n}')], n]

FX.StartActor(input=t, initial_state=0, handler=tick) | run
FX.StartTimer(interval=1, target=t, content='.') | run

Next up: HTTP servers โ€” declarative web endpoints. โ†’