diff options
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectRequestHandler.cpp | 302 |
1 files changed, 193 insertions, 109 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index 8e04d74512e..02e3227988e 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -17,10 +17,62 @@ #include <Ice/OutgoingAsync.h> #include <Ice/Protocol.h> #include <Ice/Properties.h> +#include <Ice/ThreadPool.h> using namespace std; using namespace IceInternal; +namespace +{ + +class FlushRequestsWithException : public ThreadPoolWorkItem +{ +public: + + FlushRequestsWithException(const ConnectRequestHandlerPtr& handler, const Ice::LocalException& ex) : + _handler(handler), + _exception(dynamic_cast<Ice::LocalException*>(ex.ice_clone())) + { + } + + virtual void + execute(const ThreadPoolPtr& threadPool) + { + threadPool->promoteFollower(); + _handler->flushRequestsWithException(*_exception.get()); + } + +private: + + const ConnectRequestHandlerPtr _handler; + const auto_ptr<Ice::LocalException> _exception; +}; + +class FlushRequestsWithExceptionWrapper : public ThreadPoolWorkItem +{ +public: + + FlushRequestsWithExceptionWrapper(const ConnectRequestHandlerPtr& handler, const LocalExceptionWrapper& ex) : + _handler(handler), + _exception(ex) + { + } + + virtual void + execute(const ThreadPoolPtr& threadPool) + { + threadPool->promoteFollower(); + _handler->flushRequestsWithException(_exception); + } + +private: + + const ConnectRequestHandlerPtr _handler; + const LocalExceptionWrapper _exception; +}; + +}; + ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref, const Ice::ObjectPrx& proxy, const Handle< ::IceDelegate::Ice::Object>& delegate) : @@ -49,8 +101,14 @@ ConnectRequestHandler::connect() _reference->getConnection(this); Lock sync(*this); - if(_connection) + if(_exception.get()) + { + _exception->ice_throw(); + return 0; // Keep the compiler happy. + } + else if(_connection) { + assert(_initialized); return new ConnectionRequestHandler(_reference, _connection, _compress); } else @@ -135,34 +193,30 @@ ConnectRequestHandler::abortBatchRequest() Ice::ConnectionI* ConnectRequestHandler::sendRequest(Outgoing* out) { - return (!getConnection(true)->sendRequest(out, _compress, _response) || _response) ? _connection.get() : 0; + if(!getConnection(true)->sendRequest(out, _compress, _response) || _response) + { + return _connection.get(); // The request has been sent or we're expecting a response. + } + else + { + return 0; // The request hasn't been sent yet. + } } void ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncPtr& out) { - try { + Lock sync(*this); + if(!initialized()) { - Lock sync(*this); - if(!initialized()) - { - Request req; - req.out = out; - _requests.push_back(req); - return; - } + Request req; + req.out = out; + _requests.push_back(req); + return; } - _connection->sendAsyncRequest(out, _compress, _response); - } - catch(const LocalExceptionWrapper& ex) - { - out->__finished(ex); - } - catch(const Ice::LocalException& ex) - { - out->__finished(ex); } + _connection->sendAsyncRequest(out, _compress, _response); } bool @@ -174,24 +228,17 @@ ConnectRequestHandler::flushBatchRequests(BatchOutgoing* out) void ConnectRequestHandler::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& out) { - try { + Lock sync(*this); + if(!initialized()) { - Lock sync(*this); - if(!initialized()) - { - Request req; - req.batchOut = out; - _requests.push_back(req); - return; - } + Request req; + req.batchOut = out; + _requests.push_back(req); + return; } - _connection->flushAsyncBatchRequests(out); - } - catch(const Ice::LocalException& ex) - { - out->__finished(ex); } + _connection->flushAsyncBatchRequests(out); } Ice::ConnectionIPtr @@ -227,6 +274,8 @@ ConnectRequestHandler::setConnection(const Ice::ConnectionIPtr& connection, bool { Lock sync(*this); assert(!_exception.get() && !_connection); + assert(_updateRequestHandler || _requests.empty()); + _connection = connection; _compress = compress; } @@ -236,51 +285,48 @@ ConnectRequestHandler::setConnection(const Ice::ConnectionIPtr& connection, bool // add this proxy to the router info object. // RouterInfoPtr ri = _reference->getRouterInfo(); - if(ri) + if(ri && !ri->addProxy(_proxy, this)) { - if(!ri->addProxy(_proxy, this)) - { - return; // The request handler will be initialized once addProxy returns. - } + return; // The request handler will be initialized once addProxy returns. } - + + // + // We can now send the queued requests. + // flushRequests(); } void ConnectRequestHandler::setException(const Ice::LocalException& ex) { + Lock sync(*this); + assert(!_initialized && !_exception.get()); + assert(_updateRequestHandler || _requests.empty()); + + _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); + _proxy = 0; // Break cyclic reference count. + _delegate = 0; // Break cyclic reference count. + + // + // If some requests were queued, we notify them of the failure. This is done from a thread + // from the client thread pool since this will result in ice_exception callbacks to be + // called. + // + if(!_requests.empty()) { - Lock sync(*this); - assert(!_initialized && !_exception.get()); - _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); - _proxy = 0; // Break cyclic reference count. - _delegate = 0; // Break cyclic reference count. - notifyAll(); - } - - for(vector<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) - { - if(p->out) - { - p->out->__finished(ex); - } - else if(p->batchOut) - { - p->batchOut->__finished(ex); - } - else - { - assert(p->os); - delete p->os; - } + _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this, ex)); } - _requests.clear(); + + notifyAll(); } void ConnectRequestHandler::addedProxy() { + // + // The proxy was added to the router info, we're now ready to send the + // queued requests. + // flushRequests(); } @@ -296,7 +342,7 @@ ConnectRequestHandler::initialized() } else { - while(_flushing) + while(_flushing && !_exception.get()) { wait(); } @@ -332,69 +378,58 @@ ConnectRequestHandler::flushRequests() // _flushing = true; } - - for(vector<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + + try { - // _requests is immutable when _flushing = true - if(p->out) + while(!_requests.empty()) // _requests is immutable when _flushing = true { - try - { - _connection->sendAsyncRequest(p->out, _compress, _response); - } - catch(const LocalExceptionWrapper& ex) + Request& req = _requests.front(); + if(req.out) { - p->out->__finished(ex); + _connection->sendAsyncRequest(req.out, _compress, _response); } - catch(const Ice::LocalException& ex) + else if(req.batchOut) { - p->out->__finished(ex); - } - } - else if(p->batchOut) - { - try - { - _connection->flushAsyncBatchRequests(p->batchOut); - } - catch(const Ice::LocalException& ex) - { - p->batchOut->__finished(ex); - } - } - else - { - assert(p->os); - if(_exception.get()) - { - delete p->os; + _connection->flushAsyncBatchRequests(req.batchOut); } else { - // - // TODO: Add sendBatchRequest() method to ConnectionI? - // + BasicStream os(req.os->instance()); + _connection->prepareBatchRequest(&os); try { - BasicStream os(p->os->instance()); - _connection->prepareBatchRequest(&os); const Ice::Byte* bytes; - p->os->i = p->os->b.begin(); - p->os->readBlob(bytes, p->os->b.size()); - os.writeBlob(bytes, p->os->b.size()); + req.os->i = req.os->b.begin(); + req.os->readBlob(bytes, req.os->b.size()); + os.writeBlob(bytes, req.os->b.size()); _connection->finishBatchRequest(&os, _compress); - delete p->os; + delete req.os; } - catch(const Ice::LocalException& ex) + catch(const Ice::LocalException&) { - delete p->os; _connection->abortBatchRequest(); - _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); + throw; } } + _requests.pop_front(); } } - _requests.clear(); + catch(const LocalExceptionWrapper& ex) + { + Lock sync(*this); + assert(!_exception.get() && !_requests.empty()); + _exception.reset(dynamic_cast<Ice::LocalException*>(ex.get()->ice_clone())); + _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithExceptionWrapper(this, ex)); + return; + } + catch(const Ice::LocalException& ex) + { + Lock sync(*this); + assert(!_exception.get() && !_requests.empty()); + _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); + _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this, ex)); + return; + } { Lock sync(*this); @@ -404,6 +439,13 @@ ConnectRequestHandler::flushRequests() notifyAll(); } + // + // We've finished sending the queued requests and the request handler now send + // the requests over the connection directly. It's time to substitute the + // request handler of the proxy with the more efficient connection request + // handler which does not have any synchronization. This also breaks the cyclic + // reference count with the proxy. + // if(_updateRequestHandler && !_exception.get()) { _proxy->__setRequestHandler(_delegate, new ConnectionRequestHandler(_reference, _connection, _compress)); @@ -412,4 +454,46 @@ ConnectRequestHandler::flushRequests() _delegate = 0; // Break cyclic reference count. } +void +ConnectRequestHandler::flushRequestsWithException(const Ice::LocalException& ex) +{ + for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + { + if(p->out) + { + p->out->__finished(ex); + } + else if(p->batchOut) + { + p->batchOut->__finished(ex); + } + else + { + assert(p->os); + delete p->os; + } + } + _requests.clear(); +} +void +ConnectRequestHandler::flushRequestsWithException(const LocalExceptionWrapper& ex) +{ + for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + { + if(p->out) + { + p->out->__finished(ex); + } + else if(p->batchOut) + { + p->batchOut->__finished(*ex.get()); + } + else + { + assert(p->os); + delete p->os; + } + } + _requests.clear(); +} |