summaryrefslogtreecommitdiff
path: root/js/src/Ice/ConnectionI.js
diff options
context:
space:
mode:
Diffstat (limited to 'js/src/Ice/ConnectionI.js')
-rw-r--r--js/src/Ice/ConnectionI.js846
1 files changed, 431 insertions, 415 deletions
diff --git a/js/src/Ice/ConnectionI.js b/js/src/Ice/ConnectionI.js
index 04dd1a0c4f3..1bf5c7d0b28 100644
--- a/js/src/Ice/ConnectionI.js
+++ b/js/src/Ice/ConnectionI.js
@@ -7,12 +7,11 @@
//
// **********************************************************************
-var Ice = require("../Ice/ModuleRegistry").Ice;
-Ice.__M.require(module,
+const Ice = require("../Ice/ModuleRegistry").Ice;
+Ice._ModuleRegistry.require(module,
[
- "../Ice/Class",
"../Ice/AsyncStatus",
- "../Ice/BasicStream",
+ "../Ice/Stream",
"../Ice/OutgoingAsync",
"../Ice/Debug",
"../Ice/ExUtil",
@@ -30,52 +29,54 @@ Ice.__M.require(module,
"../Ice/BatchRequestQueue",
]);
-var AsyncStatus = Ice.AsyncStatus;
-var AsyncResultBase = Ice.AsyncResultBase;
-var BasicStream = Ice.BasicStream;
-var BatchRequestQueue = Ice.BatchRequestQueue;
-var ConnectionFlushBatch = Ice.ConnectionFlushBatch;
-var Debug = Ice.Debug;
-var ExUtil = Ice.ExUtil;
-var HashMap = Ice.HashMap;
-var IncomingAsync = Ice.IncomingAsync;
-var RetryException = Ice.RetryException;
-var Promise = Ice.Promise;
-var Protocol = Ice.Protocol;
-var SocketOperation = Ice.SocketOperation;
-var Timer = Ice.Timer;
-var TraceUtil = Ice.TraceUtil;
-var ProtocolVersion = Ice.ProtocolVersion;
-var EncodingVersion = Ice.EncodingVersion;
-var ACM = Ice.ACM;
-var ACMClose = Ice.ACMClose;
-var ACMHeartbeat = Ice.ACMHeartbeat;
-
-var StateNotInitialized = 0;
-var StateNotValidated = 1;
-var StateActive = 2;
-var StateHolding = 3;
-var StateClosing = 4;
-var StateClosed = 5;
-var StateFinished = 6;
-
-var MessageInfo = function(instance)
+const AsyncStatus = Ice.AsyncStatus;
+const AsyncResultBase = Ice.AsyncResultBase;
+const InputStream = Ice.InputStream;
+const OutputStream = Ice.OutputStream;
+const BatchRequestQueue = Ice.BatchRequestQueue;
+const ConnectionFlushBatch = Ice.ConnectionFlushBatch;
+const HeartbeatAsync = Ice.HeartbeatAsync;
+const Debug = Ice.Debug;
+const ExUtil = Ice.ExUtil;
+const HashMap = Ice.HashMap;
+const IncomingAsync = Ice.IncomingAsync;
+const RetryException = Ice.RetryException;
+const Protocol = Ice.Protocol;
+const SocketOperation = Ice.SocketOperation;
+const Timer = Ice.Timer;
+const TraceUtil = Ice.TraceUtil;
+const ProtocolVersion = Ice.ProtocolVersion;
+const EncodingVersion = Ice.EncodingVersion;
+const ACM = Ice.ACM;
+const ACMClose = Ice.ACMClose;
+const ACMHeartbeat = Ice.ACMHeartbeat;
+const ConnectionClose = Ice.ConnectionClose;
+
+const StateNotInitialized = 0;
+const StateNotValidated = 1;
+const StateActive = 2;
+const StateHolding = 3;
+const StateClosing = 4;
+const StateClosed = 5;
+const StateFinished = 6;
+
+class MessageInfo
{
- this.stream = new BasicStream(instance, Protocol.currentProtocolEncoding);
-
- this.invokeNum = 0;
- this.requestId = 0;
- this.compress = false;
- this.servantManager = null;
- this.adapter = null;
- this.outAsync = null;
- this.heartbeatCallback = null;
-};
-
-var Class = Ice.Class;
+ constructor(instance)
+ {
+ this.stream = new InputStream(instance, Protocol.currentProtocolEncoding);
+ this.invokeNum = 0;
+ this.requestId = 0;
+ this.servantManager = null;
+ this.adapter = null;
+ this.outAsync = null;
+ this.heartbeatCallback = null;
+ }
+}
-var ConnectionI = Class({
- __init__: function(communicator, instance, monitor, transceiver, endpoint, incoming, adapter)
+class ConnectionI
+{
+ constructor(communicator, instance, monitor, transceiver, endpoint, incoming, adapter)
{
this._communicator = communicator;
this._instance = instance;
@@ -86,7 +87,7 @@ var ConnectionI = Class({
this._endpoint = endpoint;
this._incoming = incoming;
this._adapter = adapter;
- var initData = instance.initializationData();
+ const initData = instance.initializationData();
this._logger = initData.logger; // Cached for better performance.
this._traceLevels = instance.traceLevels(); // Cached for better performance.
this._timer = instance.timer();
@@ -106,9 +107,9 @@ var ConnectionI = Class({
this._sendStreams = [];
- this._readStream = new BasicStream(instance, Protocol.currentProtocolEncoding);
+ this._readStream = new InputStream(instance, Protocol.currentProtocolEncoding);
this._readHeader = false;
- this._writeStream = new BasicStream(instance, Protocol.currentProtocolEncoding);
+ this._writeStream = new OutputStream(instance, Protocol.currentProtocolEncoding);
this._readStreamPos = -1;
this._writeStreamPos = -1;
@@ -140,9 +141,11 @@ var ConnectionI = Class({
{
this._servantManager = null;
}
- this._callback = null;
- },
- start: function()
+ this._closeCallback = null;
+ this._heartbeatCallback = null;
+ }
+
+ start()
{
Debug.assert(this._startPromise === null);
@@ -152,34 +155,14 @@ var ConnectionI = Class({
if(this._state >= StateClosed)
{
Debug.assert(this._exception !== null);
- return new Promise().fail(this._exception);
+ return Ice.Promise.reject(this._exception);
}
- this._startPromise = new Promise();
- var self = this;
+ this._startPromise = new Ice.Promise();
this._transceiver.setCallbacks(
- function() { self.message(SocketOperation.Write); }, // connected callback
- function() { self.message(SocketOperation.Read); }, // read callback
- function(bytesSent, bytesTotal) {
- self.message(SocketOperation.Write);
- if(self._instance.traceLevels().network >= 3 && bytesSent > 0)
- {
- var s = [];
- s.push("sent ");
- s.push(bytesSent);
- if(!self._endpoint.datagram())
- {
- s.push(" of ");
- s.push(bytesTotal);
- }
- s.push(" bytes via ");
- s.push(self._endpoint.protocol());
- s.push("\n");
- s.push(this.toString());
- self._instance.initializationData().logger.trace(self._instance.traceLevels().networkCat,
- s.join(""));
- }
- } // write callback
+ () => { this.message(SocketOperation.Write); }, // connected callback
+ () => { this.message(SocketOperation.Read); }, // read callback
+ () => { this.message(SocketOperation.Write); } // write callback
);
this.initialize();
}
@@ -189,12 +172,12 @@ var ConnectionI = Class({
{
this.exception(ex);
}
- return new Promise().fail(ex);
+ return Ice.Promise.reject(ex);
}
-
return this._startPromise;
- },
- activate: function()
+ }
+
+ activate()
{
if(this._state <= StateNotValidated)
{
@@ -206,8 +189,9 @@ var ConnectionI = Class({
this._acmLastActivity = Date.now();
}
this.setState(StateActive);
- },
- hold: function()
+ }
+
+ hold()
{
if(this._state <= StateNotValidated)
{
@@ -215,8 +199,9 @@ var ConnectionI = Class({
}
this.setState(StateHolding);
- },
- destroy: function(reason)
+ }
+
+ destroy(reason)
{
switch(reason)
{
@@ -232,53 +217,57 @@ var ConnectionI = Class({
break;
}
}
- },
- close: function(force)
+ }
+
+ close(mode)
{
- var __r = new AsyncResultBase(this._communicator, "close", this, null, null);
+ const r = new AsyncResultBase(this._communicator, "close", this, null, null);
- if(force)
+ if(mode == ConnectionClose.Forcefully)
{
- this.setState(StateClosed, new Ice.ForcedCloseConnectionException());
- __r.succeed(__r);
+ this.setState(StateClosed, new Ice.ConnectionManuallyClosedException(false));
+ r.resolve();
+ }
+ else if(mode == ConnectionClose.Gracefully)
+ {
+ this.setState(StateClosing, new Ice.ConnectionManuallyClosedException(true));
+ r.resolve();
}
else
{
+ Debug.assert(mode == ConnectionClose.GracefullyWithWait);
+
//
- // If we do a graceful shutdown, then we wait until all
- // outstanding requests have been completed. Otherwise,
- // the CloseConnectionException will cause all outstanding
- // requests to be retried, regardless of whether the
- // server has processed them or not.
+ // Wait until all outstanding requests have been completed.
//
- this._closePromises.push(__r);
+ this._closePromises.push(r);
this.checkClose();
}
- return __r;
- },
- checkClose: function()
+ return r;
+ }
+
+ checkClose()
{
//
- // If close(false) has been called, then we need to check if all
+ // If close(GracefullyWithWait) has been called, then we need to check if all
// requests have completed and we can transition to StateClosing.
// We also complete outstanding promises.
//
if(this._asyncRequests.size === 0 && this._closePromises.length > 0)
{
- this.setState(StateClosing, new Ice.CloseConnectionException());
- for(var i = 0; i < this._closePromises.length; ++i)
- {
- this._closePromises[i].succeed(this._closePromises[i]);
- }
+ this.setState(StateClosing, new Ice.ConnectionManuallyClosedException(true));
+ this._closePromises.forEach(p => p.resolve());
this._closePromises = [];
}
- },
- isActiveOrHolding: function()
+ }
+
+ isActiveOrHolding()
{
return this._state > StateNotValidated && this._state < StateClosing;
- },
- isFinished: function()
+ }
+
+ isFinished()
{
if(this._state !== StateFinished || this._dispatchCount !== 0)
{
@@ -287,30 +276,34 @@ var ConnectionI = Class({
Debug.assert(this._state === StateFinished);
return true;
- },
- throwException: function()
+ }
+
+ throwException()
{
if(this._exception !== null)
{
Debug.assert(this._state >= StateClosing);
throw this._exception;
}
- },
- waitUntilHolding: function()
+ }
+
+ waitUntilHolding()
{
- var promise = new Promise();
+ const promise = new Ice.Promise();
this._holdPromises.push(promise);
this.checkState();
return promise;
- },
- waitUntilFinished: function()
+ }
+
+ waitUntilFinished()
{
- var promise = new Promise();
+ const promise = new Ice.Promise();
this._finishedPromises.push(promise);
this.checkState();
return promise;
- },
- monitor: function(now, acm)
+ }
+
+ monitor(now, acm)
{
if(this._state !== StateActive)
{
@@ -321,22 +314,22 @@ var ConnectionI = Class({
// We send a heartbeat if there was no activity in the last
// (timeout / 4) period. Sending a heartbeat sooner than
// really needed is safer to ensure that the receiver will
- // receive in time the heartbeat. Sending the heartbeat if
+ // receive the heartbeat in time. Sending the heartbeat if
// there was no activity in the last (timeout / 2) period
// isn't enough since monitor() is called only every (timeout
// / 2) period.
//
// Note that this doesn't imply that we are sending 4 heartbeats
- // per timeout period because the monitor() method is sill only
+ // per timeout period because the monitor() method is still only
// called every (timeout / 2) period.
//
if(acm.heartbeat == Ice.ACMHeartbeat.HeartbeatAlways ||
- (acm.heartbeat != Ice.ACMHeartbeat.HeartbeatOff && this._writeStream.isEmpty() &&
- now >= (this._acmLastActivity + acm.timeout / 4)))
+ (acm.heartbeat != Ice.ACMHeartbeat.HeartbeatOff && this._writeStream.isEmpty() &&
+ now >= (this._acmLastActivity + acm.timeout / 4)))
{
if(acm.heartbeat != Ice.ACMHeartbeat.HeartbeatOnInvocation || this._dispatchCount > 0)
{
- this.heartbeat(); // Send heartbeat if idle in the last timeout / 2 period.
+ this.sendHeartbeatNow(); // Send heartbeat if idle in the last timeout / 2 period.
}
}
@@ -354,7 +347,7 @@ var ConnectionI = Class({
if(acm.close != Ice.ACMClose.CloseOff && now >= (this._acmLastActivity + acm.timeout))
{
if(acm.close == Ice.ACMClose.CloseOnIdleForceful ||
- (acm.close != Ice.ACMClose.CloseOnIdle && this._asyncRequests.size > 0))
+ (acm.close != Ice.ACMClose.CloseOnIdle && this._asyncRequests.size > 0))
{
//
// Close the connection if we didn't receive a heartbeat in
@@ -371,11 +364,12 @@ var ConnectionI = Class({
this.setState(StateClosing, new Ice.ConnectionTimeoutException());
}
}
- },
- sendAsyncRequest: function(out, compress, response, batchRequestNum)
+ }
+
+ sendAsyncRequest(out, response, batchRequestNum)
{
- var requestId = 0;
- var os = out.__os();
+ let requestId = 0;
+ const ostr = out.getOs();
if(this._exception !== null)
{
@@ -394,13 +388,13 @@ var ConnectionI = Class({
// Ensure the message isn't bigger than what we can send with the
// transport.
//
- this._transceiver.checkSendSize(os);
+ this._transceiver.checkSendSize(ostr);
//
// Notify the request that it's cancelable with this connection.
// This will throw if the request is canceled.
//
- out.__cancelable(this); // Notify the request that it's cancelable
+ out.cancelable(this); // Notify the request that it's cancelable
if(response)
{
@@ -417,19 +411,19 @@ var ConnectionI = Class({
//
// Fill in the request ID.
//
- os.pos = Protocol.headerSize;
- os.writeInt(requestId);
+ ostr.pos = Protocol.headerSize;
+ ostr.writeInt(requestId);
}
else if(batchRequestNum > 0)
{
- os.pos = Protocol.headerSize;
- os.writeInt(batchRequestNum);
+ ostr.pos = Protocol.headerSize;
+ ostr.writeInt(batchRequestNum);
}
- var status;
+ let status;
try
{
- status = this.sendMessage(OutgoingMessage.create(out, out.__os(), compress, requestId));
+ status = this.sendMessage(OutgoingMessage.create(out, out.getOs(), requestId));
}
catch(ex)
{
@@ -454,42 +448,58 @@ var ConnectionI = Class({
}
return status;
- },
- getBatchRequestQueue: function()
+ }
+
+ getBatchRequestQueue()
{
return this._batchRequestQueue;
- },
- flushBatchRequests: function()
+ }
+
+ flushBatchRequests()
{
- var result = new ConnectionFlushBatch(this, this._communicator, "flushBatchRequests");
- result.__invoke();
+ const result = new ConnectionFlushBatch(this, this._communicator, "flushBatchRequests");
+ result.invoke();
return result;
- },
- setCallback: function(callback)
+ }
+
+ setCloseCallback(callback)
{
if(this._state >= StateClosed)
{
if(callback !== null)
{
- var self = this;
- Timer.setImmediate(function() {
+ Timer.setImmediate(() =>
+ {
try
{
- callback.closed(this);
+ callback(this);
}
catch(ex)
{
- self._logger.error("connection callback exception:\n" + ex + '\n' + self._desc);
+ this._logger.error("connection callback exception:\n" + ex + '\n' + this._desc);
}
});
}
}
else
{
- this._callback = callback;
+ this._closeCallback = callback;
}
- },
- setACM: function(timeout, close, heartbeat)
+ }
+
+ setHeartbeatCallback(callback)
+ {
+ this._heartbeatCallback = callback;
+ }
+
+ heartbeat()
+ {
+ const result = new HeartbeatAsync(this, this._communicator);
+ result.invoke();
+ return result;
+ }
+
+ setACM(timeout, close, heartbeat)
{
if(this._monitor === null || this._state >= StateClosed)
{
@@ -513,17 +523,19 @@ var ConnectionI = Class({
{
this._acmLastActivity = Date.now();
}
- },
- getACM: function()
+ }
+
+ getACM()
{
return this._monitor !== null ? this._monitor.getACM() :
new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff);
- },
- asyncRequestCanceled: function(outAsync, ex)
+ }
+
+ asyncRequestCanceled(outAsync, ex)
{
- for(var i = 0; i < this._sendStreams.length; i++)
+ for(let i = 0; i < this._sendStreams.length; i++)
{
- var o = this._sendStreams[i];
+ let o = this._sendStreams[i];
if(o.outAsync === outAsync)
{
if(o.requestId > 0)
@@ -540,25 +552,26 @@ var ConnectionI = Class({
{
this._sendStreams.splice(i, 1);
}
- outAsync.__completedEx(ex);
+ outAsync.completedEx(ex);
return; // We're done.
}
}
if(outAsync instanceof Ice.OutgoingAsync)
{
- for(var e = this._asyncRequests.entries; e !== null; e = e.next)
+ for(let [key, value] of this._asyncRequests)
{
- if(e.value === outAsync)
+ if(value === outAsync)
{
- this._asyncRequests.delete(e.key);
- outAsync.__completedEx(ex);
+ this._asyncRequests.delete(key);
+ outAsync.completedEx(ex);
return; // We're done.
}
}
}
- },
- sendResponse: function(os, compressFlag)
+ }
+
+ sendResponse(os)
{
Debug.assert(this._state > StateNotValidated);
@@ -579,7 +592,7 @@ var ConnectionI = Class({
throw this._exception;
}
- this.sendMessage(OutgoingMessage.createForStream(os, compressFlag !== 0, true));
+ this.sendMessage(OutgoingMessage.createForStream(os, true));
if(this._state === StateClosing && this._dispatchCount === 0)
{
@@ -597,8 +610,9 @@ var ConnectionI = Class({
throw ex;
}
}
- },
- sendNoResponse: function()
+ }
+
+ sendNoResponse()
{
Debug.assert(this._state > StateNotValidated);
try
@@ -634,12 +648,14 @@ var ConnectionI = Class({
throw ex;
}
}
- },
- endpoint: function()
+ }
+
+ endpoint()
{
return this._endpoint;
- },
- setAdapter: function(adapter)
+ }
+
+ setAdapter(adapter)
{
if(this._state <= StateNotValidated || this._state >= StateClosing)
{
@@ -661,16 +677,19 @@ var ConnectionI = Class({
{
this._servantManager = null;
}
- },
- getAdapter: function()
+ }
+
+ getAdapter()
{
return this._adapter;
- },
- getEndpoint: function()
+ }
+
+ getEndpoint()
{
return this._endpoint;
- },
- createProxy: function(ident)
+ }
+
+ createProxy(ident)
{
//
// Create a reference and return a reverse proxy for this
@@ -678,8 +697,9 @@ var ConnectionI = Class({
//
return this._instance.proxyFactory().referenceToProxy(
this._instance.referenceFactory().createFixed(ident, this));
- },
- message: function(operation)
+ }
+
+ message(operation)
{
if(this._state >= StateClosed)
{
@@ -693,7 +713,7 @@ var ConnectionI = Class({
//
this._hasMoreData.value = (operation & SocketOperation.Read) !== 0;
- var info = null;
+ let info = null;
try
{
if((operation & SocketOperation.Write) !== 0 && this._writeStream.buffer.remaining > 0)
@@ -721,7 +741,7 @@ var ConnectionI = Class({
Debug.assert(this._readStream.buffer.remaining === 0);
this._readHeader = false;
- var pos = this._readStream.pos;
+ const pos = this._readStream.pos;
if(pos < Protocol.headerSize)
{
//
@@ -731,27 +751,25 @@ var ConnectionI = Class({
}
this._readStream.pos = 0;
- var magic0 = this._readStream.readByte();
- var magic1 = this._readStream.readByte();
- var magic2 = this._readStream.readByte();
- var magic3 = this._readStream.readByte();
+ const magic0 = this._readStream.readByte();
+ const magic1 = this._readStream.readByte();
+ const magic2 = this._readStream.readByte();
+ const magic3 = this._readStream.readByte();
if(magic0 !== Protocol.magic[0] || magic1 !== Protocol.magic[1] ||
- magic2 !== Protocol.magic[2] || magic3 !== Protocol.magic[3])
+ magic2 !== Protocol.magic[2] || magic3 !== Protocol.magic[3])
{
- var bme = new Ice.BadMagicException();
- bme.badMagic = Ice.Buffer.createNative([magic0, magic1, magic2, magic3]);
- throw bme;
+ throw new Ice.BadMagicException("", Ice.Buffer.createNative([magic0, magic1, magic2, magic3]));
}
- this._readProtocol.__read(this._readStream);
+ this._readProtocol._read(this._readStream);
Protocol.checkSupportedProtocol(this._readProtocol);
- this._readProtocolEncoding.__read(this._readStream);
+ this._readProtocolEncoding._read(this._readStream);
Protocol.checkSupportedProtocolEncoding(this._readProtocolEncoding);
this._readStream.readByte(); // messageType
this._readStream.readByte(); // compress
- var size = this._readStream.readInt();
+ const size = this._readStream.readInt();
if(size < Protocol.headerSize)
{
throw new Ice.IllegalMessageSizeException();
@@ -879,20 +897,21 @@ var ConnectionI = Class({
if(this._hasMoreData.value)
{
- var self = this;
- Timer.setImmediate(function() { self.message(SocketOperation.Read); }); // Don't tie up the thread.
+ Timer.setImmediate(() => { this.message(SocketOperation.Read); }); // Don't tie up the thread.
}
- },
- dispatch: function(info)
+ }
+
+ dispatch(info)
{
- var count = 0;
+ let count = 0;
//
// Notify the factory that the connection establishment and
// validation has completed.
//
if(this._startPromise !== null)
{
- this._startPromise.succeed();
+ this._startPromise.resolve();
+
this._startPromise = null;
++count;
}
@@ -901,14 +920,13 @@ var ConnectionI = Class({
{
if(info.outAsync !== null)
{
- info.outAsync.__completed(info.stream);
+ info.outAsync.completed(info.stream);
++count;
}
if(info.invokeNum > 0)
{
- this.invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager,
- info.adapter);
+ this.invokeAll(info.stream, info.invokeNum, info.requestId, info.servantManager, info.adapter);
//
// Don't increase count, the dispatch count is
@@ -920,7 +938,7 @@ var ConnectionI = Class({
{
try
{
- info.heartbeatCallback.heartbeat(this);
+ info.heartbeatCallback(this);
}
catch(ex)
{
@@ -964,19 +982,19 @@ var ConnectionI = Class({
this.checkState();
}
}
- },
- finish: function()
+ }
+
+ finish()
{
Debug.assert(this._state === StateClosed);
this.unscheduleTimeout(SocketOperation.Read | SocketOperation.Write | SocketOperation.Connect);
- var s;
- var traceLevels = this._instance.traceLevels();
+ const traceLevels = this._instance.traceLevels();
if(!this._initialized)
{
if(traceLevels.network >= 2)
{
- s = [];
+ let s = [];
s.push("failed to establish ");
s.push(this._endpoint.protocol());
s.push(" connection\n");
@@ -990,7 +1008,7 @@ var ConnectionI = Class({
{
if(traceLevels.network >= 1)
{
- s = [];
+ let s = [];
s.push("closed ");
s.push(this._endpoint.protocol());
s.push(" connection\n");
@@ -1000,7 +1018,7 @@ var ConnectionI = Class({
// Trace the cause of unexpected connection closures
//
if(!(this._exception instanceof Ice.CloseConnectionException ||
- this._exception instanceof Ice.ForcedCloseConnectionException ||
+ this._exception instanceof Ice.ConnectionManuallyClosedException ||
this._exception instanceof Ice.ConnectionTimeoutException ||
this._exception instanceof Ice.CommunicatorDestroyedException ||
this._exception instanceof Ice.ObjectAdapterDeactivatedException))
@@ -1015,7 +1033,7 @@ var ConnectionI = Class({
if(this._startPromise !== null)
{
- this._startPromise.fail(this._exception);
+ this._startPromise.reject(this._exception);
this._startPromise = null;
}
@@ -1027,8 +1045,7 @@ var ConnectionI = Class({
// Return the stream to the outgoing call. This is important for
// retriable AMI calls which are not marshalled again.
//
- var message = this._sendStreams[0];
- this._writeStream.swap(message.stream);
+ this._writeStream.swap(this._sendStreams[0].stream);
}
//
@@ -1037,9 +1054,9 @@ var ConnectionI = Class({
// because it's either in the _requests/_asyncRequests set. This is fine, only the
// first call should be taken into account by the implementation of finished.
//
- for(var i = 0; i < this._sendStreams.length; ++i)
+ for(let i = 0; i < this._sendStreams.length; ++i)
{
- var p = this._sendStreams[i];
+ let p = this._sendStreams[i];
if(p.requestId > 0)
{
this._asyncRequests.delete(p.requestId);
@@ -1049,9 +1066,9 @@ var ConnectionI = Class({
this._sendStreams = [];
}
- for(var e = this._asyncRequests.entries; e !== null; e = e.next)
+ for(let value of this._asyncRequests.values())
{
- e.value.__completedEx(this._exception);
+ value.completedEx(this._exception);
}
this._asyncRequests.clear();
@@ -1063,19 +1080,21 @@ var ConnectionI = Class({
this._writeStream.clear();
this._writeStream.buffer.clear();
- if(this._callback !== null)
+ if(this._closeCallback !== null)
{
try
{
- this._callback.closed(this);
+ this._closeCallback(this);
}
catch(ex)
{
this._logger.error("connection callback exception:\n" + ex + '\n' + this._desc);
}
- this._callback = null;
+ this._closeCallback = null;
}
+ this._heartbeatCallback = null;
+
//
// This must be done last as this will cause waitUntilFinished() to return (and communicator
// objects such as the timer might be destroyed too).
@@ -1085,12 +1104,14 @@ var ConnectionI = Class({
this.reap();
}
this.setState(StateFinished);
- },
- toString: function()
+ }
+
+ toString()
{
return this._desc;
- },
- timedOut: function(event)
+ }
+
+ timedOut(event)
{
if(this._state <= StateNotValidated)
{
@@ -1104,39 +1125,48 @@ var ConnectionI = Class({
{
this.setState(StateClosed, new Ice.CloseTimeoutException());
}
- },
- type: function()
+ }
+
+ type()
{
return this._type;
- },
- timeout: function()
+ }
+
+ timeout()
{
return this._endpoint.timeout();
- },
- getInfo: function()
+ }
+
+ getInfo()
{
if(this._state >= StateClosed)
{
throw this._exception;
}
- var info = this._transceiver.getInfo();
- info.adapterName = this._adapter !== null ? this._adapter.getName() : "";
- info.incoming = this._incoming;
+ let info = this._transceiver.getInfo();
+ for(let p = info; p !== null; p = p.underlying)
+ {
+ p.adapterName = this._adapter !== null ? this._adapter.getName() : "";
+ p.incoming = this._incoming;
+ }
return info;
- },
- setBufferSize: function(rcvSize, sndSize)
+ }
+
+ setBufferSize(rcvSize, sndSize)
{
if(this._state >= StateClosed)
{
throw this._exception;
}
this._transceiver.setBufferSize(rcvSize, sndSize);
- },
- exception: function(ex)
+ }
+
+ exception(ex)
{
this.setState(StateClosed, ex);
- },
- invokeException: function(ex, invokeNum)
+ }
+
+ invokeException(ex, invokeNum)
{
//
// Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't
@@ -1159,8 +1189,9 @@ var ConnectionI = Class({
this.checkState();
}
}
- },
- setState: function(state, ex)
+ }
+
+ setState(state, ex)
{
if(ex !== undefined)
{
@@ -1190,7 +1221,7 @@ var ConnectionI = Class({
// Don't warn about certain expected exceptions.
//
if(!(this._exception instanceof Ice.CloseConnectionException ||
- this._exception instanceof Ice.ForcedCloseConnectionException ||
+ this._exception instanceof Ice.ConnectionManuallyClosedException ||
this._exception instanceof Ice.ConnectionTimeoutException ||
this._exception instanceof Ice.CommunicatorDestroyedException ||
this._exception instanceof Ice.ObjectAdapterDeactivatedException ||
@@ -1334,8 +1365,8 @@ var ConnectionI = Class({
{
if(ex instanceof Ice.LocalException)
{
- var msg = "unexpected connection exception:\n " + this._desc + "\n" + ex.toString();
- this._instance.initializationData().logger.error(msg);
+ this._instance.initializationData().logger.error(
+ `unexpected connection exception:\n${this._desc}\n${ex.toString()}`);
}
else
{
@@ -1391,29 +1422,27 @@ var ConnectionI = Class({
}
this.checkState();
- },
- initiateShutdown: function()
+ }
+
+ initiateShutdown()
{
- Debug.assert(this._state === StateClosing);
- Debug.assert(this._dispatchCount === 0);
+ Debug.assert(this._state === StateClosing && this._dispatchCount === 0);
Debug.assert(!this._shutdownInitiated);
if(!this._endpoint.datagram())
{
//
- // Before we shut down, we send a close connection
- // message.
+ // Before we shut down, we send a close connection message.
//
- var os = new BasicStream(this._instance, Protocol.currentProtocolEncoding);
+ const os = new OutputStream(this._instance, Protocol.currentProtocolEncoding);
os.writeBlob(Protocol.magic);
- Protocol.currentProtocol.__write(os);
- Protocol.currentProtocolEncoding.__write(os);
+ Protocol.currentProtocol._write(os);
+ Protocol.currentProtocolEncoding._write(os);
os.writeByte(Protocol.closeConnectionMsg);
os.writeByte(0); // compression status: always report 0 for CloseConnection.
os.writeInt(Protocol.headerSize); // Message size.
- var status = this.sendMessage(OutgoingMessage.createForStream(os, false, false));
- if((status & AsyncStatus.Sent) > 0)
+ if((this.sendMessage(OutgoingMessage.createForStream(os, false)) & AsyncStatus.Sent) > 0)
{
//
// Schedule the close timeout to wait for the peer to close the connection.
@@ -1431,23 +1460,24 @@ var ConnectionI = Class({
//
//this._transceiver.shutdownWrite();
}
- },
- heartbeat: function()
+ }
+
+ sendHeartbeatNow()
{
Debug.assert(this._state === StateActive);
if(!this._endpoint.datagram())
{
- var os = new BasicStream(this._instance, Protocol.currentProtocolEncoding);
+ const os = new OutputStream(this._instance, Protocol.currentProtocolEncoding);
os.writeBlob(Protocol.magic);
- Protocol.currentProtocol.__write(os);
- Protocol.currentProtocolEncoding.__write(os);
+ Protocol.currentProtocol._write(os);
+ Protocol.currentProtocolEncoding._write(os);
os.writeByte(Protocol.validateConnectionMsg);
os.writeByte(0);
os.writeInt(Protocol.headerSize); // Message size.
try
{
- this.sendMessage(OutgoingMessage.createForStream(os, false, false));
+ this.sendMessage(OutgoingMessage.createForStream(os, false));
}
catch(ex)
{
@@ -1455,10 +1485,11 @@ var ConnectionI = Class({
Debug.assert(this._exception !== null);
}
}
- },
- initialize: function()
+ }
+
+ initialize()
{
- var s = this._transceiver.initialize(this._readStream.buffer, this._writeStream.buffer);
+ const s = this._transceiver.initialize(this._readStream.buffer, this._writeStream.buffer);
if(s != SocketOperation.None)
{
this.scheduleTimeout(s, this.connectTimeout());
@@ -1472,8 +1503,9 @@ var ConnectionI = Class({
this._initialized = true;
this.setState(StateNotValidated);
return true;
- },
- validate: function()
+ }
+
+ validate()
{
if(!this._endpoint.datagram()) // Datagram connections are always implicitly validated.
{
@@ -1482,8 +1514,8 @@ var ConnectionI = Class({
if(this._writeStream.size === 0)
{
this._writeStream.writeBlob(Protocol.magic);
- Protocol.currentProtocol.__write(this._writeStream);
- Protocol.currentProtocolEncoding.__write(this._writeStream);
+ Protocol.currentProtocol._write(this._writeStream);
+ Protocol.currentProtocolEncoding._write(this._writeStream);
this._writeStream.writeByte(Protocol.validateConnectionMsg);
this._writeStream.writeByte(0); // Compression status (always zero for validate connection).
this._writeStream.writeInt(Protocol.headerSize); // Message size.
@@ -1514,29 +1546,26 @@ var ConnectionI = Class({
Debug.assert(this._readStream.pos === Protocol.headerSize);
this._readStream.pos = 0;
- var m = this._readStream.readBlob(4);
+ const m = this._readStream.readBlob(4);
if(m[0] !== Protocol.magic[0] || m[1] !== Protocol.magic[1] ||
m[2] !== Protocol.magic[2] || m[3] !== Protocol.magic[3])
{
- var bme = new Ice.BadMagicException();
- bme.badMagic = m;
- throw bme;
+ throw new Ice.BadMagicException("", m);
}
- this._readProtocol.__read(this._readStream);
+ this._readProtocol._read(this._readStream);
Protocol.checkSupportedProtocol(this._readProtocol);
- this._readProtocolEncoding.__read(this._readStream);
+ this._readProtocolEncoding._read(this._readStream);
Protocol.checkSupportedProtocolEncoding(this._readProtocolEncoding);
- var messageType = this._readStream.readByte();
+ const messageType = this._readStream.readByte();
if(messageType !== Protocol.validateConnectionMsg)
{
throw new Ice.ConnectionNotValidatedException();
}
this._readStream.readByte(); // Ignore compression status for validate connection.
- var size = this._readStream.readInt();
- if(size !== Protocol.headerSize)
+ if( this._readStream.readInt() !== Protocol.headerSize)
{
throw new Ice.IllegalMessageSizeException();
}
@@ -1552,10 +1581,10 @@ var ConnectionI = Class({
this._readHeader = true;
this._readStream.pos = 0;
- var traceLevels = this._instance.traceLevels();
+ const traceLevels = this._instance.traceLevels();
if(traceLevels.network >= 1)
{
- var s = [];
+ let s = [];
if(this._endpoint.datagram())
{
s.push("starting to send ");
@@ -1565,7 +1594,6 @@ var ConnectionI = Class({
}
else
{
- s = [];
s.push("established ");
s.push(this._endpoint.protocol());
s.push(" connection\n");
@@ -1575,8 +1603,9 @@ var ConnectionI = Class({
}
return true;
- },
- sendNextMessage: function()
+ }
+
+ sendNextMessage()
{
if(this._sendStreams.length === 0)
{
@@ -1591,7 +1620,7 @@ var ConnectionI = Class({
//
// Notify the message that it was sent.
//
- var message = this._sendStreams.shift();
+ let message = this._sendStreams.shift();
this._writeStream.swap(message.stream);
message.sent();
@@ -1620,21 +1649,15 @@ var ConnectionI = Class({
//
message = this._sendStreams[0];
Debug.assert(!message.prepared);
- var stream = message.stream;
+ let stream = message.stream;
stream.pos = 10;
stream.writeInt(stream.size);
stream.prepareWrite();
message.prepared = true;
- if(message.outAsync !== null)
- {
- TraceUtil.trace("sending asynchronous request", stream, this._logger, this._traceLevels);
- }
- else
- {
- TraceUtil.traceSend(stream, this._logger, this._traceLevels);
- }
+ TraceUtil.traceSend(stream, this._logger, this._traceLevels);
+
this._writeStream.swap(message.stream);
//
@@ -1671,8 +1694,9 @@ var ConnectionI = Class({
{
this.scheduleTimeout(SocketOperation.Write, this.closeTimeout());
}
- },
- sendMessage: function(message)
+ }
+
+ sendMessage(message)
{
if(this._sendStreams.length > 0)
{
@@ -1684,22 +1708,15 @@ var ConnectionI = Class({
Debug.assert(!message.prepared);
- var stream = message.stream;
+ let stream = message.stream;
stream.pos = 10;
stream.writeInt(stream.size);
stream.prepareWrite();
message.prepared = true;
- if(message.outAsync)
- {
- TraceUtil.trace("sending asynchronous request", message.stream, this._logger, this._traceLevels);
- }
- else
- {
- TraceUtil.traceSend(message.stream, this._logger, this._traceLevels);
- }
+ TraceUtil.traceSend(stream, this._logger, this._traceLevels);
- if(this.write(message.stream.buffer))
+ if(this.write(stream.buffer))
{
//
// Entire buffer was written immediately.
@@ -1714,17 +1731,18 @@ var ConnectionI = Class({
}
message.doAdopt();
- this._writeStream.swap(message.stream);
+ this._writeStream.swap(stream);
this._sendStreams.push(message);
this.scheduleTimeout(SocketOperation.Write, this._endpoint.timeout());
return AsyncStatus.Queued;
- },
- parseMessage: function()
+ }
+
+ parseMessage()
{
Debug.assert(this._state > StateNotValidated && this._state < StateClosed);
- var info = new MessageInfo(this._instance);
+ let info = new MessageInfo(this._instance);
this._readStream.swap(info.stream);
this._readStream.resize(Protocol.headerSize);
@@ -1748,13 +1766,11 @@ var ConnectionI = Class({
// been done by the caller.
//
info.stream.pos = 8;
- var messageType = info.stream.readByte();
- info.compress = info.stream.readByte();
- if(info.compress === 2)
+ const messageType = info.stream.readByte();
+ const compress = info.stream.readByte();
+ if(compress === 2)
{
- var ex = new Ice.FeatureNotSupportedException();
- ex.unsupportedFeature = "Cannot uncompress compressed message";
- throw ex;
+ throw new Ice.FeatureNotSupportedException("Cannot uncompress compressed message");
}
info.stream.pos = Protocol.headerSize;
@@ -1782,9 +1798,9 @@ var ConnectionI = Class({
{
if(this._state === StateClosing)
{
- TraceUtil.trace("received request during closing\n" +
- "(ignored by server, client will retry)",
- info.stream, this._logger, this._traceLevels);
+ TraceUtil.traceIn("received request during closing\n" +
+ "(ignored by server, client will retry)",
+ info.stream, this._logger, this._traceLevels);
}
else
{
@@ -1802,9 +1818,9 @@ var ConnectionI = Class({
{
if(this._state === StateClosing)
{
- TraceUtil.trace("received batch request during closing\n" +
- "(ignored by server, client will retry)",
- info.stream, this._logger, this._traceLevels);
+ TraceUtil.traceIn("received batch request during closing\n" +
+ "(ignored by server, client will retry)",
+ info.stream, this._logger, this._traceLevels);
}
else
{
@@ -1843,9 +1859,9 @@ var ConnectionI = Class({
case Protocol.validateConnectionMsg:
{
TraceUtil.traceRecv(info.stream, this._logger, this._traceLevels);
- if(this._callback !== null)
+ if(this._heartbeatCallback !== null)
{
- info.heartbeatCallback = this._callback;
+ info.heartbeatCallback = this._heartbeatCallback;
++this._dispatchCount;
}
break;
@@ -1853,8 +1869,8 @@ var ConnectionI = Class({
default:
{
- TraceUtil.trace("received unknown message\n(invalid, closing connection)",
- info.stream, this._logger, this._traceLevels);
+ TraceUtil.traceIn("received unknown message\n(invalid, closing connection)",
+ info.stream, this._logger, this._traceLevels);
throw new Ice.UnknownMessageException();
}
}
@@ -1882,10 +1898,10 @@ var ConnectionI = Class({
}
return info;
- },
- invokeAll: function(stream, invokeNum, requestId, compress, servantManager, adapter)
+ }
+
+ invokeAll(stream, invokeNum, requestId, servantManager, adapter)
{
- var inc = null;
try
{
while(invokeNum > 0)
@@ -1893,8 +1909,10 @@ var ConnectionI = Class({
//
// Prepare the invocation.
//
- var response = !this._endpoint.datagram() && requestId !== 0;
- inc = new IncomingAsync(this._instance, this, adapter, response, compress, requestId);
+ let inc = new IncomingAsync(this._instance, this,
+ adapter,
+ !this._endpoint.datagram() && requestId !== 0, // response
+ requestId);
//
// Dispatch the invocation.
@@ -1902,7 +1920,6 @@ var ConnectionI = Class({
inc.invoke(servantManager, stream);
--invokeNum;
- inc = null;
}
stream.clear();
@@ -1918,27 +1935,28 @@ var ConnectionI = Class({
throw ex;
}
}
- },
- scheduleTimeout: function(op, timeout)
+ }
+
+ scheduleTimeout(op, timeout)
{
if(timeout < 0)
{
return;
}
- var self = this;
if((op & SocketOperation.Read) !== 0)
{
- this._readTimeoutId = this._timer.schedule(function() { self.timedOut(); }, timeout);
+ this._readTimeoutId = this._timer.schedule(() => this.timedOut(), timeout);
this._readTimeoutScheduled = true;
}
if((op & (SocketOperation.Write | SocketOperation.Connect)) !== 0)
{
- this._writeTimeoutId = this._timer.schedule(function() { self.timedOut(); }, timeout);
+ this._writeTimeoutId = this._timer.schedule(() => this.timedOut(), timeout);
this._writeTimeoutScheduled = true;
}
- },
- unscheduleTimeout: function(op)
+ }
+
+ unscheduleTimeout(op)
{
if((op & SocketOperation.Read) !== 0 && this._readTimeoutScheduled)
{
@@ -1950,10 +1968,11 @@ var ConnectionI = Class({
this._timer.cancel(this._writeTimeoutId);
this._writeTimeoutScheduled = false;
}
- },
- connectTimeout: function()
+ }
+
+ connectTimeout()
{
- var defaultsAndOverrides = this._instance.defaultsAndOverrides();
+ const defaultsAndOverrides = this._instance.defaultsAndOverrides();
if(defaultsAndOverrides.overrideConnectTimeout)
{
return defaultsAndOverrides.overrideConnectTimeoutValue;
@@ -1962,10 +1981,11 @@ var ConnectionI = Class({
{
return this._endpoint.timeout();
}
- },
- closeTimeout: function()
+ }
+
+ closeTimeout()
{
- var defaultsAndOverrides = this._instance.defaultsAndOverrides();
+ const defaultsAndOverrides = this._instance.defaultsAndOverrides();
if(defaultsAndOverrides.overrideCloseTimeout)
{
return defaultsAndOverrides.overrideCloseTimeoutValue;
@@ -1974,27 +1994,22 @@ var ConnectionI = Class({
{
return this._endpoint.timeout();
}
- },
- warning: function(msg, ex)
+ }
+
+ warning(msg, ex)
{
this._logger.warning(msg + ":\n" + this._desc + "\n" + ex.toString());
- },
- checkState: function()
+ }
+
+ checkState()
{
if(this._state < StateHolding || this._dispatchCount > 0)
{
return;
}
- var i;
- if(this._holdPromises.length > 0)
- {
- for(i = 0; i < this._holdPromises.length; ++i)
- {
- this._holdPromises[i].succeed();
- }
- this._holdPromises = [];
- }
+ this._holdPromises.forEach(p => p.resolve());
+ this._holdPromises = [];
//
// We aren't finished until the state is finished and all
@@ -2008,28 +2023,26 @@ var ConnectionI = Class({
// Clear the OA. See bug 1673 for the details of why this is necessary.
//
this._adapter = null;
-
- for(i = 0; i < this._finishedPromises.length; ++i)
- {
- this._finishedPromises[i].succeed();
- }
+ this._finishedPromises.forEach(p => p.resolve());
this._finishedPromises = [];
}
- },
- reap: function()
+ }
+
+ reap()
{
if(this._monitor !== null)
{
this._monitor.reap(this);
}
- },
- read: function(buf)
+ }
+
+ read(buf)
{
- var start = buf.position;
- var ret = this._transceiver.read(buf, this._hasMoreData);
+ const start = buf.position;
+ const ret = this._transceiver.read(buf, this._hasMoreData);
if(this._instance.traceLevels().network >= 3 && buf.position != start)
{
- var s = [];
+ let s = [];
s.push("received ");
if(this._endpoint.datagram())
{
@@ -2048,14 +2061,15 @@ var ConnectionI = Class({
this._instance.initializationData().logger.trace(this._instance.traceLevels().networkCat, s.join(""));
}
return ret;
- },
- write: function(buf)
+ }
+
+ write(buf)
{
- var start = buf.position;
- var ret = this._transceiver.write(buf);
+ const start = buf.position;
+ const ret = this._transceiver.write(buf);
if(this._instance.traceLevels().network >= 3 && buf.position != start)
{
- var s = [];
+ let s = [];
s.push("sent ");
s.push(buf.position - start);
if(!this._endpoint.datagram())
@@ -2071,7 +2085,7 @@ var ConnectionI = Class({
}
return ret;
}
-});
+}
// DestructionReason.
ConnectionI.ObjectAdapterDeactivated = 0;
@@ -2079,68 +2093,70 @@ ConnectionI.CommunicatorDestroyed = 1;
Ice.ConnectionI = ConnectionI;
-var OutgoingMessage = Class({
- __init__: function()
+class OutgoingMessage
+{
+ constructor()
{
this.stream = null;
this.outAsync = null;
- this.compress = false;
this.requestId = 0;
this.prepared = false;
- },
- canceled: function()
+ }
+
+ canceled()
{
Debug.assert(this.outAsync !== null);
this.outAsync = null;
- },
- doAdopt: function()
+ }
+
+ doAdopt()
{
if(this.adopt)
{
- var stream = new BasicStream(this.stream.instance, Protocol.currentProtocolEncoding);
+ const stream = new OutputStream(this.stream.instance, Protocol.currentProtocolEncoding);
stream.swap(this.stream);
this.stream = stream;
this.adopt = false;
}
- },
- sent: function()
+ }
+
+ sent()
{
if(this.outAsync !== null)
{
- this.outAsync.__sent();
+ this.outAsync.sent();
}
- },
- completed: function(ex)
+ }
+
+ completed(ex)
{
if(this.outAsync !== null)
{
- this.outAsync.__completedEx(ex);
+ this.outAsync.completedEx(ex);
}
}
-});
-OutgoingMessage.createForStream = function(stream, compress, adopt)
-{
- var m = new OutgoingMessage();
- m.stream = stream;
- m.compress = compress;
- m.adopt = adopt;
- m.isSent = false;
- m.requestId = 0;
- m.outAsync = null;
- return m;
-};
-
-OutgoingMessage.create = function(out, stream, compress, requestId)
-{
- var m = new OutgoingMessage();
- m.stream = stream;
- m.compress = compress;
- m.outAsync = out;
- m.requestId = requestId;
- m.isSent = false;
- m.adopt = false;
- return m;
-};
+ static createForStream(stream, adopt)
+ {
+ const m = new OutgoingMessage();
+ m.stream = stream;
+ m.adopt = adopt;
+ m.isSent = false;
+ m.requestId = 0;
+ m.outAsync = null;
+ return m;
+ }
+
+ static create(out, stream, requestId)
+ {
+ const m = new OutgoingMessage();
+ m.stream = stream;
+ m.outAsync = out;
+ m.requestId = requestId;
+ m.isSent = false;
+ m.adopt = false;
+ return m;
+ }
+}
module.exports.Ice = Ice;