diff options
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectRequestHandler.cpp | 541 |
1 files changed, 541 insertions, 0 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp new file mode 100644 index 00000000000..d2c0aa6bc9b --- /dev/null +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -0,0 +1,541 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <Ice/ConnectRequestHandler.h> +#include <Ice/ConnectionRequestHandler.h> +#include <Ice/Instance.h> +#include <Ice/Proxy.h> +#include <Ice/ConnectionI.h> +#include <Ice/RouterInfo.h> +#include <Ice/Outgoing.h> +#include <Ice/OutgoingAsync.h> +#include <Ice/Protocol.h> +#include <Ice/Properties.h> +#include <Ice/ThreadPool.h> + +using namespace std; +using namespace IceInternal; + +namespace +{ + +class FlushRequestsWithException : public DispatchWorkItem +{ +public: + + FlushRequestsWithException(const InstancePtr& instance, + const ConnectRequestHandlerPtr& handler, + const Ice::LocalException& ex) : + DispatchWorkItem(instance), + _handler(handler), + _exception(dynamic_cast<Ice::LocalException*>(ex.ice_clone())) + { + } + + virtual void + run() + { + _handler->flushRequestsWithException(*_exception.get()); + } + +private: + + const ConnectRequestHandlerPtr _handler; + const auto_ptr<Ice::LocalException> _exception; +}; + +class FlushRequestsWithExceptionWrapper : public DispatchWorkItem +{ +public: + + FlushRequestsWithExceptionWrapper(const InstancePtr& instance, + const ConnectRequestHandlerPtr& handler, + const LocalExceptionWrapper& ex) : + DispatchWorkItem(instance), + _handler(handler), + _exception(ex) + { + } + + virtual void + run() + { + _handler->flushRequestsWithException(_exception); + } + +private: + + const ConnectRequestHandlerPtr _handler; + const LocalExceptionWrapper _exception; +}; + +class FlushSentRequests : public DispatchWorkItem +{ +public: + + FlushSentRequests(const InstancePtr& instance, const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) : + DispatchWorkItem(instance), _callbacks(callbacks) + { + } + + virtual void + run() + { + for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = _callbacks.begin(); p != _callbacks.end(); ++p) + { + (*p)->__sent(); + } + } + +private: + + vector<OutgoingAsyncMessageCallbackPtr> _callbacks; +}; + +}; + +ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref, + const Ice::ObjectPrx& proxy, + const Handle< ::IceDelegate::Ice::Object>& delegate) : + RequestHandler(ref), + _proxy(proxy), + _delegate(delegate), + _batchAutoFlush( + ref->getInstance()->initializationData().properties->getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0), + _initialized(false), + _flushing(false), + _batchRequestInProgress(false), + _batchRequestsSize(sizeof(requestBatchHdr)), + _batchStream(ref->getInstance().get(), _batchAutoFlush), + _updateRequestHandler(false) +{ +} + +ConnectRequestHandler::~ConnectRequestHandler() +{ +} + +RequestHandlerPtr +ConnectRequestHandler::connect() +{ + _reference->getConnection(this); + + Lock sync(*this); + if(initialized()) + { + assert(_connection); + return new ConnectionRequestHandler(_reference, _connection, _compress); + } + else + { + _updateRequestHandler = true; // The proxy request handler will be updated when the connection is set. + return this; + } +} + +void +ConnectRequestHandler::prepareBatchRequest(BasicStream* os) +{ + { + Lock sync(*this); + while(_batchRequestInProgress) + { + wait(); + } + + if(!initialized()) + { + _batchRequestInProgress = true; + _batchStream.swap(*os); + return; + } + } + _connection->prepareBatchRequest(os); +} + +void +ConnectRequestHandler::finishBatchRequest(BasicStream* os) +{ + { + Lock sync(*this); + if(!initialized()) + { + assert(_batchRequestInProgress); + _batchRequestInProgress = false; + notifyAll(); + + _batchStream.swap(*os); + + if(!_batchAutoFlush && + _batchStream.b.size() + _batchRequestsSize > _reference->getInstance()->messageSizeMax()) + { + Ex::throwMemoryLimitException(__FILE__, __LINE__, _batchStream.b.size() + _batchRequestsSize, + _reference->getInstance()->messageSizeMax()); + } + + _batchRequestsSize += _batchStream.b.size(); + + Request req; + req.os = new BasicStream(_reference->getInstance().get(), _batchAutoFlush); + req.os->swap(_batchStream); + _requests.push_back(req); + return; + } + } + _connection->finishBatchRequest(os, _compress); +} + +void +ConnectRequestHandler::abortBatchRequest() +{ + { + Lock sync(*this); + if(!initialized()) + { + assert(_batchRequestInProgress); + _batchRequestInProgress = false; + notifyAll(); + + BasicStream dummy(_reference->getInstance().get(), _batchAutoFlush); + _batchStream.swap(dummy); + _batchRequestsSize = sizeof(requestBatchHdr); + + return; + } + } + _connection->abortBatchRequest(); +} + +Ice::ConnectionI* +ConnectRequestHandler::sendRequest(Outgoing* out) +{ + // Must be called first, _compress might not be initialized before this returns. + Ice::ConnectionIPtr connection = getConnection(true); + assert(connection); + if(!connection->sendRequest(out, _compress, _response) || _response) + { + return _connection.get(); // The request has been sent or we're expecting a response. + } + else + { + return 0; // The request hasn't been sent yet. + } +} + +AsyncStatus +ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncPtr& out) +{ + { + Lock sync(*this); + if(!initialized()) + { + Request req; + req.out = out; + _requests.push_back(req); + return AsyncStatusQueued; + } + } + return _connection->sendAsyncRequest(out, _compress, _response); +} + +bool +ConnectRequestHandler::flushBatchRequests(BatchOutgoing* out) +{ + return getConnection(true)->flushBatchRequests(out); +} + +AsyncStatus +ConnectRequestHandler::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& out) +{ + { + Lock sync(*this); + if(!initialized()) + { + Request req; + req.batchOut = out; + _requests.push_back(req); + return AsyncStatusQueued; + } + } + return _connection->flushAsyncBatchRequests(out); +} + +Ice::ConnectionIPtr +ConnectRequestHandler::getConnection(bool waitInit) +{ + if(waitInit) + { + // + // Wait for the connection establishment to complete or fail. + // + Lock sync(*this); + while(!_initialized && !_exception.get()) + { + wait(); + } + } + + if(_exception.get()) + { + _exception->ice_throw(); + return 0; // Keep the compiler happy. + } + else + { + assert(!waitInit || _initialized); + return _connection; + } +} + +void +ConnectRequestHandler::setConnection(const Ice::ConnectionIPtr& connection, bool compress) +{ + { + Lock sync(*this); + assert(!_exception.get() && !_connection); + assert(_updateRequestHandler || _requests.empty()); + + _connection = connection; + _compress = compress; + } + + // + // If this proxy is for a non-local object, and we are using a router, then + // add this proxy to the router info object. + // + RouterInfoPtr ri = _reference->getRouterInfo(); + if(ri && !ri->addProxy(_proxy, this)) + { + return; // The request handler will be initialized once addProxy returns. + } + + // + // We can now send the queued requests. + // + flushRequests(); +} + +void +ConnectRequestHandler::setException(const Ice::LocalException& ex) +{ + Lock sync(*this); + assert(!_initialized && !_exception.get()); + assert(_updateRequestHandler || _requests.empty()); + + _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); + _proxy = 0; // Break cyclic reference count. + _delegate = 0; // Break cyclic reference count. + + // + // If some requests were queued, we notify them of the failure. This is done from a thread + // from the client thread pool since this will result in ice_exception callbacks to be + // called. + // + if(!_requests.empty()) + { + const InstancePtr instance = _reference->getInstance(); + instance->clientThreadPool()->execute(new FlushRequestsWithException(instance, this, ex)); + } + + notifyAll(); +} + +void +ConnectRequestHandler::addedProxy() +{ + // + // The proxy was added to the router info, we're now ready to send the + // queued requests. + // + flushRequests(); +} + +bool +ConnectRequestHandler::initialized() +{ + // Must be called with the mutex locked. + + if(_initialized) + { + assert(_connection); + return true; + } + else + { + while(_flushing && !_exception.get()) + { + wait(); + } + + if(_exception.get()) + { + _exception->ice_throw(); + return false; // Keep the compiler happy. + } + else + { + return _initialized; + } + } +} + +void +ConnectRequestHandler::flushRequests() +{ + { + Lock sync(*this); + assert(_connection && !_initialized); + + while(_batchRequestInProgress) + { + wait(); + } + + // + // We set the _flushing flag to true to prevent any additional queuing. Callers + // might block for a little while as the queued requests are being sent but this + // shouldn't be an issue as the request sends are non-blocking. + // + _flushing = true; + } + + vector<OutgoingAsyncMessageCallbackPtr> sentCallbacks; + try + { + while(!_requests.empty()) // _requests is immutable when _flushing = true + { + Request& req = _requests.front(); + if(req.out) + { + if(_connection->sendAsyncRequest(req.out, _compress, _response) & AsyncStatusInvokeSentCallback) + { + sentCallbacks.push_back(req.out); + } + } + else if(req.batchOut) + { + if(_connection->flushAsyncBatchRequests(req.batchOut) & AsyncStatusInvokeSentCallback) + { + sentCallbacks.push_back(req.batchOut); + } + } + else + { + BasicStream os(req.os->instance()); + _connection->prepareBatchRequest(&os); + try + { + const Ice::Byte* bytes; + req.os->i = req.os->b.begin(); + req.os->readBlob(bytes, req.os->b.size()); + os.writeBlob(bytes, req.os->b.size()); + } + catch(const Ice::LocalException&) + { + _connection->abortBatchRequest(); + throw; + } + _connection->finishBatchRequest(&os, _compress); + delete req.os; + } + _requests.pop_front(); + } + } + catch(const LocalExceptionWrapper& ex) + { + Lock sync(*this); + assert(!_exception.get() && !_requests.empty()); + _exception.reset(dynamic_cast<Ice::LocalException*>(ex.get()->ice_clone())); + const InstancePtr instance = _reference->getInstance(); + instance->clientThreadPool()->execute(new FlushRequestsWithExceptionWrapper(instance, this, ex)); + } + catch(const Ice::LocalException& ex) + { + Lock sync(*this); + assert(!_exception.get() && !_requests.empty()); + _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); + const InstancePtr instance = _reference->getInstance(); + instance->clientThreadPool()->execute(new FlushRequestsWithException(instance, this, ex)); + } + + if(!sentCallbacks.empty()) + { + const InstancePtr instance = _reference->getInstance(); + instance->clientThreadPool()->execute(new FlushSentRequests(instance, sentCallbacks)); + } + + // + // We've finished sending the queued requests and the request handler now send + // the requests over the connection directly. It's time to substitute the + // request handler of the proxy with the more efficient connection request + // handler which does not have any synchronization. This also breaks the cyclic + // reference count with the proxy. + // + if(_updateRequestHandler && !_exception.get()) + { + _proxy->__setRequestHandler(_delegate, new ConnectionRequestHandler(_reference, _connection, _compress)); + } + + { + Lock sync(*this); + assert(!_initialized); + if(!_exception.get()) + { + _initialized = true; + _flushing = false; + } + _proxy = 0; // Break cyclic reference count. + _delegate = 0; // Break cyclic reference count. + notifyAll(); + } +} + +void +ConnectRequestHandler::flushRequestsWithException(const Ice::LocalException& ex) +{ + for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + { + if(p->out) + { + p->out->__finished(ex, false); + } + else if(p->batchOut) + { + p->batchOut->__finished(ex, false); + } + else + { + assert(p->os); + delete p->os; + } + } + _requests.clear(); +} + +void +ConnectRequestHandler::flushRequestsWithException(const LocalExceptionWrapper& ex) +{ + for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + { + if(p->out) + { + p->out->__finished(ex); + } + else if(p->batchOut) + { + p->batchOut->__finished(*ex.get(), false); + } + else + { + assert(p->os); + delete p->os; + } + } + _requests.clear(); +} |