Skip to main content

Code Examples

Full, production-ready examples for connecting to the Rhombus WebSocket event stream. Each example handles authentication, STOMP protocol, heartbeats, reconnection, and event processing.

Python

Dependencies

pip install websocket-client requests

Complete Example

"""
Rhombus WebSocket Event Monitor
Connects to the Rhombus platform and streams real-time events.
"""

import json
import os
import signal
import threading
import time

import requests
import websocket

# Configuration
API_TOKEN = os.environ["RHOMBUS_API_KEY"]
WS_HOST = "ws.rhombussystems.com"
WS_PORT = "8443"
WS_PATH = "/websocket"
HEARTBEAT_INTERVAL = 10  # seconds
RECONNECT_DELAY = 5      # seconds


def get_org_uuid(api_token):
    """Fetch the organization UUID from the REST API."""
    resp = requests.post(
        "https://api2.rhombussystems.com/api/org/getOrgV2",
        headers={"x-auth-apikey": api_token, "Content-Type": "application/json"},
        json={},
    )
    resp.raise_for_status()
    return resp.json()["org"]["uuid"]


def build_stomp_frame(command, headers=None, body=""):
    """Construct a STOMP 1.2 protocol frame."""
    frame = command + "\n"
    for key, value in (headers or {}).items():
        frame += f"{key}:{value}\n"
    frame += "\n" + body + "\x00"
    return frame


def parse_stomp_frame(raw):
    """Parse a raw WebSocket message into a STOMP frame dict."""
    raw = raw.lstrip("\n\r")
    if not raw or raw == "\x00":
        return None  # Heartbeat

    raw = raw.rstrip("\x00")
    parts = raw.split("\n\n", 1)
    lines = parts[0].split("\n")
    command = lines[0]
    headers = {}
    for line in lines[1:]:
        if ":" in line:
            key, value = line.split(":", 1)
            headers[key] = value
    body = parts[1] if len(parts) > 1 else ""
    return {"command": command, "headers": headers, "body": body}


def monitor_events(api_token, org_uuid, on_event, filter_alerts_only=True):
    """
    Connect to Rhombus WebSocket and stream events.

    Args:
        api_token: Rhombus API token.
        org_uuid: Organization UUID.
        on_event: Callback function invoked with each event payload dict.
        filter_alerts_only: If True, only POLICY_ALERT events are forwarded.
    """
    url = f"wss://{WS_HOST}:{WS_PORT}{WS_PATH}?x-auth-scheme=api-token"
    headers = {"x-auth-apikey": api_token}

    ws = websocket.create_connection(url, header=headers, timeout=10)

    # STOMP CONNECT
    ws.send(build_stomp_frame("CONNECT", {
        "accept-version": "1.2",
        "heart-beat": "10000,10000",
    }))
    response = parse_stomp_frame(ws.recv())
    if not response or response["command"] != "CONNECTED":
        raise ConnectionError("STOMP handshake failed")

    # SUBSCRIBE to org change topic
    ws.send(build_stomp_frame("SUBSCRIBE", {
        "id": "sub-0",
        "destination": f"/topic/change/{org_uuid}",
    }))

    # Heartbeat thread
    stop_event = threading.Event()

    def heartbeat_sender():
        while not stop_event.is_set():
            try:
                ws.send("\n")
            except Exception:
                break
            stop_event.wait(HEARTBEAT_INTERVAL)

    hb_thread = threading.Thread(target=heartbeat_sender, daemon=True)
    hb_thread.start()

    try:
        while True:
            raw = ws.recv()
            frame = parse_stomp_frame(raw)
            if frame is None:
                continue  # Heartbeat
            if frame["command"] != "MESSAGE":
                continue

            payload = json.loads(frame["body"])

            if filter_alerts_only and payload.get("entity") != "POLICY_ALERT":
                continue

            on_event(payload)
    finally:
        stop_event.set()
        hb_thread.join(timeout=2)
        try:
            ws.send(build_stomp_frame("DISCONNECT"))
            ws.close()
        except Exception:
            pass


def main():
    org_uuid = get_org_uuid(API_TOKEN)
    print(f"Organization: {org_uuid}")
    print("Listening for events... (Ctrl+C to stop)\n")

    def handle_event(payload):
        ts = time.strftime("%H:%M:%S", time.localtime(payload["timestampMs"] / 1000))
        entity = payload.get("entity", "UNKNOWN")
        change = payload.get("type", "UNKNOWN")
        triggers = ", ".join(payload.get("policyAlertTriggers", []))
        desc = payload.get("textDescription", "")
        print(f"[{ts}] {change} {entity}")
        if triggers:
            print(f"  triggers: {triggers}")
        if desc:
            print(f"  {desc}")
        print(f"  uuid: {payload.get('entityUuid', 'N/A')}\n")

    # Reconnection loop
    while True:
        try:
            monitor_events(API_TOKEN, org_uuid, handle_event, filter_alerts_only=False)
        except KeyboardInterrupt:
            print("\nExiting.")
            break
        except Exception as e:
            print(f"Connection lost: {e}. Reconnecting in {RECONNECT_DELAY}s...")
            time.sleep(RECONNECT_DELAY)


if __name__ == "__main__":
    main()

JavaScript (Node.js)

Dependencies

npm install ws

Complete Example

/**
 * Rhombus WebSocket Event Monitor
 * Connects to the Rhombus platform and streams real-time events.
 */

const WebSocket = require("ws");
const https = require("https");

// Configuration
const API_TOKEN = process.env.RHOMBUS_API_KEY;
const WS_HOST = "ws.rhombussystems.com";
const WS_PORT = "8443";
const WS_PATH = "/websocket";
const HEARTBEAT_INTERVAL = 10000; // ms
const RECONNECT_DELAY = 5000; // ms

/**
 * Fetch organization UUID from the REST API.
 */
function getOrgUuid(apiToken) {
  return new Promise((resolve, reject) => {
    const req = https.request(
      {
        hostname: "api2.rhombussystems.com",
        path: "/api/org/getOrgV2",
        method: "POST",
        headers: {
          "x-auth-apikey": apiToken,
          "Content-Type": "application/json",
        },
      },
      (res) => {
        let data = "";
        res.on("data", (chunk) => (data += chunk));
        res.on("end", () => {
          const body = JSON.parse(data);
          resolve(body.org.uuid);
        });
      }
    );
    req.on("error", reject);
    req.write("{}");
    req.end();
  });
}

/**
 * Build a STOMP 1.2 protocol frame.
 */
function buildStompFrame(command, headers = {}, body = "") {
  let frame = command + "\n";
  for (const [key, value] of Object.entries(headers)) {
    frame += `${key}:${value}\n`;
  }
  frame += "\n" + body + "\x00";
  return frame;
}

/**
 * Parse a raw STOMP frame from a WebSocket message.
 */
function parseStompFrame(raw) {
  raw = raw.replace(/^[\n\r]+/, "");
  if (!raw || raw === "\x00") return null; // Heartbeat

  raw = raw.replace(/\x00+$/, "");
  const parts = raw.split("\n\n");
  const headerLines = parts[0].split("\n");
  const command = headerLines[0];

  const headers = {};
  for (const line of headerLines.slice(1)) {
    const idx = line.indexOf(":");
    if (idx > -1) {
      headers[line.slice(0, idx)] = line.slice(idx + 1);
    }
  }

  const body = parts.length > 1 ? parts.slice(1).join("\n\n") : "";
  return { command, headers, body };
}

/**
 * Connect and monitor events with automatic reconnection.
 */
async function monitor(apiToken, onEvent, filterAlertsOnly = true) {
  const orgUuid = await getOrgUuid(apiToken);
  console.log(`Organization: ${orgUuid}`);
  console.log("Listening for events... (Ctrl+C to stop)\n");

  function connect() {
    const url = `wss://${WS_HOST}:${WS_PORT}${WS_PATH}?x-auth-scheme=api-token`;
    const ws = new WebSocket(url, {
      headers: { "x-auth-apikey": apiToken },
    });

    let heartbeatTimer = null;

    ws.on("open", () => {
      // STOMP CONNECT
      ws.send(
        buildStompFrame("CONNECT", {
          "accept-version": "1.2",
          "heart-beat": "10000,10000",
        })
      );
    });

    ws.on("message", (data) => {
      const frame = parseStompFrame(data.toString());
      if (!frame) return; // Heartbeat

      switch (frame.command) {
        case "CONNECTED":
          console.log("STOMP connected");
          // Subscribe to change topic
          ws.send(
            buildStompFrame("SUBSCRIBE", {
              id: "sub-0",
              destination: `/topic/change/${orgUuid}`,
            })
          );
          // Start heartbeats
          heartbeatTimer = setInterval(() => ws.send("\n"), HEARTBEAT_INTERVAL);
          break;

        case "MESSAGE": {
          const payload = JSON.parse(frame.body);
          if (filterAlertsOnly && payload.entity !== "POLICY_ALERT") return;
          onEvent(payload);
          break;
        }
      }
    });

    ws.on("close", () => {
      clearInterval(heartbeatTimer);
      console.log(`Connection closed. Reconnecting in ${RECONNECT_DELAY / 1000}s...`);
      setTimeout(connect, RECONNECT_DELAY);
    });

    ws.on("error", (err) => {
      console.error("WebSocket error:", err.message);
      ws.close();
    });

    // Graceful shutdown on Ctrl+C
    process.on("SIGINT", () => {
      console.log("\nExiting.");
      clearInterval(heartbeatTimer);
      ws.send(buildStompFrame("DISCONNECT"));
      ws.close();
      process.exit(0);
    });
  }

  connect();
}

// Event handler
function handleEvent(payload) {
  const ts = new Date(payload.timestampMs).toLocaleTimeString();
  const entity = payload.entity || "UNKNOWN";
  const change = payload.type || "UNKNOWN";
  const triggers = (payload.policyAlertTriggers || []).join(", ");
  const desc = payload.textDescription || "";

  console.log(`[${ts}] ${change} ${entity}`);
  if (triggers) console.log(`  triggers: ${triggers}`);
  if (desc) console.log(`  ${desc}`);
  console.log(`  uuid: ${payload.entityUuid || "N/A"}\n`);
}

// Run
monitor(API_TOKEN, handleEvent, false);

Go

Dependencies

go get github.com/gorilla/websocket

Complete Example

// Rhombus WebSocket Event Monitor
// Connects to the Rhombus platform and streams real-time events.
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"net/url"
	"os"
	"os/signal"
	"strings"
	"sync"
	"time"

	"github.com/gorilla/websocket"
)

const (
	wsHost             = "ws.rhombussystems.com"
	wsPort             = "8443"
	wsPath             = "/websocket"
	heartbeatInterval  = 10 * time.Second
	reconnectDelay     = 5 * time.Second
	handshakeTimeout   = 10 * time.Second
)

// StompFrame represents a parsed STOMP protocol frame.
type StompFrame struct {
	Command string
	Headers map[string]string
	Body    string
}

// Event represents a Rhombus change event.
type Event struct {
	Entity             string   `json:"entity"`
	EntityUUID         string   `json:"entityUuid"`
	Type               string   `json:"type"`
	DeviceUUID         string   `json:"deviceUuid"`
	TimestampMs        int64    `json:"timestampMs"`
	DurationSec        float64  `json:"durationSec"`
	PolicyAlertTriggers []string `json:"policyAlertTriggers"`
	TextDescription    string   `json:"textDescription"`
}

func buildStompFrame(command string, headers map[string]string, body string) string {
	var b strings.Builder
	b.WriteString(command + "\n")
	for k, v := range headers {
		fmt.Fprintf(&b, "%s:%s\n", k, v)
	}
	b.WriteString("\n" + body)
	b.WriteByte(0x00)
	return b.String()
}

func parseStompFrame(raw string) *StompFrame {
	raw = strings.TrimLeft(raw, "\n\r")
	if raw == "" || raw == "\x00" {
		return nil // Heartbeat
	}
	raw = strings.TrimRight(raw, "\x00")

	parts := strings.SplitN(raw, "\n\n", 2)
	lines := strings.Split(parts[0], "\n")

	frame := &StompFrame{
		Command: lines[0],
		Headers: make(map[string]string),
	}
	for _, line := range lines[1:] {
		idx := strings.Index(line, ":")
		if idx > -1 {
			frame.Headers[line[:idx]] = line[idx+1:]
		}
	}
	if len(parts) > 1 {
		frame.Body = parts[1]
	}
	return frame
}

func getOrgUUID(apiToken string) (string, error) {
	req, _ := http.NewRequest("POST", "https://api2.rhombussystems.com/api/org/getOrgV2", strings.NewReader("{}"))
	req.Header.Set("x-auth-apikey", apiToken)
	req.Header.Set("Content-Type", "application/json")

	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return "", err
	}
	defer resp.Body.Close()

	var result struct {
		Org struct {
			UUID string `json:"uuid"`
		} `json:"org"`
	}
	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
		return "", err
	}
	return result.Org.UUID, nil
}

func connectAndMonitor(apiToken, orgUUID string, onEvent func(Event)) error {
	// Build URL
	params := url.Values{}
	params.Set("x-auth-scheme", "api-token")
	wsURL := fmt.Sprintf("wss://%s:%s%s?%s", wsHost, wsPort, wsPath, params.Encode())

	// Connect
	headers := http.Header{}
	headers.Set("x-auth-apikey", apiToken)

	dialer := websocket.Dialer{HandshakeTimeout: handshakeTimeout}
	conn, _, err := dialer.Dial(wsURL, headers)
	if err != nil {
		return fmt.Errorf("dial failed: %w", err)
	}
	defer conn.Close()

	// STOMP CONNECT
	conn.WriteMessage(websocket.TextMessage, []byte(buildStompFrame("CONNECT", map[string]string{
		"accept-version": "1.2",
		"heart-beat":     "10000,10000",
	}, "")))

	_, msg, err := conn.ReadMessage()
	if err != nil {
		return fmt.Errorf("read CONNECTED failed: %w", err)
	}
	frame := parseStompFrame(string(msg))
	if frame == nil || frame.Command != "CONNECTED" {
		return fmt.Errorf("expected CONNECTED, got: %v", frame)
	}

	// SUBSCRIBE
	conn.WriteMessage(websocket.TextMessage, []byte(buildStompFrame("SUBSCRIBE", map[string]string{
		"id":          "sub-0",
		"destination": "/topic/change/" + orgUUID,
	}, "")))

	// Heartbeat sender
	stopHeartbeat := make(chan struct{})
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		ticker := time.NewTicker(heartbeatInterval)
		defer ticker.Stop()
		for {
			select {
			case <-ticker.C:
				conn.WriteMessage(websocket.TextMessage, []byte("\n"))
			case <-stopHeartbeat:
				return
			}
		}
	}()

	// Message reader
	msgCh := make(chan StompFrame, 16)
	errCh := make(chan error, 1)
	go func() {
		for {
			_, raw, err := conn.ReadMessage()
			if err != nil {
				errCh <- err
				return
			}
			f := parseStompFrame(string(raw))
			if f != nil && f.Command == "MESSAGE" {
				msgCh <- *f
			}
		}
	}()

	// Signal handling
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, os.Interrupt)

	// Main loop
	for {
		select {
		case f := <-msgCh:
			var event Event
			if err := json.Unmarshal([]byte(f.Body), &event); err == nil {
				onEvent(event)
			}
		case err := <-errCh:
			close(stopHeartbeat)
			wg.Wait()
			return fmt.Errorf("connection lost: %w", err)
		case <-sigCh:
			close(stopHeartbeat)
			wg.Wait()
			conn.WriteMessage(websocket.TextMessage, []byte(buildStompFrame("DISCONNECT", nil, "")))
			return nil
		}
	}
}

func main() {
	apiToken := os.Getenv("RHOMBUS_API_KEY")
	if apiToken == "" {
		log.Fatal("Set RHOMBUS_API_KEY environment variable")
	}

	orgUUID, err := getOrgUUID(apiToken)
	if err != nil {
		log.Fatal("Failed to get org UUID:", err)
	}
	fmt.Printf("Organization: %s\n", orgUUID)
	fmt.Println("Listening for events... (Ctrl+C to stop)\n")

	handler := func(e Event) {
		ts := time.UnixMilli(e.TimestampMs).Format("15:04:05")
		fmt.Printf("[%s] %s %s\n", ts, e.Type, e.Entity)
		if len(e.PolicyAlertTriggers) > 0 {
			fmt.Printf("  triggers: %s\n", strings.Join(e.PolicyAlertTriggers, ", "))
		}
		if e.TextDescription != "" {
			fmt.Printf("  %s\n", e.TextDescription)
		}
		fmt.Printf("  uuid: %s\n\n", e.EntityUUID)
	}

	for {
		err := connectAndMonitor(apiToken, orgUUID, handler)
		if err == nil {
			fmt.Println("\nExiting.")
			return
		}
		fmt.Printf("%v. Reconnecting in %v...\n", err, reconnectDelay)
		time.Sleep(reconnectDelay)
	}
}

Quick Reference

Environment Variables

All examples use this environment variable:
export RHOMBUS_API_KEY="your-api-token-here"

STOMP Frame Sequence

Client                              Server
  │                                    │
  ├─── CONNECT ──────────────────────► │
  │                                    │
  │ ◄──────────────────── CONNECTED ───┤
  │                                    │
  ├─── SUBSCRIBE ────────────────────► │
  │                                    │
  │ ◄────────────────────── MESSAGE ───┤
  │ ◄────────────────────── MESSAGE ───┤
  ├─── heartbeat (\n) ──────────────► │
  │ ◄──────────────── heartbeat (\n) ──┤
  │ ◄────────────────────── MESSAGE ───┤
  │       ...                          │
  │                                    │
  ├─── DISCONNECT ───────────────────► │
  │                                    │

Connection Parameters at a Glance

ParameterValue
ProtocolWSS (TLS)
Hostws.rhombussystems.com
Port8443
Path/websocket
STOMP Version1.2
Heartbeat10000ms bidirectional
Auth Headerx-auth-apikey
Auth Query Paramx-auth-scheme=api-token
Topic/topic/change/{orgUuid}