broadcast mode and actor-per-connection mode.
| mode | use case |
|---|---|
| topic mode | broadcast/chatroom โ everyone sees everything |
| actor mode | per-connection state โ auth, sessions, stateful protocols |
All clients subscribe to one topic AND push into the same topic โ instant broadcast.
chat = ET.Topic('๐-87f83f06bfec2ba79b96')
FX.StartHTTPServer(
routes={
'/ws': ET.WebSocket(
clients_subscribe_to=[chat], # server โ all clients
clients_push_into_=[chat], # any client โ the topic
),
},
port=8000,
) | run
That's a complete chatroom. Connect, anything you send is broadcast to everyone.
Server pushes updates, clients don't push back:
news_feed = ET.Topic(generate_uid())
FX.StartHTTPServer(
routes={
'/feed': ET.WebSocket(
clients_subscribe_to=[news_feed],
# no clients_push_into_ โ one-way
),
},
port=8000,
) | run
# elsewhere โ emit updates
FX.Publish(target=news_feed, content='breaking news!') | run
Each client gets its own private actor with its own state:
FX.StartHTTPServer(
routes={
'/ws': ET.WebSocket(
actor=FX.StartActor(
initial_state=ET.Guest(),
rules={
(ET.WSClientConnected, ET.Guest): on_connect,
(String, ET.Connected): on_message,
(ET.WSClientDisconnected, Any): on_disconnect,
},
),
),
},
port=8000,
) | run
When a client connects, Zef spawns a fresh actor for that connection. When the client leaves, the actor stops.
Every per-connection actor sees (at minimum) three kinds of message:
ET.WSClientConnected(
client_ip='127.0.0.1',
path='/ws',
time=Time(...),
topic_in=EntityUid(...), # actor's input topic
topic_out=EntityUid(...), # publish here โ sends to THIS client
origin='http://...',
)
# regular text frames
String('hello from client')
# regular binary frames
Bytes(b'...')
ET.WSClientDisconnected(
client_ip='127.0.0.1',
path='/ws',
time=Time(...),
origin='http://...',
)
topic_out trickOn connect, you get a topic_out that's unique to THIS client.
Stash it in the actor's state so subsequent handlers know where to
publish replies. This is how you "reply to the sender."
@zef_function
def on_connect(input: Any) -> Any:
msg, state = input[0], input[1]
return [
[FX.Publish(target=msg.topic_out, content='welcome!')],
ET.Connected(topic_out=msg.topic_out),
]
@zef_function
def on_message(input: Any) -> Any:
msg, state = input[0], input[1]
return [
[FX.Publish(target=state.topic_out, content=f'you said: {msg}')],
state,
]
@zef_function
def on_disconnect(input: Any) -> Any:
msg, state = input[0], input[1]
print(f'client {msg.client_ip} left')
return [[], ET.Done()]
FX.StartHTTPServer(
routes={
'/ws': ET.WebSocket(
actor=FX.StartActor(
initial_state=ET.Guest(),
rules={
(ET.WSClientConnected, ET.Guest): on_connect,
(String, ET.Connected): on_message,
(ET.WSClientDisconnected, Any): on_disconnect,
},
),
),
},
port=8000,
) | run
Text frames arrive as String, binary as Bytes. If you're using the topic mode, binary arrives as Array1[u8] โ you have to convert:
FX.SubscribeFX(
topic=from_clients,
op=array_to_bytes | insert_into(FX.Publish(target=to_clients), 'content'),
) | run
Thread shared resources through the initial state:
db = FX.CreateDB(type=DictDatabase, persistence='in_memory') | run
FX.StartHTTPServer(
routes={
'/ws': ET.WebSocket(
actor=FX.StartActor(
initial_state=ET.Guest(db=db), # pass db through state
rules={
(ET.WSClientConnected, ET.Guest): on_connect,
(ET.StoreRequest, ET.Connected): on_store,
},
),
),
},
port=8000,
) | run
Every connection's actor starts with a reference to the shared db handle. They all write to the same store.
FX.QueryAllActiveHTTPServers() | run
FX.QueryAllWebsocketEndpoints(http_server=server) | run
FX.QueryAllConnectedWebsocketClients(http_server=server, path='/ws') | run
# [
# {'channel_id': 0, 'client_ip': '127.0.0.1', 'connect_time': Time(...)},
# ...
# ]
Built-in introspection โ great for debugging or building an admin panel.
Need to talk to an external WS server? Classic mode:
inbox = ET.Topic(generate_uid())
outbox = ET.Topic(generate_uid())
client = FX.StartWebSocketClient(
url='wss://echo.example.com/ws',
in_stream_into=[inbox],
out_stream_from=[outbox],
) | run
# subscribe to see what comes back
FX.SubscribeFX(topic=inbox, op=log) | run
# send a message
FX.Publish(target=outbox, content='hello remote!') | run
# cleanup
FX.StopWebSocketClient(client=client) | run
First message from client = their username. All subsequent messages are broadcast with "[username] says: ...".
chat = ET.Topic(generate_uid())
@zef_function
def on_connect(input: Any) -> Any:
msg, state = input[0], input[1]
return [
[FX.Publish(target=msg.topic_out, content='send your name')],
ET.AwaitingName(topic_out=msg.topic_out),
]
@zef_function
def on_name(input: Any) -> Any:
name, state = input[0], input[1]
# subscribe client to chat
return [
[FX.Publish(target=chat, content=f'* {name} joined'),
FX.SubscribeFX(topic=chat, op=insert_into(FX.Publish(target=state.topic_out), 'content'))],
ET.Chatting(name=name, topic_out=state.topic_out),
]
@zef_function
def on_msg(input: Any) -> Any:
msg, state = input[0], input[1]
return [[FX.Publish(target=chat, content=f'[{state.name}]: {msg}')], state]
# wire up rules
rules={
(ET.WSClientConnected, Any): on_connect,
(String, ET.AwaitingName): on_name,
(String, ET.Chatting): on_msg,
}
Next up: auth + HTTPS โ OAuth with one parameter, Let's Encrypt in two lines. โ