server/Socket.js

import { getTime } from '@ircam/sc-utils';
import {
  PING_INTERVAL,
  PING_MESSAGE,
  PONG_MESSAGE,
} from '../common/constants.js';
import {
  packBinaryMessage,
  unpackBinaryMessage,
  packStringMessage,
  unpackStringMessage,
} from '../common/sockets-utils.js';

// Status codes:
//
// CONNECTING = 0;
// OPEN = 1;
// CLOSING = 2;
// CLOSED = 3;
// READY_STATES = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED'];

/**
 * The Socket class is a simple publish / subscribe wrapper built on top of the
 * [ws](https://github.com/websockets/ws) library. An instance of {@link server.Socket}
 * is automatically created per client when it connects (see {@link server.Client#socket}).
 *
 * _Important: In most cases, you should consider using a {@link client.SharedState}
 * rather than directly using the sockets._
 *
 * The Socket class contains two different WebSockets:
 * - a socket configured with `binaryType = 'blob'` for JSON compatible data
 *  types (i.e. string, number, boolean, object, array and null).
 * - a socket configured with `binaryType= 'arraybuffer'` for efficient streaming
 *  of binary data.
 *
 * @memberof server
 * @hideconstructor
 */
class Socket {
  constructor(ws, binaryWs, rooms, sockets) {
    /**
     * `ws` socket instance configured with `binaryType=blob` (string)
     *
     * @type {object}
     * @private
     */
    this.ws = ws;

    /**
     * `ws` socket instance configured with `binaryType=arraybuffer` (TypedArray)
     *
     * @type {object}
     * @private
     */
    this.binaryWs = binaryWs;

    /**
     * Reference to the sockets object, is mainly dedicated to allow
     * broadcasting from a given socket instance.
     *
     * @type {server.Sockets}
     * @example
     * socket.sockets.broadcast('my-room', this, 'update-value', 1);
     */
    this.sockets = sockets;

    /**
     * Reference to the rooms object
     *
     * @type {Map}
     * @private
     */
    this.rooms = rooms;

    /** @private */
    this._stringListeners = new Map();
    /** @private */
    this._binaryListeners = new Map();
    /** @private */
    this._heartbeatId = null;

    // heartbeat system (run only on string socket), adapted from:
    // https://github.com/websockets/ws#how-to-detect-and-close-broken-connections
    /** @private */
    this._isAlive = true;
    let msg = {
      type: 'add-measurement',
      value: {
        ping: 0,
        pong: 0,
      },
    };

    // ----------------------------------------------------------
    // String socket
    // implements ping/pong behavior
    // ----------------------------------------------------------
    this.ws.addEventListener('message', e => {
      if (e.data === PONG_MESSAGE) {
        this._isAlive = true;

        msg.value.pong = getTime();
        this.sockets._latencyStatsWorker.postMessage(msg);
        // do not propagate ping / pong messages
        return;
      }

      const [channel, args] = unpackStringMessage(e.data);
      this._emit(false, channel, ...args);
    });

    this._heartbeatId = setInterval(() => {
      if (this._isAlive === false) {
        // emit a 'close' event to go trough all the disconnection pipeline
        this._emit(false, 'close');
        return;
      }

      this._isAlive = false;
      msg.value.ping = getTime();

      this.ws.send(PING_MESSAGE);
    }, PING_INTERVAL);

    // broadcast all "native" events
    [
      'close',
      'error',
      'message',
      'open',
      'ping',
      'pong',
      'unexpected-response',
      'upgrade',
    ].forEach(eventName => {
      this.ws.addEventListener(eventName, e => {
        this._emit(false, eventName, e.data);
      });
    });

    // ----------------------------------------------------------
    // Binary socket
    // ----------------------------------------------------------
    this.binaryWs.addEventListener('message', e => {
      const [channel, data] = unpackBinaryMessage(e.data);
      this._emit(true, channel, data);
    });

    // broadcast all "native" events
    [
      'close',
      'error',
      'message',
      'open',
      'ping',
      'pong',
      'unexpected-response',
      'upgrade',
    ].forEach(eventName => {
      this.binaryWs.addEventListener(eventName, e => {
        this._emit(true, eventName, e.data);
      });
    });
  }

  /**
   * Removes all listeners and immediately close the two sockets. Is automatically
   * called on `server.stop()`
   *
   * @private
   */
  terminate() {
    // clear ping/pong check
    clearInterval(this._heartbeatId);

    // clean rooms
    for (let [_key, room] of this.rooms) {
      room.delete(this);
    }

    // clear references to sockets and rooms
    this.sockets = null;
    this.rooms = null;

    // clear all listeners
    this._stringListeners.clear();
    this._binaryListeners.clear();

    // clear "native" listeners
    [this.binaryWs, this.ws].forEach((socket) => {
      [
        'close',
        'error',
        'message',
        'open',
        'ping',
        'pong',
        'unexpected-response',
        'upgrade',
      ].forEach(eventName => {
        socket.removeAllListeners(eventName);
      });
    });

    // clear binarySocket as this is called from the string one.
    this.binaryWs.terminate();
    this.ws.terminate();
  }

  /**
   * @param {boolean} binary - Emit to either the string or binary socket.
   * @param {string} channel - Channel name.
   * @param {...*} args - Content of the message.
   * @private
   */
  _emit(binary, channel, ...args) {
    const listeners = binary ? this._binaryListeners : this._stringListeners;

    if (listeners.has(channel)) {
      const callbacks = listeners.get(channel);
      callbacks.forEach(callback => callback(...args));
    }
  }

  /**
   * @param {Function[]} listeners - List of listeners, either for the string or binary socket.
   * @param {string} channel - Channel name.
   * @param {Function} callback - The function to be added to the listeners.
   * @private
   */
  _addListener(listeners, channel, callback) {
    if (!listeners.has(channel)) {
      listeners.set(channel, new Set());
    }

    const callbacks = listeners.get(channel);
    callbacks.add(callback);
  }

  /**
   * @param {Function[]} listeners - List of listeners, either for the string or binary socket.
   * @param {string} channel - Channel name.
   * @param {Function} callback - The function to be removed from the listeners.
   * @private
   */
  _removeListener(listeners, channel, callback) {
    if (listeners.has(channel)) {
      const callbacks = listeners.get(channel);
      callbacks.delete(callback);

      if (callbacks.size === 0) {
        listeners.delete(channel);
      }
    }
  }

  /**
   * @param {Function[]} listeners - List of listeners, either for the string or binary socket.
   * @param {string} [channel=null] - Channel name of the listeners to remove. If null
   *  all the listeners are cleared.
   * @private
   */
  _removeAllListeners(listeners, channel) {
    if (channel === null) {
      listeners.clear();
    } else if (listeners.has(channel)) {
      listeners.delete(channel);
    }
  }

  /**
   * Add the socket to a room
   *
   * @param {string} roomId - Id of the room.
   */
  addToRoom(roomId) {
    if (!this.rooms.has(roomId)) {
      this.rooms.set(roomId, new Set());
    }

    const room = this.rooms.get(roomId);
    room.add(this);
  }

  /**
   * Remove the socket from a room
   *
   * @param {string} roomId - Id of the room.
   */
  removeFromRoom(roomId) {
    if (this.rooms.has(roomId)) {
      const room = this.rooms.get(roomId);
      room.delete(this);
    }
  }

  /**
   * Send messages with JSON compatible data types on a given channel.
   *
   * @param {string} channel - Channel name.
   * @param {...*} args - Payload of the message. As many arguments as needed, of
   *  JSON compatible data types (i.e. string, number, boolean, object, array and null).
   */
  send(channel, ...args) {
    const msg = packStringMessage(channel, ...args);

    // 0 CONNECTING  Socket has been created. The connection is not yet open.
    // 1 OPEN  The connection is open and ready to communicate.
    // 2 CLOSING The connection is in the process of closing.
    // 3 CLOSED  The connection is closed or couldn't be opened.
    if (this.ws.readyState === 1) {
      this.ws.send(msg, (err) => {
        if (err) {
          console.error('error sending msg:', channel, args, err);
        }
      });
    }
  }

  /**
   * Listen messages with JSON compatible data types on a given channel.
   *
   * @param {string} channel - Channel name.
   * @param {Function} callback - Callback to execute when a message is received,
   *  arguments of the callback function will match the arguments sent using the
   *  {@link server.Socket#send} method.
   */
  addListener(channel, callback) {
    this._addListener(this._stringListeners, channel, callback);
  }

  /**
   * Remove a listener of messages with JSON compatible data types from a given channel.
   *
   * @param {string} channel - Channel name.
   * @param {Function} callback - Callback to remove.
   */
  removeListener(channel, callback) {
    this._removeListener(this._stringListeners, channel, callback);
  }

  /**
   * Remove all listeners of messages with JSON compatible data types.
   *
   * @param {string} channel - Channel name.
   */
  removeAllListeners(channel = null) {
    this._removeAllListeners(this._stringListeners, channel);
  }

  /**
   * Send binary messages on a given channel.
   *
   * @param {string} channel - Channel name.
   * @param {TypedArray} typedArray - Binary data to be sent.
   */
  sendBinary(channel, typedArray) {
    const msg = packBinaryMessage(channel, typedArray);

    this.binaryWs.send(msg, (err) => {
      if (err) {
        console.error('error sending msg:', channel, typedArray);
      }
    });
  }

  /**
   * Listen binary messages on a given channel
   *
   * @param {string} channel - Channel name.
   * @param {Function} callback - Callback to execute when a message is received.
   */
  addBinaryListener(channel, callback) {
    this._addListener(this._binaryListeners, channel, callback);
  }

  /**
   * Remove a listener of binary compatible messages from a given channel
   *
   * @param {string} channel - Channel name.
   * @param {Function} callback - Callback to remove.
   */
  removeBinaryListener(channel, callback) {
    this._removeListener(this._binaryListeners, channel, callback);
  }

  /**
   * Remove all listeners of binary compatible messages on a given channel
   *
   * @param {string} channel - Channel of the message.
   */
  removeAllBinaryListeners(channel = null) {
    this._removeAllListeners(this._binaryListeners, channel);
  }
}

export default Socket;