diff options
Diffstat (limited to 'js/src/Ice/TcpTransceiver.js')
-rw-r--r-- | js/src/Ice/TcpTransceiver.js | 817 |
1 files changed, 409 insertions, 408 deletions
diff --git a/js/src/Ice/TcpTransceiver.js b/js/src/Ice/TcpTransceiver.js index 2473627da95..e885020acf3 100644 --- a/js/src/Ice/TcpTransceiver.js +++ b/js/src/Ice/TcpTransceiver.js @@ -7,496 +7,497 @@ // // ********************************************************************** -(function(global){ - var net = require("net"); - - require("Ice/Class"); - require("Ice/Debug"); - require("Ice/ExUtil"); - require("Ice/SocketOperation"); - require("Ice/Connection"); - require("Ice/Exception"); - require("Ice/LocalException"); - - var Ice = global.Ice || {}; - - var Debug = Ice.Debug; - var ExUtil = Ice.ExUtil; - var Network = Ice.Network; - var SocketOperation = Ice.SocketOperation; - var LocalException = Ice.LocalException; - var SocketException = Ice.SocketException; - - var StateNeedConnect = 0; - var StateConnectPending = 1; - var StateProxyConnectRequest = 2; - var StateProxyConnectRequestPending = 3; - var StateConnected = 4; - var StateClosed = 5; - - var TcpTransceiver = Ice.Class({ - __init__: function(instance) - { - var id = instance.initializationData(); - this._traceLevels = instance.traceLevels(); - this._logger = id.logger; - this._readBuffers = []; - this._readPosition = 0; - this._maxSendPacketSize = id.properties.getPropertyAsIntWithDefault("Ice.TCP.SndSize", 512 * 1204); - }, - setCallbacks: function(connectedCallback, bytesAvailableCallback, bytesWrittenCallback) - { - this._connectedCallback = connectedCallback; - this._bytesAvailableCallback = bytesAvailableCallback; - this._bytesWrittenCallback = bytesWrittenCallback; - }, - // - // Returns SocketOperation.None when initialization is complete. - // - initialize: function(readBuffer, writeBuffer) +var net = require("net"); + +var Ice = require("../Ice/ModuleRegistry").Ice; + +Ice.__M.require(module, "Ice", + [ + "../Ice/Class", + "../Ice/Debug", + "../Ice/ExUtil", + "../Ice/SocketOperation", + "../Ice/Connection", + "../Ice/Exception", + "../Ice/LocalException" + ]); + +var Debug = Ice.Debug; +var ExUtil = Ice.ExUtil; +var Network = Ice.Network; +var SocketOperation = Ice.SocketOperation; +var LocalException = Ice.LocalException; +var SocketException = Ice.SocketException; + +var StateNeedConnect = 0; +var StateConnectPending = 1; +var StateProxyConnectRequest = 2; +var StateProxyConnectRequestPending = 3; +var StateConnected = 4; +var StateClosed = 5; + +var TcpTransceiver = Ice.Class({ + __init__: function(instance) + { + var id = instance.initializationData(); + this._traceLevels = instance.traceLevels(); + this._logger = id.logger; + this._readBuffers = []; + this._readPosition = 0; + this._maxSendPacketSize = id.properties.getPropertyAsIntWithDefault("Ice.TCP.SndSize", 512 * 1204); + }, + setCallbacks: function(connectedCallback, bytesAvailableCallback, bytesWrittenCallback) + { + this._connectedCallback = connectedCallback; + this._bytesAvailableCallback = bytesAvailableCallback; + this._bytesWrittenCallback = bytesWrittenCallback; + }, + // + // Returns SocketOperation.None when initialization is complete. + // + initialize: function(readBuffer, writeBuffer) + { + try { - try - { - if(this._exception) - { - throw this._exception; - } - - if(this._state === StateNeedConnect) - { - this._state = StateConnectPending; - this._fd = net.createConnection({port: this._addr.port, - host: this._addr.host, - localAddress: this._sourceAddr}); - - var self = this; - this._fd.on("connect", function() { self.socketConnected(); }); - this._fd.on("close", function(err) { self.socketClosed(err); }); - this._fd.on("error", function(err) { self.socketError(err); }); - this._fd.on("data", function(buf) { self.socketBytesAvailable(buf); }); - - return SocketOperation.Connect; // Waiting for connect to complete. - } - else if(this._state === StateConnectPending) - { - // - // Socket is connected. - // - this._desc = fdToString(this._fd, this._proxy, this._addr); - this._state = StateConnected; - } - else if(this._state === StateProxyConnectRequest) - { - // - // Write completed. - // - this._proxy.endWriteConnectRequest(writeBuffer); - this._state = StateProxyConnectRequestPending; // Wait for proxy response - return SocketOperation.Read; - } - else if(this._state === StateProxyConnectRequestPending) - { - // - // Read completed. - // - this._proxy.endReadConnectRequestResponse(readBuffer); - this._state = StateConnected; - } - } - catch(err) + if(this._exception) { - if(!this._exception) - { - this._exception = translateError(this._state, err); - } - - if(this._traceLevels.network >= 2) - { - var s = []; - s.push("failed to establish tcp connection\n"); - s.push(fdToString(this._fd, this._proxy, this._addr.host, this._addr.port)); - this._logger.trace(this._traceLevels.networkCat, s.join("")); - } - throw this._exception; } - Debug.assert(this._state === StateConnected); - if(this._traceLevels.network >= 1) + if(this._state === StateNeedConnect) { - var s = "tcp connection established\n" + this._desc; - this._logger.trace(this._traceLevels.networkCat, s); - } + this._state = StateConnectPending; + this._fd = net.createConnection({port: this._addr.port, + host: this._addr.host, + localAddress: this._sourceAddr}); - return SocketOperation.None; - }, - register: function() - { - this._registered = true; - this._fd.resume(); - if(this._exception) - { - this._bytesAvailableCallback(); - } - }, - unregister: function() - { - this._registered = false; - this._fd.pause(); - }, - close: function() - { - if(this._state > StateConnectPending && this._traceLevels.network >= 1) - { - this._logger.trace(this._traceLevels.networkCat, "closing " + this.type() + " connection\n" + - this._desc); - } + var self = this; + this._fd.on("connect", function() { self.socketConnected(); }); + this._fd.on("close", function(err) { self.socketClosed(err); }); + this._fd.on("error", function(err) { self.socketError(err); }); + this._fd.on("data", function(buf) { self.socketBytesAvailable(buf); }); - Debug.assert(this._fd !== null); - try + return SocketOperation.Connect; // Waiting for connect to complete. + } + else if(this._state === StateConnectPending) { - this._fd.destroy(); + // + // Socket is connected. + // + this._desc = fdToString(this._fd, this._proxy, this._addr); + this._state = StateConnected; } - catch(ex) + else if(this._state === StateProxyConnectRequest) { - throw translateError(this._state, ex); + // + // Write completed. + // + this._proxy.endWriteConnectRequest(writeBuffer); + this._state = StateProxyConnectRequestPending; // Wait for proxy response + return SocketOperation.Read; } - finally + else if(this._state === StateProxyConnectRequestPending) { - this._fd = null; + // + // Read completed. + // + this._proxy.endReadConnectRequestResponse(readBuffer); + this._state = StateConnected; } - }, - // - // Returns true if all of the data was flushed to the kernel buffer. - // - write: function(byteBuffer) + } + catch(err) { - if(this._exception) + if(!this._exception) { - throw this._exception; + this._exception = translateError(this._state, err); } - var packetSize = byteBuffer.remaining; - Debug.assert(packetSize > 0); - - if(this._maxSendPacketSize > 0 && packetSize > this._maxSendPacketSize) + if(this._traceLevels.network >= 2) { - packetSize = this._maxSendPacketSize; + var s = []; + s.push("failed to establish tcp connection\n"); + s.push(fdToString(this._fd, this._proxy, this._addr.host, this._addr.port)); + this._logger.trace(this._traceLevels.networkCat, s.join("")); } - while(packetSize > 0) - { - var slice = byteBuffer.b.slice(byteBuffer.position, byteBuffer.position + packetSize); + throw this._exception; + } - var self = this; - var sync = true; - sync = this._fd.write(slice, null, function() { - if(sync) - { - return; - } - - if(self._traceLevels.network >= 3) - { - var msg = "sent " + packetSize + " of " + byteBuffer.remaining + " bytes via " + - self.type() + "\n" + self._desc; - self._logger.trace(self._traceLevels.networkCat, msg); - } - - byteBuffer.position = byteBuffer.position + packetSize; - if(this._maxSendPacketSize > 0 && byteBuffer.remaining > this._maxSendPacketSize) - { - packetSize = this._maxSendPacketSize; - } - else - { - packetSize = byteBuffer.remaining; - } - self._bytesWrittenCallback(); - }); + Debug.assert(this._state === StateConnected); + if(this._traceLevels.network >= 1) + { + var s = "tcp connection established\n" + this._desc; + this._logger.trace(this._traceLevels.networkCat, s); + } + return SocketOperation.None; + }, + register: function() + { + this._registered = true; + this._fd.resume(); + if(this._exception) + { + this._bytesAvailableCallback(); + } + }, + unregister: function() + { + this._registered = false; + this._fd.pause(); + }, + close: function() + { + if(this._state > StateConnectPending && this._traceLevels.network >= 1) + { + this._logger.trace(this._traceLevels.networkCat, "closing " + this.type() + " connection\n" + + this._desc); + } + + Debug.assert(this._fd !== null); + try + { + this._fd.destroy(); + } + catch(ex) + { + throw translateError(this._state, ex); + } + finally + { + this._fd = null; + } + }, + // + // Returns true if all of the data was flushed to the kernel buffer. + // + write: function(byteBuffer) + { + if(this._exception) + { + throw this._exception; + } + + var packetSize = byteBuffer.remaining; + Debug.assert(packetSize > 0); + + if(this._maxSendPacketSize > 0 && packetSize > this._maxSendPacketSize) + { + packetSize = this._maxSendPacketSize; + } + + while(packetSize > 0) + { + var slice = byteBuffer.b.slice(byteBuffer.position, byteBuffer.position + packetSize); + + var self = this; + var sync = true; + sync = this._fd.write(slice, null, function() { if(sync) { - if(self._traceLevels.network >= 3) - { - var msg = "sent " + packetSize + " of " + byteBuffer.remaining + " bytes via " + - self.type() + "\n" + self._desc; - self._logger.trace(self._traceLevels.networkCat, msg); - } - - byteBuffer.position = byteBuffer.position + packetSize; - - if(this._maxSendPacketSize > 0 && byteBuffer.remaining > this._maxSendPacketSize) - { - packetSize = this._maxSendPacketSize; - } - else - { - packetSize = byteBuffer.remaining; - } + return; } - else + + if(self._traceLevels.network >= 3) { - return false; + self._logger.trace(self._traceLevels.networkCat, + "sent " + packetSize + " of " + byteBuffer.remaining + " bytes via " + + self.type() + "\n" + self._desc); } - } - return true; - }, - read: function(byteBuffer, moreData) - { - if(this._exception) - { - throw this._exception; - } - - moreData.value = false; - - if(this._readBuffers.length === 0) - { - return false; // No data available. - } - var avail = this._readBuffers[0].length - this._readPosition; - Debug.assert(avail > 0); - var remaining = byteBuffer.remaining; + byteBuffer.position = byteBuffer.position + packetSize; + if(this._maxSendPacketSize > 0 && byteBuffer.remaining > this._maxSendPacketSize) + { + packetSize = this._maxSendPacketSize; + } + else + { + packetSize = byteBuffer.remaining; + } + self._bytesWrittenCallback(); + }); - while(byteBuffer.remaining > 0) + if(sync) { - if(avail > byteBuffer.remaining) + if(self._traceLevels.network >= 3) { - avail = byteBuffer.remaining; + self._logger.trace(self._traceLevels.networkCat, + "sent " + packetSize + " of " + byteBuffer.remaining + " bytes via " + + self.type() + "\n" + self._desc); } - this._readBuffers[0].copy(byteBuffer.b, byteBuffer.position, this._readPosition, - this._readPosition + avail); + byteBuffer.position = byteBuffer.position + packetSize; - byteBuffer.position += avail; - this._readPosition += avail; - if(this._readPosition === this._readBuffers[0].length) + if(this._maxSendPacketSize > 0 && byteBuffer.remaining > this._maxSendPacketSize) + { + packetSize = this._maxSendPacketSize; + } + else { - // - // We've exhausted the current read buffer. - // - this._readPosition = 0; - this._readBuffers.shift(); - if(this._readBuffers.length === 0) - { - break; // No more data - we're done. - } - else - { - avail = this._readBuffers[0].length; - } + packetSize = byteBuffer.remaining; } } - - var n = remaining - byteBuffer.remaining; - if(n > 0 && this._traceLevels.network >= 3) + else { - var msg = "received " + n + " of " + remaining + " bytes via " + this.type() + "\n" + this._desc; - this._logger.trace(this._traceLevels.networkCat, msg); + return false; } + } + return true; + }, + read: function(byteBuffer, moreData) + { + if(this._exception) + { + throw this._exception; + } - moreData.value = this._readBuffers.length > 0; + moreData.value = false; - return byteBuffer.remaining === 0; - }, - type: function() + if(this._readBuffers.length === 0) { - return "tcp"; - }, - getInfo: function() - { - Debug.assert(this._fd !== null); - var info = this.createInfo(); - info.localAddress = this._fd.localAddress; - info.localPort = this._fd.localPort; - info.remoteAddress = this._fd.remoteAddress; - info.remotePort = this._fd.remotePort; - return info; - }, - createInfo: function() - { - return new Ice.TCPConnectionInfo(); - }, - checkSendSize: function(stream, messageSizeMax) + return false; // No data available. + } + + var avail = this._readBuffers[0].length - this._readPosition; + Debug.assert(avail > 0); + var remaining = byteBuffer.remaining; + + while(byteBuffer.remaining > 0) { - if(stream.size > messageSizeMax) + if(avail > byteBuffer.remaining) { - ExUtil.throwMemoryLimitException(stream.size, messageSizeMax); + avail = byteBuffer.remaining; } - }, - toString: function() - { - return this._desc; - }, - socketConnected: function() - { - Debug.assert(this._connectedCallback !== null); - this._connectedCallback(); - }, - socketBytesAvailable: function(buf) - { - Debug.assert(this._bytesAvailableCallback !== null); - // - // TODO: Should we set a limit on how much data we can read? - // We can call _fd.pause() to temporarily stop reading. - // - if(buf.length > 0) - { - this._readBuffers.push(buf); - this._bytesAvailableCallback(); - } - }, - socketClosed: function(err) - { - // - // Don't call the closed callback if an error occurred; the error callback - // will be called. - // - if(!err) + this._readBuffers[0].copy(byteBuffer.b, byteBuffer.position, this._readPosition, + this._readPosition + avail); + + byteBuffer.position += avail; + this._readPosition += avail; + if(this._readPosition === this._readBuffers[0].length) { - this.socketError(null); + // + // We've exhausted the current read buffer. + // + this._readPosition = 0; + this._readBuffers.shift(); + if(this._readBuffers.length === 0) + { + break; // No more data - we're done. + } + else + { + avail = this._readBuffers[0].length; + } } - }, - socketError: function(err) + } + + var n = remaining - byteBuffer.remaining; + if(n > 0 && this._traceLevels.network >= 3) { - this._exception = translateError(this._state, err) - if(this._state < StateConnected) - { - this._connectedCallback(); - } - else if(this._registered) - { - this._bytesAvailableCallback(); - } + var msg = "received " + n + " of " + remaining + " bytes via " + this.type() + "\n" + this._desc; + this._logger.trace(this._traceLevels.networkCat, msg); } - }); - function fdToString(fd, targetAddr) + moreData.value = this._readBuffers.length > 0; + + return byteBuffer.remaining === 0; + }, + type: function() + { + return "tcp"; + }, + getInfo: function() + { + Debug.assert(this._fd !== null); + var info = this.createInfo(); + info.localAddress = this._fd.localAddress; + info.localPort = this._fd.localPort; + info.remoteAddress = this._fd.remoteAddress; + info.remotePort = this._fd.remotePort; + return info; + }, + createInfo: function() { - if(fd === null) + return new Ice.TCPConnectionInfo(); + }, + checkSendSize: function(stream, messageSizeMax) + { + if(stream.size > messageSizeMax) { - return "<closed>"; + ExUtil.throwMemoryLimitException(stream.size, messageSizeMax); } + }, + toString: function() + { + return this._desc; + }, + socketConnected: function() + { + Debug.assert(this._connectedCallback !== null); + this._connectedCallback(); + }, + socketBytesAvailable: function(buf) + { + Debug.assert(this._bytesAvailableCallback !== null); - return addressesToString(fd.localAddress, fd.localPort, fd.remoteAddress, fd.remotePort, targetAddr); - } - - function translateError(state, err) + // + // TODO: Should we set a limit on how much data we can read? + // We can call _fd.pause() to temporarily stop reading. + // + if(buf.length > 0) + { + this._readBuffers.push(buf); + this._bytesAvailableCallback(); + } + }, + socketClosed: function(err) { + // + // Don't call the closed callback if an error occurred; the error callback + // will be called. + // if(!err) { - return new Ice.ConnectionLostException(); + this.socketError(null); } - else if(state < StateConnected) + }, + socketError: function(err) + { + this._exception = translateError(this._state, err); + if(this._state < StateConnected) { - if(connectionRefused(err.code)) - { - return new Ice.ConnectionRefusedException(err.code, err); - } - else if(connectionFailed(err.code)) - { - return new Ice.ConnectFailedException(err.code, err); - } + this._connectedCallback(); } - else if(connectionLost(err.code)) + else if(this._registered) { - return new Ice.ConnectionLostException(err.code, err); + this._bytesAvailableCallback(); } - return new Ice.SocketException(err.code, err); } +}); - function addressesToString(localHost, localPort, remoteHost, remotePort, targetAddr) +function fdToString(fd, targetAddr) +{ + if(fd === null) { - remoteHost = remoteHost === undefined ? null : remoteHost; - targetAddr = targetAddr === undefined ? null : targetAddr; - - var s = []; - s.push("local address = "); - s.push(localHost + ":" + localPort); + return "<closed>"; + } - if(remoteHost === null && targetAddr !== null) - { - remoteHost = targetAddr.host; - remotePort = targetAddr.port; - } + return addressesToString(fd.localAddress, fd.localPort, fd.remoteAddress, fd.remotePort, targetAddr); +} - if(remoteHost === null) +function translateError(state, err) +{ + if(!err) + { + return new Ice.ConnectionLostException(); + } + else if(state < StateConnected) + { + if(connectionRefused(err.code)) { - s.push("\nremote address = <not connected>"); + return new Ice.ConnectionRefusedException(err.code, err); } - else + else if(connectionFailed(err.code)) { - s.push("\nremote address = "); - s.push(remoteHost + ":" + remotePort); + return new Ice.ConnectFailedException(err.code, err); } - - return s.join(""); - }; - - TcpTransceiver.createOutgoing = function(instance, addr, sourceAddr) + } + else if(connectionLost(err.code)) { - var transceiver = new TcpTransceiver(instance); + return new Ice.ConnectionLostException(err.code, err); + } + return new Ice.SocketException(err.code, err); +} - transceiver._fd = null; - transceiver._addr = addr; - transceiver._sourceAddr = sourceAddr; - transceiver._desc = "remote address: " + addr.host + ":" + addr.port + " <not connected>"; - transceiver._state = StateNeedConnect; - transceiver._registered = false; - transceiver._exception = null; +function addressesToString(localHost, localPort, remoteHost, remotePort, targetAddr) +{ + remoteHost = remoteHost === undefined ? null : remoteHost; + targetAddr = targetAddr === undefined ? null : targetAddr; - return transceiver; - }; + var s = []; + s.push("local address = "); + s.push(localHost + ":" + localPort); - TcpTransceiver.createIncoming = function(instance, fd) + if(remoteHost === null && targetAddr !== null) { - var transceiver = new TcpTransceiver(instance); - - transceiver._fd = fd; - transceiver._addr = null; - transceiver._sourceAddr = null; - transceiver._desc = fdToString(fd); - transceiver._state = StateConnected; - transceiver._registered = false; - transceiver._exception = null; - - return transceiver; - }; - - - var ECONNABORTED = "ECONNABORTED"; - var ECONNREFUSED = "ECONNREFUSED"; - var ECONNRESET = "ECONNRESET" - var EHOSTUNREACH = "EHOSTUNREACH"; - var ENETUNREACH = "ENETUNREACH"; - var ENOTCONN = "ENOTCONN"; - var EPIPE = "EPIPE"; - var ESHUTDOWN = "ESHUTDOWN" - var ETIMEDOUT = "ETIMEDOUT"; - - function connectionRefused(err) - { - return err == ECONNREFUSED; + remoteHost = targetAddr.host; + remotePort = targetAddr.port; } - function connectionFailed(err) + if(remoteHost === null) { - return err == ECONNREFUSED || err == ETIMEDOUT || - err == ENETUNREACH || err == EHOSTUNREACH || - err == ECONNRESET || err == ESHUTDOWN || - err == ECONNABORTED; + s.push("\nremote address = <not connected>"); } - - function connectionLost(err) + else { - return err == ECONNRESET || err == ENOTCONN || - err == ESHUTDOWN || err == ECONNABORTED || - err == EPIPE; + s.push("\nremote address = "); + s.push(remoteHost + ":" + remotePort); } - Ice.TcpTransceiver = TcpTransceiver; - global.Ice = Ice; -}(typeof (global) === "undefined" ? window : global)); + return s.join(""); +} + +TcpTransceiver.createOutgoing = function(instance, addr, sourceAddr) +{ + var transceiver = new TcpTransceiver(instance); + + transceiver._fd = null; + transceiver._addr = addr; + transceiver._sourceAddr = sourceAddr; + transceiver._desc = "remote address: " + addr.host + ":" + addr.port + " <not connected>"; + transceiver._state = StateNeedConnect; + transceiver._registered = false; + transceiver._exception = null; + + return transceiver; +}; + +TcpTransceiver.createIncoming = function(instance, fd) +{ + var transceiver = new TcpTransceiver(instance); + + transceiver._fd = fd; + transceiver._addr = null; + transceiver._sourceAddr = null; + transceiver._desc = fdToString(fd); + transceiver._state = StateConnected; + transceiver._registered = false; + transceiver._exception = null; + + return transceiver; +}; + + +var ECONNABORTED = "ECONNABORTED"; +var ECONNREFUSED = "ECONNREFUSED"; +var ECONNRESET = "ECONNRESET"; +var EHOSTUNREACH = "EHOSTUNREACH"; +var ENETUNREACH = "ENETUNREACH"; +var ENOTCONN = "ENOTCONN"; +var EPIPE = "EPIPE"; +var ESHUTDOWN = "ESHUTDOWN"; +var ETIMEDOUT = "ETIMEDOUT"; + +function connectionRefused(err) +{ + return err == ECONNREFUSED; +} + +function connectionFailed(err) +{ + return err == ECONNREFUSED || err == ETIMEDOUT || + err == ENETUNREACH || err == EHOSTUNREACH || + err == ECONNRESET || err == ESHUTDOWN || + err == ECONNABORTED; +} + +function connectionLost(err) +{ + return err == ECONNRESET || err == ENOTCONN || + err == ESHUTDOWN || err == ECONNABORTED || + err == EPIPE; +} + +Ice.TcpTransceiver = TcpTransceiver; +module.exports.Ice = Ice; |