summaryrefslogtreecommitdiff
path: root/js/src/Ice/ConnectRequestHandler.js
diff options
context:
space:
mode:
Diffstat (limited to 'js/src/Ice/ConnectRequestHandler.js')
-rw-r--r--js/src/Ice/ConnectRequestHandler.js429
1 files changed, 429 insertions, 0 deletions
diff --git a/js/src/Ice/ConnectRequestHandler.js b/js/src/Ice/ConnectRequestHandler.js
new file mode 100644
index 00000000000..c6053f3945a
--- /dev/null
+++ b/js/src/Ice/ConnectRequestHandler.js
@@ -0,0 +1,429 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+(function(global){
+ require("Ice/Class");
+ require("Ice/AsyncResult");
+ require("Ice/AsyncStatus");
+ require("Ice/BasicStream");
+ require("Ice/BatchOutgoingAsync");
+ require("Ice/ConnectionRequestHandler");
+ require("Ice/Debug");
+ require("Ice/ExUtil");
+ require("Ice/LocalExceptionWrapper");
+ require("Ice/OutgoingAsync");
+ require("Ice/Protocol");
+ require("Ice/ReferenceMode");
+ require("Ice/Exception");
+ require("Ice/Promise");
+
+ var Ice = global.Ice || {};
+
+ var AsyncResult = Ice.AsyncResult;
+ var AsyncStatus = Ice.AsyncStatus;
+ var BasicStream = Ice.BasicStream;
+ var BatchOutgoingAsync = Ice.BatchOutgoingAsync;
+ var ConnectionRequestHandler = Ice.ConnectionRequestHandler;
+ var Debug = Ice.Debug;
+ var ExUtil = Ice.ExUtil;
+ var LocalExceptionWrapper = Ice.LocalExceptionWrapper;
+ var OutgoingAsync = Ice.OutgoingAsync;
+ var Protocol = Ice.Protocol;
+ var ReferenceMode = Ice.ReferenceMode;
+ var LocalException = Ice.LocalException;
+ var Promise = Ice.Promise;
+
+ var ConnectRequestHandler = Ice.Class({
+ __init__: function(ref, proxy)
+ {
+ this._reference = ref;
+ this._response = ref.getMode() === ReferenceMode.ModeTwoway;
+ this._proxy = proxy;
+ this._batchAutoFlush = ref.getInstance().initializationData().properties.getPropertyAsIntWithDefault(
+ "Ice.BatchAutoFlush", 1) > 0 ? true : false;
+ this._initialized = false;
+ this._flushing = false;
+ this._batchRequestInProgress = false;
+ this._batchRequestsSize = Protocol.requestBatchHdr.length;
+ this._batchStream =
+ new BasicStream(ref.getInstance(), Protocol.currentProtocolEncoding, this._batchAutoFlush);
+ this._updateRequestHandler = false;
+
+ this._connection = null;
+ this._compress = false;
+ this._exception = null;
+ this._requests = [];
+ this._updateRequestHandler = false;
+ this._pendingPromises = [];
+ },
+ connect: function()
+ {
+ var self = this;
+ this._reference.getConnection().then(
+ function(connection, compress)
+ {
+ self.setConnection(connection, compress);
+ }).exception(
+ function(ex)
+ {
+ self.setException(ex);
+ });
+
+ if(this.initialized())
+ {
+ Debug.assert(this._connection !== null);
+ return new ConnectionRequestHandler(this._reference, this._connection, this._compress);
+ }
+ else
+ {
+ // The proxy request handler will be updated when the connection is set.
+ this._updateRequestHandler = true;
+ return this;
+ }
+ },
+ prepareBatchRequest: function(os)
+ {
+ if(!this.initialized())
+ {
+ this._batchRequestInProgress = true;
+ this._batchStream.swap(os);
+ return;
+ }
+
+ this._connection.prepareBatchRequest(os);
+ },
+ finishBatchRequest: function(os)
+ {
+ if(!this.initialized())
+ {
+ Debug.assert(this._batchRequestInProgress);
+ this._batchRequestInProgress = false;
+
+ this._batchStream.swap(os);
+
+ if(!this._batchAutoFlush &&
+ this._batchStream.size + this._batchRequestsSize > this._reference.getInstance().messageSizeMax())
+ {
+ ExUtil.throwMemoryLimitException(this._batchStream.size + this._batchRequestsSize,
+ this._reference.getInstance().messageSizeMax());
+ }
+
+ this._requests.push(new Request(this._batchStream));
+ return;
+ }
+ this._connection.finishBatchRequest(os, this._compress);
+ },
+ abortBatchRequest: function()
+ {
+ if(!this.initialized())
+ {
+ Debug.assert(this._batchRequestInProgress);
+ this._batchRequestInProgress = false;
+
+ var dummy = new BasicStream(this._reference.getInstance(), Protocol.currentProtocolEncoding,
+ this._batchAutoFlush);
+ this._batchStream.swap(dummy);
+ this._batchRequestsSize = Protocol.requestBatchHdr.length;
+
+ return;
+ }
+ this._connection.abortBatchRequest();
+ },
+ sendAsyncRequest: function(out)
+ {
+ if(!this.initialized())
+ {
+ this._requests.push(new Request(out));
+ return AsyncStatus.Queued;
+ }
+ return this._connection.sendAsyncRequest(out, this._compress, this._response);
+ },
+ flushAsyncBatchRequests: function(out)
+ {
+ if(!this.initialized())
+ {
+ this._requests.push(new Request(out));
+ return AsyncStatus.Queued;
+ }
+ return this._connection.flushAsyncBatchRequests(out);
+ },
+ getReference: function()
+ {
+ return this._reference;
+ },
+ getConnection: function()
+ {
+ if(this._exception !== null)
+ {
+ throw this._exception;
+ }
+ else
+ {
+ return this._connection;
+ }
+ },
+ onConnection: function(r)
+ {
+ //
+ // Called by ObjectPrx.ice_getConnection
+ //
+
+ if(this._exception !== null)
+ {
+ r.__exception(this._exception);
+ }
+ else if(this._connection !== null)
+ {
+ Debug.assert(this._initialized);
+ r.succeed(this._connection, r);
+ }
+ else
+ {
+ this._pendingPromises.push(r);
+ }
+ },
+ //
+ // Implementation of Reference_GetConnectionCallback
+ //
+
+ setConnection: function(connection, compress)
+ {
+ Debug.assert(this._exception === null && this._connection === null);
+ Debug.assert(this._updateRequestHandler || this._requests.length === 0);
+
+ this._connection = connection;
+ this._compress = compress;
+
+ //
+ // If this proxy is for a non-local object, and we are using a router, then
+ // add this proxy to the router info object.
+ //
+ var ri = this._reference.getRouterInfo();
+ if(ri !== null)
+ {
+ var self = this;
+ var promise = ri.addProxy(this._proxy).then(
+ function()
+ {
+ //
+ // The proxy was added to the router info, we're now ready to send the
+ // queued requests.
+ //
+ self.flushRequests();
+ }).exception(
+ function(ex)
+ {
+ self.setException(ex);
+ });
+
+ if(!promise.completed())
+ {
+ return; // The request handler will be initialized once addProxy completes.
+ }
+ }
+
+ //
+ // We can now send the queued requests.
+ //
+ this.flushRequests();
+ },
+ setException: function(ex)
+ {
+ Debug.assert(!this._initialized && this._exception === null);
+ Debug.assert(this._updateRequestHandler || this._requests.length === 0);
+
+ this._exception = ex;
+ this._proxy = null; // Break cyclic reference count.
+
+ //
+ // If some requests were queued, we notify them of the failure.
+ //
+ if(this._requests.length > 0)
+ {
+ this.flushRequestsWithException(ex);
+ }
+
+ for(var i = 0; i < this._pendingPromises.length; ++i)
+ {
+ this._pendingPromises[i].fail(ex);
+ }
+ this._pendingPromises = [];
+ },
+ initialized: function()
+ {
+ if(this._initialized)
+ {
+ Debug.assert(this._connection !== null);
+ return true;
+ }
+ else
+ {
+ if(this._exception !== null)
+ {
+ throw this._exception;
+ }
+ else
+ {
+ return this._initialized;
+ }
+ }
+ },
+ flushRequests: function()
+ {
+ Debug.assert(this._connection !== null && !this._initialized);
+
+ //
+ // We set the _flushing flag to true to prevent any additional queuing. Callers
+ // might block for a little while as the queued requests are being sent but this
+ // shouldn't be an issue as the request sends are non-blocking.
+ //
+ this._flushing = true;
+
+ try
+ {
+ while(this._requests.length > 0)
+ {
+ var request = this._requests[0];
+ if(request.out !== null)
+ {
+ this._connection.sendAsyncRequest(request.out, this._compress, this._response);
+ }
+ else if(request.batchOut !== null)
+ {
+ this._connection.flushAsyncBatchRequests(request.batchOut);
+ }
+ else
+ {
+ var os = new BasicStream(request.os.instance, Protocol.currentProtocolEncoding);
+ this._connection.prepareBatchRequest(os);
+ try
+ {
+ request.os.pos = 0;
+ os.writeBlob(request.os.readBlob(request.os.size));
+ }
+ catch(ex)
+ {
+ this._connection.abortBatchRequest();
+ throw ex;
+ }
+ this._connection.finishBatchRequest(os, this._compress);
+ }
+ this._requests.shift();
+ }
+ }
+ catch(ex)
+ {
+ if(ex instanceof LocalExceptionWrapper)
+ {
+ Debug.assert(this._exception === null && this._requests.length > 0);
+ this._exception = ex.inner;
+ this.flushRequestsWithExceptionWrapper(ex);
+ }
+ else if(ex instanceof LocalException)
+ {
+ Debug.assert(this._exception === null && this._requests.length > 0);
+ this._exception = ex;
+ this.flushRequestsWithException(ex);
+ }
+ else
+ {
+ throw ex;
+ }
+ }
+
+ //
+ // We've finished sending the queued requests and the request handler now send
+ // the requests over the connection directly. It's time to substitute the
+ // request handler of the proxy with the more efficient connection request
+ // handler which does not have any synchronization. This also breaks the cyclic
+ // reference count with the proxy.
+ //
+ // NOTE: _updateRequestHandler is immutable once _flushing = true
+ //
+ if(this._updateRequestHandler && this._exception === null)
+ {
+ this._proxy.__setRequestHandler(
+ new ConnectionRequestHandler(this._reference, this._connection, this._compress));
+ }
+
+ Debug.assert(!this._initialized);
+ if(this._exception === null)
+ {
+ this._initialized = true;
+ this._flushing = false;
+ }
+ this._proxy = null; // Break cyclic reference count.
+
+ var p;
+ for(var i = 0; i < this._pendingPromises.length; ++i)
+ {
+ p = this._pendingPromises[i];
+ p.succeed(this._connection, p);
+ }
+ this._pendingPromises = [];
+ },
+ flushRequestsWithException: function(ex)
+ {
+ for(var i = 0; i < this._requests.length; ++i)
+ {
+ var request = this._requests[i];
+ if(request.out !== null)
+ {
+ request.out.__finishedEx(ex, false);
+ }
+ else if(request.batchOut !== null)
+ {
+ request.batchOut.__finishedEx(ex, false);
+ }
+ }
+ this._requests = [];
+ },
+ flushRequestsWithExceptionWrapper: function(ex)
+ {
+ for(var i = 0; i < this._requests.length; ++i)
+ {
+ var request = this._requests[i];
+ if(request.out !== null)
+ {
+ request.out.__finishedWrapper(ex);
+ }
+ else if(request.batchOut !== null)
+ {
+ request.batchOut.__finishedEx(ex.inner, false);
+ }
+ }
+ this._requests = [];
+ }
+ });
+
+ Ice.ConnectRequestHandler = ConnectRequestHandler;
+ global.Ice = Ice;
+
+ var Request = function(arg)
+ {
+ this.os = null;
+ this.out = null;
+ this.batchOut = null;
+
+ if(arg instanceof BasicStream)
+ {
+ this.os = new BasicStream(arg.instance, Protocol.currentProtocolEncoding);
+ this.os.swap(arg);
+ }
+ else if(arg instanceof OutgoingAsync)
+ {
+ this.out = arg;
+ }
+ else
+ {
+ Debug.assert(arg instanceof BatchOutgoingAsync);
+ this.batchOut = arg;
+ }
+ };
+}(typeof (global) === "undefined" ? window : global));