server/Sockets.js

import querystring from 'node:querystring';
import {
  Worker,
} from 'node:worker_threads';

import {
  default as WebSocket,
  WebSocketServer,
} from 'ws';

import Socket, {
  kSocketTerminate,
} from './Socket.js';

// @note - fs.readFileSync creates some cwd() issues...
import networkLatencyWorker from './audit-network-latency.worker.js';

export const kSocketsRemoveFromAllRooms = Symbol('soundworks:sockets-remove-from-all-rooms');
export const kSocketsLatencyStatsWorker = Symbol('soundworks:sockets-latency-stats-worker');
export const kSocketsDebugPreventHeartBeat = Symbol('soundworks:sockets-debug-prevent-heartbeat');

/**
 * Manage all {@link server.Socket} instances.
 *
 * _Important: In most cases, you should consider using a {@link client.SharedState}
 * rather than directly using the Socket instance._
 *
 * @memberof server
 */
class Sockets {
  #wsServer = null;
  #rooms = new Map();

  constructor() {
    // Init special `'*'` room which stores all current connections.
    this.#rooms.set('*', new Set());

    this[kSocketsLatencyStatsWorker] = null;
    this[kSocketsDebugPreventHeartBeat] = false;
  }

  /**
   * Initialize sockets, all sockets are added to two rooms by default:
   * - to the room corresponding to the client `role`
   * - to the '*' room that holds all connected sockets
   *
   * @private
   */
  async start(server, config, onConnectionCallback) {
    // Audit for network latency estimation, the worker is written in cjs so that we
    // can make builds for Max, move back to modules once Max support modules
    this[kSocketsLatencyStatsWorker] = new Worker(networkLatencyWorker, { eval: true });

    const auditState = await server.getAuditState();

    auditState.onUpdate(updates => {
      if ('averageNetworkLatencyWindow' in updates || 'averageNetworkLatencyPeriod' in updates) {
        this[kSocketsLatencyStatsWorker].postMessage({
          type: 'config',
          value: {
            averageLatencyWindow: updates.averageNetworkLatencyWindow,
            averageLatencyPeriod: updates.averageNetworkLatencyPeriod,
          },
        });
      }
    }, true);

    this[kSocketsLatencyStatsWorker].on('message', msg => {
      if (msg.type === 'computed-average-latency') {
        auditState.set({ averageNetworkLatency: msg.value });
      }
    });

    // Init ws server
    this.#wsServer = new WebSocketServer({
      noServer: true,
      path: `/${config.path}`,
    });

    this.#wsServer.on('connection', (ws, req) => {
      const { role, token } = querystring.parse(req.url.split('?')[1]);
      const socket = new Socket(ws, this);

      socket.addToRoom('*');
      socket.addToRoom(role);

      onConnectionCallback(role, socket, token);
    });

    // Prevent socket with protected role to connect is token is invalid
    server.httpServer.on('upgrade', async (req, socket, head) => {
      const { role, token } = querystring.parse(req.url.split('?')[1]);

      if (server.isProtected(role)) {
        // we don't have any IP in the upgrade request object,
        // so we just check the connection token is pending and valid
        if (!server.isValidConnectionToken(token)) {
          socket.destroy('not allowed');
        }
      }

      this.#wsServer.handleUpgrade(req, socket, head, (ws) => {
        this.#wsServer.emit('connection', ws, req);
      });
    });
  }

  /**
   * Terminate all existing sockets.
   * @private
   */
  terminate() {
    // terminate stat worker thread
    this[kSocketsLatencyStatsWorker].terminate();
    // clean sockets
    const sockets = this.#rooms.get('*');
    sockets.forEach(socket => socket[kSocketTerminate]());
  }

  /**
   * Remove given socket from all rooms.
   */
  [kSocketsRemoveFromAllRooms](socket) {
    for (let [_, room] of this.#rooms) {
      room.delete(socket);
    }
  }

  /**
   * Add a socket to a room.
   *
   * _Note that in most cases, you should use a shared state instead_
   *
   * @param {server.Socket} socket - Socket to add to the room.
   * @param {String} roomId - Id of the room.
   */
  addToRoom(socket, roomId) {
    if (!this.#rooms.has(roomId)) {
      this.#rooms.set(roomId, new Set());
    }

    const room = this.#rooms.get(roomId);
    room.add(socket);
  }

  /**
   * Remove a socket from a room.
   *
   * _Note that in most cases, you should use a shared state instead_
   *
   * @param {server.Socket} socket - Socket to remove from the room.
   * @param {String} roomId - Id of the room.
   */
  removeFromRoom(socket, roomId) {
    if (this.#rooms.has(roomId)) {
      const room = this.#rooms.get(roomId);
      room.delete(socket);
    }
  }

  /**
   * Send a message to all clients os given room(s). If no room is specified,
   * the message is sent to all clients.
   *
   * _Note that in most cases, you should use a shared state instead_
   *
   * @param {String|Array} roomsIds - Ids of the rooms that must receive
   *  the message. If `null` the message is sent to all clients.
   * @param {server.Socket} excludeSocket - Optionnal socket to ignore when
   *  broadcasting the message, typically the client at the origin of the message.
   * @param {String} channel - Channel name.
   * @param {...*} args - Payload of the message. As many arguments as needed, of
   *  JSON compatible data types (i.e. string, number, boolean, object, array and null).
   */
  broadcast(roomIds, excludeSocket, channel, ...args) {
    const targets = new Set();

    if (typeof roomIds === 'string' || Array.isArray(roomIds)) {
      if (typeof roomIds === 'string') {
        roomIds = [roomIds];
      }

      roomIds.forEach(roomId => {
        if (this.#rooms.has(roomId)) {
          const room = this.#rooms.get(roomId);
          room.forEach(socket => targets.add(socket));
        }
      });
    } else {
      targets = this.#rooms.get('*');
    }

    targets.forEach(socket => {
      if (socket.readyState === WebSocket.OPEN && socket !== excludeSocket) {
        socket.send(channel, ...args);
      }
    });
  }
}

export default Sockets;