summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectRequestHandler.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2015-03-10 12:12:10 +0100
committerBenoit Foucher <benoit@zeroc.com>2015-03-10 12:12:10 +0100
commitc6ca68d97aa5bbc2a172e3e35171b5452657fa22 (patch)
tree46edcca4c8e313285a205bf6fad7c56c452c0cc0 /cpp/src/Ice/ConnectRequestHandler.cpp
parentMinor JS style fixes (diff)
downloadice-c6ca68d97aa5bbc2a172e3e35171b5452657fa22.tar.bz2
ice-c6ca68d97aa5bbc2a172e3e35171b5452657fa22.tar.xz
ice-c6ca68d97aa5bbc2a172e3e35171b5452657fa22.zip
ICE-6170 - fixed behavior of batch requests
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r--cpp/src/Ice/ConnectRequestHandler.cpp190
1 files changed, 33 insertions, 157 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp
index fce13d5e88a..993cb065c23 100644
--- a/cpp/src/Ice/ConnectRequestHandler.cpp
+++ b/cpp/src/Ice/ConnectRequestHandler.cpp
@@ -23,14 +23,13 @@
using namespace std;
using namespace IceInternal;
+IceUtil::Shared* IceInternal::upCast(ConnectRequestHandler* p) { return p; }
+
ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref, const Ice::ObjectPrx& proxy) :
RequestHandler(ref),
- _connect(true),
_proxy(proxy),
_initialized(false),
- _flushing(false),
- _batchRequestInProgress(false),
- _batchStream(ref->getInstance().get(), Ice::currentProtocolEncoding)
+ _flushing(false)
{
}
@@ -41,39 +40,27 @@ ConnectRequestHandler::~ConnectRequestHandler()
RequestHandlerPtr
ConnectRequestHandler::connect(const Ice::ObjectPrx& proxy)
{
- //
- // Initiate the connection if connect() is called by the proxy that
- // created the handler.
- //
- if(proxy.get() == _proxy.get() && _connect)
- {
- _connect = false; // Call getConnection only once
- _reference->getConnection(this);
- }
-
+ Lock sync(*this);
try
{
- Lock sync(*this);
if(!initialized())
{
_proxies.insert(proxy);
- return this;
}
}
catch(const Ice::LocalException&)
{
- throw;
- }
-
- if(_connectionRequestHandler)
- {
- proxy->__setRequestHandler(this, _connectionRequestHandler);
- return _connectionRequestHandler;
- }
- else
- {
- return this;
+ //
+ // Only throw if the connection didn't get established. If
+ // it died after being established, we allow the caller to
+ // retry the connection establishment by not throwing here.
+ //
+ if(!_connection)
+ {
+ throw;
+ }
}
+ return _requestHandler ? _requestHandler : this;
}
RequestHandlerPtr
@@ -82,70 +69,8 @@ ConnectRequestHandler::update(const RequestHandlerPtr& previousHandler, const Re
return previousHandler.get() == this ? newHandler : this;
}
-void
-ConnectRequestHandler::prepareBatchRequest(BasicStream* os)
-{
- {
- Lock sync(*this);
- while(_batchRequestInProgress)
- {
- wait();
- }
-
- if(!initialized())
- {
- _batchRequestInProgress = true;
- _batchStream.swap(*os);
- return;
- }
- }
- _connection->prepareBatchRequest(os);
-}
-
-void
-ConnectRequestHandler::finishBatchRequest(BasicStream* os)
-{
- {
- Lock sync(*this);
- if(!initialized()) // This can't throw until _batchRequestInProgress = false
- {
- assert(_batchRequestInProgress);
- _batchRequestInProgress = false;
- notifyAll();
-
- _batchStream.swap(*os);
-
- Request req;
- req.os = new BasicStream(_reference->getInstance().get(), Ice::currentProtocolEncoding);
- req.os->swap(_batchStream);
- _requests.push_back(req);
- return;
- }
- }
- _connection->finishBatchRequest(os, _compress);
-}
-
-void
-ConnectRequestHandler::abortBatchRequest()
-{
- {
- Lock sync(*this);
- if(!initialized()) // This can't throw until _batchRequestInProgress = false
- {
- assert(_batchRequestInProgress);
- _batchRequestInProgress = false;
- notifyAll();
-
- BasicStream dummy(_reference->getInstance().get(), Ice::currentProtocolEncoding);
- _batchStream.swap(dummy);
- return;
- }
- }
- _connection->abortBatchRequest();
-}
-
bool
-ConnectRequestHandler::sendRequest(OutgoingBase* out)
+ConnectRequestHandler::sendRequest(ProxyOutgoingBase* out)
{
{
Lock sync(*this);
@@ -164,11 +89,11 @@ ConnectRequestHandler::sendRequest(OutgoingBase* out)
throw RetryException(ex);
}
}
- return out->send(_connection, _compress, _response) && !_response; // Finished if sent and no response.
+ return out->invokeRemote(_connection, _compress, _response) && !_response; // Finished if sent and no response.
}
AsyncStatus
-ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out)
+ConnectRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& out)
{
{
Lock sync(*this);
@@ -192,7 +117,7 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out)
throw RetryException(ex);
}
}
- return out->send(_connection, _compress, _response);
+ return out->invokeRemote(_connection, _compress, _response);
}
void
@@ -350,18 +275,13 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex)
{
p->out->completed(*_exception.get());
}
- else if(p->outAsync)
+ else
{
if(p->outAsync->completed(*_exception.get()))
{
p->outAsync->invokeCompletedAsync();
}
}
- else
- {
- assert(p->os);
- delete p->os;
- }
}
_requests.clear();
notifyAll();
@@ -413,11 +333,6 @@ ConnectRequestHandler::flushRequests()
Lock sync(*this);
assert(_connection && !_initialized);
- while(_batchRequestInProgress)
- {
- wait();
- }
-
//
// We set the _flushing flag to true to prevent any additional queuing. Callers
// might block for a little while as the queued requests are being sent but this
@@ -434,29 +349,9 @@ ConnectRequestHandler::flushRequests()
{
if(req.out)
{
- req.out->send(_connection, _compress, _response);
- }
- else if(req.os)
- {
- BasicStream os(req.os->instance(), Ice::currentProtocolEncoding);
- _connection->prepareBatchRequest(&os);
- try
- {
- const Ice::Byte* bytes;
- req.os->i = req.os->b.begin();
- req.os->readBlob(bytes, req.os->b.size());
- os.writeBlob(bytes, req.os->b.size());
- }
- catch(const Ice::LocalException&)
- {
- _connection->abortBatchRequest();
- throw;
- }
-
- _connection->finishBatchRequest(&os, _compress);
- delete req.os;
+ req.out->invokeRemote(_connection, _compress, _response);
}
- else if(req.outAsync->send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback)
+ else if(req.outAsync->invokeRemote(_connection, _compress, _response) & AsyncStatusInvokeSentCallback)
{
req.outAsync->invokeSentAsync();
}
@@ -464,24 +359,14 @@ ConnectRequestHandler::flushRequests()
catch(const RetryException& ex)
{
exception.reset(ex.get()->ice_clone());
- try
- {
- // Remove the request handler before retrying.
- _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
- }
- catch(const Ice::CommunicatorDestroyedException&)
- {
- // Ignore
- }
+
+ // Remove the request handler before retrying.
+ _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
if(req.out)
{
req.out->retryException(*ex.get());
}
- else if(req.os)
- {
- delete req.os;
- }
else
{
req.outAsync->retryException(*ex.get());
@@ -494,10 +379,6 @@ ConnectRequestHandler::flushRequests()
{
req.out->completed(ex);
}
- else if(req.os)
- {
- delete req.os;
- }
else if(req.outAsync->completed(ex))
{
req.outAsync->invokeCompletedAsync();
@@ -514,10 +395,10 @@ ConnectRequestHandler::flushRequests()
//
if(_reference->getCacheConnection() && !exception.get())
{
- _connectionRequestHandler = new ConnectionRequestHandler(_reference, _connection, _compress);
+ _requestHandler = new ConnectionRequestHandler(_reference, _connection, _compress);
for(set<Ice::ObjectPrx>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p)
{
- (*p)->__setRequestHandler(this, _connectionRequestHandler);
+ (*p)->__updateRequestHandler(this, _requestHandler);
}
}
@@ -527,18 +408,13 @@ ConnectRequestHandler::flushRequests()
_exception.swap(exception);
_initialized = !_exception.get();
_flushing = false;
- try
- {
- //
- // Only remove once all the requests are flushed to
- // guarantee serialization.
- //
- _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
- }
- catch(const Ice::CommunicatorDestroyedException&)
- {
- // Ignore
- }
+
+ //
+ // Only remove once all the requests are flushed to
+ // guarantee serialization.
+ //
+ _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
+
_proxies.clear();
_proxy = 0; // Break cyclic reference count.
notifyAll();