diff options
Diffstat (limited to 'js/src/Ice/ConnectionI.js')
-rw-r--r-- | js/src/Ice/ConnectionI.js | 846 |
1 files changed, 431 insertions, 415 deletions
diff --git a/js/src/Ice/ConnectionI.js b/js/src/Ice/ConnectionI.js index 04dd1a0c4f3..1bf5c7d0b28 100644 --- a/js/src/Ice/ConnectionI.js +++ b/js/src/Ice/ConnectionI.js @@ -7,12 +7,11 @@ // // ********************************************************************** -var Ice = require("../Ice/ModuleRegistry").Ice; -Ice.__M.require(module, +const Ice = require("../Ice/ModuleRegistry").Ice; +Ice._ModuleRegistry.require(module, [ - "../Ice/Class", "../Ice/AsyncStatus", - "../Ice/BasicStream", + "../Ice/Stream", "../Ice/OutgoingAsync", "../Ice/Debug", "../Ice/ExUtil", @@ -30,52 +29,54 @@ Ice.__M.require(module, "../Ice/BatchRequestQueue", ]); -var AsyncStatus = Ice.AsyncStatus; -var AsyncResultBase = Ice.AsyncResultBase; -var BasicStream = Ice.BasicStream; -var BatchRequestQueue = Ice.BatchRequestQueue; -var ConnectionFlushBatch = Ice.ConnectionFlushBatch; -var Debug = Ice.Debug; -var ExUtil = Ice.ExUtil; -var HashMap = Ice.HashMap; -var IncomingAsync = Ice.IncomingAsync; -var RetryException = Ice.RetryException; -var Promise = Ice.Promise; -var Protocol = Ice.Protocol; -var SocketOperation = Ice.SocketOperation; -var Timer = Ice.Timer; -var TraceUtil = Ice.TraceUtil; -var ProtocolVersion = Ice.ProtocolVersion; -var EncodingVersion = Ice.EncodingVersion; -var ACM = Ice.ACM; -var ACMClose = Ice.ACMClose; -var ACMHeartbeat = Ice.ACMHeartbeat; - -var StateNotInitialized = 0; -var StateNotValidated = 1; -var StateActive = 2; -var StateHolding = 3; -var StateClosing = 4; -var StateClosed = 5; -var StateFinished = 6; - -var MessageInfo = function(instance) +const AsyncStatus = Ice.AsyncStatus; +const AsyncResultBase = Ice.AsyncResultBase; +const InputStream = Ice.InputStream; +const OutputStream = Ice.OutputStream; +const BatchRequestQueue = Ice.BatchRequestQueue; +const ConnectionFlushBatch = Ice.ConnectionFlushBatch; +const HeartbeatAsync = Ice.HeartbeatAsync; +const Debug = Ice.Debug; +const ExUtil = Ice.ExUtil; +const HashMap = Ice.HashMap; +const IncomingAsync = Ice.IncomingAsync; +const RetryException = Ice.RetryException; +const Protocol = Ice.Protocol; +const SocketOperation = Ice.SocketOperation; +const Timer = Ice.Timer; +const TraceUtil = Ice.TraceUtil; +const ProtocolVersion = Ice.ProtocolVersion; +const EncodingVersion = Ice.EncodingVersion; +const ACM = Ice.ACM; +const ACMClose = Ice.ACMClose; +const ACMHeartbeat = Ice.ACMHeartbeat; +const ConnectionClose = Ice.ConnectionClose; + +const StateNotInitialized = 0; +const StateNotValidated = 1; +const StateActive = 2; +const StateHolding = 3; +const StateClosing = 4; +const StateClosed = 5; +const StateFinished = 6; + +class MessageInfo { - this.stream = new BasicStream(instance, Protocol.currentProtocolEncoding); - - this.invokeNum = 0; - this.requestId = 0; - this.compress = false; - this.servantManager = null; - this.adapter = null; - this.outAsync = null; - this.heartbeatCallback = null; -}; - -var Class = Ice.Class; + constructor(instance) + { + this.stream = new InputStream(instance, Protocol.currentProtocolEncoding); + this.invokeNum = 0; + this.requestId = 0; + this.servantManager = null; + this.adapter = null; + this.outAsync = null; + this.heartbeatCallback = null; + } +} -var ConnectionI = Class({ - __init__: function(communicator, instance, monitor, transceiver, endpoint, incoming, adapter) +class ConnectionI +{ + constructor(communicator, instance, monitor, transceiver, endpoint, incoming, adapter) { this._communicator = communicator; this._instance = instance; @@ -86,7 +87,7 @@ var ConnectionI = Class({ this._endpoint = endpoint; this._incoming = incoming; this._adapter = adapter; - var initData = instance.initializationData(); + const initData = instance.initializationData(); this._logger = initData.logger; // Cached for better performance. this._traceLevels = instance.traceLevels(); // Cached for better performance. this._timer = instance.timer(); @@ -106,9 +107,9 @@ var ConnectionI = Class({ this._sendStreams = []; - this._readStream = new BasicStream(instance, Protocol.currentProtocolEncoding); + this._readStream = new InputStream(instance, Protocol.currentProtocolEncoding); this._readHeader = false; - this._writeStream = new BasicStream(instance, Protocol.currentProtocolEncoding); + this._writeStream = new OutputStream(instance, Protocol.currentProtocolEncoding); this._readStreamPos = -1; this._writeStreamPos = -1; @@ -140,9 +141,11 @@ var ConnectionI = Class({ { this._servantManager = null; } - this._callback = null; - }, - start: function() + this._closeCallback = null; + this._heartbeatCallback = null; + } + + start() { Debug.assert(this._startPromise === null); @@ -152,34 +155,14 @@ var ConnectionI = Class({ if(this._state >= StateClosed) { Debug.assert(this._exception !== null); - return new Promise().fail(this._exception); + return Ice.Promise.reject(this._exception); } - this._startPromise = new Promise(); - var self = this; + this._startPromise = new Ice.Promise(); this._transceiver.setCallbacks( - function() { self.message(SocketOperation.Write); }, // connected callback - function() { self.message(SocketOperation.Read); }, // read callback - function(bytesSent, bytesTotal) { - self.message(SocketOperation.Write); - if(self._instance.traceLevels().network >= 3 && bytesSent > 0) - { - var s = []; - s.push("sent "); - s.push(bytesSent); - if(!self._endpoint.datagram()) - { - s.push(" of "); - s.push(bytesTotal); - } - s.push(" bytes via "); - s.push(self._endpoint.protocol()); - s.push("\n"); - s.push(this.toString()); - self._instance.initializationData().logger.trace(self._instance.traceLevels().networkCat, - s.join("")); - } - } // write callback + () => { this.message(SocketOperation.Write); }, // connected callback + () => { this.message(SocketOperation.Read); }, // read callback + () => { this.message(SocketOperation.Write); } // write callback ); this.initialize(); } @@ -189,12 +172,12 @@ var ConnectionI = Class({ { this.exception(ex); } - return new Promise().fail(ex); + return Ice.Promise.reject(ex); } - return this._startPromise; - }, - activate: function() + } + + activate() { if(this._state <= StateNotValidated) { @@ -206,8 +189,9 @@ var ConnectionI = Class({ this._acmLastActivity = Date.now(); } this.setState(StateActive); - }, - hold: function() + } + + hold() { if(this._state <= StateNotValidated) { @@ -215,8 +199,9 @@ var ConnectionI = Class({ } this.setState(StateHolding); - }, - destroy: function(reason) + } + + destroy(reason) { switch(reason) { @@ -232,53 +217,57 @@ var ConnectionI = Class({ break; } } - }, - close: function(force) + } + + close(mode) { - var __r = new AsyncResultBase(this._communicator, "close", this, null, null); + const r = new AsyncResultBase(this._communicator, "close", this, null, null); - if(force) + if(mode == ConnectionClose.Forcefully) { - this.setState(StateClosed, new Ice.ForcedCloseConnectionException()); - __r.succeed(__r); + this.setState(StateClosed, new Ice.ConnectionManuallyClosedException(false)); + r.resolve(); + } + else if(mode == ConnectionClose.Gracefully) + { + this.setState(StateClosing, new Ice.ConnectionManuallyClosedException(true)); + r.resolve(); } else { + Debug.assert(mode == ConnectionClose.GracefullyWithWait); + // - // If we do a graceful shutdown, then we wait until all - // outstanding requests have been completed. Otherwise, - // the CloseConnectionException will cause all outstanding - // requests to be retried, regardless of whether the - // server has processed them or not. + // Wait until all outstanding requests have been completed. // - this._closePromises.push(__r); + this._closePromises.push(r); this.checkClose(); } - return __r; - }, - checkClose: function() + return r; + } + + checkClose() { // - // If close(false) has been called, then we need to check if all + // If close(GracefullyWithWait) has been called, then we need to check if all // requests have completed and we can transition to StateClosing. // We also complete outstanding promises. // if(this._asyncRequests.size === 0 && this._closePromises.length > 0) { - this.setState(StateClosing, new Ice.CloseConnectionException()); - for(var i = 0; i < this._closePromises.length; ++i) - { - this._closePromises[i].succeed(this._closePromises[i]); - } + this.setState(StateClosing, new Ice.ConnectionManuallyClosedException(true)); + this._closePromises.forEach(p => p.resolve()); this._closePromises = []; } - }, - isActiveOrHolding: function() + } + + isActiveOrHolding() { return this._state > StateNotValidated && this._state < StateClosing; - }, - isFinished: function() + } + + isFinished() { if(this._state !== StateFinished || this._dispatchCount !== 0) { @@ -287,30 +276,34 @@ var ConnectionI = Class({ Debug.assert(this._state === StateFinished); return true; - }, - throwException: function() + } + + throwException() { if(this._exception !== null) { Debug.assert(this._state >= StateClosing); throw this._exception; } - }, - waitUntilHolding: function() + } + + waitUntilHolding() { - var promise = new Promise(); + const promise = new Ice.Promise(); this._holdPromises.push(promise); this.checkState(); return promise; - }, - waitUntilFinished: function() + } + + waitUntilFinished() { - var promise = new Promise(); + const promise = new Ice.Promise(); this._finishedPromises.push(promise); this.checkState(); return promise; - }, - monitor: function(now, acm) + } + + monitor(now, acm) { if(this._state !== StateActive) { @@ -321,22 +314,22 @@ var ConnectionI = Class({ // We send a heartbeat if there was no activity in the last // (timeout / 4) period. Sending a heartbeat sooner than // really needed is safer to ensure that the receiver will - // receive in time the heartbeat. Sending the heartbeat if + // receive the heartbeat in time. Sending the heartbeat if // there was no activity in the last (timeout / 2) period // isn't enough since monitor() is called only every (timeout // / 2) period. // // Note that this doesn't imply that we are sending 4 heartbeats - // per timeout period because the monitor() method is sill only + // per timeout period because the monitor() method is still only // called every (timeout / 2) period. // if(acm.heartbeat == Ice.ACMHeartbeat.HeartbeatAlways || - (acm.heartbeat != Ice.ACMHeartbeat.HeartbeatOff && this._writeStream.isEmpty() && - now >= (this._acmLastActivity + acm.timeout / 4))) + (acm.heartbeat != Ice.ACMHeartbeat.HeartbeatOff && this._writeStream.isEmpty() && + now >= (this._acmLastActivity + acm.timeout / 4))) { if(acm.heartbeat != Ice.ACMHeartbeat.HeartbeatOnInvocation || this._dispatchCount > 0) { - this.heartbeat(); // Send heartbeat if idle in the last timeout / 2 period. + this.sendHeartbeatNow(); // Send heartbeat if idle in the last timeout / 2 period. } } @@ -354,7 +347,7 @@ var ConnectionI = Class({ if(acm.close != Ice.ACMClose.CloseOff && now >= (this._acmLastActivity + acm.timeout)) { if(acm.close == Ice.ACMClose.CloseOnIdleForceful || - (acm.close != Ice.ACMClose.CloseOnIdle && this._asyncRequests.size > 0)) + (acm.close != Ice.ACMClose.CloseOnIdle && this._asyncRequests.size > 0)) { // // Close the connection if we didn't receive a heartbeat in @@ -371,11 +364,12 @@ var ConnectionI = Class({ this.setState(StateClosing, new Ice.ConnectionTimeoutException()); } } - }, - sendAsyncRequest: function(out, compress, response, batchRequestNum) + } + + sendAsyncRequest(out, response, batchRequestNum) { - var requestId = 0; - var os = out.__os(); + let requestId = 0; + const ostr = out.getOs(); if(this._exception !== null) { @@ -394,13 +388,13 @@ var ConnectionI = Class({ // Ensure the message isn't bigger than what we can send with the // transport. // - this._transceiver.checkSendSize(os); + this._transceiver.checkSendSize(ostr); // // Notify the request that it's cancelable with this connection. // This will throw if the request is canceled. // - out.__cancelable(this); // Notify the request that it's cancelable + out.cancelable(this); // Notify the request that it's cancelable if(response) { @@ -417,19 +411,19 @@ var ConnectionI = Class({ // // Fill in the request ID. // - os.pos = Protocol.headerSize; - os.writeInt(requestId); + ostr.pos = Protocol.headerSize; + ostr.writeInt(requestId); } else if(batchRequestNum > 0) { - os.pos = Protocol.headerSize; - os.writeInt(batchRequestNum); + ostr.pos = Protocol.headerSize; + ostr.writeInt(batchRequestNum); } - var status; + let status; try { - status = this.sendMessage(OutgoingMessage.create(out, out.__os(), compress, requestId)); + status = this.sendMessage(OutgoingMessage.create(out, out.getOs(), requestId)); } catch(ex) { @@ -454,42 +448,58 @@ var ConnectionI = Class({ } return status; - }, - getBatchRequestQueue: function() + } + + getBatchRequestQueue() { return this._batchRequestQueue; - }, - flushBatchRequests: function() + } + + flushBatchRequests() { - var result = new ConnectionFlushBatch(this, this._communicator, "flushBatchRequests"); - result.__invoke(); + const result = new ConnectionFlushBatch(this, this._communicator, "flushBatchRequests"); + result.invoke(); return result; - }, - setCallback: function(callback) + } + + setCloseCallback(callback) { if(this._state >= StateClosed) { if(callback !== null) { - var self = this; - Timer.setImmediate(function() { + Timer.setImmediate(() => + { try { - callback.closed(this); + callback(this); } catch(ex) { - self._logger.error("connection callback exception:\n" + ex + '\n' + self._desc); + this._logger.error("connection callback exception:\n" + ex + '\n' + this._desc); } }); } } else { - this._callback = callback; + this._closeCallback = callback; } - }, - setACM: function(timeout, close, heartbeat) + } + + setHeartbeatCallback(callback) + { + this._heartbeatCallback = callback; + } + + heartbeat() + { + const result = new HeartbeatAsync(this, this._communicator); + result.invoke(); + return result; + } + + setACM(timeout, close, heartbeat) { if(this._monitor === null || this._state >= StateClosed) { @@ -513,17 +523,19 @@ var ConnectionI = Class({ { this._acmLastActivity = Date.now(); } - }, - getACM: function() + } + + getACM() { return this._monitor !== null ? this._monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff); - }, - asyncRequestCanceled: function(outAsync, ex) + } + + asyncRequestCanceled(outAsync, ex) { - for(var i = 0; i < this._sendStreams.length; i++) + for(let i = 0; i < this._sendStreams.length; i++) { - var o = this._sendStreams[i]; + let o = this._sendStreams[i]; if(o.outAsync === outAsync) { if(o.requestId > 0) @@ -540,25 +552,26 @@ var ConnectionI = Class({ { this._sendStreams.splice(i, 1); } - outAsync.__completedEx(ex); + outAsync.completedEx(ex); return; // We're done. } } if(outAsync instanceof Ice.OutgoingAsync) { - for(var e = this._asyncRequests.entries; e !== null; e = e.next) + for(let [key, value] of this._asyncRequests) { - if(e.value === outAsync) + if(value === outAsync) { - this._asyncRequests.delete(e.key); - outAsync.__completedEx(ex); + this._asyncRequests.delete(key); + outAsync.completedEx(ex); return; // We're done. } } } - }, - sendResponse: function(os, compressFlag) + } + + sendResponse(os) { Debug.assert(this._state > StateNotValidated); @@ -579,7 +592,7 @@ var ConnectionI = Class({ throw this._exception; } - this.sendMessage(OutgoingMessage.createForStream(os, compressFlag !== 0, true)); + this.sendMessage(OutgoingMessage.createForStream(os, true)); if(this._state === StateClosing && this._dispatchCount === 0) { @@ -597,8 +610,9 @@ var ConnectionI = Class({ throw ex; } } - }, - sendNoResponse: function() + } + + sendNoResponse() { Debug.assert(this._state > StateNotValidated); try @@ -634,12 +648,14 @@ var ConnectionI = Class({ throw ex; } } - }, - endpoint: function() + } + + endpoint() { return this._endpoint; - }, - setAdapter: function(adapter) + } + + setAdapter(adapter) { if(this._state <= StateNotValidated || this._state >= StateClosing) { @@ -661,16 +677,19 @@ var ConnectionI = Class({ { this._servantManager = null; } - }, - getAdapter: function() + } + + getAdapter() { return this._adapter; - }, - getEndpoint: function() + } + + getEndpoint() { return this._endpoint; - }, - createProxy: function(ident) + } + + createProxy(ident) { // // Create a reference and return a reverse proxy for this @@ -678,8 +697,9 @@ var ConnectionI = Class({ // return this._instance.proxyFactory().referenceToProxy( this._instance.referenceFactory().createFixed(ident, this)); - }, - message: function(operation) + } + + message(operation) { if(this._state >= StateClosed) { @@ -693,7 +713,7 @@ var ConnectionI = Class({ // this._hasMoreData.value = (operation & SocketOperation.Read) !== 0; - var info = null; + let info = null; try { if((operation & SocketOperation.Write) !== 0 && this._writeStream.buffer.remaining > 0) @@ -721,7 +741,7 @@ var ConnectionI = Class({ Debug.assert(this._readStream.buffer.remaining === 0); this._readHeader = false; - var pos = this._readStream.pos; + const pos = this._readStream.pos; if(pos < Protocol.headerSize) { // @@ -731,27 +751,25 @@ var ConnectionI = Class({ } this._readStream.pos = 0; - var magic0 = this._readStream.readByte(); - var magic1 = this._readStream.readByte(); - var magic2 = this._readStream.readByte(); - var magic3 = this._readStream.readByte(); + const magic0 = this._readStream.readByte(); + const magic1 = this._readStream.readByte(); + const magic2 = this._readStream.readByte(); + const magic3 = this._readStream.readByte(); if(magic0 !== Protocol.magic[0] || magic1 !== Protocol.magic[1] || - magic2 !== Protocol.magic[2] || magic3 !== Protocol.magic[3]) + magic2 !== Protocol.magic[2] || magic3 !== Protocol.magic[3]) { - var bme = new Ice.BadMagicException(); - bme.badMagic = Ice.Buffer.createNative([magic0, magic1, magic2, magic3]); - throw bme; + throw new Ice.BadMagicException("", Ice.Buffer.createNative([magic0, magic1, magic2, magic3])); } - this._readProtocol.__read(this._readStream); + this._readProtocol._read(this._readStream); Protocol.checkSupportedProtocol(this._readProtocol); - this._readProtocolEncoding.__read(this._readStream); + this._readProtocolEncoding._read(this._readStream); Protocol.checkSupportedProtocolEncoding(this._readProtocolEncoding); this._readStream.readByte(); // messageType this._readStream.readByte(); // compress - var size = this._readStream.readInt(); + const size = this._readStream.readInt(); if(size < Protocol.headerSize) { throw new Ice.IllegalMessageSizeException(); @@ -879,20 +897,21 @@ var ConnectionI = Class({ if(this._hasMoreData.value) { - var self = this; - Timer.setImmediate(function() { self.message(SocketOperation.Read); }); // Don't tie up the thread. + Timer.setImmediate(() => { this.message(SocketOperation.Read); }); // Don't tie up the thread. } - }, - dispatch: function(info) + } + + dispatch(info) { - var count = 0; + let count = 0; // // Notify the factory that the connection establishment and // validation has completed. // if(this._startPromise !== null) { - this._startPromise.succeed(); + this._startPromise.resolve(); + this._startPromise = null; ++count; } @@ -901,14 +920,13 @@ var ConnectionI = Class({ { if(info.outAsync !== null) { - info.outAsync.__completed(info.stream); + info.outAsync.completed(info.stream); ++count; } if(info.invokeNum > 0) { - this.invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, - info.adapter); + this.invokeAll(info.stream, info.invokeNum, info.requestId, info.servantManager, info.adapter); // // Don't increase count, the dispatch count is @@ -920,7 +938,7 @@ var ConnectionI = Class({ { try { - info.heartbeatCallback.heartbeat(this); + info.heartbeatCallback(this); } catch(ex) { @@ -964,19 +982,19 @@ var ConnectionI = Class({ this.checkState(); } } - }, - finish: function() + } + + finish() { Debug.assert(this._state === StateClosed); this.unscheduleTimeout(SocketOperation.Read | SocketOperation.Write | SocketOperation.Connect); - var s; - var traceLevels = this._instance.traceLevels(); + const traceLevels = this._instance.traceLevels(); if(!this._initialized) { if(traceLevels.network >= 2) { - s = []; + let s = []; s.push("failed to establish "); s.push(this._endpoint.protocol()); s.push(" connection\n"); @@ -990,7 +1008,7 @@ var ConnectionI = Class({ { if(traceLevels.network >= 1) { - s = []; + let s = []; s.push("closed "); s.push(this._endpoint.protocol()); s.push(" connection\n"); @@ -1000,7 +1018,7 @@ var ConnectionI = Class({ // Trace the cause of unexpected connection closures // if(!(this._exception instanceof Ice.CloseConnectionException || - this._exception instanceof Ice.ForcedCloseConnectionException || + this._exception instanceof Ice.ConnectionManuallyClosedException || this._exception instanceof Ice.ConnectionTimeoutException || this._exception instanceof Ice.CommunicatorDestroyedException || this._exception instanceof Ice.ObjectAdapterDeactivatedException)) @@ -1015,7 +1033,7 @@ var ConnectionI = Class({ if(this._startPromise !== null) { - this._startPromise.fail(this._exception); + this._startPromise.reject(this._exception); this._startPromise = null; } @@ -1027,8 +1045,7 @@ var ConnectionI = Class({ // Return the stream to the outgoing call. This is important for // retriable AMI calls which are not marshalled again. // - var message = this._sendStreams[0]; - this._writeStream.swap(message.stream); + this._writeStream.swap(this._sendStreams[0].stream); } // @@ -1037,9 +1054,9 @@ var ConnectionI = Class({ // because it's either in the _requests/_asyncRequests set. This is fine, only the // first call should be taken into account by the implementation of finished. // - for(var i = 0; i < this._sendStreams.length; ++i) + for(let i = 0; i < this._sendStreams.length; ++i) { - var p = this._sendStreams[i]; + let p = this._sendStreams[i]; if(p.requestId > 0) { this._asyncRequests.delete(p.requestId); @@ -1049,9 +1066,9 @@ var ConnectionI = Class({ this._sendStreams = []; } - for(var e = this._asyncRequests.entries; e !== null; e = e.next) + for(let value of this._asyncRequests.values()) { - e.value.__completedEx(this._exception); + value.completedEx(this._exception); } this._asyncRequests.clear(); @@ -1063,19 +1080,21 @@ var ConnectionI = Class({ this._writeStream.clear(); this._writeStream.buffer.clear(); - if(this._callback !== null) + if(this._closeCallback !== null) { try { - this._callback.closed(this); + this._closeCallback(this); } catch(ex) { this._logger.error("connection callback exception:\n" + ex + '\n' + this._desc); } - this._callback = null; + this._closeCallback = null; } + this._heartbeatCallback = null; + // // This must be done last as this will cause waitUntilFinished() to return (and communicator // objects such as the timer might be destroyed too). @@ -1085,12 +1104,14 @@ var ConnectionI = Class({ this.reap(); } this.setState(StateFinished); - }, - toString: function() + } + + toString() { return this._desc; - }, - timedOut: function(event) + } + + timedOut(event) { if(this._state <= StateNotValidated) { @@ -1104,39 +1125,48 @@ var ConnectionI = Class({ { this.setState(StateClosed, new Ice.CloseTimeoutException()); } - }, - type: function() + } + + type() { return this._type; - }, - timeout: function() + } + + timeout() { return this._endpoint.timeout(); - }, - getInfo: function() + } + + getInfo() { if(this._state >= StateClosed) { throw this._exception; } - var info = this._transceiver.getInfo(); - info.adapterName = this._adapter !== null ? this._adapter.getName() : ""; - info.incoming = this._incoming; + let info = this._transceiver.getInfo(); + for(let p = info; p !== null; p = p.underlying) + { + p.adapterName = this._adapter !== null ? this._adapter.getName() : ""; + p.incoming = this._incoming; + } return info; - }, - setBufferSize: function(rcvSize, sndSize) + } + + setBufferSize(rcvSize, sndSize) { if(this._state >= StateClosed) { throw this._exception; } this._transceiver.setBufferSize(rcvSize, sndSize); - }, - exception: function(ex) + } + + exception(ex) { this.setState(StateClosed, ex); - }, - invokeException: function(ex, invokeNum) + } + + invokeException(ex, invokeNum) { // // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't @@ -1159,8 +1189,9 @@ var ConnectionI = Class({ this.checkState(); } } - }, - setState: function(state, ex) + } + + setState(state, ex) { if(ex !== undefined) { @@ -1190,7 +1221,7 @@ var ConnectionI = Class({ // Don't warn about certain expected exceptions. // if(!(this._exception instanceof Ice.CloseConnectionException || - this._exception instanceof Ice.ForcedCloseConnectionException || + this._exception instanceof Ice.ConnectionManuallyClosedException || this._exception instanceof Ice.ConnectionTimeoutException || this._exception instanceof Ice.CommunicatorDestroyedException || this._exception instanceof Ice.ObjectAdapterDeactivatedException || @@ -1334,8 +1365,8 @@ var ConnectionI = Class({ { if(ex instanceof Ice.LocalException) { - var msg = "unexpected connection exception:\n " + this._desc + "\n" + ex.toString(); - this._instance.initializationData().logger.error(msg); + this._instance.initializationData().logger.error( + `unexpected connection exception:\n${this._desc}\n${ex.toString()}`); } else { @@ -1391,29 +1422,27 @@ var ConnectionI = Class({ } this.checkState(); - }, - initiateShutdown: function() + } + + initiateShutdown() { - Debug.assert(this._state === StateClosing); - Debug.assert(this._dispatchCount === 0); + Debug.assert(this._state === StateClosing && this._dispatchCount === 0); Debug.assert(!this._shutdownInitiated); if(!this._endpoint.datagram()) { // - // Before we shut down, we send a close connection - // message. + // Before we shut down, we send a close connection message. // - var os = new BasicStream(this._instance, Protocol.currentProtocolEncoding); + const os = new OutputStream(this._instance, Protocol.currentProtocolEncoding); os.writeBlob(Protocol.magic); - Protocol.currentProtocol.__write(os); - Protocol.currentProtocolEncoding.__write(os); + Protocol.currentProtocol._write(os); + Protocol.currentProtocolEncoding._write(os); os.writeByte(Protocol.closeConnectionMsg); os.writeByte(0); // compression status: always report 0 for CloseConnection. os.writeInt(Protocol.headerSize); // Message size. - var status = this.sendMessage(OutgoingMessage.createForStream(os, false, false)); - if((status & AsyncStatus.Sent) > 0) + if((this.sendMessage(OutgoingMessage.createForStream(os, false)) & AsyncStatus.Sent) > 0) { // // Schedule the close timeout to wait for the peer to close the connection. @@ -1431,23 +1460,24 @@ var ConnectionI = Class({ // //this._transceiver.shutdownWrite(); } - }, - heartbeat: function() + } + + sendHeartbeatNow() { Debug.assert(this._state === StateActive); if(!this._endpoint.datagram()) { - var os = new BasicStream(this._instance, Protocol.currentProtocolEncoding); + const os = new OutputStream(this._instance, Protocol.currentProtocolEncoding); os.writeBlob(Protocol.magic); - Protocol.currentProtocol.__write(os); - Protocol.currentProtocolEncoding.__write(os); + Protocol.currentProtocol._write(os); + Protocol.currentProtocolEncoding._write(os); os.writeByte(Protocol.validateConnectionMsg); os.writeByte(0); os.writeInt(Protocol.headerSize); // Message size. try { - this.sendMessage(OutgoingMessage.createForStream(os, false, false)); + this.sendMessage(OutgoingMessage.createForStream(os, false)); } catch(ex) { @@ -1455,10 +1485,11 @@ var ConnectionI = Class({ Debug.assert(this._exception !== null); } } - }, - initialize: function() + } + + initialize() { - var s = this._transceiver.initialize(this._readStream.buffer, this._writeStream.buffer); + const s = this._transceiver.initialize(this._readStream.buffer, this._writeStream.buffer); if(s != SocketOperation.None) { this.scheduleTimeout(s, this.connectTimeout()); @@ -1472,8 +1503,9 @@ var ConnectionI = Class({ this._initialized = true; this.setState(StateNotValidated); return true; - }, - validate: function() + } + + validate() { if(!this._endpoint.datagram()) // Datagram connections are always implicitly validated. { @@ -1482,8 +1514,8 @@ var ConnectionI = Class({ if(this._writeStream.size === 0) { this._writeStream.writeBlob(Protocol.magic); - Protocol.currentProtocol.__write(this._writeStream); - Protocol.currentProtocolEncoding.__write(this._writeStream); + Protocol.currentProtocol._write(this._writeStream); + Protocol.currentProtocolEncoding._write(this._writeStream); this._writeStream.writeByte(Protocol.validateConnectionMsg); this._writeStream.writeByte(0); // Compression status (always zero for validate connection). this._writeStream.writeInt(Protocol.headerSize); // Message size. @@ -1514,29 +1546,26 @@ var ConnectionI = Class({ Debug.assert(this._readStream.pos === Protocol.headerSize); this._readStream.pos = 0; - var m = this._readStream.readBlob(4); + const m = this._readStream.readBlob(4); if(m[0] !== Protocol.magic[0] || m[1] !== Protocol.magic[1] || m[2] !== Protocol.magic[2] || m[3] !== Protocol.magic[3]) { - var bme = new Ice.BadMagicException(); - bme.badMagic = m; - throw bme; + throw new Ice.BadMagicException("", m); } - this._readProtocol.__read(this._readStream); + this._readProtocol._read(this._readStream); Protocol.checkSupportedProtocol(this._readProtocol); - this._readProtocolEncoding.__read(this._readStream); + this._readProtocolEncoding._read(this._readStream); Protocol.checkSupportedProtocolEncoding(this._readProtocolEncoding); - var messageType = this._readStream.readByte(); + const messageType = this._readStream.readByte(); if(messageType !== Protocol.validateConnectionMsg) { throw new Ice.ConnectionNotValidatedException(); } this._readStream.readByte(); // Ignore compression status for validate connection. - var size = this._readStream.readInt(); - if(size !== Protocol.headerSize) + if( this._readStream.readInt() !== Protocol.headerSize) { throw new Ice.IllegalMessageSizeException(); } @@ -1552,10 +1581,10 @@ var ConnectionI = Class({ this._readHeader = true; this._readStream.pos = 0; - var traceLevels = this._instance.traceLevels(); + const traceLevels = this._instance.traceLevels(); if(traceLevels.network >= 1) { - var s = []; + let s = []; if(this._endpoint.datagram()) { s.push("starting to send "); @@ -1565,7 +1594,6 @@ var ConnectionI = Class({ } else { - s = []; s.push("established "); s.push(this._endpoint.protocol()); s.push(" connection\n"); @@ -1575,8 +1603,9 @@ var ConnectionI = Class({ } return true; - }, - sendNextMessage: function() + } + + sendNextMessage() { if(this._sendStreams.length === 0) { @@ -1591,7 +1620,7 @@ var ConnectionI = Class({ // // Notify the message that it was sent. // - var message = this._sendStreams.shift(); + let message = this._sendStreams.shift(); this._writeStream.swap(message.stream); message.sent(); @@ -1620,21 +1649,15 @@ var ConnectionI = Class({ // message = this._sendStreams[0]; Debug.assert(!message.prepared); - var stream = message.stream; + let stream = message.stream; stream.pos = 10; stream.writeInt(stream.size); stream.prepareWrite(); message.prepared = true; - if(message.outAsync !== null) - { - TraceUtil.trace("sending asynchronous request", stream, this._logger, this._traceLevels); - } - else - { - TraceUtil.traceSend(stream, this._logger, this._traceLevels); - } + TraceUtil.traceSend(stream, this._logger, this._traceLevels); + this._writeStream.swap(message.stream); // @@ -1671,8 +1694,9 @@ var ConnectionI = Class({ { this.scheduleTimeout(SocketOperation.Write, this.closeTimeout()); } - }, - sendMessage: function(message) + } + + sendMessage(message) { if(this._sendStreams.length > 0) { @@ -1684,22 +1708,15 @@ var ConnectionI = Class({ Debug.assert(!message.prepared); - var stream = message.stream; + let stream = message.stream; stream.pos = 10; stream.writeInt(stream.size); stream.prepareWrite(); message.prepared = true; - if(message.outAsync) - { - TraceUtil.trace("sending asynchronous request", message.stream, this._logger, this._traceLevels); - } - else - { - TraceUtil.traceSend(message.stream, this._logger, this._traceLevels); - } + TraceUtil.traceSend(stream, this._logger, this._traceLevels); - if(this.write(message.stream.buffer)) + if(this.write(stream.buffer)) { // // Entire buffer was written immediately. @@ -1714,17 +1731,18 @@ var ConnectionI = Class({ } message.doAdopt(); - this._writeStream.swap(message.stream); + this._writeStream.swap(stream); this._sendStreams.push(message); this.scheduleTimeout(SocketOperation.Write, this._endpoint.timeout()); return AsyncStatus.Queued; - }, - parseMessage: function() + } + + parseMessage() { Debug.assert(this._state > StateNotValidated && this._state < StateClosed); - var info = new MessageInfo(this._instance); + let info = new MessageInfo(this._instance); this._readStream.swap(info.stream); this._readStream.resize(Protocol.headerSize); @@ -1748,13 +1766,11 @@ var ConnectionI = Class({ // been done by the caller. // info.stream.pos = 8; - var messageType = info.stream.readByte(); - info.compress = info.stream.readByte(); - if(info.compress === 2) + const messageType = info.stream.readByte(); + const compress = info.stream.readByte(); + if(compress === 2) { - var ex = new Ice.FeatureNotSupportedException(); - ex.unsupportedFeature = "Cannot uncompress compressed message"; - throw ex; + throw new Ice.FeatureNotSupportedException("Cannot uncompress compressed message"); } info.stream.pos = Protocol.headerSize; @@ -1782,9 +1798,9 @@ var ConnectionI = Class({ { if(this._state === StateClosing) { - TraceUtil.trace("received request during closing\n" + - "(ignored by server, client will retry)", - info.stream, this._logger, this._traceLevels); + TraceUtil.traceIn("received request during closing\n" + + "(ignored by server, client will retry)", + info.stream, this._logger, this._traceLevels); } else { @@ -1802,9 +1818,9 @@ var ConnectionI = Class({ { if(this._state === StateClosing) { - TraceUtil.trace("received batch request during closing\n" + - "(ignored by server, client will retry)", - info.stream, this._logger, this._traceLevels); + TraceUtil.traceIn("received batch request during closing\n" + + "(ignored by server, client will retry)", + info.stream, this._logger, this._traceLevels); } else { @@ -1843,9 +1859,9 @@ var ConnectionI = Class({ case Protocol.validateConnectionMsg: { TraceUtil.traceRecv(info.stream, this._logger, this._traceLevels); - if(this._callback !== null) + if(this._heartbeatCallback !== null) { - info.heartbeatCallback = this._callback; + info.heartbeatCallback = this._heartbeatCallback; ++this._dispatchCount; } break; @@ -1853,8 +1869,8 @@ var ConnectionI = Class({ default: { - TraceUtil.trace("received unknown message\n(invalid, closing connection)", - info.stream, this._logger, this._traceLevels); + TraceUtil.traceIn("received unknown message\n(invalid, closing connection)", + info.stream, this._logger, this._traceLevels); throw new Ice.UnknownMessageException(); } } @@ -1882,10 +1898,10 @@ var ConnectionI = Class({ } return info; - }, - invokeAll: function(stream, invokeNum, requestId, compress, servantManager, adapter) + } + + invokeAll(stream, invokeNum, requestId, servantManager, adapter) { - var inc = null; try { while(invokeNum > 0) @@ -1893,8 +1909,10 @@ var ConnectionI = Class({ // // Prepare the invocation. // - var response = !this._endpoint.datagram() && requestId !== 0; - inc = new IncomingAsync(this._instance, this, adapter, response, compress, requestId); + let inc = new IncomingAsync(this._instance, this, + adapter, + !this._endpoint.datagram() && requestId !== 0, // response + requestId); // // Dispatch the invocation. @@ -1902,7 +1920,6 @@ var ConnectionI = Class({ inc.invoke(servantManager, stream); --invokeNum; - inc = null; } stream.clear(); @@ -1918,27 +1935,28 @@ var ConnectionI = Class({ throw ex; } } - }, - scheduleTimeout: function(op, timeout) + } + + scheduleTimeout(op, timeout) { if(timeout < 0) { return; } - var self = this; if((op & SocketOperation.Read) !== 0) { - this._readTimeoutId = this._timer.schedule(function() { self.timedOut(); }, timeout); + this._readTimeoutId = this._timer.schedule(() => this.timedOut(), timeout); this._readTimeoutScheduled = true; } if((op & (SocketOperation.Write | SocketOperation.Connect)) !== 0) { - this._writeTimeoutId = this._timer.schedule(function() { self.timedOut(); }, timeout); + this._writeTimeoutId = this._timer.schedule(() => this.timedOut(), timeout); this._writeTimeoutScheduled = true; } - }, - unscheduleTimeout: function(op) + } + + unscheduleTimeout(op) { if((op & SocketOperation.Read) !== 0 && this._readTimeoutScheduled) { @@ -1950,10 +1968,11 @@ var ConnectionI = Class({ this._timer.cancel(this._writeTimeoutId); this._writeTimeoutScheduled = false; } - }, - connectTimeout: function() + } + + connectTimeout() { - var defaultsAndOverrides = this._instance.defaultsAndOverrides(); + const defaultsAndOverrides = this._instance.defaultsAndOverrides(); if(defaultsAndOverrides.overrideConnectTimeout) { return defaultsAndOverrides.overrideConnectTimeoutValue; @@ -1962,10 +1981,11 @@ var ConnectionI = Class({ { return this._endpoint.timeout(); } - }, - closeTimeout: function() + } + + closeTimeout() { - var defaultsAndOverrides = this._instance.defaultsAndOverrides(); + const defaultsAndOverrides = this._instance.defaultsAndOverrides(); if(defaultsAndOverrides.overrideCloseTimeout) { return defaultsAndOverrides.overrideCloseTimeoutValue; @@ -1974,27 +1994,22 @@ var ConnectionI = Class({ { return this._endpoint.timeout(); } - }, - warning: function(msg, ex) + } + + warning(msg, ex) { this._logger.warning(msg + ":\n" + this._desc + "\n" + ex.toString()); - }, - checkState: function() + } + + checkState() { if(this._state < StateHolding || this._dispatchCount > 0) { return; } - var i; - if(this._holdPromises.length > 0) - { - for(i = 0; i < this._holdPromises.length; ++i) - { - this._holdPromises[i].succeed(); - } - this._holdPromises = []; - } + this._holdPromises.forEach(p => p.resolve()); + this._holdPromises = []; // // We aren't finished until the state is finished and all @@ -2008,28 +2023,26 @@ var ConnectionI = Class({ // Clear the OA. See bug 1673 for the details of why this is necessary. // this._adapter = null; - - for(i = 0; i < this._finishedPromises.length; ++i) - { - this._finishedPromises[i].succeed(); - } + this._finishedPromises.forEach(p => p.resolve()); this._finishedPromises = []; } - }, - reap: function() + } + + reap() { if(this._monitor !== null) { this._monitor.reap(this); } - }, - read: function(buf) + } + + read(buf) { - var start = buf.position; - var ret = this._transceiver.read(buf, this._hasMoreData); + const start = buf.position; + const ret = this._transceiver.read(buf, this._hasMoreData); if(this._instance.traceLevels().network >= 3 && buf.position != start) { - var s = []; + let s = []; s.push("received "); if(this._endpoint.datagram()) { @@ -2048,14 +2061,15 @@ var ConnectionI = Class({ this._instance.initializationData().logger.trace(this._instance.traceLevels().networkCat, s.join("")); } return ret; - }, - write: function(buf) + } + + write(buf) { - var start = buf.position; - var ret = this._transceiver.write(buf); + const start = buf.position; + const ret = this._transceiver.write(buf); if(this._instance.traceLevels().network >= 3 && buf.position != start) { - var s = []; + let s = []; s.push("sent "); s.push(buf.position - start); if(!this._endpoint.datagram()) @@ -2071,7 +2085,7 @@ var ConnectionI = Class({ } return ret; } -}); +} // DestructionReason. ConnectionI.ObjectAdapterDeactivated = 0; @@ -2079,68 +2093,70 @@ ConnectionI.CommunicatorDestroyed = 1; Ice.ConnectionI = ConnectionI; -var OutgoingMessage = Class({ - __init__: function() +class OutgoingMessage +{ + constructor() { this.stream = null; this.outAsync = null; - this.compress = false; this.requestId = 0; this.prepared = false; - }, - canceled: function() + } + + canceled() { Debug.assert(this.outAsync !== null); this.outAsync = null; - }, - doAdopt: function() + } + + doAdopt() { if(this.adopt) { - var stream = new BasicStream(this.stream.instance, Protocol.currentProtocolEncoding); + const stream = new OutputStream(this.stream.instance, Protocol.currentProtocolEncoding); stream.swap(this.stream); this.stream = stream; this.adopt = false; } - }, - sent: function() + } + + sent() { if(this.outAsync !== null) { - this.outAsync.__sent(); + this.outAsync.sent(); } - }, - completed: function(ex) + } + + completed(ex) { if(this.outAsync !== null) { - this.outAsync.__completedEx(ex); + this.outAsync.completedEx(ex); } } -}); -OutgoingMessage.createForStream = function(stream, compress, adopt) -{ - var m = new OutgoingMessage(); - m.stream = stream; - m.compress = compress; - m.adopt = adopt; - m.isSent = false; - m.requestId = 0; - m.outAsync = null; - return m; -}; - -OutgoingMessage.create = function(out, stream, compress, requestId) -{ - var m = new OutgoingMessage(); - m.stream = stream; - m.compress = compress; - m.outAsync = out; - m.requestId = requestId; - m.isSent = false; - m.adopt = false; - return m; -}; + static createForStream(stream, adopt) + { + const m = new OutgoingMessage(); + m.stream = stream; + m.adopt = adopt; + m.isSent = false; + m.requestId = 0; + m.outAsync = null; + return m; + } + + static create(out, stream, requestId) + { + const m = new OutgoingMessage(); + m.stream = stream; + m.outAsync = out; + m.requestId = requestId; + m.isSent = false; + m.adopt = false; + return m; + } +} module.exports.Ice = Ice; |