diff options
author | Benoit Foucher <benoit@zeroc.com> | 2008-03-06 10:13:42 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2008-03-06 10:13:42 +0100 |
commit | c6dbd090d9691cc0116a2967b2827b858b184dfe (patch) | |
tree | 6d2ad80c98665c9090b16f97c400ab4b33c7ab73 /cpp/src/Ice/ConnectRequestHandler.cpp | |
parent | Merge branch 'master' of ssh://cvs.zeroc.com/home/git/ice (diff) | |
download | ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.bz2 ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.xz ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.zip |
Removed thread-per-connection and added serialize mode
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectRequestHandler.cpp | 71 |
1 files changed, 57 insertions, 14 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index 94d22e2f11b..0ba340fa2b3 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -71,6 +71,31 @@ private: const LocalExceptionWrapper _exception; }; +class FlushSentRequests : public ThreadPoolWorkItem +{ +public: + + FlushSentRequests(const InstancePtr& instance, const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) : + _instance(instance), _callbacks(callbacks) + { + } + + virtual void + execute(const ThreadPoolPtr& threadPool) + { + threadPool->promoteFollower(); + for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = _callbacks.begin(); p != _callbacks.end(); ++p) + { + (*p)->__sent(_instance); + } + } + +private: + + InstancePtr _instance; + vector<OutgoingAsyncMessageCallbackPtr> _callbacks; +}; + }; ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref, @@ -197,7 +222,7 @@ ConnectRequestHandler::sendRequest(Outgoing* out) } } -void +bool ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncPtr& out) { { @@ -207,10 +232,10 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncPtr& out) Request req; req.out = out; _requests.push_back(req); - return; + return false; } } - _connection->sendAsyncRequest(out, _compress, _response); + return _connection->sendAsyncRequest(out, _compress, _response); } bool @@ -219,7 +244,7 @@ ConnectRequestHandler::flushBatchRequests(BatchOutgoing* out) return getConnection(true)->flushBatchRequests(out); } -void +bool ConnectRequestHandler::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& out) { { @@ -229,10 +254,10 @@ ConnectRequestHandler::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& out) Request req; req.batchOut = out; _requests.push_back(req); - return; + return false; } } - _connection->flushAsyncBatchRequests(out); + return _connection->flushAsyncBatchRequests(out); } Ice::ConnectionIPtr @@ -373,6 +398,7 @@ ConnectRequestHandler::flushRequests() _flushing = true; } + vector<OutgoingAsyncMessageCallbackPtr> sentCallbacks; try { while(!_requests.empty()) // _requests is immutable when _flushing = true @@ -380,11 +406,23 @@ ConnectRequestHandler::flushRequests() Request& req = _requests.front(); if(req.out) { - _connection->sendAsyncRequest(req.out, _compress, _response); + if(_connection->sendAsyncRequest(req.out, _compress, _response)) + { + if(dynamic_cast<Ice::AMISentCallback*>(req.out.get())) + { + sentCallbacks.push_back(req.out); + } + } } else if(req.batchOut) { - _connection->flushAsyncBatchRequests(req.batchOut); + if(_connection->flushAsyncBatchRequests(req.batchOut)) + { + if(dynamic_cast<Ice::AMISentCallback*>(req.batchOut.get())) + { + sentCallbacks.push_back(req.batchOut); + } + } } else { @@ -414,8 +452,6 @@ ConnectRequestHandler::flushRequests() assert(!_exception.get() && !_requests.empty()); _exception.reset(dynamic_cast<Ice::LocalException*>(ex.get()->ice_clone())); _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithExceptionWrapper(this, ex)); - notifyAll(); - return; } catch(const Ice::LocalException& ex) { @@ -423,8 +459,12 @@ ConnectRequestHandler::flushRequests() assert(!_exception.get() && !_requests.empty()); _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this, ex)); - notifyAll(); - return; + } + + if(!sentCallbacks.empty()) + { + InstancePtr instance = _reference->getInstance(); + instance->clientThreadPool()->execute(new FlushSentRequests(instance, sentCallbacks)); } // @@ -442,8 +482,11 @@ ConnectRequestHandler::flushRequests() { Lock sync(*this); assert(!_initialized); - _initialized = true; - _flushing = false; + if(!_exception.get()) + { + _initialized = true; + _flushing = false; + } _proxy = 0; // Break cyclic reference count. _delegate = 0; // Break cyclic reference count. notifyAll(); |