summaryrefslogtreecommitdiff
path: root/js/src/Ice/ConnectionI.js
diff options
context:
space:
mode:
Diffstat (limited to 'js/src/Ice/ConnectionI.js')
-rw-r--r--js/src/Ice/ConnectionI.js3485
1 files changed, 1743 insertions, 1742 deletions
diff --git a/js/src/Ice/ConnectionI.js b/js/src/Ice/ConnectionI.js
index 96c5621a935..b4db9725c25 100644
--- a/js/src/Ice/ConnectionI.js
+++ b/js/src/Ice/ConnectionI.js
@@ -7,2207 +7,2208 @@
//
// **********************************************************************
-(function(global){
- require("Ice/Class");
- require("Ice/AsyncStatus");
- require("Ice/AsyncResultBase");
- require("Ice/BasicStream");
- require("Ice/OutgoingAsync");
- require("Ice/Debug");
- require("Ice/ExUtil");
- require("Ice/HashMap");
- require("Ice/IncomingAsync");
- require("Ice/RetryException");
- require("Ice/Promise");
- require("Ice/Protocol");
- require("Ice/SocketOperation");
- require("Ice/Timer");
- require("Ice/TraceUtil");
- require("Ice/Version");
- require("Ice/Exception");
- require("Ice/LocalException");
-
- var Ice = global.Ice || {};
-
- var AsyncStatus = Ice.AsyncStatus;
- var AsyncResultBase = Ice.AsyncResultBase;
- var BasicStream = Ice.BasicStream;
- var ConnectionBatchOutgoingAsync = Ice.ConnectionBatchOutgoingAsync;
- var Debug = Ice.Debug;
- var ExUtil = Ice.ExUtil;
- var HashMap = Ice.HashMap;
- var IncomingAsync = Ice.IncomingAsync;
- var RetryException = Ice.RetryException;
- var Promise = Ice.Promise;
- var Protocol = Ice.Protocol;
- var SocketOperation = Ice.SocketOperation;
- var Timer = Ice.Timer;
- var TraceUtil = Ice.TraceUtil;
- var ProtocolVersion = Ice.ProtocolVersion;
- var EncodingVersion = Ice.EncodingVersion;
- var ACM = Ice.ACM;
- var ACMClose = Ice.ACMClose;
- var ACMHeartbeat = Ice.ACMHeartbeat;
-
- var StateNotInitialized = 0;
- var StateNotValidated = 1;
- var StateActive = 2;
- var StateHolding = 3;
- var StateClosing = 4;
- var StateClosed = 5;
- var StateFinished = 6;
-
- var MessageInfo = function(instance)
+var Ice = require("../Ice/ModuleRegistry").Ice;
+Ice.__M.require(module, "Ice",
+ [
+ "../Ice/Class",
+ "../Ice/AsyncStatus",
+ "../Ice/AsyncResultBase",
+ "../Ice/BasicStream",
+ "../Ice/OutgoingAsync",
+ "../Ice/Debug",
+ "../Ice/ExUtil",
+ "../Ice/HashMap",
+ "../Ice/IncomingAsync",
+ "../Ice/RetryException",
+ "../Ice/Promise",
+ "../Ice/Protocol",
+ "../Ice/SocketOperation",
+ "../Ice/Timer",
+ "../Ice/TraceUtil",
+ "../Ice/Version",
+ "../Ice/Exception",
+ "../Ice/LocalException"
+ ]);
+
+var AsyncStatus = Ice.AsyncStatus;
+var AsyncResultBase = Ice.AsyncResultBase;
+var BasicStream = Ice.BasicStream;
+var ConnectionBatchOutgoingAsync = Ice.ConnectionBatchOutgoingAsync;
+var Debug = Ice.Debug;
+var ExUtil = Ice.ExUtil;
+var HashMap = Ice.HashMap;
+var IncomingAsync = Ice.IncomingAsync;
+var RetryException = Ice.RetryException;
+var Promise = Ice.Promise;
+var Protocol = Ice.Protocol;
+var SocketOperation = Ice.SocketOperation;
+var Timer = Ice.Timer;
+var TraceUtil = Ice.TraceUtil;
+var ProtocolVersion = Ice.ProtocolVersion;
+var EncodingVersion = Ice.EncodingVersion;
+var ACM = Ice.ACM;
+var ACMClose = Ice.ACMClose;
+var ACMHeartbeat = Ice.ACMHeartbeat;
+
+var StateNotInitialized = 0;
+var StateNotValidated = 1;
+var StateActive = 2;
+var StateHolding = 3;
+var StateClosing = 4;
+var StateClosed = 5;
+var StateFinished = 6;
+
+var MessageInfo = function(instance)
+{
+ this.stream = new BasicStream(instance, Protocol.currentProtocolEncoding, false);
+
+ this.invokeNum = 0;
+ this.requestId = 0;
+ this.compress = false;
+ this.servantManager = null;
+ this.adapter = null;
+ this.outAsync = null;
+ this.heartbeatCallback = null;
+};
+
+var Class = Ice.Class;
+
+var ConnectionI = Class({
+ __init__: function(communicator, instance, monitor, transceiver, endpoint, incoming, adapter)
{
- this.stream = new BasicStream(instance, Protocol.currentProtocolEncoding, false);
-
- this.invokeNum = 0;
- this.requestId = 0;
- this.compress = false;
- this.servantManager = null;
- this.adapter = null;
- this.outAsync = null;
- this.heartbeatCallback = null;
- };
-
- var Class = Ice.Class;
-
- var ConnectionI = Class({
- __init__: function(communicator, instance, monitor, transceiver, endpoint, incoming, adapter)
- {
- this._communicator = communicator;
- this._instance = instance;
- this._monitor = monitor;
- this._transceiver = transceiver;
- this._desc = transceiver.toString();
- this._type = transceiver.type();
- this._endpoint = endpoint;
- this._incoming = incoming;
- this._adapter = adapter;
- var initData = instance.initializationData();
- this._logger = initData.logger; // Cached for better performance.
- this._traceLevels = instance.traceLevels(); // Cached for better performance.
- this._timer = instance.timer();
- this._writeTimeoutId = 0;
- this._writeTimeoutScheduled = false;
- this._readTimeoutId = 0;
- this._readTimeoutScheduled = false;
-
- this._hasMoreData = { value: false };
-
- this._warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0;
- this._warnUdp = instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;
- this._acmLastActivity = this._monitor != null && this._monitor.getACM().timeout > 0 ? Date.now() : -1;
- this._nextRequestId = 1;
- this._batchAutoFlush =
- initData.properties.getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0 ? true : false;
- this._batchStream = new BasicStream(instance, Protocol.currentProtocolEncoding, this._batchAutoFlush);
- this._batchStreamInUse = false;
- this._batchRequestNum = 0;
- this._batchRequestCompress = false;
- this._batchMarker = 0;
-
- this._sendStreams = [];
-
- this._readStream = new BasicStream(instance, Protocol.currentProtocolEncoding);
- this._readHeader = false;
- this._writeStream = new BasicStream(instance, Protocol.currentProtocolEncoding);
-
- this._readStreamPos = -1;
- this._writeStreamPos = -1;
-
- this._dispatchCount = 0;
-
- this._state = StateNotInitialized;
- this._shutdownInitiated = false;
- this._validated = false;
-
- this._readProtocol = new ProtocolVersion();
- this._readProtocolEncoding = new EncodingVersion();
-
- this._asyncRequests = new HashMap(); // Map<int, OutgoingAsync>
-
- this._exception = null;
-
- this._startPromise = null;
- this._closePromises = [];
- this._holdPromises = [];
- this._finishedPromises = [];
+ this._communicator = communicator;
+ this._instance = instance;
+ this._monitor = monitor;
+ this._transceiver = transceiver;
+ this._desc = transceiver.toString();
+ this._type = transceiver.type();
+ this._endpoint = endpoint;
+ this._incoming = incoming;
+ this._adapter = adapter;
+ var initData = instance.initializationData();
+ this._logger = initData.logger; // Cached for better performance.
+ this._traceLevels = instance.traceLevels(); // Cached for better performance.
+ this._timer = instance.timer();
+ this._writeTimeoutId = 0;
+ this._writeTimeoutScheduled = false;
+ this._readTimeoutId = 0;
+ this._readTimeoutScheduled = false;
+
+ this._hasMoreData = { value: false };
+
+ this._warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0;
+ this._warnUdp = instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;
+ this._acmLastActivity = this._monitor !== null && this._monitor.getACM().timeout > 0 ? Date.now() : -1;
+ this._nextRequestId = 1;
+ this._batchAutoFlush =
+ initData.properties.getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0 ? true : false;
+ this._batchStream = new BasicStream(instance, Protocol.currentProtocolEncoding, this._batchAutoFlush);
+ this._batchStreamInUse = false;
+ this._batchRequestNum = 0;
+ this._batchRequestCompress = false;
+ this._batchMarker = 0;
+
+ this._sendStreams = [];
+
+ this._readStream = new BasicStream(instance, Protocol.currentProtocolEncoding);
+ this._readHeader = false;
+ this._writeStream = new BasicStream(instance, Protocol.currentProtocolEncoding);
+
+ this._readStreamPos = -1;
+ this._writeStreamPos = -1;
+
+ this._dispatchCount = 0;
+
+ this._state = StateNotInitialized;
+ this._shutdownInitiated = false;
+ this._validated = false;
+
+ this._readProtocol = new ProtocolVersion();
+ this._readProtocolEncoding = new EncodingVersion();
+
+ this._asyncRequests = new HashMap(); // Map<int, OutgoingAsync>
+
+ this._exception = null;
+
+ this._startPromise = null;
+ this._closePromises = [];
+ this._holdPromises = [];
+ this._finishedPromises = [];
+
+ if(this._adapter !== null)
+ {
+ this._servantManager = this._adapter.getServantManager();
+ }
+ else
+ {
+ this._servantManager = null;
+ }
+ this._callback = null;
+ },
+ start: function()
+ {
+ Debug.assert(this._startPromise === null);
- if(this._adapter !== null)
+ try
+ {
+ // The connection might already be closed if the communicator was destroyed.
+ if(this._state >= StateClosed)
{
- this._servantManager = this._adapter.getServantManager();
+ Debug.assert(this._exception !== null);
+ return new Promise().fail(this._exception);
}
- else
+
+ this._startPromise = new Promise();
+ var self = this;
+ this._transceiver.setCallbacks(
+ function() { self.message(SocketOperation.Write); }, // connected callback
+ function() { self.message(SocketOperation.Read); }, // read callback
+ function() { self.message(SocketOperation.Write); } // write callback
+ );
+ this.initialize();
+ }
+ catch(ex)
+ {
+ if(ex instanceof Ice.LocalException)
{
- this._servantManager = null;
+ this.exception(ex);
}
- },
- start: function()
+ return new Promise().fail(ex);
+ }
+
+ return this._startPromise;
+ },
+ activate: function()
+ {
+ if(this._state <= StateNotValidated)
+ {
+ return;
+ }
+
+ if(this._acmLastActivity > 0)
{
- Debug.assert(this._startPromise === null);
+ this._acmLastActivity = Date.now();
+ }
+ this.setState(StateActive);
+ },
+ hold: function()
+ {
+ if(this._state <= StateNotValidated)
+ {
+ return;
+ }
- try
+ this.setState(StateHolding);
+ },
+ destroy: function(reason)
+ {
+ switch(reason)
+ {
+ case ConnectionI.ObjectAdapterDeactivated:
{
- // The connection might already be closed if the communicator was destroyed.
- if(this._state >= StateClosed)
- {
- Debug.assert(this._exception !== null);
- return new Promise().fail(this._exception);
- }
-
- this._startPromise = new Promise();
- var self = this;
- this._transceiver.setCallbacks(
- function() { self.message(SocketOperation.Write); }, // connected callback
- function() { self.message(SocketOperation.Read); }, // read callback
- function() { self.message(SocketOperation.Write); } // write callback
- );
- this.initialize();
+ this.setState(StateClosing, new Ice.ObjectAdapterDeactivatedException());
+ break;
}
- catch(ex)
+
+ case ConnectionI.CommunicatorDestroyed:
{
- if(ex instanceof Ice.LocalException)
- {
- this.exception(ex);
- }
- return new Promise().fail(ex);
+ this.setState(StateClosing, new Ice.CommunicatorDestroyedException());
+ break;
}
+ }
+ },
+ close: function(force)
+ {
+ var __r = new AsyncResultBase(this._communicator, "close", this, null, null);
- return this._startPromise;
- },
- activate: function()
+ if(force)
{
- if(this._state <= StateNotValidated)
- {
- return;
- }
-
- if(this._acmLastActivity > 0)
- {
- this._acmLastActivity = Date.now();
- }
- this.setState(StateActive);
- },
- hold: function()
+ this.setState(StateClosed, new Ice.ForcedCloseConnectionException());
+ __r.succeed(__r);
+ }
+ else
{
- if(this._state <= StateNotValidated)
+ //
+ // If we do a graceful shutdown, then we wait until all
+ // outstanding requests have been completed. Otherwise,
+ // the CloseConnectionException will cause all outstanding
+ // requests to be retried, regardless of whether the
+ // server has processed them or not.
+ //
+ this._closePromises.push(__r);
+ this.checkClose();
+ }
+
+ return __r;
+ },
+ checkClose: function()
+ {
+ //
+ // If close(false) has been called, then we need to check if all
+ // requests have completed and we can transition to StateClosing.
+ // We also complete outstanding promises.
+ //
+ if(this._asyncRequests.size === 0 && this._closePromises.length > 0)
+ {
+ this.setState(StateClosing, new Ice.CloseConnectionException());
+ for(var i = 0; i < this._closePromises.length; ++i)
{
- return;
+ this._closePromises[i].succeed(this._closePromises[i]);
}
+ this._closePromises = [];
+ }
+ },
+ isActiveOrHolding: function()
+ {
+ return this._state > StateNotValidated && this._state < StateClosing;
+ },
+ isFinished: function()
+ {
+ if(this._state !== StateFinished || this._dispatchCount !== 0)
+ {
+ return false;
+ }
- this.setState(StateHolding);
- },
- destroy: function(reason)
+ Debug.assert(this._state === StateFinished);
+ return true;
+ },
+ throwException: function()
+ {
+ if(this._exception !== null)
{
- switch(reason)
- {
- case ConnectionI.ObjectAdapterDeactivated:
- {
- this.setState(StateClosing, new Ice.ObjectAdapterDeactivatedException());
- break;
- }
+ Debug.assert(this._state >= StateClosing);
+ throw this._exception;
+ }
+ },
+ waitUntilHolding: function()
+ {
+ var promise = new Promise();
+ this._holdPromises.push(promise);
+ this.checkState();
+ return promise;
+ },
+ waitUntilFinished: function()
+ {
+ var promise = new Promise();
+ this._finishedPromises.push(promise);
+ this.checkState();
+ return promise;
+ },
+ monitor: function(now, acm)
+ {
+ if(this._state !== StateActive)
+ {
+ return;
+ }
- case ConnectionI.CommunicatorDestroyed:
- {
- this.setState(StateClosing, new Ice.CommunicatorDestroyedException());
- break;
- }
+ if(this._readStream.size > Protocol.headerSize || !this._writeStream.isEmpty())
+ {
+ //
+ // If writing or reading, nothing to do, the connection
+ // timeout will kick-in if writes or reads don't progress.
+ // This check is necessary because the actitivy timer is
+ // only set when a message is fully read/written.
+ //
+ return;
+ }
+
+ //
+ // We send a heartbeat if there was no activity in the last
+ // (timeout / 4) period. Sending a heartbeat sooner than
+ // really needed is safer to ensure that the receiver will
+ // receive in time the heartbeat. Sending the heartbeat if
+ // there was no activity in the last (timeout / 2) period
+ // isn't enough since monitor() is called only every (timeout
+ // / 2) period.
+ //
+ // Note that this doesn't imply that we are sending 4 heartbeats
+ // per timeout period because the monitor() method is sill only
+ // called every (timeout / 2) period.
+ //
+
+ if(acm.heartbeat == Ice.ACMHeartbeat.HeartbeatAlways ||
+ (acm.heartbeat != Ice.ACMHeartbeat.HeartbeatOff && now >= (this._acmLastActivity + acm.timeout / 4)))
+ {
+ if(acm.heartbeat != Ice.ACMHeartbeat.HeartbeatOnInvocation || this._dispatchCount > 0)
+ {
+ this.heartbeat(); // Send heartbeat if idle in the last timeout / 2 period.
}
- },
- close: function(force)
+ }
+
+ if(acm.close != Ice.ACMClose.CloseOff && now >= (this._acmLastActivity + acm.timeout))
{
- var __r = new AsyncResultBase(this._communicator, "close", this, null, null);
-
- if(force)
+ if(acm.close == Ice.ACMClose.CloseOnIdleForceful ||
+ (acm.close != Ice.ACMClose.CloseOnIdle && this._asyncRequests.size > 0))
{
- this.setState(StateClosed, new Ice.ForcedCloseConnectionException());
- __r.succeed(__r);
+ //
+ // Close the connection if we didn't receive a heartbeat in
+ // the last period.
+ //
+ this.setState(StateClosed, new Ice.ConnectionTimeoutException());
}
- else
+ else if(acm.close != Ice.ACMClose.CloseOnInvocation &&
+ this._dispatchCount === 0 && this._batchStream.isEmpty() && this._asyncRequests.size === 0)
{
//
- // If we do a graceful shutdown, then we wait until all
- // outstanding requests have been completed. Otherwise,
- // the CloseConnectionException will cause all outstanding
- // requests to be retried, regardless of whether the
- // server has processed them or not.
+ // The connection is idle, close it.
//
- this._closePromises.push(__r);
- this.checkClose();
+ this.setState(StateClosing, new Ice.ConnectionTimeoutException());
}
+ }
+ },
+ sendAsyncRequest: function(out, compress, response)
+ {
+ var requestId = 0;
+ var os = out.__os();
- return __r;
- },
- checkClose: function()
+ if(this._exception !== null)
{
//
- // If close(false) has been called, then we need to check if all
- // requests have completed and we can transition to StateClosing.
- // We also complete outstanding promises.
+ // If the connection is closed before we even have a chance
+ // to send our request, we always try to send the request
+ // again.
//
- if(this._asyncRequests.size === 0 && this._closePromises.length > 0)
- {
- this.setState(StateClosing, new Ice.CloseConnectionException());
- for(var i = 0; i < this._closePromises.length; ++i)
- {
- this._closePromises[i].succeed(this._closePromises[i]);
- }
- this._closePromises = [];
- }
- },
- isActiveOrHolding: function()
- {
- return this._state > StateNotValidated && this._state < StateClosing;
- },
- isFinished: function()
+ throw new RetryException(this._exception);
+ }
+
+ Debug.assert(this._state > StateNotValidated);
+ Debug.assert(this._state < StateClosing);
+
+ //
+ // Ensure the message isn't bigger than what we can send with the
+ // transport.
+ //
+ this._transceiver.checkSendSize(os, this._instance.messageSizeMax());
+
+ if(response)
{
- if(this._state !== StateFinished || this._dispatchCount !== 0)
+ //
+ // Create a new unique request ID.
+ //
+ requestId = this._nextRequestId++;
+ if(requestId <= 0)
{
- return false;
+ this._nextRequestId = 1;
+ requestId = this._nextRequestId++;
}
- Debug.assert(this._state === StateFinished);
- return true;
- },
- throwException: function()
+ //
+ // Fill in the request ID.
+ //
+ os.pos = Protocol.headerSize;
+ os.writeInt(requestId);
+ }
+
+ var status;
+ try
{
- if(this._exception !== null)
+ status = this.sendMessage(OutgoingMessage.create(out, out.__os(), compress, requestId));
+ }
+ catch(ex)
+ {
+ if(ex instanceof Ice.LocalException)
{
- Debug.assert(this._state >= StateClosing);
+ this.setState(StateClosed, ex);
+ Debug.assert(this._exception !== null);
throw this._exception;
}
- },
- waitUntilHolding: function()
- {
- var promise = new Promise();
- this._holdPromises.push(promise);
- this.checkState();
- return promise;
- },
- waitUntilFinished: function()
- {
- var promise = new Promise();
- this._finishedPromises.push(promise);
- this.checkState();
- return promise;
- },
- monitor: function(now, acm)
- {
- if(this._state !== StateActive)
+ else
{
- return;
+ throw ex;
}
+ }
- if(this._readStream.size > Protocol.headerSize || !this._writeStream.isEmpty())
- {
- //
- // If writing or reading, nothing to do, the connection
- // timeout will kick-in if writes or reads don't progress.
- // This check is necessary because the actitivy timer is
- // only set when a message is fully read/written.
- //
- return;
- }
-
- //
- // We send a heartbeat if there was no activity in the last
- // (timeout / 4) period. Sending a heartbeat sooner than
- // really needed is safer to ensure that the receiver will
- // receive in time the heartbeat. Sending the heartbeat if
- // there was no activity in the last (timeout / 2) period
- // isn't enough since monitor() is called only every (timeout
- // / 2) period.
+ if(response)
+ {
//
- // Note that this doesn't imply that we are sending 4 heartbeats
- // per timeout period because the monitor() method is sill only
- // called every (timeout / 2) period.
+ // Add to the async requests map.
//
+ this._asyncRequests.set(requestId, out);
+ }
- if(acm.heartbeat == Ice.ACMHeartbeat.HeartbeatAlways ||
- (acm.heartbeat != Ice.ACMHeartbeat.HeartbeatOff && now >= (this._acmLastActivity + acm.timeout / 4)))
- {
- if(acm.heartbeat != Ice.ACMHeartbeat.HeartbeatOnInvocation || this._dispatchCount > 0)
- {
- this.heartbeat(); // Send heartbeat if idle in the last timeout / 2 period.
- }
- }
-
- if(acm.close != Ice.ACMClose.CloseOff && now >= (this._acmLastActivity + acm.timeout))
- {
- if(acm.close == Ice.ACMClose.CloseOnIdleForceful ||
- (acm.close != Ice.ACMClose.CloseOnIdle && this._asyncRequests.size > 0))
- {
- //
- // Close the connection if we didn't receive a heartbeat in
- // the last period.
- //
- this.setState(StateClosed, new Ice.ConnectionTimeoutException());
- }
- else if(acm.close != Ice.ACMClose.CloseOnInvocation &&
- this._dispatchCount == 0 && this._batchStream.isEmpty() && this._asyncRequests.size == 0)
- {
- //
- // The connection is idle, close it.
- //
- this.setState(StateClosing, new Ice.ConnectionTimeoutException());
- }
- }
- },
- sendAsyncRequest: function(out, compress, response)
+ return status;
+ },
+ prepareBatchRequest: function(os)
+ {
+ if(this._exception !== null)
{
- var requestId = 0;
- var os = out.__os();
-
- if(this._exception !== null)
+ //
+ // If there were no batch requests queued when the connection failed, we can safely
+ // retry with a new connection. Otherwise, we must throw to notify the caller that
+ // some previous batch requests were not sent.
+ //
+ if(this._batchStream.isEmpty())
{
- //
- // If the connection is closed before we even have a chance
- // to send our request, we always try to send the request
- // again.
- //
throw new RetryException(this._exception);
}
-
- Debug.assert(this._state > StateNotValidated);
- Debug.assert(this._state < StateClosing);
-
- //
- // Ensure the message isn't bigger than what we can send with the
- // transport.
- //
- this._transceiver.checkSendSize(os, this._instance.messageSizeMax());
-
- if(response)
+ else
{
- //
- // Create a new unique request ID.
- //
- requestId = this._nextRequestId++;
- if(requestId <= 0)
- {
- this._nextRequestId = 1;
- requestId = this._nextRequestId++;
- }
-
- //
- // Fill in the request ID.
- //
- os.pos = Protocol.headerSize;
- os.writeInt(requestId);
+ throw this._exception;
}
+ }
- var status;
+ Debug.assert(this._state > StateNotValidated);
+ Debug.assert(this._state < StateClosing);
+
+ if(this._batchStream.isEmpty())
+ {
try
{
- status = this.sendMessage(OutgoingMessage.create(out, out.__os(), compress, requestId));
+ this._batchStream.writeBlob(Protocol.requestBatchHdr);
}
catch(ex)
{
if(ex instanceof Ice.LocalException)
{
this.setState(StateClosed, ex);
- Debug.assert(this._exception !== null);
- throw this._exception;
- }
- else
- {
- throw ex;
}
+ throw ex;
}
+ }
- if(response)
- {
- //
- // Add to the async requests map.
- //
- this._asyncRequests.set(requestId, out);
- }
+ this._batchStreamInUse = true;
+ this._batchMarker = this._batchStream.size;
+ this._batchStream.swap(os);
- return status;
- },
- prepareBatchRequest: function(os)
+ //
+ // The batch stream now belongs to the caller, until
+ // finishBatchRequest() or abortBatchRequest() is called.
+ //
+ },
+ finishBatchRequest: function(os, compress)
+ {
+ try
{
+ //
+ // Get the batch stream back.
+ //
+ this._batchStream.swap(os);
+
if(this._exception !== null)
{
- //
- // If there were no batch requests queued when the connection failed, we can safely
- // retry with a new connection. Otherwise, we must throw to notify the caller that
- // some previous batch requests were not sent.
- //
- if(this._batchStream.isEmpty())
- {
- throw new RetryException(this._exception);
- }
- else
- {
- throw this._exception;
- }
+ return;
}
- Debug.assert(this._state > StateNotValidated);
- Debug.assert(this._state < StateClosing);
-
- if(this._batchStream.isEmpty())
+ var flush = false;
+ if(this._batchAutoFlush)
{
+ //
+ // Throw memory limit exception if the first message added causes us to go over
+ // limit. Otherwise put aside the marshalled message that caused limit to be
+ // exceeded and rollback stream to the marker.
try
{
- this._batchStream.writeBlob(Protocol.requestBatchHdr);
+ this._transceiver.checkSendSize(this._batchStream.buffer, this._instance.messageSizeMax());
}
catch(ex)
{
if(ex instanceof Ice.LocalException)
{
- this.setState(StateClosed, ex);
- }
- throw ex;
- }
- }
-
- this._batchStreamInUse = true;
- this._batchMarker = this._batchStream.size;
- this._batchStream.swap(os);
-
- //
- // The batch stream now belongs to the caller, until
- // finishBatchRequest() or abortBatchRequest() is called.
- //
- },
- finishBatchRequest: function(os, compress)
- {
- try
- {
- //
- // Get the batch stream back.
- //
- this._batchStream.swap(os);
-
- if(this._exception !== null)
- {
- return;
- }
-
- var flush = false;
- if(this._batchAutoFlush)
- {
- //
- // Throw memory limit exception if the first message added causes us to go over
- // limit. Otherwise put aside the marshalled message that caused limit to be
- // exceeded and rollback stream to the marker.
- try
- {
- this._transceiver.checkSendSize(this._batchStream.buffer, this._instance.messageSizeMax());
- }
- catch(ex)
- {
- if(ex instanceof Ice.LocalException)
+ if(this._batchRequestNum > 0)
{
- if(this._batchRequestNum > 0)
- {
- flush = true;
- }
- else
- {
- throw ex;
- }
+ flush = true;
}
else
{
throw ex;
}
}
+ else
+ {
+ throw ex;
+ }
}
+ }
+
+ if(flush)
+ {
+ //
+ // Temporarily save the last request.
+ //
+ var sz = this._batchStream.size - this._batchMarker;
+ this._batchStream.pos = this._batchMarker;
+ var lastRequest = this._batchStream.readBlob(sz);
+ this._batchStream.resize(this._batchMarker, false);
- if(flush)
+ try
{
//
- // Temporarily save the last request.
+ // Fill in the number of requests in the batch.
//
- var sz = this._batchStream.size - this._batchMarker;
- this._batchStream.pos = this._batchMarker;
- var lastRequest = this._batchStream.readBlob(sz);
- this._batchStream.resize(this._batchMarker, false);
-
- try
- {
- //
- // Fill in the number of requests in the batch.
- //
- this._batchStream.pos = Protocol.headerSize;
- this._batchStream.writeInt(this._batchRequestNum);
+ this._batchStream.pos = Protocol.headerSize;
+ this._batchStream.writeInt(this._batchRequestNum);
- this.sendMessage(OutgoingMessage.createForStream(this._batchStream, this._batchRequestCompress,
- true));
- }
- catch(ex)
+ this.sendMessage(OutgoingMessage.createForStream(this._batchStream, this._batchRequestCompress,
+ true));
+ }
+ catch(ex)
+ {
+ if(ex instanceof Ice.LocalException)
{
- if(ex instanceof Ice.LocalException)
- {
- this.setState(StateClosed, ex);
- Debug.assert(this._exception !== null);
- throw this._exception;
- }
- else
- {
- throw ex;
- }
+ this.setState(StateClosed, ex);
+ Debug.assert(this._exception !== null);
+ throw this._exception;
}
-
- //
- // Reset the batch stream.
- //
- this._batchStream =
- new BasicStream(this._instance, Protocol.currentProtocolEncoding, this._batchAutoFlush);
- this._batchRequestNum = 0;
- this._batchRequestCompress = false;
- this._batchMarker = 0;
-
- //
- // Check again if the last request doesn't exceed the maximum message size.
- //
- if(Protocol.requestBatchHdr.length + lastRequest.length > this._instance.messageSizeMax())
+ else
{
- ExUtil.throwMemoryLimitException(
- Protocol.requestBatchHdr.length + lastRequest.length,
- this._instance.messageSizeMax());
+ throw ex;
}
-
- //
- // Start a new batch with the last message that caused us to go over the limit.
- //
- this._batchStream.writeBlob(Protocol.requestBatchHdr);
- this._batchStream.writeBlob(lastRequest);
}
//
- // Increment the number of requests in the batch.
+ // Reset the batch stream.
//
- ++this._batchRequestNum;
+ this._batchStream =
+ new BasicStream(this._instance, Protocol.currentProtocolEncoding, this._batchAutoFlush);
+ this._batchRequestNum = 0;
+ this._batchRequestCompress = false;
+ this._batchMarker = 0;
//
- // We compress the whole batch if there is at least one compressed
- // message.
+ // Check again if the last request doesn't exceed the maximum message size.
//
- if(compress)
+ if(Protocol.requestBatchHdr.length + lastRequest.length > this._instance.messageSizeMax())
{
- this._batchRequestCompress = true;
+ ExUtil.throwMemoryLimitException(
+ Protocol.requestBatchHdr.length + lastRequest.length,
+ this._instance.messageSizeMax());
}
//
- // The batch stream is not in use anymore.
+ // Start a new batch with the last message that caused us to go over the limit.
//
- Debug.assert(this._batchStreamInUse);
- this._batchStreamInUse = false;
+ this._batchStream.writeBlob(Protocol.requestBatchHdr);
+ this._batchStream.writeBlob(lastRequest);
}
- catch(ex)
+
+ //
+ // Increment the number of requests in the batch.
+ //
+ ++this._batchRequestNum;
+
+ //
+ // We compress the whole batch if there is at least one compressed
+ // message.
+ //
+ if(compress)
{
- if(ex instanceof Ice.LocalException)
- {
- this.abortBatchRequest();
- }
- throw ex;
+ this._batchRequestCompress = true;
}
- },
- abortBatchRequest: function()
- {
- this._batchStream = new BasicStream(this._instance, Protocol.currentProtocolEncoding, this._batchAutoFlush);
- this._batchRequestNum = 0;
- this._batchRequestCompress = false;
- this._batchMarker = 0;
+ //
+ // The batch stream is not in use anymore.
+ //
Debug.assert(this._batchStreamInUse);
this._batchStreamInUse = false;
- },
- flushBatchRequests: function()
+ }
+ catch(ex)
{
- var result = new ConnectionBatchOutgoingAsync(this, this._communicator, "flushBatchRequests");
- try
+ if(ex instanceof Ice.LocalException)
{
- result.__invoke();
+ this.abortBatchRequest();
}
- catch(ex)
- {
- result.__invokeException(ex);
- }
- return result;
- },
- flushAsyncBatchRequests: function(outAsync)
+ throw ex;
+ }
+ },
+ abortBatchRequest: function()
+ {
+ this._batchStream = new BasicStream(this._instance, Protocol.currentProtocolEncoding, this._batchAutoFlush);
+ this._batchRequestNum = 0;
+ this._batchRequestCompress = false;
+ this._batchMarker = 0;
+
+ Debug.assert(this._batchStreamInUse);
+ this._batchStreamInUse = false;
+ },
+ flushBatchRequests: function()
+ {
+ var result = new ConnectionBatchOutgoingAsync(this, this._communicator, "flushBatchRequests");
+ try
{
- if(this._exception !== null)
- {
- throw this._exception;
- }
+ result.__invoke();
+ }
+ catch(ex)
+ {
+ result.__invokeException(ex);
+ }
+ return result;
+ },
+ flushAsyncBatchRequests: function(outAsync)
+ {
+ if(this._exception !== null)
+ {
+ throw this._exception;
+ }
- var status;
- if(this._batchRequestNum === 0)
- {
- outAsync.__sent();
- return AsyncStatus.Sent;
- }
+ var status;
+ if(this._batchRequestNum === 0)
+ {
+ outAsync.__sent();
+ return AsyncStatus.Sent;
+ }
- //
- // Fill in the number of requests in the batch.
- //
- this._batchStream.pos = Protocol.headerSize;
- this._batchStream.writeInt(this._batchRequestNum);
+ //
+ // Fill in the number of requests in the batch.
+ //
+ this._batchStream.pos = Protocol.headerSize;
+ this._batchStream.writeInt(this._batchRequestNum);
- this._batchStream.swap(outAsync.__os());
+ this._batchStream.swap(outAsync.__os());
- try
+ try
+ {
+ status = this.sendMessage(OutgoingMessage.create(outAsync, outAsync.__os(), this._batchRequestCompress,
+ 0));
+ }
+ catch(ex)
+ {
+ if(ex instanceof Ice.LocalException)
{
- status = this.sendMessage(OutgoingMessage.create(outAsync, outAsync.__os(), this._batchRequestCompress,
- 0));
+ this.setState(StateClosed, ex);
+ Debug.assert(this._exception !== null);
+ throw this._exception;
}
- catch(ex)
+ else
{
- if(ex instanceof Ice.LocalException)
- {
- this.setState(StateClosed, ex);
- Debug.assert(this._exception !== null);
- throw this._exception;
- }
- else
- {
- throw ex;
- }
+ throw ex;
}
+ }
- //
- // Reset the batch stream.
- //
- this._batchStream = new BasicStream(this._instance, Protocol.currentProtocolEncoding, this._batchAutoFlush);
- this._batchRequestNum = 0;
- this._batchRequestCompress = false;
- this._batchMarker = 0;
- return status;
- },
- setCallback: function(callback)
+ //
+ // Reset the batch stream.
+ //
+ this._batchStream = new BasicStream(this._instance, Protocol.currentProtocolEncoding, this._batchAutoFlush);
+ this._batchRequestNum = 0;
+ this._batchRequestCompress = false;
+ this._batchMarker = 0;
+ return status;
+ },
+ setCallback: function(callback)
+ {
+ if(this._state > StateClosing)
+ {
+ return;
+ }
+ this._callback = callback;
+ },
+ setACM: function(timeout, close, heartbeat)
+ {
+ if(this._monitor !== null)
{
- if(this._state > StateClosing)
+ if(this._state == StateActive)
{
- return;
+ this._monitor.remove(this);
+ }
+ this._monitor = this._monitor.acm(timeout, close, heartbeat);
+ if(this._state == StateActive)
+ {
+ this._monitor.add(this);
+ }
+ if(this._monitor.getACM().timeout <= 0)
+ {
+ this._acmLastActivity = -1; // Disable the recording of last activity.
}
- this._callback = callback;
- },
- setACM: function(timeout, close, heartbeat)
+ else if(this._state == StateActive && this._acmLastActivity == -1)
+ {
+ this._acmLastActivity = Date.now();
+ }
+ }
+ },
+ getACM: function()
+ {
+ return this._monitor !== null ? this._monitor.getACM() :
+ new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff);
+ },
+ asyncRequestTimedOut: function(outAsync)
+ {
+ for(var i = 0; i < this._sendStreams.length; i++)
{
- if(this._monitor != null)
+ var o = this._sendStreams[i];
+ if(o.outAsync === outAsync)
{
- if(this._state == StateActive)
+ if(o.requestId > 0)
{
- this._monitor.remove(this);
+ this._asyncRequests.delete(o.requestId);
}
- this._monitor = this._monitor.acm(timeout, close, heartbeat);
- if(this._state == StateActive)
- {
- this._monitor.add(this);
- }
- if(this._monitor.getACM().timeout <= 0)
- {
- this._acmLastActivity = -1; // Disable the recording of last activity.
- }
- else if(this._state == StateActive && this._acmLastActivity == -1)
+
+ //
+ // If the request is being sent, don't remove it from the send streams,
+ // it will be removed once the sending is finished.
+ //
+ var isSent = i.timedOut();
+ if(i !== 0)
{
- this._acmLastActivity = Date.now();
+ this._sendStreams.splice(i, 1);
}
+ outAsync.__finishedEx(new Ice.InvocationTimeoutException(), isSent);
+ return; // We're done.
}
- },
- getACM: function()
- {
- return this._monitor !== null ? this._monitor.getACM() :
- new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff);
- },
- asyncRequestTimedOut: function(outAsync)
+ }
+
+ if(outAsync instanceof Ice.OutgoingAsync)
{
- for(var i = 0; i < this._sendStreams.length; i++)
+ for(var e = this._asyncRequests.entries; e !== null; e = e.next)
{
- var o = this._sendStreams[i];
- if(o.outAsync === outAsync)
+ if(e.value === outAsync)
{
- if(o.requestId > 0)
- {
- this._asyncRequests.delete(o.requestId);
- }
-
- //
- // If the request is being sent, don't remove it from the send streams,
- // it will be removed once the sending is finished.
- //
- var isSent = i.timedOut();
- if(i !== 0)
- {
- this._sendStreams.splice(i, 1);
- }
- outAsync.__finishedEx(new Ice.InvocationTimeoutException(), isSent);
+ this._asyncRequests.delete(e.key);
+ outAsync.__finishedEx(new Ice.InvocationTimeoutException(), true);
return; // We're done.
}
}
+ }
+ },
+ sendResponse: function(os, compressFlag)
+ {
+ Debug.assert(this._state > StateNotValidated);
- if(outAsync instanceof Ice.OutgoingAsync)
+ try
+ {
+ if(--this._dispatchCount === 0)
{
- var o = outAsync;
- for(var e = this._asyncRequests.entries; e !== null; e = e.next)
+ if(this._state === StateFinished)
{
- if(e.value === o)
- {
- this._asyncRequests.delete(e.key);
- outAsync.__finishedEx(new Ice.InvocationTimeoutException(), true);
- return; // We're done.
- }
+ this.reap();
}
+ this.checkState();
}
- },
- sendResponse: function(os, compressFlag)
- {
- Debug.assert(this._state > StateNotValidated);
- try
+ if(this._state >= StateClosed)
{
- if(--this._dispatchCount === 0)
- {
- if(this._state === StateFinished)
- {
- this.reap();
- }
- this.checkState();
- }
-
- if(this._state >= StateClosed)
- {
- Debug.assert(this._exception !== null);
- throw this._exception;
- }
+ Debug.assert(this._exception !== null);
+ throw this._exception;
+ }
- this.sendMessage(OutgoingMessage.createForStream(os, compressFlag !== 0, true));
+ this.sendMessage(OutgoingMessage.createForStream(os, compressFlag !== 0, true));
- if(this._state === StateClosing && this._dispatchCount === 0)
- {
- this.initiateShutdown();
- }
- }
- catch(ex)
+ if(this._state === StateClosing && this._dispatchCount === 0)
{
- if(ex instanceof Ice.LocalException)
- {
- this.setState(StateClosed, ex);
- }
- else
- {
- throw ex;
- }
+ this.initiateShutdown();
}
- },
- sendNoResponse: function()
+ }
+ catch(ex)
{
- Debug.assert(this._state > StateNotValidated);
- try
+ if(ex instanceof Ice.LocalException)
{
- if(--this._dispatchCount === 0)
- {
- if(this._state === StateFinished)
- {
- this.reap();
- }
- this.checkState();
- }
-
- if(this._state >= StateClosed)
- {
- Debug.assert(this._exception !== null);
- throw this._exception;
- }
-
- if(this._state === StateClosing && this._dispatchCount === 0)
- {
- this.initiateShutdown();
- }
+ this.setState(StateClosed, ex);
}
- catch(ex)
+ else
{
- if(ex instanceof Ice.LocalException)
- {
- this.setState(StateClosed, ex);
- }
- else
- {
- throw ex;
- }
+ throw ex;
}
- },
- endpoint: function()
- {
- return this._endpoint;
- },
- setAdapter: function(adapter)
+ }
+ },
+ sendNoResponse: function()
+ {
+ Debug.assert(this._state > StateNotValidated);
+ try
{
- if(this._state <= StateNotValidated || this._state >= StateClosing)
+ if(--this._dispatchCount === 0)
{
- return;
+ if(this._state === StateFinished)
+ {
+ this.reap();
+ }
+ this.checkState();
}
- Debug.assert(this._state < StateClosing);
- this._adapter = adapter;
+ if(this._state >= StateClosed)
+ {
+ Debug.assert(this._exception !== null);
+ throw this._exception;
+ }
- if(this._adapter !== null)
+ if(this._state === StateClosing && this._dispatchCount === 0)
{
- this._servantManager = this._adapter.getServantManager();
- if(this._servantManager === null)
- {
- this._adapter = null;
- }
+ this.initiateShutdown();
+ }
+ }
+ catch(ex)
+ {
+ if(ex instanceof Ice.LocalException)
+ {
+ this.setState(StateClosed, ex);
}
else
{
- this._servantManager = null;
+ throw ex;
}
- },
- getAdapter: function()
- {
- return this._adapter;
- },
- getEndpoint: function()
- {
- return this._endpoint;
- },
- createProxy: function(ident)
+ }
+ },
+ endpoint: function()
+ {
+ return this._endpoint;
+ },
+ setAdapter: function(adapter)
+ {
+ if(this._state <= StateNotValidated || this._state >= StateClosing)
{
- //
- // Create a reference and return a reverse proxy for this
- // reference.
- //
- return this._instance.proxyFactory().referenceToProxy(
- this._instance.referenceFactory().createFixed(ident, this));
- },
- message: function(operation)
+ return;
+ }
+ Debug.assert(this._state < StateClosing);
+
+ this._adapter = adapter;
+
+ if(this._adapter !== null)
{
- if(this._state >= StateClosed)
+ this._servantManager = this._adapter.getServantManager();
+ if(this._servantManager === null)
{
- return;
+ this._adapter = null;
}
+ }
+ else
+ {
+ this._servantManager = null;
+ }
+ },
+ getAdapter: function()
+ {
+ return this._adapter;
+ },
+ getEndpoint: function()
+ {
+ return this._endpoint;
+ },
+ createProxy: function(ident)
+ {
+ //
+ // Create a reference and return a reverse proxy for this
+ // reference.
+ //
+ return this._instance.proxyFactory().referenceToProxy(
+ this._instance.referenceFactory().createFixed(ident, this));
+ },
+ message: function(operation)
+ {
+ if(this._state >= StateClosed)
+ {
+ return;
+ }
- this.unscheduleTimeout(operation);
+ this.unscheduleTimeout(operation);
- //
- // Keep reading until no more data is available.
- //
- this._hasMoreData.value = (operation & SocketOperation.Read) !== 0;
+ //
+ // Keep reading until no more data is available.
+ //
+ this._hasMoreData.value = (operation & SocketOperation.Read) !== 0;
- var info = null;
- try
+ var info = null;
+ try
+ {
+ if((operation & SocketOperation.Write) !== 0 && this._writeStream.buffer.remaining > 0)
{
- if((operation & SocketOperation.Write) !== 0 && this._writeStream.buffer.remaining > 0)
+ if(!this._transceiver.write(this._writeStream.buffer))
{
- if(!this._transceiver.write(this._writeStream.buffer))
+ Debug.assert(!this._writeStream.isEmpty());
+ this.scheduleTimeout(SocketOperation.Write, this._endpoint.timeout());
+ return;
+ }
+ Debug.assert(this._writeStream.buffer.remaining === 0);
+ }
+ if((operation & SocketOperation.Read) !== 0 && !this._readStream.isEmpty())
+ {
+ if(this._readHeader) // Read header if necessary.
+ {
+ if(!this._transceiver.read(this._readStream.buffer, this._hasMoreData))
{
- Debug.assert(!this._writeStream.isEmpty());
- this.scheduleTimeout(SocketOperation.Write, this._endpoint.timeout());
+ //
+ // We didn't get enough data to complete the header.
+ //
return;
}
- Debug.assert(this._writeStream.buffer.remaining === 0);
- }
- if((operation & SocketOperation.Read) !== 0 && !this._readStream.isEmpty())
- {
- if(this._readHeader) // Read header if necessary.
+
+ Debug.assert(this._readStream.buffer.remaining === 0);
+ this._readHeader = false;
+
+ var pos = this._readStream.pos;
+ if(pos < Protocol.headerSize)
{
- if(!this._transceiver.read(this._readStream.buffer, this._hasMoreData))
- {
- //
- // We didn't get enough data to complete the header.
- //
- return;
- }
-
- Debug.assert(this._readStream.buffer.remaining === 0);
- this._readHeader = false;
-
- var pos = this._readStream.pos;
- if(pos < Protocol.headerSize)
- {
- //
- // This situation is possible for small UDP packets.
- //
- throw new Ice.IllegalMessageSizeException();
- }
-
- this._readStream.pos = 0;
- var magic0 = this._readStream.readByte();
- var magic1 = this._readStream.readByte();
- var magic2 = this._readStream.readByte();
- var magic3 = this._readStream.readByte();
- if(magic0 !== Protocol.magic[0] || magic1 !== Protocol.magic[1] ||
- magic2 !== Protocol.magic[2] || magic3 !== Protocol.magic[3])
- {
- var bme = new Ice.BadMagicException();
- bme.badMagic = Ice.Buffer.createNative([magic0, magic1, magic2, magic3]);
- throw bme;
- }
-
- this._readProtocol.__read(this._readStream);
- Protocol.checkSupportedProtocol(this._readProtocol);
-
- this._readProtocolEncoding.__read(this._readStream);
- Protocol.checkSupportedProtocolEncoding(this._readProtocolEncoding);
-
- this._readStream.readByte(); // messageType
- this._readStream.readByte(); // compress
- var size = this._readStream.readInt();
- if(size < Protocol.headerSize)
- {
- throw new Ice.IllegalMessageSizeException();
- }
- if(size > this._instance.messageSizeMax())
- {
- ExUtil.throwMemoryLimitException(size, this._instance.messageSizeMax());
- }
- if(size > this._readStream.size)
- {
- this._readStream.resize(size);
- }
- this._readStream.pos = pos;
+ //
+ // This situation is possible for small UDP packets.
+ //
+ throw new Ice.IllegalMessageSizeException();
}
- if(this._readStream.pos != this._readStream.size)
+ this._readStream.pos = 0;
+ var magic0 = this._readStream.readByte();
+ var magic1 = this._readStream.readByte();
+ var magic2 = this._readStream.readByte();
+ var magic3 = this._readStream.readByte();
+ if(magic0 !== Protocol.magic[0] || magic1 !== Protocol.magic[1] ||
+ magic2 !== Protocol.magic[2] || magic3 !== Protocol.magic[3])
{
- if(this._endpoint.datagram())
- {
- throw new Ice.DatagramLimitException(); // The message was truncated.
- }
- else
- {
- if(!this._transceiver.read(this._readStream.buffer, this._hasMoreData))
- {
- Debug.assert(!this._readStream.isEmpty());
- this.scheduleTimeout(SocketOperation.Read, this._endpoint.timeout());
- return;
- }
- Debug.assert(this._readStream.buffer.remaining === 0);
- }
+ var bme = new Ice.BadMagicException();
+ bme.badMagic = Ice.Buffer.createNative([magic0, magic1, magic2, magic3]);
+ throw bme;
}
- }
-
- if(this._state <= StateNotValidated)
- {
- if(this._state === StateNotInitialized && !this.initialize())
+
+ this._readProtocol.__read(this._readStream);
+ Protocol.checkSupportedProtocol(this._readProtocol);
+
+ this._readProtocolEncoding.__read(this._readStream);
+ Protocol.checkSupportedProtocolEncoding(this._readProtocolEncoding);
+
+ this._readStream.readByte(); // messageType
+ this._readStream.readByte(); // compress
+ var size = this._readStream.readInt();
+ if(size < Protocol.headerSize)
{
- return;
+ throw new Ice.IllegalMessageSizeException();
}
-
- if(this._state <= StateNotValidated && !this.validate())
+ if(size > this._instance.messageSizeMax())
{
- return;
+ ExUtil.throwMemoryLimitException(size, this._instance.messageSizeMax());
}
-
- this._transceiver.unregister();
-
- //
- // We start out in holding state.
- //
- this.setState(StateHolding);
- if(this._startPromise !== null)
+ if(size > this._readStream.size)
{
- ++this._dispatchCount;
+ this._readStream.resize(size);
}
+ this._readStream.pos = pos;
}
- else
+
+ if(this._readStream.pos != this._readStream.size)
{
- Debug.assert(this._state <= StateClosing);
-
- //
- // We parse messages first, if we receive a close
- // connection message we won't send more messages.
- //
- if((operation & SocketOperation.Read) !== 0)
+ if(this._endpoint.datagram())
{
- info = this.parseMessage();
+ throw new Ice.DatagramLimitException(); // The message was truncated.
}
-
- if((operation & SocketOperation.Write) !== 0)
+ else
{
- this.sendNextMessage();
+ if(!this._transceiver.read(this._readStream.buffer, this._hasMoreData))
+ {
+ Debug.assert(!this._readStream.isEmpty());
+ this.scheduleTimeout(SocketOperation.Read, this._endpoint.timeout());
+ return;
+ }
+ Debug.assert(this._readStream.buffer.remaining === 0);
}
}
}
- catch(ex)
+
+ if(this._state <= StateNotValidated)
{
- if(ex instanceof Ice.DatagramLimitException) // Expected.
+ if(this._state === StateNotInitialized && !this.initialize())
{
- if(this._warnUdp)
- {
- this._logger.warning("maximum datagram size of " + this._readStream.pos + " exceeded");
- }
- this._readStream.resize(Protocol.headerSize);
- this._readStream.pos = 0;
- this._readHeader = true;
return;
}
- else if(ex instanceof Ice.SocketException)
+
+ if(this._state <= StateNotValidated && !this.validate())
{
- this.setState(StateClosed, ex);
return;
}
- else if(ex instanceof Ice.LocalException)
+
+ this._transceiver.unregister();
+
+ //
+ // We start out in holding state.
+ //
+ this.setState(StateHolding);
+ if(this._startPromise !== null)
{
- if(this._endpoint.datagram())
- {
- if(this._warn)
- {
- this._logger.warning("datagram connection exception:\n" + ex + '\n' + this._desc);
- }
- this._readStream.resize(Protocol.headerSize);
- this._readStream.pos = 0;
- this._readHeader = true;
- }
- else
+ ++this._dispatchCount;
+ }
+ }
+ else
+ {
+ Debug.assert(this._state <= StateClosing);
+
+ //
+ // We parse messages first, if we receive a close
+ // connection message we won't send more messages.
+ //
+ if((operation & SocketOperation.Read) !== 0)
+ {
+ info = this.parseMessage();
+ }
+
+ if((operation & SocketOperation.Write) !== 0)
+ {
+ this.sendNextMessage();
+ }
+ }
+ }
+ catch(ex)
+ {
+ if(ex instanceof Ice.DatagramLimitException) // Expected.
+ {
+ if(this._warnUdp)
+ {
+ this._logger.warning("maximum datagram size of " + this._readStream.pos + " exceeded");
+ }
+ this._readStream.resize(Protocol.headerSize);
+ this._readStream.pos = 0;
+ this._readHeader = true;
+ return;
+ }
+ else if(ex instanceof Ice.SocketException)
+ {
+ this.setState(StateClosed, ex);
+ return;
+ }
+ else if(ex instanceof Ice.LocalException)
+ {
+ if(this._endpoint.datagram())
+ {
+ if(this._warn)
{
- this.setState(StateClosed, ex);
+ this._logger.warning("datagram connection exception:\n" + ex + '\n' + this._desc);
}
- return;
+ this._readStream.resize(Protocol.headerSize);
+ this._readStream.pos = 0;
+ this._readHeader = true;
}
else
{
- throw ex;
+ this.setState(StateClosed, ex);
}
+ return;
}
-
- if(this._acmLastActivity > 0)
+ else
{
- this._acmLastActivity = Date.now();
+ throw ex;
}
+ }
- this.dispatch(info);
+ if(this._acmLastActivity > 0)
+ {
+ this._acmLastActivity = Date.now();
+ }
- if(this._hasMoreData.value)
- {
- var self = this;
- setTimeout(function() { self.message(SocketOperation.Read); }, 0); // Don't tie up the thread.
- }
- },
- dispatch: function(info)
+ this.dispatch(info);
+
+ if(this._hasMoreData.value)
{
- var count = 0;
- //
- // Notify the factory that the connection establishment and
- // validation has completed.
- //
- if(this._startPromise !== null)
+ var self = this;
+ setTimeout(function() { self.message(SocketOperation.Read); }, 0); // Don't tie up the thread.
+ }
+ },
+ dispatch: function(info)
+ {
+ var count = 0;
+ //
+ // Notify the factory that the connection establishment and
+ // validation has completed.
+ //
+ if(this._startPromise !== null)
+ {
+ this._startPromise.succeed();
+ this._startPromise = null;
+ ++count;
+ }
+
+ if(info !== null)
+ {
+ if(info.outAsync !== null)
{
- this._startPromise.succeed();
- this._startPromise = null;
+ info.outAsync.__finished(info.stream);
++count;
}
- if(info !== null)
+ if(info.invokeNum > 0)
{
- if(info.outAsync !== null)
+ this.invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager,
+ info.adapter);
+
+ //
+ // Don't increase count, the dispatch count is
+ // decreased when the incoming reply is sent.
+ //
+ }
+
+ if(info.heartbeatCallback)
+ {
+ try
{
- info.outAsync.__finished(info.stream);
- ++count;
+ info.heartbeatCallback.heartbeat(this);
}
-
- if(info.invokeNum > 0)
+ catch(ex)
{
- this.invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager,
- info.adapter);
-
- //
- // Don't increase count, the dispatch count is
- // decreased when the incoming reply is sent.
- //
+ this._logger.error("connection callback exception:\n" + ex + '\n' + this._desc);
}
+ info.heartbeatCallback = null;
+ ++count;
+ }
+ }
- if(info.heartbeatCallback)
+ //
+ // Decrease dispatch count.
+ //
+ if(count > 0)
+ {
+ this._dispatchCount -= count;
+ if(this._dispatchCount === 0)
+ {
+ if(this._state === StateClosing && !this._shutdownInitiated)
{
try
{
- info.heartbeatCallback.heartbeat(this);
+ this.initiateShutdown();
}
catch(ex)
{
- this._logger.error("connection callback exception:\n" + ex + '\n' + this._desc);
- }
- info.heartbeatCallback = null;
- ++count;
- }
- }
-
- //
- // Decrease dispatch count.
- //
- if(count > 0)
- {
- this._dispatchCount -= count;
- if(this._dispatchCount === 0)
- {
- if(this._state === StateClosing && !this._shutdownInitiated)
- {
- try
+ if(ex instanceof Ice.LocalException)
{
- this.initiateShutdown();
+ this.setState(StateClosed, ex);
}
- catch(ex)
+ else
{
- if(ex instanceof Ice.LocalException)
- {
- this.setState(StateClosed, ex);
- }
- else
- {
- throw ex;
- }
+ throw ex;
}
}
- else if(this._state === StateFinished)
- {
- this.reap();
- }
- this.checkState();
}
+ else if(this._state === StateFinished)
+ {
+ this.reap();
+ }
+ this.checkState();
}
- },
- finish: function()
- {
- Debug.assert(this._state === StateClosed);
- this.unscheduleTimeout(SocketOperation.Read | SocketOperation.Write | SocketOperation.Connect);
+ }
+ },
+ finish: function()
+ {
+ Debug.assert(this._state === StateClosed);
+ this.unscheduleTimeout(SocketOperation.Read | SocketOperation.Write | SocketOperation.Connect);
- if(this._startPromise !== null)
- {
- this._startPromise.fail(this._exception);
- this._startPromise = null;
- }
+ if(this._startPromise !== null)
+ {
+ this._startPromise.fail(this._exception);
+ this._startPromise = null;
+ }
- if(this._sendStreams.length > 0)
+ if(this._sendStreams.length > 0)
+ {
+ if(!this._writeStream.isEmpty())
{
- if(!this._writeStream.isEmpty())
- {
- //
- // Return the stream to the outgoing call. This is important for
- // retriable AMI calls which are not marshalled again.
- //
- var message = this._sendStreams[0];
- this._writeStream.swap(message.stream);
- }
-
//
- // NOTE: for twoway requests which are not sent, finished can be called twice: the
- // first time because the outgoing is in the _sendStreams set and the second time
- // because it's either in the _requests/_asyncRequests set. This is fine, only the
- // first call should be taken into account by the implementation of finished.
+ // Return the stream to the outgoing call. This is important for
+ // retriable AMI calls which are not marshalled again.
//
- for(var i = 0; i < this._sendStreams.length; ++i)
- {
- var p = this._sendStreams[i];
- if(p.requestId > 0)
- {
- this._asyncRequests.delete(p.requestId);
- }
- p.finished(this._exception);
- }
- this._sendStreams = [];
+ var message = this._sendStreams[0];
+ this._writeStream.swap(message.stream);
}
- for(var e = this._asyncRequests.entries; e !== null; e = e.next)
- {
- e.value.__finishedEx(this._exception, true);
- }
- this._asyncRequests.clear();
-
- if(this._callback != null)
+ //
+ // NOTE: for twoway requests which are not sent, finished can be called twice: the
+ // first time because the outgoing is in the _sendStreams set and the second time
+ // because it's either in the _requests/_asyncRequests set. This is fine, only the
+ // first call should be taken into account by the implementation of finished.
+ //
+ for(var i = 0; i < this._sendStreams.length; ++i)
{
- try
- {
- this._callback.closed(this);
- }
- catch(ex)
+ var p = this._sendStreams[i];
+ if(p.requestId > 0)
{
- this._logger.error("connection callback exception:\n" + ex + '\n' + this._desc);
+ this._asyncRequests.delete(p.requestId);
}
- this._callback = null;
+ p.finished(this._exception);
}
+ this._sendStreams = [];
+ }
- //
- // This must be done last as this will cause waitUntilFinished() to return (and communicator
- // objects such as the timer might be destroyed too).
- //
- if(this._dispatchCount === 0)
- {
- this.reap();
- }
- this.setState(StateFinished);
- },
- toString: function()
+ for(var e = this._asyncRequests.entries; e !== null; e = e.next)
{
- return this._desc;
- },
- timedOut: function(event)
+ e.value.__finishedEx(this._exception, true);
+ }
+ this._asyncRequests.clear();
+
+ if(this._callback !== null)
{
- if(this._state <= StateNotValidated)
- {
- this.setState(StateClosed, new Ice.ConnectTimeoutException());
- }
- else if(this._state < StateClosing)
+ try
{
- this.setState(StateClosed, new Ice.TimeoutException());
+ this._callback.closed(this);
}
- else if(this._state === StateClosing)
+ catch(ex)
{
- this.setState(StateClosed, new Ice.CloseTimeoutException());
+ this._logger.error("connection callback exception:\n" + ex + '\n' + this._desc);
}
- },
- type: function()
+ this._callback = null;
+ }
+
+ //
+ // This must be done last as this will cause waitUntilFinished() to return (and communicator
+ // objects such as the timer might be destroyed too).
+ //
+ if(this._dispatchCount === 0)
{
- return this._type;
- },
- timeout: function()
+ this.reap();
+ }
+ this.setState(StateFinished);
+ },
+ toString: function()
+ {
+ return this._desc;
+ },
+ timedOut: function(event)
+ {
+ if(this._state <= StateNotValidated)
{
- return this._endpoint.timeout();
- },
- getInfo: function()
+ this.setState(StateClosed, new Ice.ConnectTimeoutException());
+ }
+ else if(this._state < StateClosing)
{
- if(this._state >= StateClosed)
- {
- throw this._exception;
- }
- var info = this._transceiver.getInfo();
- info.adapterName = this._adapter !== null ? this._adapter.getName() : "";
- info.incoming = this._incoming;
- return info;
- },
- exception: function(ex)
+ this.setState(StateClosed, new Ice.TimeoutException());
+ }
+ else if(this._state === StateClosing)
{
- this.setState(StateClosed, ex);
- },
- invokeException: function(ex, invokeNum)
+ this.setState(StateClosed, new Ice.CloseTimeoutException());
+ }
+ },
+ type: function()
+ {
+ return this._type;
+ },
+ timeout: function()
+ {
+ return this._endpoint.timeout();
+ },
+ getInfo: function()
+ {
+ if(this._state >= StateClosed)
{
- //
- // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't
- // called in case of a fatal exception we decrement this._dispatchCount here.
- //
+ throw this._exception;
+ }
+ var info = this._transceiver.getInfo();
+ info.adapterName = this._adapter !== null ? this._adapter.getName() : "";
+ info.incoming = this._incoming;
+ return info;
+ },
+ exception: function(ex)
+ {
+ this.setState(StateClosed, ex);
+ },
+ invokeException: function(ex, invokeNum)
+ {
+ //
+ // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't
+ // called in case of a fatal exception we decrement this._dispatchCount here.
+ //
- this.setState(StateClosed, ex);
+ this.setState(StateClosed, ex);
- if(invokeNum > 0)
+ if(invokeNum > 0)
+ {
+ Debug.assert(this._dispatchCount > 0);
+ this._dispatchCount -= invokeNum;
+ Debug.assert(this._dispatchCount >= 0);
+ if(this._dispatchCount === 0)
{
- Debug.assert(this._dispatchCount > 0);
- this._dispatchCount -= invokeNum;
- Debug.assert(this._dispatchCount >= 0);
- if(this._dispatchCount === 0)
+ if(this._state === StateFinished)
{
- if(this._state === StateFinished)
- {
- this.reap();
- }
- this.checkState();
+ this.reap();
}
+ this.checkState();
}
- },
- setState: function(state, ex)
+ }
+ },
+ setState: function(state, ex)
+ {
+ if(ex !== undefined)
{
- if(ex !== undefined)
+ Debug.assert(ex instanceof Ice.LocalException);
+
+ //
+ // If setState() is called with an exception, then only closed
+ // and closing states are permissible.
+ //
+ Debug.assert(state >= StateClosing);
+
+ if(this._state === state) // Don't switch twice.
{
- Debug.assert(ex instanceof Ice.LocalException);
-
+ return;
+ }
+
+ if(this._exception === null)
+ {
+ this._exception = ex;
+
//
- // If setState() is called with an exception, then only closed
- // and closing states are permissible.
+ // We don't warn if we are not validated.
//
- Debug.assert(state >= StateClosing);
-
- if(this._state === state) // Don't switch twice.
+ if(this._warn && this._validated)
{
- return;
- }
-
- if(this._exception === null)
- {
- this._exception = ex;
-
//
- // We don't warn if we are not validated.
+ // Don't warn about certain expected exceptions.
//
- if(this._warn && this._validated)
+ if(!(this._exception instanceof Ice.CloseConnectionException ||
+ this._exception instanceof Ice.ForcedCloseConnectionException ||
+ this._exception instanceof Ice.ConnectionTimeoutException ||
+ this._exception instanceof Ice.CommunicatorDestroyedException ||
+ this._exception instanceof Ice.ObjectAdapterDeactivatedException ||
+ (this._exception instanceof Ice.ConnectionLostException && this._state === StateClosing)))
{
- //
- // Don't warn about certain expected exceptions.
- //
- if(!(this._exception instanceof Ice.CloseConnectionException ||
- this._exception instanceof Ice.ForcedCloseConnectionException ||
- this._exception instanceof Ice.ConnectionTimeoutException ||
- this._exception instanceof Ice.CommunicatorDestroyedException ||
- this._exception instanceof Ice.ObjectAdapterDeactivatedException ||
- (this._exception instanceof Ice.ConnectionLostException && this._state === StateClosing)))
- {
- this.warning("connection exception", this._exception);
- }
+ this.warning("connection exception", this._exception);
}
}
-
- //
- // We must set the new state before we notify requests of any
- // exceptions. Otherwise new requests may retry on a
- // connection that is not yet marked as closed or closing.
- //
}
//
- // We don't want to send close connection messages if the endpoint
- // only supports oneway transmission from client to server.
+ // We must set the new state before we notify requests of any
+ // exceptions. Otherwise new requests may retry on a
+ // connection that is not yet marked as closed or closing.
//
- if(this._endpoint.datagram() && state === StateClosing)
- {
- state = StateClosed;
- }
+ }
- //
- // Skip graceful shutdown if we are destroyed before validation.
- //
- if(this._state <= StateNotValidated && state === StateClosing)
- {
- state = StateClosed;
- }
+ //
+ // We don't want to send close connection messages if the endpoint
+ // only supports oneway transmission from client to server.
+ //
+ if(this._endpoint.datagram() && state === StateClosing)
+ {
+ state = StateClosed;
+ }
- if(this._state === state) // Don't switch twice.
+ //
+ // Skip graceful shutdown if we are destroyed before validation.
+ //
+ if(this._state <= StateNotValidated && state === StateClosing)
+ {
+ state = StateClosed;
+ }
+
+ if(this._state === state) // Don't switch twice.
+ {
+ return;
+ }
+
+ try
+ {
+ switch(state)
{
- return;
+ case StateNotInitialized:
+ {
+ Debug.assert(false);
+ break;
}
- try
+ case StateNotValidated:
{
- switch(state)
- {
- case StateNotInitialized:
+ if(this._state !== StateNotInitialized)
{
- Debug.assert(false);
- break;
- }
-
- case StateNotValidated:
- {
- if(this._state !== StateNotInitialized)
- {
- Debug.assert(this._state === StateClosed);
- return;
- }
- //
- // Register to receive validation message.
- //
- if(!this._endpoint.datagram() && !this._incoming)
- {
- //
- // Once validation is complete, a new connection starts out in the
- // Holding state. We only want to register the transceiver now if we
- // need to receive data in order to validate the connection.
- //
- this._transceiver.register();
- }
- break;
+ Debug.assert(this._state === StateClosed);
+ return;
}
-
- case StateActive:
+ //
+ // Register to receive validation message.
+ //
+ if(!this._endpoint.datagram() && !this._incoming)
{
//
- // Can only switch from holding or not validated to
- // active.
+ // Once validation is complete, a new connection starts out in the
+ // Holding state. We only want to register the transceiver now if we
+ // need to receive data in order to validate the connection.
//
- if(this._state !== StateHolding && this._state !== StateNotValidated)
- {
- return;
- }
this._transceiver.register();
- break;
}
+ break;
+ }
- case StateHolding:
+ case StateActive:
+ {
+ //
+ // Can only switch from holding or not validated to
+ // active.
+ //
+ if(this._state !== StateHolding && this._state !== StateNotValidated)
{
- //
- // Can only switch from active or not validated to
- // holding.
- //
- if(this._state !== StateActive && this._state !== StateNotValidated)
- {
- return;
- }
- if(this._state === StateActive)
- {
- this._transceiver.unregister();
- }
- break;
+ return;
}
+ this._transceiver.register();
+ break;
+ }
- case StateClosing:
+ case StateHolding:
+ {
+ //
+ // Can only switch from active or not validated to
+ // holding.
+ //
+ if(this._state !== StateActive && this._state !== StateNotValidated)
{
- //
- // Can't change back from closed.
- //
- if(this._state >= StateClosed)
- {
- return;
- }
- if(this._state === StateHolding)
- {
- // We need to continue to read in closing state.
- this._transceiver.register();
- }
- break;
+ return;
}
-
- case StateClosed:
+ if(this._state === StateActive)
{
- if(this._state === StateFinished)
- {
- return;
- }
this._transceiver.unregister();
- break;
- }
-
- case StateFinished:
- {
- Debug.assert(this._state === StateClosed);
- this._transceiver.close();
- this._communicator = null;
- break;
- }
}
+ break;
}
- catch(ex)
+
+ case StateClosing:
{
- if(ex instanceof Ice.LocalException)
+ //
+ // Can't change back from closed.
+ //
+ if(this._state >= StateClosed)
{
- var msg = "unexpected connection exception:\n " + this._desc + "\n" + ExUtil.toString(ex);
- this._instance.initializationData().logger.error(msg);
+ return;
}
- else
+ if(this._state === StateHolding)
{
- throw ex;
+ // We need to continue to read in closing state.
+ this._transceiver.register();
}
+ break;
}
- //
- // We only register with the connection monitor if our new state
- // is StateActive. Otherwise we unregister with the connection
- // monitor, but only if we were registered before, i.e., if our
- // old state was StateActive.
- //
- if(this._monitor !== null)
+ case StateClosed:
{
- if(state === StateActive)
- {
- this._monitor.add(this);
- if(this._acmLastActivity > 0)
- {
- this._acmLastActivity = Date.now();
- }
- }
- else if(this._state === StateActive)
+ if(this._state === StateFinished)
{
- this._monitor.remove(this);
+ return;
}
+ this._transceiver.unregister();
+ break;
}
- this._state = state;
-
- if(this._state === StateClosing && this._dispatchCount === 0)
+ case StateFinished:
{
- try
- {
- this.initiateShutdown();
- }
- catch(ex)
- {
- if(ex instanceof Ice.LocalException)
- {
- this.setState(StateClosed, ex);
- }
- else
- {
- throw ex;
- }
- }
+ Debug.assert(this._state === StateClosed);
+ this._transceiver.close();
+ this._communicator = null;
+ break;
+ }
}
- else if(this._state === StateClosed)
+ }
+ catch(ex)
+ {
+ if(ex instanceof Ice.LocalException)
{
- this.finish();
+ var msg = "unexpected connection exception:\n " + this._desc + "\n" + ExUtil.toString(ex);
+ this._instance.initializationData().logger.error(msg);
+ }
+ else
+ {
+ throw ex;
}
+ }
- this.checkState();
- },
- initiateShutdown: function()
+ //
+ // We only register with the connection monitor if our new state
+ // is StateActive. Otherwise we unregister with the connection
+ // monitor, but only if we were registered before, i.e., if our
+ // old state was StateActive.
+ //
+ if(this._monitor !== null)
{
- Debug.assert(this._state === StateClosing);
- Debug.assert(this._dispatchCount === 0);
- Debug.assert(!this._shutdownInitiated);
-
- if(!this._endpoint.datagram())
+ if(state === StateActive)
{
- //
- // Before we shut down, we send a close connection
- // message.
- //
- var os = new BasicStream(this._instance, Protocol.currentProtocolEncoding, false);
- os.writeBlob(Protocol.magic);
- Protocol.currentProtocol.__write(os);
- Protocol.currentProtocolEncoding.__write(os);
- os.writeByte(Protocol.closeConnectionMsg);
- os.writeByte(0); // compression status: always report 0 for CloseConnection.
- os.writeInt(Protocol.headerSize); // Message size.
-
- var status = this.sendMessage(OutgoingMessage.createForStream(os, false, false));
- if((status & AsyncStatus.Sent) > 0)
+ this._monitor.add(this);
+ if(this._acmLastActivity > 0)
{
- //
- // Schedule the close timeout to wait for the peer to close the connection.
- //
- this.scheduleTimeout(SocketOperation.Write, this.closeTimeout());
+ this._acmLastActivity = Date.now();
}
-
- //
- // The CloseConnection message should be sufficient. Closing the write
- // end of the socket is probably an artifact of how things were done
- // in IIOP. In fact, shutting down the write end of the socket causes
- // problems on Windows by preventing the peer from using the socket.
- // For example, the peer is no longer able to continue writing a large
- // message after the socket is shutdown.
- //
- //this._transceiver.shutdownWrite();
}
- },
- heartbeat: function()
+ else if(this._state === StateActive)
+ {
+ this._monitor.remove(this);
+ }
+ }
+
+ this._state = state;
+
+ if(this._state === StateClosing && this._dispatchCount === 0)
{
- Debug.assert(this._state === StateActive);
-
- if(!this._endpoint.datagram())
- {
- var os = new BasicStream(this._instance, Protocol.currentProtocolEncoding);
- os.writeBlob(Protocol.magic);
- Protocol.currentProtocol.__write(os);
- Protocol.currentProtocolEncoding.__write(os);
- os.writeByte(Protocol.validateConnectionMsg);
- os.writeByte(0);
- os.writeInt(Protocol.headerSize); // Message size.
- try
+ try
+ {
+ this.initiateShutdown();
+ }
+ catch(ex)
+ {
+ if(ex instanceof Ice.LocalException)
{
- this.sendMessage(OutgoingMessage.createForStream(os, false, false));
+ this.setState(StateClosed, ex);
}
- catch(ex)
+ else
{
- this.setState(StateClosed, ex);
- Debug.assert(this._exception != null);
+ throw ex;
}
}
- },
- initialize: function()
+ }
+ else if(this._state === StateClosed)
{
- var s = this._transceiver.initialize(this._readStream.buffer, this._writeStream.buffer);
- if(s != SocketOperation.None)
+ this.finish();
+ }
+
+ this.checkState();
+ },
+ initiateShutdown: function()
+ {
+ Debug.assert(this._state === StateClosing);
+ Debug.assert(this._dispatchCount === 0);
+ Debug.assert(!this._shutdownInitiated);
+
+ if(!this._endpoint.datagram())
+ {
+ //
+ // Before we shut down, we send a close connection
+ // message.
+ //
+ var os = new BasicStream(this._instance, Protocol.currentProtocolEncoding, false);
+ os.writeBlob(Protocol.magic);
+ Protocol.currentProtocol.__write(os);
+ Protocol.currentProtocolEncoding.__write(os);
+ os.writeByte(Protocol.closeConnectionMsg);
+ os.writeByte(0); // compression status: always report 0 for CloseConnection.
+ os.writeInt(Protocol.headerSize); // Message size.
+
+ var status = this.sendMessage(OutgoingMessage.createForStream(os, false, false));
+ if((status & AsyncStatus.Sent) > 0)
{
- this.scheduleTimeout(s, this.connectTimeout());
- return false;
+ //
+ // Schedule the close timeout to wait for the peer to close the connection.
+ //
+ this.scheduleTimeout(SocketOperation.Write, this.closeTimeout());
}
//
- // Update the connection description once the transceiver is initialized.
+ // The CloseConnection message should be sufficient. Closing the write
+ // end of the socket is probably an artifact of how things were done
+ // in IIOP. In fact, shutting down the write end of the socket causes
+ // problems on Windows by preventing the peer from using the socket.
+ // For example, the peer is no longer able to continue writing a large
+ // message after the socket is shutdown.
//
- this._desc = this._transceiver.toString();
- this.setState(StateNotValidated);
- return true;
- },
- validate: function()
+ //this._transceiver.shutdownWrite();
+ }
+ },
+ heartbeat: function()
+ {
+ Debug.assert(this._state === StateActive);
+
+ if(!this._endpoint.datagram())
+ {
+ var os = new BasicStream(this._instance, Protocol.currentProtocolEncoding);
+ os.writeBlob(Protocol.magic);
+ Protocol.currentProtocol.__write(os);
+ Protocol.currentProtocolEncoding.__write(os);
+ os.writeByte(Protocol.validateConnectionMsg);
+ os.writeByte(0);
+ os.writeInt(Protocol.headerSize); // Message size.
+ try
+ {
+ this.sendMessage(OutgoingMessage.createForStream(os, false, false));
+ }
+ catch(ex)
+ {
+ this.setState(StateClosed, ex);
+ Debug.assert(this._exception !== null);
+ }
+ }
+ },
+ initialize: function()
+ {
+ var s = this._transceiver.initialize(this._readStream.buffer, this._writeStream.buffer);
+ if(s != SocketOperation.None)
+ {
+ this.scheduleTimeout(s, this.connectTimeout());
+ return false;
+ }
+
+ //
+ // Update the connection description once the transceiver is initialized.
+ //
+ this._desc = this._transceiver.toString();
+ this.setState(StateNotValidated);
+ return true;
+ },
+ validate: function()
+ {
+ if(!this._endpoint.datagram()) // Datagram connections are always implicitly validated.
{
- if(!this._endpoint.datagram()) // Datagram connections are always implicitly validated.
+ if(this._adapter !== null) // The server side has the active role for connection validation.
{
- if(this._adapter !== null) // The server side has the active role for connection validation.
+ if(this._writeStream.size === 0)
{
- if(this._writeStream.size === 0)
- {
- this._writeStream.writeBlob(Protocol.magic);
- Protocol.currentProtocol.__write(this._writeStream);
- Protocol.currentProtocolEncoding.__write(this._writeStream);
- this._writeStream.writeByte(Protocol.validateConnectionMsg);
- this._writeStream.writeByte(0); // Compression status (always zero for validate connection).
- this._writeStream.writeInt(Protocol.headerSize); // Message size.
- TraceUtil.traceSend(this._writeStream, this._logger, this._traceLevels);
- this._writeStream.prepareWrite();
- }
+ this._writeStream.writeBlob(Protocol.magic);
+ Protocol.currentProtocol.__write(this._writeStream);
+ Protocol.currentProtocolEncoding.__write(this._writeStream);
+ this._writeStream.writeByte(Protocol.validateConnectionMsg);
+ this._writeStream.writeByte(0); // Compression status (always zero for validate connection).
+ this._writeStream.writeInt(Protocol.headerSize); // Message size.
+ TraceUtil.traceSend(this._writeStream, this._logger, this._traceLevels);
+ this._writeStream.prepareWrite();
+ }
- if(this._writeStream.pos != this._writeStream.size &&
- !this._transceiver.write(this._writeStream.buffer))
- {
- this.scheduleTimeout(SocketOperation.Write, this.connectTimeout());
- return false;
- }
+ if(this._writeStream.pos != this._writeStream.size &&
+ !this._transceiver.write(this._writeStream.buffer))
+ {
+ this.scheduleTimeout(SocketOperation.Write, this.connectTimeout());
+ return false;
}
- else // The client side has the passive role for connection validation.
+ }
+ else // The client side has the passive role for connection validation.
+ {
+ if(this._readStream.size === 0)
{
- if(this._readStream.size === 0)
- {
- this._readStream.resize(Protocol.headerSize);
- this._readStream.pos = 0;
- }
+ this._readStream.resize(Protocol.headerSize);
+ this._readStream.pos = 0;
+ }
- if(this._readStream.pos !== this._readStream.size &&
- !this._transceiver.read(this._readStream.buffer, this._hasMoreData))
- {
- this.scheduleTimeout(SocketOperation.Read, this.connectTimeout());
- return false;
- }
+ if(this._readStream.pos !== this._readStream.size &&
+ !this._transceiver.read(this._readStream.buffer, this._hasMoreData))
+ {
+ this.scheduleTimeout(SocketOperation.Read, this.connectTimeout());
+ return false;
+ }
- Debug.assert(this._readStream.pos === Protocol.headerSize);
- this._readStream.pos = 0;
- var m = this._readStream.readBlob(4);
- if(m[0] !== Protocol.magic[0] || m[1] !== Protocol.magic[1] ||
- m[2] !== Protocol.magic[2] || m[3] !== Protocol.magic[3])
- {
- var bme = new Ice.BadMagicException();
- bme.badMagic = m;
- throw bme;
- }
+ Debug.assert(this._readStream.pos === Protocol.headerSize);
+ this._readStream.pos = 0;
+ var m = this._readStream.readBlob(4);
+ if(m[0] !== Protocol.magic[0] || m[1] !== Protocol.magic[1] ||
+ m[2] !== Protocol.magic[2] || m[3] !== Protocol.magic[3])
+ {
+ var bme = new Ice.BadMagicException();
+ bme.badMagic = m;
+ throw bme;
+ }
- this._readProtocol.__read(this._readStream);
- Protocol.checkSupportedProtocol(this._readProtocol);
+ this._readProtocol.__read(this._readStream);
+ Protocol.checkSupportedProtocol(this._readProtocol);
- this._readProtocolEncoding.__read(this._readStream);
- Protocol.checkSupportedProtocolEncoding(this._readProtocolEncoding);
+ this._readProtocolEncoding.__read(this._readStream);
+ Protocol.checkSupportedProtocolEncoding(this._readProtocolEncoding);
- var messageType = this._readStream.readByte();
- if(messageType !== Protocol.validateConnectionMsg)
- {
- throw new Ice.ConnectionNotValidatedException();
- }
- this._readStream.readByte(); // Ignore compression status for validate connection.
- var size = this._readStream.readInt();
- if(size !== Protocol.headerSize)
- {
- throw new Ice.IllegalMessageSizeException();
- }
- TraceUtil.traceRecv(this._readStream, this._logger, this._traceLevels);
- this._validated = true;
+ var messageType = this._readStream.readByte();
+ if(messageType !== Protocol.validateConnectionMsg)
+ {
+ throw new Ice.ConnectionNotValidatedException();
+ }
+ this._readStream.readByte(); // Ignore compression status for validate connection.
+ var size = this._readStream.readInt();
+ if(size !== Protocol.headerSize)
+ {
+ throw new Ice.IllegalMessageSizeException();
}
+ TraceUtil.traceRecv(this._readStream, this._logger, this._traceLevels);
+ this._validated = true;
}
+ }
- this._writeStream.resize(0);
- this._writeStream.pos = 0;
+ this._writeStream.resize(0);
+ this._writeStream.pos = 0;
- this._readStream.resize(Protocol.headerSize);
- this._readHeader = true;
- this._readStream.pos = 0;
+ this._readStream.resize(Protocol.headerSize);
+ this._readHeader = true;
+ this._readStream.pos = 0;
- return true;
- },
- sendNextMessage: function()
+ return true;
+ },
+ sendNextMessage: function()
+ {
+ if(this._sendStreams.length === 0)
{
- if(this._sendStreams.length === 0)
- {
- return;
- }
+ return;
+ }
- Debug.assert(!this._writeStream.isEmpty() && this._writeStream.pos === this._writeStream.size);
- try
+ Debug.assert(!this._writeStream.isEmpty() && this._writeStream.pos === this._writeStream.size);
+ try
+ {
+ while(true)
{
- while(true)
+ //
+ // Notify the message that it was sent.
+ //
+ var message = this._sendStreams.shift();
+ this._writeStream.swap(message.stream);
+ message.sent(this);
+
+ //
+ // If there's nothing left to send, we're done.
+ //
+ if(this._sendStreams.length === 0)
{
- //
- // Notify the message that it was sent.
- //
- var message = this._sendStreams.shift();
- this._writeStream.swap(message.stream);
- message.sent(this);
-
- //
- // If there's nothing left to send, we're done.
- //
- if(this._sendStreams.length === 0)
- {
- break;
- }
-
- //
- // If we are in the closed state, don't continue sending.
- //
- // The connection can be in the closed state if parseMessage
- // (called before sendNextMessage by message()) closes the
- // connection.
- //
- if(this._state >= StateClosed)
- {
- return;
- }
+ break;
+ }
- //
- // Otherwise, prepare the next message stream for writing.
- //
- message = this._sendStreams[0];
- Debug.assert(!message.prepared);
- var stream = message.stream;
+ //
+ // If we are in the closed state, don't continue sending.
+ //
+ // The connection can be in the closed state if parseMessage
+ // (called before sendNextMessage by message()) closes the
+ // connection.
+ //
+ if(this._state >= StateClosed)
+ {
+ return;
+ }
- stream.pos = 10;
- stream.writeInt(stream.size);
- stream.prepareWrite();
- message.prepared = true;
+ //
+ // Otherwise, prepare the next message stream for writing.
+ //
+ message = this._sendStreams[0];
+ Debug.assert(!message.prepared);
+ var stream = message.stream;
- if(message.outAsync !== null)
- {
- TraceUtil.trace("sending asynchronous request", stream, this._logger, this._traceLevels);
- }
- else
- {
- TraceUtil.traceSend(stream, this._logger, this._traceLevels);
- }
- this._writeStream.swap(message.stream);
+ stream.pos = 10;
+ stream.writeInt(stream.size);
+ stream.prepareWrite();
+ message.prepared = true;
- //
- // Send the message.
- //
- if(this._writeStream.pos != this._writeStream.size &&
- !this._transceiver.write(this._writeStream.buffer))
- {
- Debug.assert(!this._writeStream.isEmpty());
- this.scheduleTimeout(SocketOperation.Write, this._endpoint.timeout());
- return;
- }
- }
- }
- catch(ex)
- {
- if(ex instanceof Ice.LocalException)
+ if(message.outAsync !== null)
{
- this.setState(StateClosed, ex);
- return;
+ TraceUtil.trace("sending asynchronous request", stream, this._logger, this._traceLevels);
}
else
{
- throw ex;
+ TraceUtil.traceSend(stream, this._logger, this._traceLevels);
}
- }
+ this._writeStream.swap(message.stream);
- Debug.assert(this._writeStream.isEmpty());
-
- //
- // If all the messages were sent and we are in the closing state, we schedule
- // the close timeout to wait for the peer to close the connection.
- //
- if(this._state === StateClosing)
- {
- this.scheduleTimeout(SocketOperation.Write, this.closeTimeout());
+ //
+ // Send the message.
+ //
+ if(this._writeStream.pos != this._writeStream.size &&
+ !this._transceiver.write(this._writeStream.buffer))
+ {
+ Debug.assert(!this._writeStream.isEmpty());
+ this.scheduleTimeout(SocketOperation.Write, this._endpoint.timeout());
+ return;
+ }
}
- },
- sendMessage: function(message)
+ }
+ catch(ex)
{
- if(this._sendStreams.length > 0)
+ if(ex instanceof Ice.LocalException)
{
- message.doAdopt();
- this._sendStreams.push(message);
- return AsyncStatus.Queued;
+ this.setState(StateClosed, ex);
+ return;
}
- Debug.assert(this._state < StateClosed);
-
- Debug.assert(!message.prepared);
-
- var stream = message.stream;
- stream.pos = 10;
- stream.writeInt(stream.size);
- stream.prepareWrite();
- message.prepared = true;
-
- TraceUtil.trace("sending asynchronous request", message.stream, this._logger, this._traceLevels);
-
- if(this._transceiver.write(message.stream.buffer))
+ else
{
- //
- // Entire buffer was written immediately.
- //
- message.sent(this);
-
- if(this._acmLastActivity > 0)
- {
- this._acmLastActivity = Date.now();
- }
- return AsyncStatus.Sent;
+ throw ex;
}
- message.doAdopt();
+ }
+
+ Debug.assert(this._writeStream.isEmpty());
- this._writeStream.swap(message.stream);
+ //
+ // If all the messages were sent and we are in the closing state, we schedule
+ // the close timeout to wait for the peer to close the connection.
+ //
+ if(this._state === StateClosing)
+ {
+ this.scheduleTimeout(SocketOperation.Write, this.closeTimeout());
+ }
+ },
+ sendMessage: function(message)
+ {
+ if(this._sendStreams.length > 0)
+ {
+ message.doAdopt();
this._sendStreams.push(message);
- this.scheduleTimeout(SocketOperation.Write, this._endpoint.timeout());
-
return AsyncStatus.Queued;
- },
- parseMessage: function()
- {
- Debug.assert(this._state > StateNotValidated && this._state < StateClosed);
+ }
+ Debug.assert(this._state < StateClosed);
- var info = new MessageInfo(this._instance);
+ Debug.assert(!message.prepared);
- this._readStream.swap(info.stream);
- this._readStream.resize(Protocol.headerSize);
- this._readStream.pos = 0;
- this._readHeader = true;
+ var stream = message.stream;
+ stream.pos = 10;
+ stream.writeInt(stream.size);
+ stream.prepareWrite();
+ message.prepared = true;
+
+ TraceUtil.trace("sending asynchronous request", message.stream, this._logger, this._traceLevels);
+ if(this._transceiver.write(message.stream.buffer))
+ {
//
- // Connection is validated on first message. This is only used by
- // setState() to check wether or not we can print a connection
- // warning (a client might close the connection forcefully if the
- // connection isn't validated).
+ // Entire buffer was written immediately.
//
- this._validated = true;
+ message.sent(this);
- Debug.assert(info.stream.pos === info.stream.size);
+ if(this._acmLastActivity > 0)
+ {
+ this._acmLastActivity = Date.now();
+ }
+ return AsyncStatus.Sent;
+ }
+ message.doAdopt();
+
+ this._writeStream.swap(message.stream);
+ this._sendStreams.push(message);
+ this.scheduleTimeout(SocketOperation.Write, this._endpoint.timeout());
+
+ return AsyncStatus.Queued;
+ },
+ parseMessage: function()
+ {
+ Debug.assert(this._state > StateNotValidated && this._state < StateClosed);
- try
+ var info = new MessageInfo(this._instance);
+
+ this._readStream.swap(info.stream);
+ this._readStream.resize(Protocol.headerSize);
+ this._readStream.pos = 0;
+ this._readHeader = true;
+
+ //
+ // Connection is validated on first message. This is only used by
+ // setState() to check wether or not we can print a connection
+ // warning (a client might close the connection forcefully if the
+ // connection isn't validated).
+ //
+ this._validated = true;
+
+ Debug.assert(info.stream.pos === info.stream.size);
+
+ try
+ {
+ //
+ // We don't need to check magic and version here. This has already
+ // been done by the caller.
+ //
+ info.stream.pos = 8;
+ var messageType = info.stream.readByte();
+ info.compress = info.stream.readByte();
+ if(info.compress === 2)
{
- //
- // We don't need to check magic and version here. This has already
- // been done by the caller.
- //
- info.stream.pos = 8;
- var messageType = info.stream.readByte();
- info.compress = info.stream.readByte();
- if(info.compress === 2)
- {
- var ex = new Ice.FeatureNotSupportedException();
- ex.unsupportedFeature = "Cannot uncompress compressed message";
- throw ex;
- }
- info.stream.pos = Protocol.headerSize;
+ var ex = new Ice.FeatureNotSupportedException();
+ ex.unsupportedFeature = "Cannot uncompress compressed message";
+ throw ex;
+ }
+ info.stream.pos = Protocol.headerSize;
- switch(messageType)
+ switch(messageType)
+ {
+ case Protocol.closeConnectionMsg:
{
- case Protocol.closeConnectionMsg:
+ TraceUtil.traceRecv(info.stream, this._logger, this._traceLevels);
+ if(this._endpoint.datagram())
{
- TraceUtil.traceRecv(info.stream, this._logger, this._traceLevels);
- if(this._endpoint.datagram())
- {
- if(this._warn)
- {
- this._logger.warning("ignoring close connection message for datagram connection:\n" +
- this._desc);
- }
- }
- else
+ if(this._warn)
{
- this.setState(StateClosed, new Ice.CloseConnectionException());
+ this._logger.warning("ignoring close connection message for datagram connection:\n" +
+ this._desc);
}
- break;
}
-
- case Protocol.requestMsg:
+ else
{
- if(this._state === StateClosing)
- {
- TraceUtil.trace("received request during closing\n" +
- "(ignored by server, client will retry)",
- info.stream, this._logger, this._traceLevels);
- }
- else
- {
- TraceUtil.traceRecv(info.stream, this._logger, this._traceLevels);
- info.requestId = info.stream.readInt();
- info.invokeNum = 1;
- info.servantManager = this._servantManager;
- info.adapter = this._adapter;
- ++this._dispatchCount;
- }
- break;
+ this.setState(StateClosed, new Ice.CloseConnectionException());
}
+ break;
+ }
- case Protocol.requestBatchMsg:
+ case Protocol.requestMsg:
+ {
+ if(this._state === StateClosing)
{
- if(this._state === StateClosing)
- {
- TraceUtil.trace("received batch request during closing\n" +
- "(ignored by server, client will retry)",
- info.stream, this._logger, this._traceLevels);
- }
- else
- {
- TraceUtil.traceRecv(info.stream, this._logger, this._traceLevels);
- info.invokeNum = info.stream.readInt();
- if(info.invokeNum < 0)
- {
- info.invokeNum = 0;
- throw new Ice.UnmarshalOutOfBoundsException();
- }
- info.servantManager = this._servantManager;
- info.adapter = this._adapter;
- this._dispatchCount += info.invokeNum;
- }
- break;
+ TraceUtil.trace("received request during closing\n" +
+ "(ignored by server, client will retry)",
+ info.stream, this._logger, this._traceLevels);
}
-
- case Protocol.replyMsg:
+ else
{
TraceUtil.traceRecv(info.stream, this._logger, this._traceLevels);
info.requestId = info.stream.readInt();
- info.outAsync = this._asyncRequests.get(info.requestId);
- if(info.outAsync)
- {
- this._asyncRequests.delete(info.requestId);
- ++this._dispatchCount;
- }
- else
- {
- info = null;
- }
- this.checkClose();
- break;
+ info.invokeNum = 1;
+ info.servantManager = this._servantManager;
+ info.adapter = this._adapter;
+ ++this._dispatchCount;
}
+ break;
+ }
- case Protocol.validateConnectionMsg:
+ case Protocol.requestBatchMsg:
+ {
+ if(this._state === StateClosing)
+ {
+ TraceUtil.trace("received batch request during closing\n" +
+ "(ignored by server, client will retry)",
+ info.stream, this._logger, this._traceLevels);
+ }
+ else
{
TraceUtil.traceRecv(info.stream, this._logger, this._traceLevels);
- if(this._callback !== null)
+ info.invokeNum = info.stream.readInt();
+ if(info.invokeNum < 0)
{
- info.heartbeatCallback = this._callback;
- ++this._dispatchCount;
+ info.invokeNum = 0;
+ throw new Ice.UnmarshalOutOfBoundsException();
}
- break;
- }
-
- default:
- {
- TraceUtil.trace("received unknown message\n(invalid, closing connection)",
- info.stream, this._logger, this._traceLevels);
- throw new Ice.UnknownMessageException();
+ info.servantManager = this._servantManager;
+ info.adapter = this._adapter;
+ this._dispatchCount += info.invokeNum;
}
+ break;
}
- }
- catch(ex)
- {
- if(ex instanceof Ice.LocalException)
+
+ case Protocol.replyMsg:
{
- if(this._endpoint.datagram())
+ TraceUtil.traceRecv(info.stream, this._logger, this._traceLevels);
+ info.requestId = info.stream.readInt();
+ info.outAsync = this._asyncRequests.get(info.requestId);
+ if(info.outAsync)
{
- if(this._warn)
- {
- this._logger.warning("datagram connection exception:\n" + ex + '\n' + this._desc);
- }
+ this._asyncRequests.delete(info.requestId);
+ ++this._dispatchCount;
}
else
{
- this.setState(StateClosed, ex);
+ info = null;
}
+ this.checkClose();
+ break;
}
- else
+
+ case Protocol.validateConnectionMsg:
{
- throw ex;
+ TraceUtil.traceRecv(info.stream, this._logger, this._traceLevels);
+ if(this._callback !== null)
+ {
+ info.heartbeatCallback = this._callback;
+ ++this._dispatchCount;
+ }
+ break;
}
- }
- return info;
- },
- invokeAll: function(stream, invokeNum, requestId, compress, servantManager, adapter)
- {
- var inc = null;
- try
- {
- while(invokeNum > 0)
+ default:
{
- //
- // Prepare the invocation.
- //
- var response = !this._endpoint.datagram() && requestId !== 0;
- inc = new IncomingAsync(this._instance, this, adapter, response, compress, requestId);
-
- //
- // Dispatch the invocation.
- //
- inc.invoke(servantManager, stream);
-
- --invokeNum;
- inc = null;
+ TraceUtil.trace("received unknown message\n(invalid, closing connection)",
+ info.stream, this._logger, this._traceLevels);
+ throw new Ice.UnknownMessageException();
}
-
- stream.clear();
}
- catch(ex)
+ }
+ catch(ex)
+ {
+ if(ex instanceof Ice.LocalException)
{
- if(ex instanceof Ice.LocalException)
+ if(this._endpoint.datagram())
{
- this.invokeException(ex, invokeNum);
+ if(this._warn)
+ {
+ this._logger.warning("datagram connection exception:\n" + ex + '\n' + this._desc);
+ }
}
else
{
- throw ex;
+ this.setState(StateClosed, ex);
}
}
- },
- scheduleTimeout: function(op, timeout)
- {
- if(timeout < 0)
+ else
{
- return;
+ throw ex;
}
+ }
- var self = this;
- if((op & SocketOperation.Read) !== 0)
- {
- this._readTimeoutId = this._timer.schedule(function() { self.timedOut(); }, timeout);
- this._readTimeoutScheduled = true;
- }
- if((op & (SocketOperation.Write | SocketOperation.Connect)) !== 0)
- {
- this._writeTimeoutId = this._timer.schedule(function() { self.timedOut(); }, timeout);
- this._writeTimeoutScheduled = true;
- }
- },
- unscheduleTimeout: function(op)
+ return info;
+ },
+ invokeAll: function(stream, invokeNum, requestId, compress, servantManager, adapter)
+ {
+ var inc = null;
+ try
{
- if((op & SocketOperation.Read) !== 0 && this._readTimeoutScheduled)
- {
- this._timer.cancel(this._readTimeoutId);
- this._readTimeoutScheduled = false;
- }
- if((op & (SocketOperation.Write | SocketOperation.Connect)) !== 0 && this._writeTimeoutScheduled)
+ while(invokeNum > 0)
{
- this._timer.cancel(this._writeTimeoutId);
- this._writeTimeoutScheduled = false;
+ //
+ // Prepare the invocation.
+ //
+ var response = !this._endpoint.datagram() && requestId !== 0;
+ inc = new IncomingAsync(this._instance, this, adapter, response, compress, requestId);
+
+ //
+ // Dispatch the invocation.
+ //
+ inc.invoke(servantManager, stream);
+
+ --invokeNum;
+ inc = null;
}
- },
- connectTimeout: function()
+
+ stream.clear();
+ }
+ catch(ex)
{
- var defaultsAndOverrides = this._instance.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideConnectTimeout)
+ if(ex instanceof Ice.LocalException)
{
- return defaultsAndOverrides.overrideConnectTimeoutValue;
+ this.invokeException(ex, invokeNum);
}
else
{
- return this._endpoint.timeout();
+ throw ex;
}
- },
- closeTimeout: function()
+ }
+ },
+ scheduleTimeout: function(op, timeout)
+ {
+ if(timeout < 0)
{
- var defaultsAndOverrides = this._instance.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideCloseTimeout)
- {
- return defaultsAndOverrides.overrideCloseTimeoutValue;
- }
- else
- {
- return this._endpoint.timeout();
- }
- },
- warning: function(msg, ex)
+ return;
+ }
+
+ var self = this;
+ if((op & SocketOperation.Read) !== 0)
+ {
+ this._readTimeoutId = this._timer.schedule(function() { self.timedOut(); }, timeout);
+ this._readTimeoutScheduled = true;
+ }
+ if((op & (SocketOperation.Write | SocketOperation.Connect)) !== 0)
{
- this._logger.warning(msg + ":\n" + this._desc + "\n" + ExUtil.toString(ex));
- },
- checkState: function()
+ this._writeTimeoutId = this._timer.schedule(function() { self.timedOut(); }, timeout);
+ this._writeTimeoutScheduled = true;
+ }
+ },
+ unscheduleTimeout: function(op)
+ {
+ if((op & SocketOperation.Read) !== 0 && this._readTimeoutScheduled)
{
- if(this._state < StateHolding || this._dispatchCount > 0)
- {
- return;
- }
+ this._timer.cancel(this._readTimeoutId);
+ this._readTimeoutScheduled = false;
+ }
+ if((op & (SocketOperation.Write | SocketOperation.Connect)) !== 0 && this._writeTimeoutScheduled)
+ {
+ this._timer.cancel(this._writeTimeoutId);
+ this._writeTimeoutScheduled = false;
+ }
+ },
+ connectTimeout: function()
+ {
+ var defaultsAndOverrides = this._instance.defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideConnectTimeout)
+ {
+ return defaultsAndOverrides.overrideConnectTimeoutValue;
+ }
+ else
+ {
+ return this._endpoint.timeout();
+ }
+ },
+ closeTimeout: function()
+ {
+ var defaultsAndOverrides = this._instance.defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideCloseTimeout)
+ {
+ return defaultsAndOverrides.overrideCloseTimeoutValue;
+ }
+ else
+ {
+ return this._endpoint.timeout();
+ }
+ },
+ warning: function(msg, ex)
+ {
+ this._logger.warning(msg + ":\n" + this._desc + "\n" + ExUtil.toString(ex));
+ },
+ checkState: function()
+ {
+ if(this._state < StateHolding || this._dispatchCount > 0)
+ {
+ return;
+ }
- var i;
- if(this._holdPromises.length > 0)
+ var i;
+ if(this._holdPromises.length > 0)
+ {
+ for(i = 0; i < this._holdPromises.length; ++i)
{
- for(i = 0; i < this._holdPromises.length; ++i)
- {
- this._holdPromises[i].succeed();
- }
- this._holdPromises = [];
+ this._holdPromises[i].succeed();
}
+ this._holdPromises = [];
+ }
+ //
+ // We aren't finished until the state is finished and all
+ // outstanding requests are completed. Otherwise we couldn't
+ // guarantee that there are no outstanding calls when deactivate()
+ // is called on the servant locators.
+ //
+ if(this._state === StateFinished && this._finishedPromises.length > 0)
+ {
//
- // We aren't finished until the state is finished and all
- // outstanding requests are completed. Otherwise we couldn't
- // guarantee that there are no outstanding calls when deactivate()
- // is called on the servant locators.
+ // Clear the OA. See bug 1673 for the details of why this is necessary.
//
- if(this._state === StateFinished && this._finishedPromises.length > 0)
- {
- //
- // Clear the OA. See bug 1673 for the details of why this is necessary.
- //
- this._adapter = null;
+ this._adapter = null;
- for(i = 0; i < this._finishedPromises.length; ++i)
- {
- this._finishedPromises[i].succeed();
- }
- this._finishedPromises = [];
- }
- },
- reap: function()
- {
- if(this._monitor !== null)
+ for(i = 0; i < this._finishedPromises.length; ++i)
{
- this._monitor.reap(this);
+ this._finishedPromises[i].succeed();
}
+ this._finishedPromises = [];
}
- });
-
- // DestructionReason.
- ConnectionI.ObjectAdapterDeactivated = 0;
- ConnectionI.CommunicatorDestroyed = 1;
-
- Ice.ConnectionI = ConnectionI;
- global.Ice = Ice;
-
- var OutgoingMessage = Class({
- __init__: function()
- {
- this.stream = null;
- this.outAsync = null;
- this.compress = false;
- this.requestId = 0;
- this.prepared = false;
- this.isSent = false;
- },
- timedOut: function()
- {
- Debug.assert(this.outAsync !== null);
- this.outAsync = null;
- return this.isSent;
- },
- doAdopt: function()
- {
- if(this.adopt)
- {
- var stream = new BasicStream(this.stream.instance, Protocol.currentProtocolEncoding);
- stream.swap(this.stream);
- this.stream = stream;
- this.adopt = false;
- }
- },
- sent: function(connection)
+ },
+ reap: function()
+ {
+ if(this._monitor !== null)
{
- this.isSent = true; // The message is sent.
+ this._monitor.reap(this);
+ }
+ }
+});
- if(this.outAsync !== null)
- {
- this.outAsync.__sent();
- }
- },
- finished: function(ex)
+// DestructionReason.
+ConnectionI.ObjectAdapterDeactivated = 0;
+ConnectionI.CommunicatorDestroyed = 1;
+
+Ice.ConnectionI = ConnectionI;
+
+var OutgoingMessage = Class({
+ __init__: function()
+ {
+ this.stream = null;
+ this.outAsync = null;
+ this.compress = false;
+ this.requestId = 0;
+ this.prepared = false;
+ this.isSent = false;
+ },
+ timedOut: function()
+ {
+ Debug.assert(this.outAsync !== null);
+ this.outAsync = null;
+ return this.isSent;
+ },
+ doAdopt: function()
+ {
+ if(this.adopt)
{
- if(this.outAsync !== null)
- {
- this.outAsync.__finishedEx(ex, this.isSent);
- }
+ var stream = new BasicStream(this.stream.instance, Protocol.currentProtocolEncoding);
+ stream.swap(this.stream);
+ this.stream = stream;
+ this.adopt = false;
}
- });
-
- OutgoingMessage.createForStream = function(stream, compress, adopt)
+ },
+ sent: function(connection)
{
- var m = new OutgoingMessage();
- m.stream = stream;
- m.compress = compress;
- m.adopt = adopt;
- m.isSent = false;
- m.requestId = 0;
- m.outAsync = null;
- return m;
- };
-
- OutgoingMessage.create = function(out, stream, compress, requestId)
+ this.isSent = true; // The message is sent.
+
+ if(this.outAsync !== null)
+ {
+ this.outAsync.__sent();
+ }
+ },
+ finished: function(ex)
{
- var m = new OutgoingMessage();
- m.stream = stream;
- m.compress = compress;
- m.outAsync = out;
- m.requestId = requestId;
- m.isSent = false;
- m.adopt = false;
- return m;
- };
-}(typeof (global) === "undefined" ? window : global));
+ if(this.outAsync !== null)
+ {
+ this.outAsync.__finishedEx(ex, this.isSent);
+ }
+ }
+});
+
+OutgoingMessage.createForStream = function(stream, compress, adopt)
+{
+ var m = new OutgoingMessage();
+ m.stream = stream;
+ m.compress = compress;
+ m.adopt = adopt;
+ m.isSent = false;
+ m.requestId = 0;
+ m.outAsync = null;
+ return m;
+};
+
+OutgoingMessage.create = function(out, stream, compress, requestId)
+{
+ var m = new OutgoingMessage();
+ m.stream = stream;
+ m.compress = compress;
+ m.outAsync = out;
+ m.requestId = requestId;
+ m.isSent = false;
+ m.adopt = false;
+ return m;
+};
+
+module.exports.Ice = Ice;