diff options
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectRequestHandler.cpp | 69 |
1 files changed, 34 insertions, 35 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index f400b03d418..d098d8ba8a4 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -25,20 +25,22 @@ using namespace IceInternal; namespace { -class FlushRequestsWithException : public ThreadPoolWorkItem +class FlushRequestsWithException : public DispatchWorkItem { public: - FlushRequestsWithException(const ConnectRequestHandlerPtr& handler, const Ice::LocalException& ex) : + FlushRequestsWithException(const InstancePtr& instance, + const ConnectRequestHandlerPtr& handler, + const Ice::LocalException& ex) : + DispatchWorkItem(instance), _handler(handler), _exception(dynamic_cast<Ice::LocalException*>(ex.ice_clone())) { } virtual void - execute(ThreadPoolCurrent& current) + run() { - current.ioCompleted(); _handler->flushRequestsWithException(*_exception.get()); } @@ -48,20 +50,22 @@ private: const auto_ptr<Ice::LocalException> _exception; }; -class FlushRequestsWithExceptionWrapper : public ThreadPoolWorkItem +class FlushRequestsWithExceptionWrapper : public DispatchWorkItem { public: - FlushRequestsWithExceptionWrapper(const ConnectRequestHandlerPtr& handler, const LocalExceptionWrapper& ex) : + FlushRequestsWithExceptionWrapper(const InstancePtr& instance, + const ConnectRequestHandlerPtr& handler, + const LocalExceptionWrapper& ex) : + DispatchWorkItem(instance), _handler(handler), _exception(ex) { } virtual void - execute(ThreadPoolCurrent& current) + run() { - current.ioCompleted(); _handler->flushRequestsWithException(_exception); } @@ -71,28 +75,26 @@ private: const LocalExceptionWrapper _exception; }; -class FlushSentRequests : public ThreadPoolWorkItem +class FlushSentRequests : public DispatchWorkItem { public: FlushSentRequests(const InstancePtr& instance, const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) : - _instance(instance), _callbacks(callbacks) + DispatchWorkItem(instance), _callbacks(callbacks) { } virtual void - execute(ThreadPoolCurrent& current) + run() { - current.ioCompleted(); for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = _callbacks.begin(); p != _callbacks.end(); ++p) { - (*p)->__sentCallback(_instance); + (*p)->__sent(); } } private: - InstancePtr _instance; vector<OutgoingAsyncMessageCallbackPtr> _callbacks; }; @@ -226,7 +228,7 @@ ConnectRequestHandler::sendRequest(Outgoing* out) } } -bool +AsyncStatus ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncPtr& out) { { @@ -236,7 +238,7 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncPtr& out) Request req; req.out = out; _requests.push_back(req); - return false; + return AsyncStatusQueued; } } return _connection->sendAsyncRequest(out, _compress, _response); @@ -248,7 +250,7 @@ ConnectRequestHandler::flushBatchRequests(BatchOutgoing* out) return getConnection(true)->flushBatchRequests(out); } -bool +AsyncStatus ConnectRequestHandler::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& out) { { @@ -258,7 +260,7 @@ ConnectRequestHandler::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& out) Request req; req.batchOut = out; _requests.push_back(req); - return false; + return AsyncStatusQueued; } } return _connection->flushAsyncBatchRequests(out); @@ -337,7 +339,8 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex) // if(!_requests.empty()) { - _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this, ex)); + const InstancePtr instance = _reference->getInstance(); + instance->clientThreadPool()->execute(new FlushRequestsWithException(instance, this, ex)); } notifyAll(); @@ -410,22 +413,16 @@ ConnectRequestHandler::flushRequests() Request& req = _requests.front(); if(req.out) { - if(_connection->sendAsyncRequest(req.out, _compress, _response)) + if(_connection->sendAsyncRequest(req.out, _compress, _response) & AsyncStatusInvokeSentCallback) { - if(dynamic_cast<Ice::AMISentCallback*>(req.out.get())) - { - sentCallbacks.push_back(req.out); - } + sentCallbacks.push_back(req.out); } } else if(req.batchOut) { - if(_connection->flushAsyncBatchRequests(req.batchOut)) + if(_connection->flushAsyncBatchRequests(req.batchOut) & AsyncStatusInvokeSentCallback) { - if(dynamic_cast<Ice::AMISentCallback*>(req.batchOut.get())) - { - sentCallbacks.push_back(req.batchOut); - } + sentCallbacks.push_back(req.batchOut); } } else @@ -455,19 +452,21 @@ ConnectRequestHandler::flushRequests() Lock sync(*this); assert(!_exception.get() && !_requests.empty()); _exception.reset(dynamic_cast<Ice::LocalException*>(ex.get()->ice_clone())); - _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithExceptionWrapper(this, ex)); + const InstancePtr instance = _reference->getInstance(); + instance->clientThreadPool()->execute(new FlushRequestsWithExceptionWrapper(instance, this, ex)); } catch(const Ice::LocalException& ex) { Lock sync(*this); assert(!_exception.get() && !_requests.empty()); _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); - _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this, ex)); + const InstancePtr instance = _reference->getInstance(); + instance->clientThreadPool()->execute(new FlushRequestsWithException(instance, this, ex)); } if(!sentCallbacks.empty()) { - InstancePtr instance = _reference->getInstance(); + const InstancePtr instance = _reference->getInstance(); instance->clientThreadPool()->execute(new FlushSentRequests(instance, sentCallbacks)); } @@ -504,11 +503,11 @@ ConnectRequestHandler::flushRequestsWithException(const Ice::LocalException& ex) { if(p->out) { - p->out->__finished(ex); + p->out->__finished(ex, false); } else if(p->batchOut) { - p->batchOut->__finished(ex); + p->batchOut->__finished(ex, false); } else { @@ -530,7 +529,7 @@ ConnectRequestHandler::flushRequestsWithException(const LocalExceptionWrapper& e } else if(p->batchOut) { - p->batchOut->__finished(*ex.get()); + p->batchOut->__finished(*ex.get(), false); } else { |