summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectRequestHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r--cpp/src/Ice/ConnectRequestHandler.cpp173
1 files changed, 57 insertions, 116 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp
index c864daeb4c2..47c26f53da2 100644
--- a/cpp/src/Ice/ConnectRequestHandler.cpp
+++ b/cpp/src/Ice/ConnectRequestHandler.cpp
@@ -14,7 +14,6 @@
#include <Ice/Proxy.h>
#include <Ice/ConnectionI.h>
#include <Ice/RouterInfo.h>
-#include <Ice/Outgoing.h>
#include <Ice/OutgoingAsync.h>
#include <Ice/Protocol.h>
#include <Ice/Properties.h>
@@ -23,9 +22,11 @@
using namespace std;
using namespace IceInternal;
+#ifndef ICE_CPP11_MAPPING
IceUtil::Shared* IceInternal::upCast(ConnectRequestHandler* p) { return p; }
+#endif
-ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref, const Ice::ObjectPrx& proxy) :
+ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref, const Ice::ObjectPrxPtr& proxy) :
RequestHandler(ref),
_proxy(proxy),
_initialized(false),
@@ -33,41 +34,21 @@ ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref, const Ice:
{
}
-ConnectRequestHandler::~ConnectRequestHandler()
-{
-}
-
RequestHandlerPtr
-ConnectRequestHandler::connect(const Ice::ObjectPrx& proxy)
+ConnectRequestHandler::connect(const Ice::ObjectPrxPtr& proxy)
{
Lock sync(*this);
if(!initialized())
{
_proxies.insert(proxy);
}
- return _requestHandler ? _requestHandler : this;
+ return _requestHandler ? _requestHandler : ICE_SHARED_FROM_THIS;
}
RequestHandlerPtr
ConnectRequestHandler::update(const RequestHandlerPtr& previousHandler, const RequestHandlerPtr& newHandler)
{
- return previousHandler.get() == this ? newHandler : this;
-}
-
-bool
-ConnectRequestHandler::sendRequest(ProxyOutgoingBase* out)
-{
- {
- Lock sync(*this);
- if(!initialized())
- {
- Request req;
- req.out = out;
- _requests.push_back(req);
- return false; // Not sent
- }
- }
- return out->invokeRemote(_connection, _compress, _response) && !_response; // Finished if sent and no response.
+ return previousHandler.get() == this ? newHandler : ICE_SHARED_FROM_THIS;
}
AsyncStatus
@@ -77,14 +58,12 @@ ConnectRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& out)
Lock sync(*this);
if(!_initialized)
{
- out->cancelable(this); // This will throw if the request is canceled
+ out->cancelable(ICE_SHARED_FROM_THIS); // This will throw if the request is canceled
}
if(!initialized())
{
- Request req;
- req.outAsync = out;
- _requests.push_back(req);
+ _requests.push_back(out);
return AsyncStatusQueued;
}
}
@@ -92,52 +71,25 @@ ConnectRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& out)
}
void
-ConnectRequestHandler::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex)
-{
- {
- Lock sync(*this);
- if(_exception.get())
- {
- return; // The request has been notified of a failure already.
- }
-
- if(!initialized())
- {
- for(deque<Request>::iterator p = _requests.begin(); p != _requests.end(); ++p)
- {
- if(p->out == out)
- {
- out->completed(ex);
- _requests.erase(p);
- return;
- }
- }
- assert(false); // The request has to be queued if it timed out and we're not initialized yet.
- }
- }
- _connection->requestCanceled(out, ex);
-}
-
-void
ConnectRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException& ex)
{
{
Lock sync(*this);
- if(_exception.get())
+ if(_exception)
{
return; // The request has been notified of a failure already.
}
if(!initialized())
{
- for(deque<Request>::iterator p = _requests.begin(); p != _requests.end(); ++p)
+ for(deque<ProxyOutgoingAsyncBasePtr>::iterator p = _requests.begin(); p != _requests.end(); ++p)
{
- if(p->outAsync.get() == outAsync.get())
+ if(p->get() == outAsync.get())
{
_requests.erase(p);
- if(outAsync->completed(ex))
+ if(outAsync->exception(ex))
{
- outAsync->invokeCompletedAsync();
+ outAsync->invokeExceptionAsync();
}
return;
}
@@ -151,7 +103,7 @@ Ice::ConnectionIPtr
ConnectRequestHandler::getConnection()
{
Lock sync(*this);
- if(_exception.get())
+ if(_exception)
{
_exception->ice_throw();
return 0; // Keep the compiler happy.
@@ -166,20 +118,19 @@ Ice::ConnectionIPtr
ConnectRequestHandler::waitForConnection()
{
Lock sync(*this);
- if(_exception.get())
+ if(_exception)
{
- throw RetryException(*_exception.get());
+ throw RetryException(*_exception);
}
-
//
// Wait for the connection establishment to complete or fail.
//
- while(!_initialized && !_exception.get())
+ while(!_initialized && !_exception)
{
wait();
}
- if(_exception.get())
+ if(_exception)
{
_exception->ice_throw();
return 0; // Keep the compiler happy.
@@ -195,7 +146,7 @@ ConnectRequestHandler::setConnection(const Ice::ConnectionIPtr& connection, bool
{
{
Lock sync(*this);
- assert(!_exception.get() && !_connection);
+ assert(!_exception && !_connection);
_connection = connection;
_compress = compress;
}
@@ -205,7 +156,7 @@ ConnectRequestHandler::setConnection(const Ice::ConnectionIPtr& connection, bool
// add this proxy to the router info object.
//
RouterInfoPtr ri = _reference->getRouterInfo();
- if(ri && !ri->addProxy(_proxy, this))
+ if(ri && !ri->addProxy(_proxy, ICE_SHARED_FROM_THIS))
{
return; // The request handler will be initialized once addProxy returns.
}
@@ -220,8 +171,9 @@ void
ConnectRequestHandler::setException(const Ice::LocalException& ex)
{
Lock sync(*this);
- assert(!_initialized && !_exception.get());
- _exception.reset(ex.ice_clone());
+ assert(!_initialized && !_exception);
+ ICE_SET_EXCEPTION_FROM_CLONE(_exception, ex.ice_clone());
+
_proxies.clear();
_proxy = 0; // Break cyclic reference count.
@@ -233,27 +185,22 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex)
//
try
{
- _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
+ _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, ICE_SHARED_FROM_THIS);
}
catch(const Ice::CommunicatorDestroyedException&)
{
// Ignore
}
- for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
+
+ for(deque<ProxyOutgoingAsyncBasePtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
{
- if(p->out)
+ if((*p)->exception(ex))
{
- p->out->completed(*_exception.get());
- }
- else
- {
- if(p->outAsync->completed(*_exception.get()))
- {
- p->outAsync->invokeCompletedAsync();
- }
+ (*p)->invokeExceptionAsync();
}
}
+
_requests.clear();
notifyAll();
}
@@ -280,12 +227,12 @@ ConnectRequestHandler::initialized()
}
else
{
- while(_flushing && !_exception.get())
+ while(_flushing && !_exception)
{
wait();
}
- if(_exception.get())
+ if(_exception)
{
if(_connection)
{
@@ -322,47 +269,37 @@ ConnectRequestHandler::flushRequests()
_flushing = true;
}
- IceUtil::UniquePtr<Ice::LocalException> exception;
+#ifdef ICE_CPP11_MAPPING
+ std::unique_ptr<Ice::LocalException> exception;
+#else
+ IceInternal::UniquePtr<Ice::LocalException> exception;
+#endif
while(!_requests.empty()) // _requests is immutable when _flushing = true
{
- Request& req = _requests.front();
+ ProxyOutgoingAsyncBasePtr& req = _requests.front();
try
{
- if(req.out)
+ if(req->invokeRemote(_connection, _compress, _response) & AsyncStatusInvokeSentCallback)
{
- req.out->invokeRemote(_connection, _compress, _response);
- }
- else if(req.outAsync->invokeRemote(_connection, _compress, _response) & AsyncStatusInvokeSentCallback)
- {
- req.outAsync->invokeSentAsync();
+ req->invokeSentAsync();
}
}
catch(const RetryException& ex)
{
- exception.reset(ex.get()->ice_clone());
+ ICE_SET_EXCEPTION_FROM_CLONE(exception, ex.get()->ice_clone());
// Remove the request handler before retrying.
- _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
+ _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, ICE_SHARED_FROM_THIS);
- if(req.out)
- {
- req.out->retryException(*ex.get());
- }
- else
- {
- req.outAsync->retryException(*ex.get());
- }
+ req->retryException(*exception);
}
catch(const Ice::LocalException& ex)
{
- exception.reset(ex.ice_clone());
- if(req.out)
- {
- req.out->completed(ex);
- }
- else if(req.outAsync->completed(ex))
+ ICE_SET_EXCEPTION_FROM_CLONE(exception, ex.ice_clone());
+
+ if(req->exception(ex))
{
- req.outAsync->invokeCompletedAsync();
+ req->invokeExceptionAsync();
}
}
_requests.pop_front();
@@ -374,30 +311,34 @@ ConnectRequestHandler::flushRequests()
// request handler to use the more efficient connection request
// handler.
//
- if(_reference->getCacheConnection() && !exception.get())
+ if(_reference->getCacheConnection() && !exception)
{
- _requestHandler = new ConnectionRequestHandler(_reference, _connection, _compress);
- for(set<Ice::ObjectPrx>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p)
+ _requestHandler = ICE_MAKE_SHARED(ConnectionRequestHandler, _reference, _connection, _compress);
+ for(set<Ice::ObjectPrxPtr>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p)
{
- (*p)->__updateRequestHandler(this, _requestHandler);
+ (*p)->_updateRequestHandler(ICE_SHARED_FROM_THIS, _requestHandler);
}
}
{
Lock sync(*this);
assert(!_initialized);
+#ifdef ICE_CPP11_MAPPING
+ swap(_exception, exception);
+#else
_exception.swap(exception);
- _initialized = !_exception.get();
+#endif
+ _initialized = !_exception;
_flushing = false;
//
// Only remove once all the requests are flushed to
// guarantee serialization.
//
- _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
+ _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, ICE_SHARED_FROM_THIS);
_proxies.clear();
- _proxy = 0; // Break cyclic reference count.
+ _proxy = ICE_NULLPTR; // Break cyclic reference count.
notifyAll();
}
}