summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectRequestHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r--cpp/src/Ice/ConnectRequestHandler.cpp302
1 files changed, 193 insertions, 109 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp
index 8e04d74512e..02e3227988e 100644
--- a/cpp/src/Ice/ConnectRequestHandler.cpp
+++ b/cpp/src/Ice/ConnectRequestHandler.cpp
@@ -17,10 +17,62 @@
#include <Ice/OutgoingAsync.h>
#include <Ice/Protocol.h>
#include <Ice/Properties.h>
+#include <Ice/ThreadPool.h>
using namespace std;
using namespace IceInternal;
+namespace
+{
+
+class FlushRequestsWithException : public ThreadPoolWorkItem
+{
+public:
+
+ FlushRequestsWithException(const ConnectRequestHandlerPtr& handler, const Ice::LocalException& ex) :
+ _handler(handler),
+ _exception(dynamic_cast<Ice::LocalException*>(ex.ice_clone()))
+ {
+ }
+
+ virtual void
+ execute(const ThreadPoolPtr& threadPool)
+ {
+ threadPool->promoteFollower();
+ _handler->flushRequestsWithException(*_exception.get());
+ }
+
+private:
+
+ const ConnectRequestHandlerPtr _handler;
+ const auto_ptr<Ice::LocalException> _exception;
+};
+
+class FlushRequestsWithExceptionWrapper : public ThreadPoolWorkItem
+{
+public:
+
+ FlushRequestsWithExceptionWrapper(const ConnectRequestHandlerPtr& handler, const LocalExceptionWrapper& ex) :
+ _handler(handler),
+ _exception(ex)
+ {
+ }
+
+ virtual void
+ execute(const ThreadPoolPtr& threadPool)
+ {
+ threadPool->promoteFollower();
+ _handler->flushRequestsWithException(_exception);
+ }
+
+private:
+
+ const ConnectRequestHandlerPtr _handler;
+ const LocalExceptionWrapper _exception;
+};
+
+};
+
ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref,
const Ice::ObjectPrx& proxy,
const Handle< ::IceDelegate::Ice::Object>& delegate) :
@@ -49,8 +101,14 @@ ConnectRequestHandler::connect()
_reference->getConnection(this);
Lock sync(*this);
- if(_connection)
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ return 0; // Keep the compiler happy.
+ }
+ else if(_connection)
{
+ assert(_initialized);
return new ConnectionRequestHandler(_reference, _connection, _compress);
}
else
@@ -135,34 +193,30 @@ ConnectRequestHandler::abortBatchRequest()
Ice::ConnectionI*
ConnectRequestHandler::sendRequest(Outgoing* out)
{
- return (!getConnection(true)->sendRequest(out, _compress, _response) || _response) ? _connection.get() : 0;
+ if(!getConnection(true)->sendRequest(out, _compress, _response) || _response)
+ {
+ return _connection.get(); // The request has been sent or we're expecting a response.
+ }
+ else
+ {
+ return 0; // The request hasn't been sent yet.
+ }
}
void
ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncPtr& out)
{
- try
{
+ Lock sync(*this);
+ if(!initialized())
{
- Lock sync(*this);
- if(!initialized())
- {
- Request req;
- req.out = out;
- _requests.push_back(req);
- return;
- }
+ Request req;
+ req.out = out;
+ _requests.push_back(req);
+ return;
}
- _connection->sendAsyncRequest(out, _compress, _response);
- }
- catch(const LocalExceptionWrapper& ex)
- {
- out->__finished(ex);
- }
- catch(const Ice::LocalException& ex)
- {
- out->__finished(ex);
}
+ _connection->sendAsyncRequest(out, _compress, _response);
}
bool
@@ -174,24 +228,17 @@ ConnectRequestHandler::flushBatchRequests(BatchOutgoing* out)
void
ConnectRequestHandler::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& out)
{
- try
{
+ Lock sync(*this);
+ if(!initialized())
{
- Lock sync(*this);
- if(!initialized())
- {
- Request req;
- req.batchOut = out;
- _requests.push_back(req);
- return;
- }
+ Request req;
+ req.batchOut = out;
+ _requests.push_back(req);
+ return;
}
- _connection->flushAsyncBatchRequests(out);
- }
- catch(const Ice::LocalException& ex)
- {
- out->__finished(ex);
}
+ _connection->flushAsyncBatchRequests(out);
}
Ice::ConnectionIPtr
@@ -227,6 +274,8 @@ ConnectRequestHandler::setConnection(const Ice::ConnectionIPtr& connection, bool
{
Lock sync(*this);
assert(!_exception.get() && !_connection);
+ assert(_updateRequestHandler || _requests.empty());
+
_connection = connection;
_compress = compress;
}
@@ -236,51 +285,48 @@ ConnectRequestHandler::setConnection(const Ice::ConnectionIPtr& connection, bool
// add this proxy to the router info object.
//
RouterInfoPtr ri = _reference->getRouterInfo();
- if(ri)
+ if(ri && !ri->addProxy(_proxy, this))
{
- if(!ri->addProxy(_proxy, this))
- {
- return; // The request handler will be initialized once addProxy returns.
- }
+ return; // The request handler will be initialized once addProxy returns.
}
-
+
+ //
+ // We can now send the queued requests.
+ //
flushRequests();
}
void
ConnectRequestHandler::setException(const Ice::LocalException& ex)
{
+ Lock sync(*this);
+ assert(!_initialized && !_exception.get());
+ assert(_updateRequestHandler || _requests.empty());
+
+ _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
+ _proxy = 0; // Break cyclic reference count.
+ _delegate = 0; // Break cyclic reference count.
+
+ //
+ // If some requests were queued, we notify them of the failure. This is done from a thread
+ // from the client thread pool since this will result in ice_exception callbacks to be
+ // called.
+ //
+ if(!_requests.empty())
{
- Lock sync(*this);
- assert(!_initialized && !_exception.get());
- _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
- _proxy = 0; // Break cyclic reference count.
- _delegate = 0; // Break cyclic reference count.
- notifyAll();
- }
-
- for(vector<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
- {
- if(p->out)
- {
- p->out->__finished(ex);
- }
- else if(p->batchOut)
- {
- p->batchOut->__finished(ex);
- }
- else
- {
- assert(p->os);
- delete p->os;
- }
+ _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this, ex));
}
- _requests.clear();
+
+ notifyAll();
}
void
ConnectRequestHandler::addedProxy()
{
+ //
+ // The proxy was added to the router info, we're now ready to send the
+ // queued requests.
+ //
flushRequests();
}
@@ -296,7 +342,7 @@ ConnectRequestHandler::initialized()
}
else
{
- while(_flushing)
+ while(_flushing && !_exception.get())
{
wait();
}
@@ -332,69 +378,58 @@ ConnectRequestHandler::flushRequests()
//
_flushing = true;
}
-
- for(vector<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
+
+ try
{
- // _requests is immutable when _flushing = true
- if(p->out)
+ while(!_requests.empty()) // _requests is immutable when _flushing = true
{
- try
- {
- _connection->sendAsyncRequest(p->out, _compress, _response);
- }
- catch(const LocalExceptionWrapper& ex)
+ Request& req = _requests.front();
+ if(req.out)
{
- p->out->__finished(ex);
+ _connection->sendAsyncRequest(req.out, _compress, _response);
}
- catch(const Ice::LocalException& ex)
+ else if(req.batchOut)
{
- p->out->__finished(ex);
- }
- }
- else if(p->batchOut)
- {
- try
- {
- _connection->flushAsyncBatchRequests(p->batchOut);
- }
- catch(const Ice::LocalException& ex)
- {
- p->batchOut->__finished(ex);
- }
- }
- else
- {
- assert(p->os);
- if(_exception.get())
- {
- delete p->os;
+ _connection->flushAsyncBatchRequests(req.batchOut);
}
else
{
- //
- // TODO: Add sendBatchRequest() method to ConnectionI?
- //
+ BasicStream os(req.os->instance());
+ _connection->prepareBatchRequest(&os);
try
{
- BasicStream os(p->os->instance());
- _connection->prepareBatchRequest(&os);
const Ice::Byte* bytes;
- p->os->i = p->os->b.begin();
- p->os->readBlob(bytes, p->os->b.size());
- os.writeBlob(bytes, p->os->b.size());
+ req.os->i = req.os->b.begin();
+ req.os->readBlob(bytes, req.os->b.size());
+ os.writeBlob(bytes, req.os->b.size());
_connection->finishBatchRequest(&os, _compress);
- delete p->os;
+ delete req.os;
}
- catch(const Ice::LocalException& ex)
+ catch(const Ice::LocalException&)
{
- delete p->os;
_connection->abortBatchRequest();
- _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
+ throw;
}
}
+ _requests.pop_front();
}
}
- _requests.clear();
+ catch(const LocalExceptionWrapper& ex)
+ {
+ Lock sync(*this);
+ assert(!_exception.get() && !_requests.empty());
+ _exception.reset(dynamic_cast<Ice::LocalException*>(ex.get()->ice_clone()));
+ _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithExceptionWrapper(this, ex));
+ return;
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Lock sync(*this);
+ assert(!_exception.get() && !_requests.empty());
+ _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
+ _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this, ex));
+ return;
+ }
{
Lock sync(*this);
@@ -404,6 +439,13 @@ ConnectRequestHandler::flushRequests()
notifyAll();
}
+ //
+ // We've finished sending the queued requests and the request handler now send
+ // the requests over the connection directly. It's time to substitute the
+ // request handler of the proxy with the more efficient connection request
+ // handler which does not have any synchronization. This also breaks the cyclic
+ // reference count with the proxy.
+ //
if(_updateRequestHandler && !_exception.get())
{
_proxy->__setRequestHandler(_delegate, new ConnectionRequestHandler(_reference, _connection, _compress));
@@ -412,4 +454,46 @@ ConnectRequestHandler::flushRequests()
_delegate = 0; // Break cyclic reference count.
}
+void
+ConnectRequestHandler::flushRequestsWithException(const Ice::LocalException& ex)
+{
+ for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
+ {
+ if(p->out)
+ {
+ p->out->__finished(ex);
+ }
+ else if(p->batchOut)
+ {
+ p->batchOut->__finished(ex);
+ }
+ else
+ {
+ assert(p->os);
+ delete p->os;
+ }
+ }
+ _requests.clear();
+}
+void
+ConnectRequestHandler::flushRequestsWithException(const LocalExceptionWrapper& ex)
+{
+ for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
+ {
+ if(p->out)
+ {
+ p->out->__finished(ex);
+ }
+ else if(p->batchOut)
+ {
+ p->batchOut->__finished(*ex.get());
+ }
+ else
+ {
+ assert(p->os);
+ delete p->os;
+ }
+ }
+ _requests.clear();
+}