diff options
author | Matthew Newhook <matthew@zeroc.com> | 2014-10-20 11:40:05 -0230 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2014-10-20 11:40:05 -0230 |
commit | b51469b41167fb86ae2059a15cf0475c53fdda7b (patch) | |
tree | fc85d6ca2efd89c67e1e4e7438f437c3e08313f4 /java/src/IceInternal/ConnectRequestHandler.java | |
parent | Fixed (ICE-5695) - IceSSL: misleading exception (diff) | |
download | ice-b51469b41167fb86ae2059a15cf0475c53fdda7b.tar.bz2 ice-b51469b41167fb86ae2059a15cf0475c53fdda7b.tar.xz ice-b51469b41167fb86ae2059a15cf0475c53fdda7b.zip |
Down with ant. From the gradle to the grave.
Diffstat (limited to 'java/src/IceInternal/ConnectRequestHandler.java')
-rw-r--r-- | java/src/IceInternal/ConnectRequestHandler.java | 583 |
1 files changed, 0 insertions, 583 deletions
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java deleted file mode 100644 index a5f92ac090c..00000000000 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ /dev/null @@ -1,583 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package IceInternal; - -import Ice.ConnectionI; - -public class ConnectRequestHandler - implements RequestHandler, Reference.GetConnectionCallback, RouterInfo.AddProxyCallback -{ - static private class Request - { - Request(BasicStream os) - { - this.os = new BasicStream(os.instance(), Protocol.currentProtocolEncoding); - this.os.swap(os); - } - - Request(OutgoingAsyncBase out) - { - this.outAsync = out; - } - - OutgoingAsyncBase outAsync = null; - BasicStream os = null; - } - - @Override - public RequestHandler - connect() - { - Ice.ObjectPrxHelperBase proxy = _proxy; - try - { - _reference.getConnection(this); - - synchronized(this) - { - if(!initialized()) - { - // The proxy request handler will be updated when the connection is set. - _updateRequestHandler = true; - return this; - } - } - } - catch(Ice.LocalException ex) - { - proxy.__setRequestHandler(this, null); - throw ex; - } - - assert(_connection != null); - - RequestHandler handler = new ConnectionRequestHandler(_reference, _connection, _compress); - proxy.__setRequestHandler(this, handler); - return handler; - } - - @Override - public RequestHandler - update(RequestHandler previousHandler, RequestHandler newHandler) - { - return previousHandler == this ? newHandler : this; - } - - @Override - public void - prepareBatchRequest(BasicStream os) - throws RetryException - { - synchronized(this) - { - waitBatchRequestInProgress(); - try - { - if(!initialized()) - { - _batchRequestInProgress = true; - _batchStream.swap(os); - return; - } - } - catch(Ice.LocalException ex) - { - throw new RetryException(ex); - } - } - - _connection.prepareBatchRequest(os); - } - - @Override - public void - finishBatchRequest(BasicStream os) - { - synchronized(this) - { - if(!initialized()) // This can't throw until _batchRequestInProgress = false - { - assert(_batchRequestInProgress); - _batchRequestInProgress = false; - notifyAll(); - - _batchStream.swap(os); - - if(!_batchAutoFlush && - _batchStream.size() + _batchRequestsSize > _reference.getInstance().messageSizeMax()) - { - Ex.throwMemoryLimitException(_batchStream.size() + _batchRequestsSize, - _reference.getInstance().messageSizeMax()); - } - - _requests.add(new Request(_batchStream)); - return; - } - } - _connection.finishBatchRequest(os, _compress); - } - - @Override - public void - abortBatchRequest() - { - synchronized(this) - { - if(!initialized()) // This can't throw until _batchRequestInProgress = false - { - assert(_batchRequestInProgress); - _batchRequestInProgress = false; - notifyAll(); - - BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding, - _batchAutoFlush); - _batchStream.swap(dummy); - _batchRequestsSize = Protocol.requestBatchHdr.length; - - return; - } - } - _connection.abortBatchRequest(); - } - - @Override - public int - sendAsyncRequest(OutgoingAsyncBase out) - throws RetryException - { - synchronized(this) - { - if(!_initialized) - { - out.cancelable(this); // This will throw if the request is canceled - } - - try - { - if(!initialized()) - { - _requests.add(new Request(out)); - return AsyncStatus.Queued; - } - } - catch(Ice.LocalException ex) - { - throw new RetryException(ex); - } - } - return out.send(_connection, _compress, _response); - } - - @Override - public void - asyncRequestCanceled(OutgoingAsyncBase outAsync, Ice.LocalException ex) - { - synchronized(this) - { - if(_exception != null) - { - return; // The request has been notified of a failure already. - } - - if(!initialized()) - { - java.util.Iterator<Request> it = _requests.iterator(); - while(it.hasNext()) - { - Request request = it.next(); - if(request.outAsync == outAsync) - { - it.remove(); - if(outAsync.completed(ex)) - { - outAsync.invokeCompletedAsync(); - } - return; - } - } - assert(false); // The request has to be queued if it timed out and we're not initialized yet. - } - } - _connection.asyncRequestCanceled(outAsync, ex); - } - - @Override - public Reference - getReference() - { - return _reference; - } - - @Override - synchronized public ConnectionI - getConnection() - { - if(_exception != null) - { - throw (Ice.LocalException)_exception.fillInStackTrace(); - } - else - { - return _connection; - } - } - - @Override - synchronized public - ConnectionI waitForConnection() - throws InterruptedException, RetryException - { - if(_exception != null) - { - throw new RetryException(_exception); - } - - // - // Wait for the connection establishment to complete or fail. - // - while(!_initialized && _exception == null) - { - wait(); - } - return getConnection(); - } - - // - // Implementation of Reference.GetConnectionCallback - // - - @Override - public void - setConnection(Ice.ConnectionI connection, boolean compress) - { - synchronized(this) - { - assert(_exception == null && _connection == null); - _connection = connection; - _compress = compress; - } - - // - // If this proxy is for a non-local object, and we are using a router, then - // add this proxy to the router info object. - // - RouterInfo ri = _reference.getRouterInfo(); - if(ri != null && !ri.addProxy(_proxy, this)) - { - return; // The request handler will be initialized once addProxy returns. - } - - // - // We can now send the queued requests. - // - flushRequests(); - } - - @Override - public synchronized void - setException(final Ice.LocalException ex) - { - assert(!_initialized && _exception == null); - _exception = ex; - _proxy = null; // Break cyclic reference count. - - // - // If some requests were queued, we notify them of the failure. This is done from a thread - // from the client thread pool since this will result in ice_exception callbacks to be - // called. - // - if(!_requests.isEmpty()) - { - _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection) - { - @Override - public void - run() - { - flushRequestsWithException(); - }; - }); - } - - notifyAll(); - } - - // - // Implementation of RouterInfo.AddProxyCallback - // - @Override - public void - addedProxy() - { - // - // The proxy was added to the router info, we're now ready to send the - // queued requests. - // - flushRequests(); - } - - public - ConnectRequestHandler(Reference ref, Ice.ObjectPrx proxy) - { - _reference = ref; - _response = _reference.getMode() == Reference.ModeTwoway; - _proxy = (Ice.ObjectPrxHelperBase)proxy; - _batchAutoFlush = ref.getInstance().initializationData().properties.getPropertyAsIntWithDefault( - "Ice.BatchAutoFlush", 1) > 0 ? true : false; - _initialized = false; - _flushing = false; - _batchRequestInProgress = false; - _batchRequestsSize = Protocol.requestBatchHdr.length; - _batchStream = new BasicStream(ref.getInstance(), Protocol.currentProtocolEncoding, _batchAutoFlush); - _updateRequestHandler = false; - } - - private boolean - initialized() - { - // Must be called with the mutex locked. - - if(_initialized) - { - assert(_connection != null); - return true; - } - else - { - // - // This is similar to a mutex lock in that the flag is - // only true for a short period of time. - // - boolean interrupted = false; - while(_flushing && _exception == null) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - interrupted = true; - } - } - // - // Restore the interrupted status. - // - if(interrupted) - { - Thread.currentThread().interrupt(); - } - - if(_exception != null) - { - throw (Ice.LocalException)_exception.fillInStackTrace(); - } - else - { - return _initialized; - } - } - } - - private void - flushRequests() - { - synchronized(this) - { - assert(_connection != null && !_initialized); - waitBatchRequestInProgress(); - - // - // We set the _flushing flag to true to prevent any additional queuing. Callers - // might block for a little while as the queued requests are being sent but this - // shouldn't be an issue as the request sends are non-blocking. - // - _flushing = true; - } - - final java.util.List<OutgoingAsyncBase> sentCallbacks = new java.util.ArrayList<OutgoingAsyncBase>(); - try - { - java.util.Iterator<Request> p = _requests.iterator(); // _requests is immutable when _flushing = true - while(p.hasNext()) - { - Request request = p.next(); - if(request.outAsync != null) - { - if((request.outAsync.send(_connection, _compress, _response) & - AsyncStatus.InvokeSentCallback) > 0) - { - sentCallbacks.add(request.outAsync); - } - } - else - { - BasicStream os = new BasicStream(request.os.instance(), Protocol.currentProtocolEncoding); - _connection.prepareBatchRequest(os); - try - { - request.os.pos(0); - os.writeBlob(request.os.readBlob(request.os.size())); - } - catch(Ice.LocalException ex) - { - _connection.abortBatchRequest(); - throw ex; - } - _connection.finishBatchRequest(os, _compress); - } - p.remove(); - } - } - catch(final RetryException ex) - { - // - // If the connection dies shortly after connection - // establishment, we don't systematically retry on - // RetryException. We handle the exception like it - // was an exception that occured while sending the - // request. - // - synchronized(this) - { - assert(_exception == null && !_requests.isEmpty()); - _exception = ex.get(); - _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection) - { - @Override - public void run() - { - flushRequestsWithException(); - }; - }); - } - } - catch(final Ice.LocalException ex) - { - synchronized(this) - { - assert(_exception == null && !_requests.isEmpty()); - _exception = ex; - _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection) - { - @Override - public void run() - { - flushRequestsWithException(); - }; - }); - } - } - - if(!sentCallbacks.isEmpty()) - { - _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection) - { - @Override - public void run() - { - for(OutgoingAsyncBase callback : sentCallbacks) - { - callback.invokeSent(); - } - }; - }); - } - - // - // We've finished sending the queued requests and the request handler now send - // the requests over the connection directly. It's time to substitute the - // request handler of the proxy with the more efficient connection request - // handler which does not have any synchronization. This also breaks the cyclic - // reference count with the proxy. - // - // NOTE: _updateRequestHandler is immutable once _flushing = true - // - if(_updateRequestHandler && _exception == null) - { - _proxy.__setRequestHandler(this, new ConnectionRequestHandler(_reference, _connection, _compress)); - } - - synchronized(this) - { - assert(!_initialized); - if(_exception == null) - { - _initialized = true; - _flushing = false; - } - _proxy = null; // Break cyclic reference count. - notifyAll(); - } - } - - private void - waitBatchRequestInProgress() - { - // - // This is similar to a mutex lock in that the stream is - // only "locked" while the request is in progress. - // - boolean interrupted = false; - while(_batchRequestInProgress) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - interrupted = true; - } - } - // - // Restore the interrupted flag if we were interrupted. - // - if(interrupted) - { - Thread.currentThread().interrupt(); - } - } - - private void - flushRequestsWithException() - { - for(Request request : _requests) - { - if(request.outAsync != null) - { - if(request.outAsync.completed(_exception)) - { - request.outAsync.invokeCompleted(); - } - } - } - _requests.clear(); - } - - private final Reference _reference; - private boolean _response; - - private Ice.ObjectPrxHelperBase _proxy; - - private final boolean _batchAutoFlush; - - private Ice.ConnectionI _connection; - private boolean _compress; - private Ice.LocalException _exception; - private boolean _initialized; - private boolean _flushing; - - private java.util.List<Request> _requests = new java.util.LinkedList<Request>(); - private boolean _batchRequestInProgress; - private int _batchRequestsSize; - private BasicStream _batchStream; - private boolean _updateRequestHandler; -} |