diff options
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectRequestHandler.cpp | 173 |
1 files changed, 57 insertions, 116 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index c864daeb4c2..47c26f53da2 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -14,7 +14,6 @@ #include <Ice/Proxy.h> #include <Ice/ConnectionI.h> #include <Ice/RouterInfo.h> -#include <Ice/Outgoing.h> #include <Ice/OutgoingAsync.h> #include <Ice/Protocol.h> #include <Ice/Properties.h> @@ -23,9 +22,11 @@ using namespace std; using namespace IceInternal; +#ifndef ICE_CPP11_MAPPING IceUtil::Shared* IceInternal::upCast(ConnectRequestHandler* p) { return p; } +#endif -ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref, const Ice::ObjectPrx& proxy) : +ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref, const Ice::ObjectPrxPtr& proxy) : RequestHandler(ref), _proxy(proxy), _initialized(false), @@ -33,41 +34,21 @@ ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref, const Ice: { } -ConnectRequestHandler::~ConnectRequestHandler() -{ -} - RequestHandlerPtr -ConnectRequestHandler::connect(const Ice::ObjectPrx& proxy) +ConnectRequestHandler::connect(const Ice::ObjectPrxPtr& proxy) { Lock sync(*this); if(!initialized()) { _proxies.insert(proxy); } - return _requestHandler ? _requestHandler : this; + return _requestHandler ? _requestHandler : ICE_SHARED_FROM_THIS; } RequestHandlerPtr ConnectRequestHandler::update(const RequestHandlerPtr& previousHandler, const RequestHandlerPtr& newHandler) { - return previousHandler.get() == this ? newHandler : this; -} - -bool -ConnectRequestHandler::sendRequest(ProxyOutgoingBase* out) -{ - { - Lock sync(*this); - if(!initialized()) - { - Request req; - req.out = out; - _requests.push_back(req); - return false; // Not sent - } - } - return out->invokeRemote(_connection, _compress, _response) && !_response; // Finished if sent and no response. + return previousHandler.get() == this ? newHandler : ICE_SHARED_FROM_THIS; } AsyncStatus @@ -77,14 +58,12 @@ ConnectRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& out) Lock sync(*this); if(!_initialized) { - out->cancelable(this); // This will throw if the request is canceled + out->cancelable(ICE_SHARED_FROM_THIS); // This will throw if the request is canceled } if(!initialized()) { - Request req; - req.outAsync = out; - _requests.push_back(req); + _requests.push_back(out); return AsyncStatusQueued; } } @@ -92,52 +71,25 @@ ConnectRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& out) } void -ConnectRequestHandler::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex) -{ - { - Lock sync(*this); - if(_exception.get()) - { - return; // The request has been notified of a failure already. - } - - if(!initialized()) - { - for(deque<Request>::iterator p = _requests.begin(); p != _requests.end(); ++p) - { - if(p->out == out) - { - out->completed(ex); - _requests.erase(p); - return; - } - } - assert(false); // The request has to be queued if it timed out and we're not initialized yet. - } - } - _connection->requestCanceled(out, ex); -} - -void ConnectRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException& ex) { { Lock sync(*this); - if(_exception.get()) + if(_exception) { return; // The request has been notified of a failure already. } if(!initialized()) { - for(deque<Request>::iterator p = _requests.begin(); p != _requests.end(); ++p) + for(deque<ProxyOutgoingAsyncBasePtr>::iterator p = _requests.begin(); p != _requests.end(); ++p) { - if(p->outAsync.get() == outAsync.get()) + if(p->get() == outAsync.get()) { _requests.erase(p); - if(outAsync->completed(ex)) + if(outAsync->exception(ex)) { - outAsync->invokeCompletedAsync(); + outAsync->invokeExceptionAsync(); } return; } @@ -151,7 +103,7 @@ Ice::ConnectionIPtr ConnectRequestHandler::getConnection() { Lock sync(*this); - if(_exception.get()) + if(_exception) { _exception->ice_throw(); return 0; // Keep the compiler happy. @@ -166,20 +118,19 @@ Ice::ConnectionIPtr ConnectRequestHandler::waitForConnection() { Lock sync(*this); - if(_exception.get()) + if(_exception) { - throw RetryException(*_exception.get()); + throw RetryException(*_exception); } - // // Wait for the connection establishment to complete or fail. // - while(!_initialized && !_exception.get()) + while(!_initialized && !_exception) { wait(); } - if(_exception.get()) + if(_exception) { _exception->ice_throw(); return 0; // Keep the compiler happy. @@ -195,7 +146,7 @@ ConnectRequestHandler::setConnection(const Ice::ConnectionIPtr& connection, bool { { Lock sync(*this); - assert(!_exception.get() && !_connection); + assert(!_exception && !_connection); _connection = connection; _compress = compress; } @@ -205,7 +156,7 @@ ConnectRequestHandler::setConnection(const Ice::ConnectionIPtr& connection, bool // add this proxy to the router info object. // RouterInfoPtr ri = _reference->getRouterInfo(); - if(ri && !ri->addProxy(_proxy, this)) + if(ri && !ri->addProxy(_proxy, ICE_SHARED_FROM_THIS)) { return; // The request handler will be initialized once addProxy returns. } @@ -220,8 +171,9 @@ void ConnectRequestHandler::setException(const Ice::LocalException& ex) { Lock sync(*this); - assert(!_initialized && !_exception.get()); - _exception.reset(ex.ice_clone()); + assert(!_initialized && !_exception); + ICE_SET_EXCEPTION_FROM_CLONE(_exception, ex.ice_clone()); + _proxies.clear(); _proxy = 0; // Break cyclic reference count. @@ -233,27 +185,22 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex) // try { - _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); + _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, ICE_SHARED_FROM_THIS); } catch(const Ice::CommunicatorDestroyedException&) { // Ignore } - for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + + for(deque<ProxyOutgoingAsyncBasePtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) { - if(p->out) + if((*p)->exception(ex)) { - p->out->completed(*_exception.get()); - } - else - { - if(p->outAsync->completed(*_exception.get())) - { - p->outAsync->invokeCompletedAsync(); - } + (*p)->invokeExceptionAsync(); } } + _requests.clear(); notifyAll(); } @@ -280,12 +227,12 @@ ConnectRequestHandler::initialized() } else { - while(_flushing && !_exception.get()) + while(_flushing && !_exception) { wait(); } - if(_exception.get()) + if(_exception) { if(_connection) { @@ -322,47 +269,37 @@ ConnectRequestHandler::flushRequests() _flushing = true; } - IceUtil::UniquePtr<Ice::LocalException> exception; +#ifdef ICE_CPP11_MAPPING + std::unique_ptr<Ice::LocalException> exception; +#else + IceInternal::UniquePtr<Ice::LocalException> exception; +#endif while(!_requests.empty()) // _requests is immutable when _flushing = true { - Request& req = _requests.front(); + ProxyOutgoingAsyncBasePtr& req = _requests.front(); try { - if(req.out) + if(req->invokeRemote(_connection, _compress, _response) & AsyncStatusInvokeSentCallback) { - req.out->invokeRemote(_connection, _compress, _response); - } - else if(req.outAsync->invokeRemote(_connection, _compress, _response) & AsyncStatusInvokeSentCallback) - { - req.outAsync->invokeSentAsync(); + req->invokeSentAsync(); } } catch(const RetryException& ex) { - exception.reset(ex.get()->ice_clone()); + ICE_SET_EXCEPTION_FROM_CLONE(exception, ex.get()->ice_clone()); // Remove the request handler before retrying. - _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); + _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, ICE_SHARED_FROM_THIS); - if(req.out) - { - req.out->retryException(*ex.get()); - } - else - { - req.outAsync->retryException(*ex.get()); - } + req->retryException(*exception); } catch(const Ice::LocalException& ex) { - exception.reset(ex.ice_clone()); - if(req.out) - { - req.out->completed(ex); - } - else if(req.outAsync->completed(ex)) + ICE_SET_EXCEPTION_FROM_CLONE(exception, ex.ice_clone()); + + if(req->exception(ex)) { - req.outAsync->invokeCompletedAsync(); + req->invokeExceptionAsync(); } } _requests.pop_front(); @@ -374,30 +311,34 @@ ConnectRequestHandler::flushRequests() // request handler to use the more efficient connection request // handler. // - if(_reference->getCacheConnection() && !exception.get()) + if(_reference->getCacheConnection() && !exception) { - _requestHandler = new ConnectionRequestHandler(_reference, _connection, _compress); - for(set<Ice::ObjectPrx>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p) + _requestHandler = ICE_MAKE_SHARED(ConnectionRequestHandler, _reference, _connection, _compress); + for(set<Ice::ObjectPrxPtr>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p) { - (*p)->__updateRequestHandler(this, _requestHandler); + (*p)->_updateRequestHandler(ICE_SHARED_FROM_THIS, _requestHandler); } } { Lock sync(*this); assert(!_initialized); +#ifdef ICE_CPP11_MAPPING + swap(_exception, exception); +#else _exception.swap(exception); - _initialized = !_exception.get(); +#endif + _initialized = !_exception; _flushing = false; // // Only remove once all the requests are flushed to // guarantee serialization. // - _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); + _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, ICE_SHARED_FROM_THIS); _proxies.clear(); - _proxy = 0; // Break cyclic reference count. + _proxy = ICE_NULLPTR; // Break cyclic reference count. notifyAll(); } } |