server/ServerSockets.js

import querystring from 'node:querystring';
import {
  Worker,
} from 'node:worker_threads';

import {
  WebSocketServer,
} from 'ws';

import {
  kServerOnSocketConnection,
  kServerIsValidConnectionToken,
} from './Server.js';
import ServerSocket, {
  kSocketTerminate,
} from './ServerSocket.js';

// @note - fs.readFileSync creates some cwd() issues...
import networkLatencyWorker from './audit-network-latency.worker.js';

export const kSocketsStart = Symbol('soundworks:sockets-start');
export const kSocketsStop = Symbol('soundworks:sockets-stop');

export const kSocketsDeleteSocket = Symbol('soundworks:sockets-delete-socket');
export const kSocketsLatencyStatsWorker = Symbol('soundworks:sockets-latency-stats-worker');
export const kSocketsDebugPreventHeartBeat = Symbol('soundworks:sockets-debug-prevent-heartbeat');

/**
 * Manage all {@link ServerSocket} instances.
 *
 * _Important: In most cases, you should consider using a {@link SharedState}
 * rather than directly using the ServerSocket instance._
 */
class ServerSockets {
  #server = null;
  #config = null;
  #wsServer = null;
  #sockets = new Set();

  constructor(server, config) {
    this.#server = server;
    this.#config = config;

    this[kSocketsLatencyStatsWorker] = null;
    this[kSocketsDebugPreventHeartBeat] = false;
  }

  /**
   * Initialize sockets, all sockets are added to two rooms by default:
   * - to the room corresponding to the client `role`
   * - to the '*' room that holds all connected sockets
   * @private
   */
  async [kSocketsStart]() {
    // Audit for network latency estimation, the worker is written in cjs so that we
    // can make builds for Max, move back to modules once Max support modules
    this[kSocketsLatencyStatsWorker] = new Worker(networkLatencyWorker, { eval: true });

    const auditState = await this.#server.getAuditState();

    auditState.onUpdate(updates => {
      if ('averageNetworkLatencyWindow' in updates || 'averageNetworkLatencyPeriod' in updates) {
        this[kSocketsLatencyStatsWorker].postMessage({
          type: 'config',
          value: {
            averageLatencyWindow: updates.averageNetworkLatencyWindow,
            averageLatencyPeriod: updates.averageNetworkLatencyPeriod,
          },
        });
      }
    }, true);

    this[kSocketsLatencyStatsWorker].on('message', msg => {
      if (msg.type === 'computed-average-latency') {
        auditState.set({ averageNetworkLatency: msg.value });
      }
    });

    // Init ws server
    this.#wsServer = new WebSocketServer({
      noServer: true,
      path: `/${this.#config.path}`,
    });

    this.#wsServer.on('connection', (ws, req) => {
      const { role, token } = querystring.parse(req.url.split('?')[1]);
      const socket = new ServerSocket(ws, this);

      this.#sockets.add(socket);
      this.#server[kServerOnSocketConnection](role, socket, token);
    });

    // Prevent socket with protected role to connect if token is invalid
    this.#server.httpServer.on('upgrade', async (req, socket, head) => {
      const { role, token } = querystring.parse(req.url.split('?')[1]);

      if (this.#server.isProtectedClientRole(role)) {
        // we don't have any IP in the upgrade request object,
        // so we just check the connection token is pending and valid
        if (!this.#server[kServerIsValidConnectionToken](token)) {
          socket.destroy('not allowed');
        }
      }

      this.#wsServer.handleUpgrade(req, socket, head, (ws) => {
        this.#wsServer.emit('connection', ws, req);
      });
    });
  }

  /**
   * Terminate all existing sockets.
   * @private
   */
  [kSocketsStop]() {
    // terminate stat worker thread
    this[kSocketsLatencyStatsWorker].terminate();
    // terminate sockets
    this.#sockets.forEach(socket => socket[kSocketTerminate]());
  }

  /**
   * Delete given socket.
   * @private
   */
  [kSocketsDeleteSocket](socket) {
    this.#sockets.delete(socket);
  }

  /** @private */
  entries() {
    return this.#sockets.entries();
  }

  /** @private */
  keys() {
    return this.#sockets.keys();
  }

  /** @private */
  values() {
    return this.#sockets.values();
  }

  forEach(func) {
    return this.#sockets.forEach(func);
  }
}

export default ServerSockets;