|
"use strict"; |
|
Object.defineProperty(exports, "__esModule", { value: true }); |
|
exports.Socket = void 0; |
|
const events_1 = require("events"); |
|
const debug_1 = require("debug"); |
|
const timers_1 = require("timers"); |
|
const debug = (0, debug_1.default)("engine:socket"); |
|
class Socket extends events_1.EventEmitter { |
|
|
|
|
|
|
|
|
|
|
|
constructor(id, server, transport, req, protocol) { |
|
super(); |
|
this.id = id; |
|
this.server = server; |
|
this.upgrading = false; |
|
this.upgraded = false; |
|
this.readyState = "opening"; |
|
this.writeBuffer = []; |
|
this.packetsFn = []; |
|
this.sentCallbackFn = []; |
|
this.cleanupFn = []; |
|
this.request = req; |
|
this.protocol = protocol; |
|
|
|
if (req.websocket && req.websocket._socket) { |
|
this.remoteAddress = req.websocket._socket.remoteAddress; |
|
} |
|
else { |
|
this.remoteAddress = req.connection.remoteAddress; |
|
} |
|
this.checkIntervalTimer = null; |
|
this.upgradeTimeoutTimer = null; |
|
this.pingTimeoutTimer = null; |
|
this.pingIntervalTimer = null; |
|
this.setTransport(transport); |
|
this.onOpen(); |
|
} |
|
get readyState() { |
|
return this._readyState; |
|
} |
|
set readyState(state) { |
|
debug("readyState updated from %s to %s", this._readyState, state); |
|
this._readyState = state; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
onOpen() { |
|
this.readyState = "open"; |
|
|
|
this.transport.sid = this.id; |
|
this.sendPacket("open", JSON.stringify({ |
|
sid: this.id, |
|
upgrades: this.getAvailableUpgrades(), |
|
pingInterval: this.server.opts.pingInterval, |
|
pingTimeout: this.server.opts.pingTimeout, |
|
maxPayload: this.server.opts.maxHttpBufferSize, |
|
})); |
|
if (this.server.opts.initialPacket) { |
|
this.sendPacket("message", this.server.opts.initialPacket); |
|
} |
|
this.emit("open"); |
|
if (this.protocol === 3) { |
|
|
|
this.resetPingTimeout(this.server.opts.pingInterval + this.server.opts.pingTimeout); |
|
} |
|
else { |
|
|
|
this.schedulePing(); |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
onPacket(packet) { |
|
if ("open" !== this.readyState) { |
|
return debug("packet received with closed socket"); |
|
} |
|
|
|
debug(`received packet ${packet.type}`); |
|
this.emit("packet", packet); |
|
|
|
|
|
this.resetPingTimeout(this.server.opts.pingInterval + this.server.opts.pingTimeout); |
|
switch (packet.type) { |
|
case "ping": |
|
if (this.transport.protocol !== 3) { |
|
this.onError("invalid heartbeat direction"); |
|
return; |
|
} |
|
debug("got ping"); |
|
this.sendPacket("pong"); |
|
this.emit("heartbeat"); |
|
break; |
|
case "pong": |
|
if (this.transport.protocol === 3) { |
|
this.onError("invalid heartbeat direction"); |
|
return; |
|
} |
|
debug("got pong"); |
|
this.pingIntervalTimer.refresh(); |
|
this.emit("heartbeat"); |
|
break; |
|
case "error": |
|
this.onClose("parse error"); |
|
break; |
|
case "message": |
|
this.emit("data", packet.data); |
|
this.emit("message", packet.data); |
|
break; |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
onError(err) { |
|
debug("transport error"); |
|
this.onClose("transport error", err); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
schedulePing() { |
|
this.pingIntervalTimer = (0, timers_1.setTimeout)(() => { |
|
debug("writing ping packet - expecting pong within %sms", this.server.opts.pingTimeout); |
|
this.sendPacket("ping"); |
|
this.resetPingTimeout(this.server.opts.pingTimeout); |
|
}, this.server.opts.pingInterval); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
resetPingTimeout(timeout) { |
|
(0, timers_1.clearTimeout)(this.pingTimeoutTimer); |
|
this.pingTimeoutTimer = (0, timers_1.setTimeout)(() => { |
|
if (this.readyState === "closed") |
|
return; |
|
this.onClose("ping timeout"); |
|
}, timeout); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
setTransport(transport) { |
|
const onError = this.onError.bind(this); |
|
const onPacket = this.onPacket.bind(this); |
|
const flush = this.flush.bind(this); |
|
const onClose = this.onClose.bind(this, "transport close"); |
|
this.transport = transport; |
|
this.transport.once("error", onError); |
|
this.transport.on("packet", onPacket); |
|
this.transport.on("drain", flush); |
|
this.transport.once("close", onClose); |
|
|
|
this.setupSendCallback(); |
|
this.cleanupFn.push(function () { |
|
transport.removeListener("error", onError); |
|
transport.removeListener("packet", onPacket); |
|
transport.removeListener("drain", flush); |
|
transport.removeListener("close", onClose); |
|
}); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
maybeUpgrade(transport) { |
|
debug('might upgrade socket transport from "%s" to "%s"', this.transport.name, transport.name); |
|
this.upgrading = true; |
|
|
|
this.upgradeTimeoutTimer = (0, timers_1.setTimeout)(() => { |
|
debug("client did not complete upgrade - closing transport"); |
|
cleanup(); |
|
if ("open" === transport.readyState) { |
|
transport.close(); |
|
} |
|
}, this.server.opts.upgradeTimeout); |
|
const onPacket = (packet) => { |
|
if ("ping" === packet.type && "probe" === packet.data) { |
|
debug("got probe ping packet, sending pong"); |
|
transport.send([{ type: "pong", data: "probe" }]); |
|
this.emit("upgrading", transport); |
|
clearInterval(this.checkIntervalTimer); |
|
this.checkIntervalTimer = setInterval(check, 100); |
|
} |
|
else if ("upgrade" === packet.type && this.readyState !== "closed") { |
|
debug("got upgrade packet - upgrading"); |
|
cleanup(); |
|
this.transport.discard(); |
|
this.upgraded = true; |
|
this.clearTransport(); |
|
this.setTransport(transport); |
|
this.emit("upgrade", transport); |
|
this.flush(); |
|
if (this.readyState === "closing") { |
|
transport.close(() => { |
|
this.onClose("forced close"); |
|
}); |
|
} |
|
} |
|
else { |
|
cleanup(); |
|
transport.close(); |
|
} |
|
}; |
|
|
|
const check = () => { |
|
if ("polling" === this.transport.name && this.transport.writable) { |
|
debug("writing a noop packet to polling for fast upgrade"); |
|
this.transport.send([{ type: "noop" }]); |
|
} |
|
}; |
|
const cleanup = () => { |
|
this.upgrading = false; |
|
clearInterval(this.checkIntervalTimer); |
|
this.checkIntervalTimer = null; |
|
(0, timers_1.clearTimeout)(this.upgradeTimeoutTimer); |
|
this.upgradeTimeoutTimer = null; |
|
transport.removeListener("packet", onPacket); |
|
transport.removeListener("close", onTransportClose); |
|
transport.removeListener("error", onError); |
|
this.removeListener("close", onClose); |
|
}; |
|
const onError = (err) => { |
|
debug("client did not complete upgrade - %s", err); |
|
cleanup(); |
|
transport.close(); |
|
transport = null; |
|
}; |
|
const onTransportClose = () => { |
|
onError("transport closed"); |
|
}; |
|
const onClose = () => { |
|
onError("socket closed"); |
|
}; |
|
transport.on("packet", onPacket); |
|
transport.once("close", onTransportClose); |
|
transport.once("error", onError); |
|
this.once("close", onClose); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
clearTransport() { |
|
let cleanup; |
|
const toCleanUp = this.cleanupFn.length; |
|
for (let i = 0; i < toCleanUp; i++) { |
|
cleanup = this.cleanupFn.shift(); |
|
cleanup(); |
|
} |
|
|
|
this.transport.on("error", function () { |
|
debug("error triggered by discarded transport"); |
|
}); |
|
|
|
this.transport.close(); |
|
(0, timers_1.clearTimeout)(this.pingTimeoutTimer); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
onClose(reason, description) { |
|
if ("closed" !== this.readyState) { |
|
this.readyState = "closed"; |
|
|
|
(0, timers_1.clearTimeout)(this.pingIntervalTimer); |
|
(0, timers_1.clearTimeout)(this.pingTimeoutTimer); |
|
clearInterval(this.checkIntervalTimer); |
|
this.checkIntervalTimer = null; |
|
(0, timers_1.clearTimeout)(this.upgradeTimeoutTimer); |
|
|
|
|
|
process.nextTick(() => { |
|
this.writeBuffer = []; |
|
}); |
|
this.packetsFn = []; |
|
this.sentCallbackFn = []; |
|
this.clearTransport(); |
|
this.emit("close", reason, description); |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
setupSendCallback() { |
|
|
|
const onDrain = () => { |
|
if (this.sentCallbackFn.length > 0) { |
|
const seqFn = this.sentCallbackFn.splice(0, 1)[0]; |
|
if ("function" === typeof seqFn) { |
|
debug("executing send callback"); |
|
seqFn(this.transport); |
|
} |
|
else if (Array.isArray(seqFn)) { |
|
debug("executing batch send callback"); |
|
const l = seqFn.length; |
|
let i = 0; |
|
for (; i < l; i++) { |
|
if ("function" === typeof seqFn[i]) { |
|
seqFn[i](this.transport); |
|
} |
|
} |
|
} |
|
} |
|
}; |
|
this.transport.on("drain", onDrain); |
|
this.cleanupFn.push(() => { |
|
this.transport.removeListener("drain", onDrain); |
|
}); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
send(data, options, callback) { |
|
this.sendPacket("message", data, options, callback); |
|
return this; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
write(data, options, callback) { |
|
this.sendPacket("message", data, options, callback); |
|
return this; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sendPacket(type, data, options = {}, callback) { |
|
if ("function" === typeof options) { |
|
callback = options; |
|
options = {}; |
|
} |
|
if ("closing" !== this.readyState && "closed" !== this.readyState) { |
|
debug('sending packet "%s" (%s)', type, data); |
|
|
|
options.compress = options.compress !== false; |
|
const packet = { |
|
type, |
|
options: options, |
|
}; |
|
if (data) |
|
packet.data = data; |
|
|
|
this.emit("packetCreate", packet); |
|
this.writeBuffer.push(packet); |
|
|
|
if (callback) |
|
this.packetsFn.push(callback); |
|
this.flush(); |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
flush() { |
|
if ("closed" !== this.readyState && |
|
this.transport.writable && |
|
this.writeBuffer.length) { |
|
debug("flushing buffer to transport"); |
|
this.emit("flush", this.writeBuffer); |
|
this.server.emit("flush", this, this.writeBuffer); |
|
const wbuf = this.writeBuffer; |
|
this.writeBuffer = []; |
|
if (!this.transport.supportsFraming) { |
|
this.sentCallbackFn.push(this.packetsFn); |
|
} |
|
else { |
|
this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn); |
|
} |
|
this.packetsFn = []; |
|
this.transport.send(wbuf); |
|
this.emit("drain"); |
|
this.server.emit("drain", this); |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
getAvailableUpgrades() { |
|
const availableUpgrades = []; |
|
const allUpgrades = this.server.upgrades(this.transport.name); |
|
let i = 0; |
|
const l = allUpgrades.length; |
|
for (; i < l; ++i) { |
|
const upg = allUpgrades[i]; |
|
if (this.server.opts.transports.indexOf(upg) !== -1) { |
|
availableUpgrades.push(upg); |
|
} |
|
} |
|
return availableUpgrades; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
close(discard) { |
|
if ("open" !== this.readyState) |
|
return; |
|
this.readyState = "closing"; |
|
if (this.writeBuffer.length) { |
|
debug("there are %d remaining packets in the buffer, waiting for the 'drain' event", this.writeBuffer.length); |
|
this.once("drain", () => { |
|
debug("all packets have been sent, closing the transport"); |
|
this.closeTransport(discard); |
|
}); |
|
return; |
|
} |
|
debug("the buffer is empty, closing the transport right away", discard); |
|
this.closeTransport(discard); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
closeTransport(discard) { |
|
debug("closing the transport (discard? %s)", discard); |
|
if (discard) |
|
this.transport.discard(); |
|
this.transport.close(this.onClose.bind(this, "forced close")); |
|
} |
|
} |
|
exports.Socket = Socket; |
|
|