Skip to content

Real-Time Delivery API

Real-time delivery via WebSocket provides immediate message notification without polling. This is the recommended delivery for interactive applications like chat clients, while polling GET /v1/messages serves as a fallback for clients that cannot maintain persistent connections.

  1. Client establishes WSS connection to /v1/stream

  2. Client sends authentication message with JWT token

  3. Server validates token and sends connection status

  4. Server pushes queued messages (one at a time or in batches)

  5. Client processes each message and sends acknowledgement over WSS

  6. Server removes acknowledged messages from queue

  7. Server continues delivering new messages as they arrive

  8. Connection remains open until client disconnects or token expires

If WebSocket connection fails or drops, clients to fall back to polling (GET /v1/messages) with the next_poll_fallback interval.

Establishes a persistent WebSocket connection for immediate message delivery. The server pushes encrypted messages to connected devices in real-time, eliminating polling latency.

Benefits:

  • Sub-second message delivery

  • Reduced server load (no polling)

  • Lower battery usage on mobile devices

  • Immediate notification of system events

  • Bidirectional communication for acknowledgements

Connection Requirements:

  • TLS 1.3+ (WSS protocol)

  • Valid JWT access token

  • Heartbeat/ping support for connection keepalive

WSS /v1/stream
Rust (tokio-tungstenite)
use anyhow::Result;
use futures_util::{SinkExt, StreamExt, stream::SplitSink};
use serde::{Deserialize, Serialize};
use tokio::net::TcpStream;
use tokio_tungstenite::{
MaybeTlsStream, WebSocketStream, connect_async, tungstenite::protocol::Message,
};
#[derive(Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
enum ClientMessage {
Auth { access_token: String },
Ping { timestamp: u64 },
Ack { message_id: String },
AckBatch { message_ids: Vec<String> },
}
#[derive(Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
enum ServerMessage {
Message {
data: IncomingMessage,
},
System {
event: String,
data: serde_json::Value,
},
Status {
status: String,
server_timestamp: u64,
next_poll_fallback: u32,
},
AckConfirmed {
message_id: String,
acknowledged: bool,
},
AckBatchConfirmed {
acknowledged_count: u32,
failed_count: u32,
},
Error {
error: String,
message: String,
code: u32,
},
}
#[derive(Deserialize)]
struct IncomingMessage {
message_id: String,
group_id: String,
mls_ciphertext: String,
sender_signature: String,
timestamp: u64,
message_type: String,
received_at: u64,
}
type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
async fn connect_websocket(access_token: String) -> Result<()> {
let url = "wss://chat.example.com/v1/stream";
let (ws_stream, _) = connect_async(url)
.await
.map_err(|e| anyhow::anyhow!("WebSocket connection failed: {e}"))?;
let (mut write, mut read) = ws_stream.split();
// Authenticate
let auth_msg = ClientMessage::Auth { access_token };
let auth_json = serde_json::to_string(&auth_msg)?;
write
.send(Message::text(auth_json))
.await
.map_err(|e| anyhow::anyhow!("Failed to send auth: {e}"))?;
// Handle incoming messages
while let Some(msg_result) = read.next().await {
let msg = msg_result.map_err(|e| anyhow::anyhow!("WebSocket error: {e}"))?;
match msg {
Message::Text(text) => {
let server_msg: ServerMessage = serde_json::from_str(&text)?;
handle_server_message(server_msg, &mut write).await?;
}
Message::Close(_) => {
println!("WebSocket closed by server");
break;
}
Message::Ping(data) => {
write
.send(Message::Pong(data))
.await
.map_err(|e| anyhow::anyhow!("Failed to send pong: {e}"))?;
}
_ => {}
}
}
Ok(())
}
async fn handle_server_message(
msg: ServerMessage,
ws_write: &mut SplitSink<WsStream, Message>,
) -> Result<()> {
match msg {
ServerMessage::Message { data } => {
// Decrypt MLS ciphertext
let plaintext = decrypt_mls(&data.mls_ciphertext).await?;
// Store locally
store_message(&data.message_id, &plaintext).await?;
// Display to user
display_message(&data.message_id, &plaintext).await;
// Acknowledge over WebSocket
let ack = ClientMessage::Ack {
message_id: data.message_id.clone(),
};
let ack_json = serde_json::to_string(&ack)?;
ws_write
.send(Message::text(ack_json))
.await
.map_err(|e| anyhow::anyhow!("Failed to send ack: {e}"))?;
}
ServerMessage::System { event, data } => {
println!("System event: {event}");
handle_system_event(&event, data).await?;
}
ServerMessage::Status {
status,
server_timestamp,
next_poll_fallback,
} => {
println!("Connection status: {status} at {server_timestamp}");
}
ServerMessage::AckConfirmed {
message_id,
acknowledged,
} => {
if acknowledged {
println!("Message {message_id} acknowledged by server");
}
}
ServerMessage::AckBatchConfirmed {
acknowledged_count,
failed_count,
} => {
println!(
"Batch ack: {acknowledged_count} succeeded, {failed_count} failed"
);
}
ServerMessage::Error {
error,
message,
code,
} => {
eprintln!("Server error {code}: {error} - {message}");
}
}
Ok(())
}
// Stub functions
async fn decrypt_mls(ciphertext: &str) -> Result<String> {
// MLS decryption logic here
todo!()
}
async fn store_message(message_id: &str, plaintext: &str) -> Result<()> {
// Local storage logic here
todo!()
}
async fn display_message(message_id: &str, plaintext: &str) {
// Logic to display messages here
todo!()
}
async fn handle_system_event(event: &str, data: serde_json::Value) -> Result<()> {
// System event handling loginc here
todo!()
}

Must be sent immediately after connection establishment:

{
"type": "auth",
"access_token": "jwt_access_token"
}

Send after successfully processing a message (decrypt, validate, store).

{
"type": "ack",
"message_id": "a1b2c3d4e5f61728394a5b6c7d8e9f101234567890abcdef"
}

Use for efficient batch processing of queued messages.

{
"type": "ack_batch",
"message_ids": [
"message_1_id_hex",
"message_2_id_hex",
"message_3_id_hex"
]
}

Send every 30 seconds to maintain a connection. Most WebSocket libraries handle protocol-level pings automatically.

{
"type": "ping",
"timestamp": 1759858431
}

Sent after successful authentication.

{
"type": "status",
"status": "connected",
"server_timestamp": 1759858431,
"next_poll_fallback": 30
}

Status Values:

  • connected: WebSocket authenticated and ready

  • reconnecting: Server attempting to restore connection

  • disconnected: Connection closed (client should reconnect)

Encrypted message pushed to client.

{
"type": "message",
"data": {
"message_id": "a1b2c3d4e5f61728394a5b6c7d8e9f101234567890abcdef",
"group_id": "f1e2d3c4b5a6978869504132e1f2d3c4b5a6978869504132",
"mls_ciphertext": "base64_encrypted_content",
"sender_signature": "ed25519_signature",
"timestamp": 1759858431,
"message_type": "Text",
"received_at": 1759858400
}
}

Client Processing:

  1. Validate message format

  2. Decrypt MLS ciphertext locally

  3. Verify sender signature (client-side, not server)

  4. Store in local database

  5. Display to user

  6. Send acknowledgement: { "type": "ack", "message_id": "..." }

Server confirms message removal from queue

{
"type": "ack_confirmed",
"message_id": "a1b2c3d4e5f61728394a5b6c7d8e9f101234567890abcdef",
"acknowledged": true
}
{
"type": "ack_batch_confirmed",
"acknowledged_count": 5,
"failed_count": 0
}

If failed_count > 0, some messages may have already expired or were invalid.

Protocol-level events (MLS operations, group changes)

{
"type": "system",
"event": "group_updated",
"data": {
"group_id": "32_byte_group_id_hex",
"mls_ciphertext": "base64_system_operation"
}
}

Event Types:

  • group_updated: MLS commit processed

  • member_added: New member joined group

  • member_removed: Member left/removed

  • epoch_change: MLS epoch changed (forward secrecy rotation)

System notifications are processed automatically by the MLS library and typically not shown to users.

Server error notifications:

{
"type": "error",
"error": "DEVICE_NOT_ANNOUNCED",
"message": "Device not registered for delivery",
"code": 4002
}

Common Errors:

  • 4001: Invalid signature
  • 4002: Device not announced (authentication failed or expired)
  • 4004: Rate limit exceeded
  • 4008: Token expired

See Error Handling for complete error code reference.

The server implements windowing to prevent overwhelming clients.

  1. Deliver up to N messages (default: 10) without waiting for acknowledgements

  2. Pause delivery when window is full

  3. Resume delivery when acknowledgements received

  4. If client doesn’t acknowledge within timeout (default: 60s), mark message as undelivered

  5. On reconnection, redeliver unacknowledged messages

// GOOD: Process and acknowledge immediately
async fn process_message(msg: IncomingMessage, ws: &mut WebSocket) {
let plaintext = decrypt_mls(&msg.mls_ciphertext)?;
store_message(&msg.message_id, &plaintext).await?;
// Acknowledge immediately after successful processing
send_ack(ws, &msg.message_id).await?;
}
// Bad: Batch acknowledgements with long delays
async fn process_message_slow(msg: IncomingMessage) {
// If processing takes 30s
// server's window fills up, stops sending more messages
// and other messages are delayed unnecessarily
}

Best Practice: Acknowledge messages as soon as they’re successfully stored locally, even if UI processing is still ongoing.

async fn connect_with_retry(access_token: String) -> Result<()> {
let mut retry_count = 0;
let max_retries = 5;
loop {
match connect_websocket(&access_token).await {
Ok(_) => {
retry_count = 0; // Reset on successful connection
}
Err(e) => {
retry_count += 1;
eprintln!("WebSocket connection failed: {e}");
if retry_count >= max_retries {
println!("Max retries reached, falling back to polling.");
start_polling().await?;
break;
}
// Exponential backoff: 1s, 2s, 4s, 8s, ...
let delay = 2u64.pow(retry_count - 1);
println!("Retrying in {}s...", delay);
tokio::time::sleep(Duration::from_secs(delay)).await;
}
}
}
Ok(())
}

When JWT token expires (typically 24 hours):

  1. Server sends error message:
{
"type": "error",
"error": "INVALID_TOKEN",
"message": "Token expired",
"code": 4008
}
  1. Server closes WebSocket connection

  2. Client refreshes token via POST /v1/auth/refresh

  3. Client reconnects with new token

  4. Server redelivers unacknowledged messages from disconnection period

async fn handle_token_expiration() -> Result<()> {
// Refresh token
let new_token = refresh_access_token().await?;
// Reconnect
connect_websocket(new_token).await?;
// Server automatically redelivers any messages
// that weren't acknowledged
Ok(())
}

Messages are delivered in the order they were received by the server (received_at timestamp), not sender timestamp. This prevents clock skew issues in distributed systems.

Example:

  1. Alice sends msg1 at 10:00:00 (her local time, clock 5 min fast)
  2. Bob sends msg2 at 09:56:00 (his local time, clock correct)

Server receives:

  • msg2 at server_time 09:56:05 (received_at)
  • msg1 at server_time 09:56:10 (received_at)

Client receives in order: msg2, then msg1

Display order should use sender timestamps: msg2, then msg1

  • Client sends { "type": "ack", "message_id": "..." } after successful processing

  • Server removes message from queue upon receiving acknowledgement

  • Unacknowledged messages remain in queue and are redelivered on reconnection

  • Client uses DELETE /v1/messages/{id} REST endpoint

  • Standard HTTP-based acknowledgment flow

  • See Message Routing for details.

  • Ensures message isn’t lost if client crashes during processing

  • Allows client to retry decryption if MLS fails temporarily

  • Enables server to track delivery progress for flow control

  • Supports “at-least-once” delivery semantics

Servers may limit:

  • Maximum concurrent WebSocket connections per device (typically 1)

  • Connection duration before requiring reconnection (typically unlimited)

  • Maximum message backlog to deliver on connection (typically 1000)

Check server capabilities via GET /.well-known/cryptid for specific limits.

The server knows:

  • Which device is connected (connection metadata)

  • When messages are delivered (connection timing)

  • Message delivery order and flow

The server does NOT know:

  • Message content (encrypted)

  • Sender identity (signatures verified client-side)

  • Whether messages are read (no read receipts to server)

  • Social relationships (contacts managed client-side)

WebSocket connections reveal device online status to the server, but this metadata is minimal and temporary (expires when connection closes).

WebSocket vs Polling:

  • WebSocket: ~1-2 KB/hour (keepalive only)

  • Polling (30s interval): ~120 requests/hour = 12-24 KB/hour

WebSocket is significantly more efficient for real-time messaging on mobile devices.