Code Examples
Full, production-ready examples for connecting to the Rhombus WebSocket event stream. Each example handles authentication, STOMP protocol, heartbeats, reconnection, and event processing.Python
Dependencies
pip install websocket-client requests
Complete Example
"""
Rhombus WebSocket Event Monitor
Connects to the Rhombus platform and streams real-time events.
"""
import json
import os
import signal
import threading
import time
import requests
import websocket
# Configuration
API_TOKEN = os.environ["RHOMBUS_API_KEY"]
WS_HOST = "ws.rhombussystems.com"
WS_PORT = "8443"
WS_PATH = "/websocket"
HEARTBEAT_INTERVAL = 10 # seconds
RECONNECT_DELAY = 5 # seconds
def get_org_uuid(api_token):
"""Fetch the organization UUID from the REST API."""
resp = requests.post(
"https://api2.rhombussystems.com/api/org/getOrgV2",
headers={"x-auth-apikey": api_token, "Content-Type": "application/json"},
json={},
)
resp.raise_for_status()
return resp.json()["org"]["uuid"]
def build_stomp_frame(command, headers=None, body=""):
"""Construct a STOMP 1.2 protocol frame."""
frame = command + "\n"
for key, value in (headers or {}).items():
frame += f"{key}:{value}\n"
frame += "\n" + body + "\x00"
return frame
def parse_stomp_frame(raw):
"""Parse a raw WebSocket message into a STOMP frame dict."""
raw = raw.lstrip("\n\r")
if not raw or raw == "\x00":
return None # Heartbeat
raw = raw.rstrip("\x00")
parts = raw.split("\n\n", 1)
lines = parts[0].split("\n")
command = lines[0]
headers = {}
for line in lines[1:]:
if ":" in line:
key, value = line.split(":", 1)
headers[key] = value
body = parts[1] if len(parts) > 1 else ""
return {"command": command, "headers": headers, "body": body}
def monitor_events(api_token, org_uuid, on_event, filter_alerts_only=True):
"""
Connect to Rhombus WebSocket and stream events.
Args:
api_token: Rhombus API token.
org_uuid: Organization UUID.
on_event: Callback function invoked with each event payload dict.
filter_alerts_only: If True, only POLICY_ALERT events are forwarded.
"""
url = f"wss://{WS_HOST}:{WS_PORT}{WS_PATH}?x-auth-scheme=api-token"
headers = {"x-auth-apikey": api_token}
ws = websocket.create_connection(url, header=headers, timeout=10)
# STOMP CONNECT
ws.send(build_stomp_frame("CONNECT", {
"accept-version": "1.2",
"heart-beat": "10000,10000",
}))
response = parse_stomp_frame(ws.recv())
if not response or response["command"] != "CONNECTED":
raise ConnectionError("STOMP handshake failed")
# SUBSCRIBE to org change topic
ws.send(build_stomp_frame("SUBSCRIBE", {
"id": "sub-0",
"destination": f"/topic/change/{org_uuid}",
}))
# Heartbeat thread
stop_event = threading.Event()
def heartbeat_sender():
while not stop_event.is_set():
try:
ws.send("\n")
except Exception:
break
stop_event.wait(HEARTBEAT_INTERVAL)
hb_thread = threading.Thread(target=heartbeat_sender, daemon=True)
hb_thread.start()
try:
while True:
raw = ws.recv()
frame = parse_stomp_frame(raw)
if frame is None:
continue # Heartbeat
if frame["command"] != "MESSAGE":
continue
payload = json.loads(frame["body"])
if filter_alerts_only and payload.get("entity") != "POLICY_ALERT":
continue
on_event(payload)
finally:
stop_event.set()
hb_thread.join(timeout=2)
try:
ws.send(build_stomp_frame("DISCONNECT"))
ws.close()
except Exception:
pass
def main():
org_uuid = get_org_uuid(API_TOKEN)
print(f"Organization: {org_uuid}")
print("Listening for events... (Ctrl+C to stop)\n")
def handle_event(payload):
ts = time.strftime("%H:%M:%S", time.localtime(payload["timestampMs"] / 1000))
entity = payload.get("entity", "UNKNOWN")
change = payload.get("type", "UNKNOWN")
triggers = ", ".join(payload.get("policyAlertTriggers", []))
desc = payload.get("textDescription", "")
print(f"[{ts}] {change} {entity}")
if triggers:
print(f" triggers: {triggers}")
if desc:
print(f" {desc}")
print(f" uuid: {payload.get('entityUuid', 'N/A')}\n")
# Reconnection loop
while True:
try:
monitor_events(API_TOKEN, org_uuid, handle_event, filter_alerts_only=False)
except KeyboardInterrupt:
print("\nExiting.")
break
except Exception as e:
print(f"Connection lost: {e}. Reconnecting in {RECONNECT_DELAY}s...")
time.sleep(RECONNECT_DELAY)
if __name__ == "__main__":
main()
JavaScript (Node.js)
Dependencies
npm install ws
Complete Example
/**
* Rhombus WebSocket Event Monitor
* Connects to the Rhombus platform and streams real-time events.
*/
const WebSocket = require("ws");
const https = require("https");
// Configuration
const API_TOKEN = process.env.RHOMBUS_API_KEY;
const WS_HOST = "ws.rhombussystems.com";
const WS_PORT = "8443";
const WS_PATH = "/websocket";
const HEARTBEAT_INTERVAL = 10000; // ms
const RECONNECT_DELAY = 5000; // ms
/**
* Fetch organization UUID from the REST API.
*/
function getOrgUuid(apiToken) {
return new Promise((resolve, reject) => {
const req = https.request(
{
hostname: "api2.rhombussystems.com",
path: "/api/org/getOrgV2",
method: "POST",
headers: {
"x-auth-apikey": apiToken,
"Content-Type": "application/json",
},
},
(res) => {
let data = "";
res.on("data", (chunk) => (data += chunk));
res.on("end", () => {
const body = JSON.parse(data);
resolve(body.org.uuid);
});
}
);
req.on("error", reject);
req.write("{}");
req.end();
});
}
/**
* Build a STOMP 1.2 protocol frame.
*/
function buildStompFrame(command, headers = {}, body = "") {
let frame = command + "\n";
for (const [key, value] of Object.entries(headers)) {
frame += `${key}:${value}\n`;
}
frame += "\n" + body + "\x00";
return frame;
}
/**
* Parse a raw STOMP frame from a WebSocket message.
*/
function parseStompFrame(raw) {
raw = raw.replace(/^[\n\r]+/, "");
if (!raw || raw === "\x00") return null; // Heartbeat
raw = raw.replace(/\x00+$/, "");
const parts = raw.split("\n\n");
const headerLines = parts[0].split("\n");
const command = headerLines[0];
const headers = {};
for (const line of headerLines.slice(1)) {
const idx = line.indexOf(":");
if (idx > -1) {
headers[line.slice(0, idx)] = line.slice(idx + 1);
}
}
const body = parts.length > 1 ? parts.slice(1).join("\n\n") : "";
return { command, headers, body };
}
/**
* Connect and monitor events with automatic reconnection.
*/
async function monitor(apiToken, onEvent, filterAlertsOnly = true) {
const orgUuid = await getOrgUuid(apiToken);
console.log(`Organization: ${orgUuid}`);
console.log("Listening for events... (Ctrl+C to stop)\n");
function connect() {
const url = `wss://${WS_HOST}:${WS_PORT}${WS_PATH}?x-auth-scheme=api-token`;
const ws = new WebSocket(url, {
headers: { "x-auth-apikey": apiToken },
});
let heartbeatTimer = null;
ws.on("open", () => {
// STOMP CONNECT
ws.send(
buildStompFrame("CONNECT", {
"accept-version": "1.2",
"heart-beat": "10000,10000",
})
);
});
ws.on("message", (data) => {
const frame = parseStompFrame(data.toString());
if (!frame) return; // Heartbeat
switch (frame.command) {
case "CONNECTED":
console.log("STOMP connected");
// Subscribe to change topic
ws.send(
buildStompFrame("SUBSCRIBE", {
id: "sub-0",
destination: `/topic/change/${orgUuid}`,
})
);
// Start heartbeats
heartbeatTimer = setInterval(() => ws.send("\n"), HEARTBEAT_INTERVAL);
break;
case "MESSAGE": {
const payload = JSON.parse(frame.body);
if (filterAlertsOnly && payload.entity !== "POLICY_ALERT") return;
onEvent(payload);
break;
}
}
});
ws.on("close", () => {
clearInterval(heartbeatTimer);
console.log(`Connection closed. Reconnecting in ${RECONNECT_DELAY / 1000}s...`);
setTimeout(connect, RECONNECT_DELAY);
});
ws.on("error", (err) => {
console.error("WebSocket error:", err.message);
ws.close();
});
// Graceful shutdown on Ctrl+C
process.on("SIGINT", () => {
console.log("\nExiting.");
clearInterval(heartbeatTimer);
ws.send(buildStompFrame("DISCONNECT"));
ws.close();
process.exit(0);
});
}
connect();
}
// Event handler
function handleEvent(payload) {
const ts = new Date(payload.timestampMs).toLocaleTimeString();
const entity = payload.entity || "UNKNOWN";
const change = payload.type || "UNKNOWN";
const triggers = (payload.policyAlertTriggers || []).join(", ");
const desc = payload.textDescription || "";
console.log(`[${ts}] ${change} ${entity}`);
if (triggers) console.log(` triggers: ${triggers}`);
if (desc) console.log(` ${desc}`);
console.log(` uuid: ${payload.entityUuid || "N/A"}\n`);
}
// Run
monitor(API_TOKEN, handleEvent, false);
Go
Dependencies
go get github.com/gorilla/websocket
Complete Example
// Rhombus WebSocket Event Monitor
// Connects to the Rhombus platform and streams real-time events.
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
)
const (
wsHost = "ws.rhombussystems.com"
wsPort = "8443"
wsPath = "/websocket"
heartbeatInterval = 10 * time.Second
reconnectDelay = 5 * time.Second
handshakeTimeout = 10 * time.Second
)
// StompFrame represents a parsed STOMP protocol frame.
type StompFrame struct {
Command string
Headers map[string]string
Body string
}
// Event represents a Rhombus change event.
type Event struct {
Entity string `json:"entity"`
EntityUUID string `json:"entityUuid"`
Type string `json:"type"`
DeviceUUID string `json:"deviceUuid"`
TimestampMs int64 `json:"timestampMs"`
DurationSec float64 `json:"durationSec"`
PolicyAlertTriggers []string `json:"policyAlertTriggers"`
TextDescription string `json:"textDescription"`
}
func buildStompFrame(command string, headers map[string]string, body string) string {
var b strings.Builder
b.WriteString(command + "\n")
for k, v := range headers {
fmt.Fprintf(&b, "%s:%s\n", k, v)
}
b.WriteString("\n" + body)
b.WriteByte(0x00)
return b.String()
}
func parseStompFrame(raw string) *StompFrame {
raw = strings.TrimLeft(raw, "\n\r")
if raw == "" || raw == "\x00" {
return nil // Heartbeat
}
raw = strings.TrimRight(raw, "\x00")
parts := strings.SplitN(raw, "\n\n", 2)
lines := strings.Split(parts[0], "\n")
frame := &StompFrame{
Command: lines[0],
Headers: make(map[string]string),
}
for _, line := range lines[1:] {
idx := strings.Index(line, ":")
if idx > -1 {
frame.Headers[line[:idx]] = line[idx+1:]
}
}
if len(parts) > 1 {
frame.Body = parts[1]
}
return frame
}
func getOrgUUID(apiToken string) (string, error) {
req, _ := http.NewRequest("POST", "https://api2.rhombussystems.com/api/org/getOrgV2", strings.NewReader("{}"))
req.Header.Set("x-auth-apikey", apiToken)
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
var result struct {
Org struct {
UUID string `json:"uuid"`
} `json:"org"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", err
}
return result.Org.UUID, nil
}
func connectAndMonitor(apiToken, orgUUID string, onEvent func(Event)) error {
// Build URL
params := url.Values{}
params.Set("x-auth-scheme", "api-token")
wsURL := fmt.Sprintf("wss://%s:%s%s?%s", wsHost, wsPort, wsPath, params.Encode())
// Connect
headers := http.Header{}
headers.Set("x-auth-apikey", apiToken)
dialer := websocket.Dialer{HandshakeTimeout: handshakeTimeout}
conn, _, err := dialer.Dial(wsURL, headers)
if err != nil {
return fmt.Errorf("dial failed: %w", err)
}
defer conn.Close()
// STOMP CONNECT
conn.WriteMessage(websocket.TextMessage, []byte(buildStompFrame("CONNECT", map[string]string{
"accept-version": "1.2",
"heart-beat": "10000,10000",
}, "")))
_, msg, err := conn.ReadMessage()
if err != nil {
return fmt.Errorf("read CONNECTED failed: %w", err)
}
frame := parseStompFrame(string(msg))
if frame == nil || frame.Command != "CONNECTED" {
return fmt.Errorf("expected CONNECTED, got: %v", frame)
}
// SUBSCRIBE
conn.WriteMessage(websocket.TextMessage, []byte(buildStompFrame("SUBSCRIBE", map[string]string{
"id": "sub-0",
"destination": "/topic/change/" + orgUUID,
}, "")))
// Heartbeat sender
stopHeartbeat := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
conn.WriteMessage(websocket.TextMessage, []byte("\n"))
case <-stopHeartbeat:
return
}
}
}()
// Message reader
msgCh := make(chan StompFrame, 16)
errCh := make(chan error, 1)
go func() {
for {
_, raw, err := conn.ReadMessage()
if err != nil {
errCh <- err
return
}
f := parseStompFrame(string(raw))
if f != nil && f.Command == "MESSAGE" {
msgCh <- *f
}
}
}()
// Signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
// Main loop
for {
select {
case f := <-msgCh:
var event Event
if err := json.Unmarshal([]byte(f.Body), &event); err == nil {
onEvent(event)
}
case err := <-errCh:
close(stopHeartbeat)
wg.Wait()
return fmt.Errorf("connection lost: %w", err)
case <-sigCh:
close(stopHeartbeat)
wg.Wait()
conn.WriteMessage(websocket.TextMessage, []byte(buildStompFrame("DISCONNECT", nil, "")))
return nil
}
}
}
func main() {
apiToken := os.Getenv("RHOMBUS_API_KEY")
if apiToken == "" {
log.Fatal("Set RHOMBUS_API_KEY environment variable")
}
orgUUID, err := getOrgUUID(apiToken)
if err != nil {
log.Fatal("Failed to get org UUID:", err)
}
fmt.Printf("Organization: %s\n", orgUUID)
fmt.Println("Listening for events... (Ctrl+C to stop)\n")
handler := func(e Event) {
ts := time.UnixMilli(e.TimestampMs).Format("15:04:05")
fmt.Printf("[%s] %s %s\n", ts, e.Type, e.Entity)
if len(e.PolicyAlertTriggers) > 0 {
fmt.Printf(" triggers: %s\n", strings.Join(e.PolicyAlertTriggers, ", "))
}
if e.TextDescription != "" {
fmt.Printf(" %s\n", e.TextDescription)
}
fmt.Printf(" uuid: %s\n\n", e.EntityUUID)
}
for {
err := connectAndMonitor(apiToken, orgUUID, handler)
if err == nil {
fmt.Println("\nExiting.")
return
}
fmt.Printf("%v. Reconnecting in %v...\n", err, reconnectDelay)
time.Sleep(reconnectDelay)
}
}
Quick Reference
Environment Variables
All examples use this environment variable:export RHOMBUS_API_KEY="your-api-token-here"
STOMP Frame Sequence
Client Server
│ │
├─── CONNECT ──────────────────────► │
│ │
│ ◄──────────────────── CONNECTED ───┤
│ │
├─── SUBSCRIBE ────────────────────► │
│ │
│ ◄────────────────────── MESSAGE ───┤
│ ◄────────────────────── MESSAGE ───┤
├─── heartbeat (\n) ──────────────► │
│ ◄──────────────── heartbeat (\n) ──┤
│ ◄────────────────────── MESSAGE ───┤
│ ... │
│ │
├─── DISCONNECT ───────────────────► │
│ │
Connection Parameters at a Glance
| Parameter | Value |
|---|---|
| Protocol | WSS (TLS) |
| Host | ws.rhombussystems.com |
| Port | 8443 |
| Path | /websocket |
| STOMP Version | 1.2 |
| Heartbeat | 10000ms bidirectional |
| Auth Header | x-auth-apikey |
| Auth Query Param | x-auth-scheme=api-token |
| Topic | /topic/change/{orgUuid} |