Source: Sock.js

'use strict';

var Writable = require('stream').Writable;
var Readable = require('stream').Readable;
var net = require('net');
var util = require('util');
var EventEmitter = require('events');
var ut = require('utjs');

var Serializer = require('./Serializer');
var Reader = require('./Reader');
var Constants = require('./Constants');

var MAX_MESSAGE_ID = Math.pow(2, 32) - 1;

/**
 * The super class of Socket.
 *
 * @constructor
 * @abstract
 * @fires Sock#reconnecting
 * @fires Sock#socket_connect
 * @fires Sock#socket_drain
 * @fires Sock#end
 * @fires Sock#close
 * @fires Sock#error
 */
function Sock(serializer, opts) {
  opts = opts || {};
  Sock.super_.call(this);

  this._serializer = serializer;
  this._reader = new Reader();
  this._opts = opts;

  this._shouldReconnect = ut.isBoolean(opts.reconnect) ? opts.reconnect : Constants.RECONNECT;

  this._reconnectInterval = ut.isNumber(opts.reconnectInterval) ? opts.reconnectInterval :
      Constants.RECONNECT_INTERVAL;

  this._autoConnect = ut.isBoolean(opts.autoConnect) ? opts.autoConnect : Constants.AUTO_CONNECT;

  this._useQueue = ut.isBoolean(opts.useQueue) ? opts.useQueue : Constants.USE_QUEUE;

  this._queueSize = ut.isNumber(opts.queueSize) ? opts.queueSize : Constants.QUEUE_SIZE;

  this._messageListener = ut.isFunction(opts.messageListener) ? opts.messageListener : null;

  /**
   * A unique identifier. It will be set up asynchronously from the server in the event 'connect'.
   * @type {String}
   */
  this.id = null;

  this._messageId = 1;
  this._acks = {};

  this._socket = null;
  this._socketConfig = {};
  this._connected = false;
  this._manuallyClosed = false;

  this._queue = [];

  this._streams = {};

  if (this._autoConnect) {
    this.connect();
  }
}

util.inherits(Sock, EventEmitter);

Sock.prototype._superEmit = Sock.prototype.emit;

/**
 * Send a FIN packet.
 */
Sock.prototype.end = function () {
  if (this._connected) {
    this._manuallyClosed = true;
    this._socket.end();
  }
};

/**
 * Close the socket.
 */
Sock.prototype.destroy = function () {
  if (this._connected) {
    this._manuallyClosed = true;
    this._socket.destroy();
  }
};

/**
 * Connect the socket. The socket will be connected automatically by default
 * so this method is only useful when you use otps.autoConnect: false,
 * in the constructor.
 */
Sock.prototype.connect = function () {
  if (!this._connected) {
    this._manuallyClosed = false;
    this._socket = net.createConnection(this._opts);
    this._bindEvents();
  }
};

/**
 * Sets the socket to timeout after 'timeout' milliseconds of inactivity on the socket.
 *
 * @param {Number} timeout The timeout in milliseconds or 0 to disable it.
 */
Sock.prototype.setTimeout = function (timeout) {
  this._socketConfig.timeout = timeout;

  if (this._connected) {
    this._socket.setTimeout(timeout);
  }

  return this;
};

/**
 * Disables the Nagle algorithm.
 *
 * @param {Boolean} [noDelay=true] True to disable de Nagle algorithm, false to enable it again.
 */
Sock.prototype.setNoDelay = function (noDelay) {
  this._socketConfig.noDelay = noDelay;

  if (this._connected) {
    this._socket.setNoDelay(noDelay);
  }

  return this;
};

/**
 * Enable/disable keep-alive functionality, and optionally set the initial delay before the first
 * keepalive probe is sent on an idle socket.
 *
 * @param {Boolean} [enable=false] True to enable the TCP keep-alive, false to disable it.
 * @param {Number} [initialDelay=0] Set the delay in milliseconds between the last data packet
 *                                  received and the first keepalive probe.
 */
Sock.prototype.setKeepAlive = function (enable, initialDelay) {
  this._socketConfig.keepAliveEnable = enable;
  this._socketConfig.keepAliveInitialDelay = initialDelay;

  if (this._connected) {
    this._socket.setKeepAlive(enable, initialDelay);
  }

  return this;
};

/**
 * Get the version of the underlying serializer.
 *
 * @return {Number} The serializer version.
 */
Sock.prototype.getSerializerVersion = function () {
  return Serializer.VERSION;
};

Sock.prototype._send = function (event, data, mt, opts) {
  opts = opts || {};
  var messageId = opts.messageId || this._nextMessageId();

  if (opts.cb !== undefined) {
    this._acks[messageId] = opts.cb;
  }

  var buff = this._serializer.serialize(event, data, mt, messageId);

  if (this._connected) {
    return this._socket.write(buff);
  } else if (this._useQueue) {
    if (this._queue.length + 1 > this._queueSize) {
      this._queue.shift();
    }

    this._queue.push(buff);
    return false;
  }
};

Sock.prototype._sendStream = function (event, data, mt, cb) {
  var opts = { messageId: this._nextMessageId() };
  var _this = this;

  if (cb !== undefined) {
    opts.cb = cb;
  }

  this._send(event, data, mt, opts);

  var writeStream = new Writable({
    write: function (chunk, encoding, cb) {
      if (_this._send(event, chunk, Serializer.MT_DATA_STREAM, opts)) {
        cb();
      } else {
        _this._socket.once('drain', cb);
      }
    }
  });

  writeStream.on('finish', function () {
    _this._send(event, '', Serializer.MT_DATA_STREAM_CLOSE, opts);
  });

  return writeStream;
};

Sock.prototype._reconnect = function () {
  var _this = this;
  setTimeout(function () {
    /**
     * The socket is trying to reconnect.
     *
     * @event Sock#reconnecting
     */
    _this._superEmit('reconnecting');
    _this.connect();
  }, this._reconnectInterval);
};

Sock.prototype._configure = function () {
  var config = this._socketConfig;

  if (config.timeout !== undefined) {
    this.setTimeout(config.timeout);
  }

  if (config.noDelay !== undefined) {
    this.setNoDelay(config.noDelay);
  }

  if (config.keepAliveEnable !== undefined || config.keepAliveInitialDelay !== undefined) {
    this.setKeepAlive(config.keepAliveEnable, config.keepAliveInitialDelay);
  }
};

Sock.prototype._bindEvents = function () {
  var _this = this;

  this._socket.on('connect', function () {
    _this._connected = true;
    _this._configure();

    // Send all queued events
    _this._flushQueue();

    // Resume all waiting streams
    _this._socket.emit('drain');

    /**
     * Connected underlying net.Socket, all messages in queue will
     * be sent and new messages will be sent directly.
     *
     * @event Sock#socket_connect
     */
    _this._superEmit('socket_connect');
  });

  this._socket.on('data', function (chunk) {
    var buffers = _this._reader.read(chunk);

    for (var i = 0; i < buffers.length; i++) {
      _this._onMessage(_this._serializer.deserialize(buffers[i]));
    }
  });

  this._socket.on('end', function () {
    /**
     * End event from net.Socket.
     *
     * @event Sock#end
     */
    _this._superEmit('end');
  });

  this._socket.on('close', function (isError) {
    _this._connected = false;
    _this._socket = null;

    if (_this._shouldReconnect && !_this._manuallyClosed) {
      _this._reconnect();
    }

    /**
     * Close event from net.Socket.
     *
     * @event Sock#close
     */
    _this._superEmit('close');
  });

  this._socket.on('error', function (err) {
    _this._onError(err);
  });

  this._socket.on('timeout', function () {
    _this._socket.destroy();
    _this._onError(ut.error('connect TIMEOUT'));
  });

  this._socket.on('drain', function () {
    /**
     * Emitted when the write buffer of the internal socket becomes empty.
     * Can be used to throttle uploads.
     *
     * @event Sock#socket_drain
     */
    _this._superEmit('socket_drain');
  });
};

Sock.prototype._onMessage = function (msg) {
  var readStream;

  switch (msg.mt) {
    case Serializer.MT_DATA:
      this._superEmit(msg.event, msg.data);
      break;
    case Serializer.MT_DATA_STREAM_OPEN:
      readStream = this._openDataStream(msg);
      this._superEmit(msg.event, readStream, msg.data);
      break;
    case Serializer.MT_DATA_STREAM_OPEN_WITH_ACK:
      readStream = this._openDataStream(msg);
      this._superEmit(msg.event, readStream, msg.data, this._ackCallback(msg.messageId));
      break;
    case Serializer.MT_DATA_STREAM:
      this._transmitDataStream(msg);
      break;
    case Serializer.MT_DATA_STREAM_CLOSE:
      this._closeDataStream(msg);
      break;
    case Serializer.MT_DATA_WITH_ACK:
      this._superEmit(msg.event, msg.data, this._ackCallback(msg.messageId));
      break;
    case Serializer.MT_ACK:
      this._acks[msg.messageId](msg.data);
      delete this._acks[msg.messageId];
      break;
    default:
      if (this._messageListener) {
        this._messageListener(msg);
      }
  }
};

Sock.prototype._openDataStream = function (msg) {
  var _this = this;
  var readStream = new Readable({
    read: function (size) {
      if (_this._socket.isPaused()) {
        _this._socket.resume();
      }
    }
  });

  this._streams[msg.messageId] = readStream;
  return readStream;
};

Sock.prototype._transmitDataStream = function (msg) {
  var readStream = this._streams[msg.messageId];

  if (!readStream.push(msg.data)) {
    this._socket.pause();
  }
};

Sock.prototype._closeDataStream = function (msg) {
  var readStream = this._streams[msg.messageId];

  readStream.push(null);
  delete this._streams[msg.messageId];
};

Sock.prototype._ackCallback = function (messageId) {
  var _this = this;

  return function ackCallback(data) {
    _this._send('', data, Serializer.MT_ACK, { messageId: messageId });
  };
};

Sock.prototype._nextMessageId = function () {
  if (++this._messageId > MAX_MESSAGE_ID) {
    this._messageId = 1;
  }

  return this._messageId;
};

Sock.prototype._flushQueue = function () {
  if (this._queue.length > 0) {
    for (var i = 0; i < this._queue.length; i++) {
      this._socket.write(this._queue[i]);
    }

    this._queue.length = 0;
  }
};

Sock.prototype._onError = function (err) {
  if (this.listenerCount('error') > 0) {
    /**
     * Error event from net.Socket or Socket.
     *
     * @event Sock#error
     */
    this._superEmit('error', err);
  } else {
    console.error('Missing error handler on `Socket`.');
    console.error(err.stack);
  }
};

module.exports = Sock;