Skip to main content
Full, production-ready examples for connecting to the Rhombus WebSocket event stream. Each example handles authentication, STOMP protocol, heartbeats, reconnection, and event processing.

Complete Examples

Dependencies

pip install websocket-client requests

Complete Example

"""
Rhombus WebSocket Event Monitor
Connects to the Rhombus platform and streams real-time events.
"""

import json
import os
import signal
import threading
import time

import requests
import websocket

# Configuration
API_TOKEN = os.environ["RHOMBUS_API_KEY"]
WS_HOST = "ws.rhombussystems.com"
WS_PORT = "8443"
WS_PATH = "/websocket"
HEARTBEAT_INTERVAL = 10  # seconds
RECONNECT_DELAY = 5      # seconds


def get_org_uuid(api_token):
    """Fetch the organization UUID from the REST API."""
    resp = requests.post(
        "https://api2.rhombussystems.com/api/org/getOrgV2",
        headers={"x-auth-apikey": api_token, "Content-Type": "application/json"},
        json={},
    )
    resp.raise_for_status()
    return resp.json()["org"]["uuid"]


def build_stomp_frame(command, headers=None, body=""):
    """Construct a STOMP 1.2 protocol frame."""
    frame = command + "\n"
    for key, value in (headers or {}).items():
        frame += f"{key}:{value}\n"
    frame += "\n" + body + "\x00"
    return frame


def parse_stomp_frame(raw):
    """Parse a raw WebSocket message into a STOMP frame dict."""
    raw = raw.lstrip("\n\r")
    if not raw or raw == "\x00":
        return None  # Heartbeat

    raw = raw.rstrip("\x00")
    parts = raw.split("\n\n", 1)
    lines = parts[0].split("\n")
    command = lines[0]
    headers = {}
    for line in lines[1:]:
        if ":" in line:
            key, value = line.split(":", 1)
            headers[key] = value
    body = parts[1] if len(parts) > 1 else ""
    return {"command": command, "headers": headers, "body": body}


def monitor_events(api_token, org_uuid, on_event, filter_alerts_only=True):
    """
    Connect to Rhombus WebSocket and stream events.

    Args:
        api_token: Rhombus API token.
        org_uuid: Organization UUID.
        on_event: Callback function invoked with each event payload dict.
        filter_alerts_only: If True, only POLICY_ALERT events are forwarded.
    """
    url = f"wss://{WS_HOST}:{WS_PORT}{WS_PATH}?x-auth-scheme=api-token"
    headers = {"x-auth-apikey": api_token}

    ws = websocket.create_connection(url, header=headers, timeout=10)

    # STOMP CONNECT
    ws.send(build_stomp_frame("CONNECT", {
        "accept-version": "1.2",
        "heart-beat": "10000,10000",
    }))
    response = parse_stomp_frame(ws.recv())
    if not response or response["command"] != "CONNECTED":
        raise ConnectionError("STOMP handshake failed")

    # SUBSCRIBE to org change topic
    ws.send(build_stomp_frame("SUBSCRIBE", {
        "id": "sub-0",
        "destination": f"/topic/change/{org_uuid}",
    }))

    # Heartbeat thread
    stop_event = threading.Event()

    def heartbeat_sender():
        while not stop_event.is_set():
            try:
                ws.send("\n")
            except Exception:
                break
            stop_event.wait(HEARTBEAT_INTERVAL)

    hb_thread = threading.Thread(target=heartbeat_sender, daemon=True)
    hb_thread.start()

    try:
        while True:
            raw = ws.recv()
            frame = parse_stomp_frame(raw)
            if frame is None:
                continue  # Heartbeat
            if frame["command"] != "MESSAGE":
                continue

            payload = json.loads(frame["body"])

            if filter_alerts_only and payload.get("entity") != "POLICY_ALERT":
                continue

            on_event(payload)
    finally:
        stop_event.set()
        hb_thread.join(timeout=2)
        try:
            ws.send(build_stomp_frame("DISCONNECT"))
            ws.close()
        except Exception:
            pass


def main():
    org_uuid = get_org_uuid(API_TOKEN)
    print(f"Organization: {org_uuid}")
    print("Listening for events... (Ctrl+C to stop)\n")

    def handle_event(payload):
        ts = time.strftime("%H:%M:%S", time.localtime(payload["timestampMs"] / 1000))
        entity = payload.get("entity", "UNKNOWN")
        change = payload.get("type", "UNKNOWN")
        triggers = ", ".join(payload.get("policyAlertTriggers", []))
        desc = payload.get("textDescription", "")
        print(f"[{ts}] {change} {entity}")
        if triggers:
            print(f"  triggers: {triggers}")
        if desc:
            print(f"  {desc}")
        print(f"  uuid: {payload.get('entityUuid', 'N/A')}\n")

    # Reconnection loop
    while True:
        try:
            monitor_events(API_TOKEN, org_uuid, handle_event, filter_alerts_only=False)
        except KeyboardInterrupt:
            print("\nExiting.")
            break
        except Exception as e:
            print(f"Connection lost: {e}. Reconnecting in {RECONNECT_DELAY}s...")
            time.sleep(RECONNECT_DELAY)


if __name__ == "__main__":
    main()

Quick Reference

Environment Variables

All examples use this environment variable:
export RHOMBUS_API_KEY="your-api-token-here"

STOMP Frame Sequence

Client                              Server
  │                                    │
  ├─── CONNECT ──────────────────────► │
  │                                    │
  │ ◄──────────────────── CONNECTED ───┤
  │                                    │
  ├─── SUBSCRIBE ────────────────────► │
  │                                    │
  │ ◄────────────────────── MESSAGE ───┤
  │ ◄────────────────────── MESSAGE ───┤
  ├─── heartbeat (\n) ──────────────► │
  │ ◄──────────────── heartbeat (\n) ──┤
  │ ◄────────────────────── MESSAGE ───┤
  │       ...                          │
  │                                    │
  ├─── DISCONNECT ───────────────────► │
  │                                    │

Connection Parameters at a Glance

ParameterValue
ProtocolWSS (TLS)
Hostws.rhombussystems.com
Port8443
Path/websocket
STOMP Version1.2
Heartbeat10000ms bidirectional
Auth Headerx-auth-apikey
Auth Query Paramx-auth-scheme=api-token
Topic/topic/change/{orgUuid}