common/SharedState.js

import { isPlainObject } from '@ircam/sc-utils';

import ParameterBag from './ParameterBag.js';
import PromiseStore from './PromiseStore.js';
import {
  DELETE_REQUEST,
  DELETE_RESPONSE,
  DELETE_ERROR,
  DELETE_NOTIFICATION,
  DETACH_REQUEST,
  DETACH_RESPONSE,
  DETACH_ERROR,
  UPDATE_REQUEST,
  UPDATE_RESPONSE,
  UPDATE_ABORT,
  UPDATE_NOTIFICATION,
} from './constants.js';

import {
  kStateManagerDeleteState,
} from './BaseStateManager.js';

import logger from './logger.js';

// for testing purposes
export const kSharedStatePromiseStore = Symbol('soundworks:shared-state-promise-store');

/**
 * Callback executed when updates are applied on a {@link SharedState}.
 *
 * @callback sharedStateOnUpdateCallback
 * @param {Object} newValues - Key / value pairs of the updates that have been
 *  applied to the state.
 * @param {Object} oldValues - Key / value pairs of the updated params before
 *  the updates has been applied to the state.
 * @param {Mixed} [context=null] - Optionnal context object that has been passed
 *  with the values updates in the `set` call.
 */

/**
 * Delete the registered {@link sharedStateOnUpdateCallback}.
 *
 * @callback sharedStateDeleteOnUpdateCallback
 */

/**
 * The `SharedState` is one of the most important and versatile abstraction provided
 * by `soundworks`. It represents a set of parameters that are synchronized accross
 * every nodes of the application (clients and server) that declared some interest
 * to the shared state.
 *
 * A `SharedState` instance is created according to a shared state class definition
 * which is composed of a {@link SharedStateClassName} and of a {@link SharedStateClassSchema}
 * registered in the {@link ServerStateManager}. Any number of `SharedState`s
 * can be created from a single class definition.
 *
 * A shared state can be created both by the clients or by the server, in which case
 * it is generally considered as a global state of the application. Similarly any
 * node of the application (clients or server) can declare interest and "attach" to
 * a state created by another node. All node attached to a state can modify its values
 * and/or react to the modifications applied by other nodes.
 *
 * Tutorial: {@link https://soundworks.dev/guide/state-manager.html}
 *
 * ```
 * // server-side
 * import { Server } from '@soundworks/server/index.js';
 *
 * const server = new Server(config);
 * // define a shared state class
 * server.stateManager.registerSchema('some-global-state', {
 *   myRandom: {
 *     type: 'float',
 *     default: 0,
 *   }
 * });
 *
 * await server.start();
 *
 * // create a global state server-side
 * const globalState = await server.stateManager.create('some-global-state');
 * // listen and react to the changes made by the clients
 * globalState.onUpdate(updates => console.log(updates));
 * ```
 *
 * ```
 * // client-side
 * import { Client } from '@soundworks/client.index.js';
 *
 * const client = new Client(config);
 * await client.start();
 *
 * // attach to the global state created by the server
 * const globalState = await client.stateManager.attach('some-global-state');
 *
 * // update the value of a `myRandom` parameter every seconds
 * setInterval(() => {
 *   globalState.set({ myRandom: Math.random() });
 * }, 1000);
 * ```
 */
class SharedState {
  #id = null;
  #remoteId = null;
  #className = null;
  #isOwner = null;
  #client = null;
  #manager = null;
  #filter = null;
  // true is the state has been detached or deleted
  #detached = false;
  #parameters = null;
  #onUpdateCallbacks = new Set();
  #onDetachCallbacks = new Set();
  #onDeleteCallbacks = new Set();

  constructor(id, remoteId, className, schema, client, isOwner, manager, initValues, filter) {
    this.#id = id;
    this.#remoteId = remoteId;
    this.#className = className;
    this.#isOwner = isOwner; // may be any node
    this.#client = client;
    this.#manager = manager;
    this.#filter = filter;

    try {
      this.#parameters = new ParameterBag(schema, initValues);
    } catch (err) {
      console.error(err.stack);

      throw new Error(`Error creating or attaching state "${className}" w/ values:\n
${JSON.stringify(initValues, null, 2)}`);
    }

    /** @private */
    this[kSharedStatePromiseStore] = new PromiseStore(this.constructor.name);

    // add listener for state updates
    this.#client.transport.addListener(`${UPDATE_RESPONSE}-${this.#id}-${this.#remoteId}`, async (reqId, updates, context) => {
      const updated = await this.#commit(updates, context, true, true);
      this[kSharedStatePromiseStore].resolve(reqId, updated);
    });

    // retrieve values but do not propagate to subscriptions
    this.#client.transport.addListener(`${UPDATE_ABORT}-${this.#id}-${this.#remoteId}`, async (reqId, updates, context) => {
      const updated = await this.#commit(updates, context, false, true);
      this[kSharedStatePromiseStore].resolve(reqId, updated);
    });

    this.#client.transport.addListener(`${UPDATE_NOTIFICATION}-${this.#id}-${this.#remoteId}`, async (updates, context) => {
      // https://github.com/collective-soundworks/soundworks/issues/18
      //
      // # note: 2002-10-03
      //
      // `setTimeout(async () => this.#commit(updates, context, true, false));`
      // appears to be the only way to push the update commit in the next event
      // cycle so that `attach` can resolve before the update notification is
      // actually dispatched. The alternative:
      // `Promise.resolve().then(() => this.#commit(updates, context, true, false))``
      // does not behave as expected...
      //
      // However this breaks the reliability of:
      // ```
      // /* given a0, a1 and a2 being 3 similar attached states */
      // await state.set({ int: i });
      //
      // assert.equal(a0.get('int'), i);
      // assert.equal(a1.get('int'), i);
      // assert.equal(a2.get('int'), i);
      // ```
      // which is far more important than the edge case reported in the issue
      // therefore this wont be fixed for now
      this.#commit(updates, context, true, false);
    });

    // ---------------------------------------------
    // state has been deleted by its creator or the schema has been deleted
    // ---------------------------------------------
    this.#client.transport.addListener(`${DELETE_NOTIFICATION}-${this.#id}-${this.#remoteId}`, async () => {
      this.#manager[kStateManagerDeleteState](this.#id);
      this.#clearTransport();

      for (let callback of this.#onDetachCallbacks) {
        try {
          await callback();
        } catch (err) {
          console.error(err.message);
        }
      }

      if (this.#isOwner) {
        for (let callback of this.#onDeleteCallbacks) {
          await callback();
        }
      }

      this.#onDetachCallbacks.clear();
      this.#onDeleteCallbacks.clear();
      this[kSharedStatePromiseStore].flush();
    });


    if (this.#isOwner) {
      // ---------------------------------------------
      // the creator has called `.delete()`
      // ---------------------------------------------
      this.#client.transport.addListener(`${DELETE_RESPONSE}-${this.#id}-${this.#remoteId}`, async (reqId) => {
        this.#manager[kStateManagerDeleteState](this.#id);
        this.#clearTransport();

        for (let callback of this.#onDetachCallbacks) {
          await callback();
        }

        for (let callback of this.#onDeleteCallbacks) {
          await callback();
        }

        this.#onDetachCallbacks.clear();
        this.#onDeleteCallbacks.clear();
        this[kSharedStatePromiseStore].resolve(reqId, this);
        this[kSharedStatePromiseStore].flush();
      });

      this.#client.transport.addListener(`${DELETE_ERROR}-${this.#id}`, (reqId, msg) => {
        this[kSharedStatePromiseStore].reject(reqId, msg);
      });

    } else {
      // ---------------------------------------------
      // the attached node has called `.detach()`
      // ---------------------------------------------
      this.#client.transport.addListener(`${DETACH_RESPONSE}-${this.#id}-${this.#remoteId}`, async (reqId) => {
        this.#manager[kStateManagerDeleteState](this.#id);
        this.#clearTransport();

        for (let callback of this.#onDetachCallbacks) {
          await callback();
        }

        this.#onDetachCallbacks.clear();
        this.#onDeleteCallbacks.clear();
        this[kSharedStatePromiseStore].resolve(reqId, this);
        this[kSharedStatePromiseStore].flush();
      });

      // the state does not exists anymore in the server (should not happen)
      this.#client.transport.addListener(`${DETACH_ERROR}-${this.#id}`, (reqId, msg) => {
        this.#onDetachCallbacks.clear();
        this.#onDeleteCallbacks.clear();
        this[kSharedStatePromiseStore].reject(reqId, msg);
        this[kSharedStatePromiseStore].flush();
      });
    }
  }

  /**
   * Id of the state
   * @type {Number}
   */
  get id() {
    return this.#id;
  }

  /**
   * Name of the underlying {@link SharedState} class.
   * @type {String}
   * @deprecated
   */
  get schemaName() {
    logger.deprecated('SharedState#schemaName', 'SharedState#className');
    return this.#className;
  }

  /**
   * Name of the underlying {@link SharedState} class.
   * @type {String}
   */
  get className() {
    return this.#className;
  }

  /**
   * Indicates if the node is the owner of the state, i.e. if it created the state.
   * @type {Boolean}
   */
  get isOwner() {
    return this.#isOwner;
  }

  #clearTransport() {
    // remove listeners
    this.#client.transport.removeAllListeners(`${UPDATE_RESPONSE}-${this.#id}-${this.#remoteId}`);
    this.#client.transport.removeAllListeners(`${UPDATE_NOTIFICATION}-${this.#id}-${this.#remoteId}`);
    this.#client.transport.removeAllListeners(`${UPDATE_ABORT}-${this.#id}-${this.#remoteId}`);
    this.#client.transport.removeAllListeners(`${DELETE_NOTIFICATION}-${this.#id}-${this.#remoteId}`);

    if (this.#isOwner) {
      this.#client.transport.removeAllListeners(`${DELETE_RESPONSE}-${this.#id}-${this.#remoteId}`);
      this.#client.transport.removeAllListeners(`${DELETE_ERROR}-${this.#id}-${this.#remoteId}`);
    } else {
      this.#client.transport.removeAllListeners(`${DETACH_RESPONSE}-${this.#id}-${this.#remoteId}`);
      this.#client.transport.removeAllListeners(`${DETACH_ERROR}-${this.#id}-${this.#remoteId}`);
    }
  }

  async #commit(updates, context, propagate = true, initiator = false) {
    const newValues = {};
    const oldValues = {};

    for (let name in updates) {
      const { immediate, event } = this.#parameters.getSchema(name);
      // @note 20211209 - we had an issue here server-side, because if the value
      // is an object or an array, the reference is shared by everybody, therefore
      // `changed` is always false and the new value is never propagated...
      // FIXED - `state.get` now returns a deep copy when `type` is `any`
      const oldValue = this.#parameters.get(name);
      const [newValue, changed] = this.#parameters.set(name, updates[name]);

      // handle immediate stuff
      if (initiator && immediate) {
        // @note - we don't need to check filterChange here because the value
        // has been updated in parameters on the `set` side so can rely on `changed`
        // to avoid retrigger listeners.
        // If the value has been overriden by the server, `changed` will true
        // anyway so it should behave correctly.
        if (!changed || event) {
          continue;
        }
      }

      newValues[name] = newValue;
      oldValues[name] = oldValue;
    }

    // if the `UPDATE_REQUEST` as been aborted by the server, do not propagate
    let promises = [];

    if (propagate && Object.keys(newValues).length > 0) {
      this.#onUpdateCallbacks.forEach(listener => {
        promises.push(listener(newValues, oldValues, context));
      });
    }

    // on a given client, `await state.set(update)` resolves after all
    // update callbacks have themselves resolved
    await Promise.all(promises);

    // reset events to null after propagation of all listeners
    for (let name in newValues) {
      const { event } = this.#parameters.getSchema(name);

      if (event) {
        this.#parameters.set(name, null);
      }
    }

    return newValues;
  }

  /**
   * Update values of the state.
   *
   * The returned `Promise` resolves on an object that contains the applied updates,
   * and resolves after all the `onUpdate` callbacks have resolved themselves, i.e.:
   *
   * ```js
   * server.stateManager.registerSchema('test', {
   *   myBool: { type: 'boolean', default: false },
   * });
   * const a = await server.stateManager.create('a');
   *
   * let asyncCallbackCalled = false;
   *
   * a.onUpdate(updates => {
   *   return new Promise(resolve => {
   *     setTimeout(() => {
   *       asyncCallbackCalled = true;
   *       resolve();
   *     }, 100);
   *   });
   * });
   *
   * const updates = await a.set({ myBool: true });
   * assert.equal(asyncCallbackCalled, true);
   * assert.deepEqual(updates, { myBool: true });
   * ```
   *
   * @param {object} updates - Key / value pairs of updates to apply to the state.
   * @param {mixed} [context=null] - Optionnal contextual object that will be propagated
   *   alongside the updates of the state. The context is valid only for the
   *   current call and will be passed as third argument to all update listeners.
   * @returns {Promise<Object>} A promise to the (coerced) updates.
   *
   * @example
   * const state = await client.stateManager.attach('globals');
   * const updates = await state.set({ myParam: Math.random() });
   */
  async set(updates, context = null) {
    if (this.#detached) {
      return;
    }

    if (!isPlainObject(updates)) {
      throw new TypeError(`[SharedState] State "${this.#className}": state.set(updates[, context]) should receive an object as first parameter`);
    }

    if (context !== null && !isPlainObject(context)) {
      throw new TypeError(`[SharedState] State "${this.#className}": state.set(updates[, context]) should receive an object as second parameter`);
    }

    const newValues = {};
    const oldValues = {};
    const localParams = {};
    const sharedParams = {};
    let hasLocalParam = false;
    let hasSharedParam = false;
    let forwardParams = undefined;
    let propagateNow = false;

    for (let name in updates) {
      // Try to coerce value early, so that eventual errors are triggered early
      // on the node requesting the update, and not only on the server side
      // This throws if name does not exists
      this.#parameters.coerceValue(name, updates[name]);

      // Check that name is in filter list, if any
      if (this.#filter !== null) {
        if (!this.#filter.includes(name)) {
          throw new DOMException(`[SharedState] State "${this.#className}": cannot set parameter '${name}', parameter is not in filter list`, 'NotSupportedError');
        }
      }

      // `immediate` option behavior
      //
      // If immediate=true
      //  - call listeners if value changed
      //  - go through normal server path
      //  - retrigger only if response from server is different from current value
      // If immediate=true && (filterChange=false || event=true)
      //  - call listeners with value regarless it changed
      //  - go through normal server path
      //  - if the node is initiator of the update (UPDATE_RESPONSE), (re-)check
      //    to prevent execute the listeners twice

      // `local` option behavior
      //
      // - If the `updates` object only contains local variables, we can call the
      // update listeners and return a fulfilled promise immediately
      // - If parameters that require network communication are present, we call the
      // update callback onces with the local payload, then we need to wait for the server
      // response, call update listeners with server response and resolve promise with
      // the full payload, i.e. reintegrating the local params in the resolve payload

      const { local, immediate, filterChange, event } = this.#parameters.getSchema(name);

      if (immediate || local) {
        const oldValue = this.#parameters.get(name);
        const [newValue, changed] = this.#parameters.set(name, updates[name]);

        // prepare data for immediate propagation of listeners
        if (changed || filterChange === false || event) {
          oldValues[name] = oldValue;
          newValues[name] = newValue;
          propagateNow = true;
        }
      }

      // define params that must go through network or not
      if (local) {
        hasLocalParam = true;
        // get sanitize value for fulfilling promise
        localParams[name] = this.#parameters.get(name);
      } else {
        // note that immediate are shared params too
        hasSharedParam = true;
        sharedParams[name] = updates[name];
      }
    }

    // propagate immediate params if changed
    if (propagateNow) {
      this.#onUpdateCallbacks.forEach(listener => listener(newValues, oldValues, context));
    }

    // check if we can resolve immediately or if we need to go through network
    if (hasLocalParam) {
      if (!hasSharedParam) {
        return Promise.resolve(localParams);
      } else {
        // store local params to fulfill promise with all values, see PromiseStore
        forwardParams = localParams;
      }
    }

    // override updates to be shared on network without local params
    updates = sharedParams;

    // go through server-side normal behavior
    return new Promise((resolve, reject) => {
      const reqId = this[kSharedStatePromiseStore].add(resolve, reject, 'update-request', forwardParams);
      this.#client.transport.emit(`${UPDATE_REQUEST}-${this.#id}-${this.#remoteId}`, reqId, updates, context);
    });
  }

  /**
   * Get the value of a parameter of the state.
   *
   * Be aware that in case of 'any' typethe returned value is deeply copied.
   * While this prevents from pollution of the state by mutating the reference,
   * this can also lead to performance issues when the parameter contains large
   * data. In such cases you should use the {@link SharedState#getUnsafe} method
   * and make sure to treat the returned object as readonly.
   *
   * @param {SharedStateParameterName} name - Name of the param.
   * @return {any}
   * @throws Throws if `name` does not exists.
   * @example
   * const value = state.get('paramName');
   */
  get(name) {
    if (!this.#parameters.has(name)) {
      throw new ReferenceError(`[SharedState] State "${this.#className}": Cannot get value of undefined parameter "${name}"`);
    }

    if (this.#filter !== null) {
      if (!this.#filter.includes(name)) {
        throw new DOMException(`[SharedState] State "${this.#className}": cannot get parameter '${name}', parameter is not in filter list`, 'NotSupportedError');
      }
    }

    return this.#parameters.get(name);
  }

  /**
   * Get an unsafe reference to the value of a parameter of the state.
   *
   * Similar to `get` but returns a reference to the underlying value in case of
   * `any` type. Can be usefull if the underlying value is large (e.g. sensors
   * recordings, etc.) and deep cloning expensive. Be aware that if changes are
   * made on the returned object, the state of your application will become
   * inconsistent.
   *
   * @param {SharedStateParameterName} name - Name of the param.
   * @return {any}
   * @throws Throws if `name` does not exists.
   * @example
   * const value = state.getUnsafe('paramName');
   */
  getUnsafe(name) {
    if (!this.#parameters.has(name)) {
      throw new ReferenceError(`[SharedState] State "${this.#className}": Cannot get value of undefined parameter "${name}"`);
    }

    if (this.#filter !== null) {
      if (!this.#filter.includes(name)) {
        throw new DOMException(`[SharedState] State "${this.#className}": cannot get parameter '${name}', parameter is not in filter list`, 'NotSupportedError');
      }
    }

    return this.#parameters.getUnsafe(name);
  }

  /**
   * Get all the key / value pairs of the state.
   *
   * If a parameter is of `any` type, a deep copy is made.
   *
   * @return {object}
   * @example
   * const values = state.getValues();
   */
  getValues() {
    const values = this.#parameters.getValues();

    if (this.#filter !== null) {
      for (let name in values) {
        if (!this.#filter.includes(name)) {
          delete values[name];
        }
      }
    }

    return values;
  }

  /**
   * Get all the key / value pairs of the state.
   *
   * Similar to `getValues` but returns a reference to the underlying value in
   * case of `any` type. Can be usefull if the underlying value is big (e.g.
   * sensors recordings, etc.) and deep cloning expensive. Be aware that if
   * changes are made on the returned object, the state of your application will
   * become inconsistent.
   *
   * @return {object}
   * @example
   * const values = state.getValues();
   */
  getValuesUnsafe() {
    const values = this.#parameters.getValuesUnsafe();

    if (this.#filter !== null) {
      for (let name in values) {
        if (!this.#filter.includes(name)) {
          delete values[name];
        }
      }
    }

    return values;
  }

  /**
   * Return the underlying {@link SharedStateClassSchema} or the
   * {@link SharedStateParameterDescription} if name is given.
   *
   * @param {string} [name] - If defined, returns only the parameter description
   *  of the given param name.
   * @return {SharedStateClassSchema|SharedStateParameterDescription}
   * @throws Throws if `name` does not exists.
   * @example
   * const schema = state.getSchema();
   */
  getSchema(name = null) {
    return this.#parameters.getSchema(name);
  }

  /**
   * Get the values with which the state has been created. May defer from the
   * default values declared in the schema.
   *
   * @return {object}
   * @example
   * const initValues = state.getInitValues();
   */
  getInitValues() {
    return this.#parameters.getInitValues();
  }

  /**
   * Get the default values as declared in the schema.
   *
   * @return {object}
   * @example
   * const defaults = state.getDefaults();
   */
  getDefaults() {
    return this.#parameters.getDefaults();
  }

  /**
   * Detach from the state. If the client is the creator of the state, the state
   * is deleted and all attached nodes get notified.
   *
   * @example
   * const state = await client.state.attach('globals');
   * // later
   * await state.detach();
   */
  async detach() {
    if (this.#detached) {
      throw new Error(`[SharedState] State "${this.#className} (${this.#id})" already detached, cannot detach twice`);
    }

    this.#detached = true; // mark detached early
    this.#onUpdateCallbacks.clear();

    if (this.#isOwner) {
      return new Promise((resolve, reject) => {
        const reqId = this[kSharedStatePromiseStore].add(resolve, reject, 'delete-request');
        this.#client.transport.emit(`${DELETE_REQUEST}-${this.#id}-${this.#remoteId}`, reqId);
      });
    } else {
      return new Promise((resolve, reject) => {
        const reqId = this[kSharedStatePromiseStore].add(resolve, reject, 'detach-request');
        this.#client.transport.emit(`${DETACH_REQUEST}-${this.#id}-${this.#remoteId}`, reqId);
      });
    }
  }

  /**
   * Delete the state. Only the creator/owner of the state can use this method.
   *
   * All nodes attached to the state will be detached, triggering any registered
   * `onDetach` callbacks. The creator of the state will also have its own `onDelete`
   * callback triggered. The local `onDeatch` and `onDelete` callbacks will be
   * executed *before* the returned Promise resolves
   *
   * @throws Throws if the method is called by a node which is not the owner of
   * the state.
   * @example
   * const state = await client.state.create('my-schema-name');
   * // later
   * await state.delete();
   */
  async delete() {
    if (this.#isOwner) {
      if (this.#detached) {
        throw new Error(`[SharedState] State "${this.#className} (${this.#id})" already deleted, cannot delete twice`);
      }

      return this.detach();
    } else {
      throw new Error(`[SharedState] Cannot delete state "${this.#className}", only the owner of the state (i.e. the node that created it) can delete the state. Use "detach" instead.`);
    }
  }

  /**
   * Subscribe to state updates.
   *
   * @param {sharedStateOnUpdateCallback} callback
   *  Callback to execute when an update is applied on the state.
   * @param {Boolean} [executeListener=false] - Execute the callback immediately
   *  with current state values. (`oldValues` will be set to `{}`, and `context` to `null`)
   * @returns {sharedStateDeleteOnUpdateCallback}
   * @example
   * const unsubscribe = state.onUpdate(async (newValues, oldValues, context) =>  {
   *   for (let [key, value] of Object.entries(newValues)) {
   *      switch (key) {
   *        // do something
   *      }
   *   }
   * });
   *
   * // later
   * unsubscribe();
   */
  onUpdate(listener, executeListener = false) {
    this.#onUpdateCallbacks.add(listener);

    if (executeListener === true) {
      const currentValues = this.getValues();
      const oldValues = {};
      const context = null;
      listener(currentValues, oldValues, context);
    }

    return () => {
      this.#onUpdateCallbacks.delete(listener);
    };
  }

  /**
   * Register a function to execute when detaching from the state. The function
   * will be executed before the `detach` promise resolves.
   *
   * @param {Function} callback - Callback to execute when detaching from the state.
   *   Whether the client as called `detach`, or the state has been deleted by its
   *   creator.
   */
  onDetach(callback) {
    this.#onDetachCallbacks.add(callback);
    return () => this.#onDetachCallbacks.delete(callback);
  }

  /**
   * Register a function to execute when the state is deleted. Only called if the
   * node was the creator of the state. Is called after `onDetach` and executed
   * before the `delete` Promise resolves.
   *
   * @param {Function} callback - Callback to execute when the state is deleted.
   */
  onDelete(callback) {
    this.#onDeleteCallbacks.add(callback);
    return () => this.#onDeleteCallbacks.delete(callback);
  }
}

export default SharedState;