summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectRequestHandler.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2008-03-06 10:13:42 +0100
committerBenoit Foucher <benoit@zeroc.com>2008-03-06 10:13:42 +0100
commitc6dbd090d9691cc0116a2967b2827b858b184dfe (patch)
tree6d2ad80c98665c9090b16f97c400ab4b33c7ab73 /cpp/src/Ice/ConnectRequestHandler.cpp
parentMerge branch 'master' of ssh://cvs.zeroc.com/home/git/ice (diff)
downloadice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.bz2
ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.xz
ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.zip
Removed thread-per-connection and added serialize mode
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r--cpp/src/Ice/ConnectRequestHandler.cpp71
1 files changed, 57 insertions, 14 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp
index 94d22e2f11b..0ba340fa2b3 100644
--- a/cpp/src/Ice/ConnectRequestHandler.cpp
+++ b/cpp/src/Ice/ConnectRequestHandler.cpp
@@ -71,6 +71,31 @@ private:
const LocalExceptionWrapper _exception;
};
+class FlushSentRequests : public ThreadPoolWorkItem
+{
+public:
+
+ FlushSentRequests(const InstancePtr& instance, const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) :
+ _instance(instance), _callbacks(callbacks)
+ {
+ }
+
+ virtual void
+ execute(const ThreadPoolPtr& threadPool)
+ {
+ threadPool->promoteFollower();
+ for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = _callbacks.begin(); p != _callbacks.end(); ++p)
+ {
+ (*p)->__sent(_instance);
+ }
+ }
+
+private:
+
+ InstancePtr _instance;
+ vector<OutgoingAsyncMessageCallbackPtr> _callbacks;
+};
+
};
ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref,
@@ -197,7 +222,7 @@ ConnectRequestHandler::sendRequest(Outgoing* out)
}
}
-void
+bool
ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncPtr& out)
{
{
@@ -207,10 +232,10 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncPtr& out)
Request req;
req.out = out;
_requests.push_back(req);
- return;
+ return false;
}
}
- _connection->sendAsyncRequest(out, _compress, _response);
+ return _connection->sendAsyncRequest(out, _compress, _response);
}
bool
@@ -219,7 +244,7 @@ ConnectRequestHandler::flushBatchRequests(BatchOutgoing* out)
return getConnection(true)->flushBatchRequests(out);
}
-void
+bool
ConnectRequestHandler::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& out)
{
{
@@ -229,10 +254,10 @@ ConnectRequestHandler::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& out)
Request req;
req.batchOut = out;
_requests.push_back(req);
- return;
+ return false;
}
}
- _connection->flushAsyncBatchRequests(out);
+ return _connection->flushAsyncBatchRequests(out);
}
Ice::ConnectionIPtr
@@ -373,6 +398,7 @@ ConnectRequestHandler::flushRequests()
_flushing = true;
}
+ vector<OutgoingAsyncMessageCallbackPtr> sentCallbacks;
try
{
while(!_requests.empty()) // _requests is immutable when _flushing = true
@@ -380,11 +406,23 @@ ConnectRequestHandler::flushRequests()
Request& req = _requests.front();
if(req.out)
{
- _connection->sendAsyncRequest(req.out, _compress, _response);
+ if(_connection->sendAsyncRequest(req.out, _compress, _response))
+ {
+ if(dynamic_cast<Ice::AMISentCallback*>(req.out.get()))
+ {
+ sentCallbacks.push_back(req.out);
+ }
+ }
}
else if(req.batchOut)
{
- _connection->flushAsyncBatchRequests(req.batchOut);
+ if(_connection->flushAsyncBatchRequests(req.batchOut))
+ {
+ if(dynamic_cast<Ice::AMISentCallback*>(req.batchOut.get()))
+ {
+ sentCallbacks.push_back(req.batchOut);
+ }
+ }
}
else
{
@@ -414,8 +452,6 @@ ConnectRequestHandler::flushRequests()
assert(!_exception.get() && !_requests.empty());
_exception.reset(dynamic_cast<Ice::LocalException*>(ex.get()->ice_clone()));
_reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithExceptionWrapper(this, ex));
- notifyAll();
- return;
}
catch(const Ice::LocalException& ex)
{
@@ -423,8 +459,12 @@ ConnectRequestHandler::flushRequests()
assert(!_exception.get() && !_requests.empty());
_exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone()));
_reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this, ex));
- notifyAll();
- return;
+ }
+
+ if(!sentCallbacks.empty())
+ {
+ InstancePtr instance = _reference->getInstance();
+ instance->clientThreadPool()->execute(new FlushSentRequests(instance, sentCallbacks));
}
//
@@ -442,8 +482,11 @@ ConnectRequestHandler::flushRequests()
{
Lock sync(*this);
assert(!_initialized);
- _initialized = true;
- _flushing = false;
+ if(!_exception.get())
+ {
+ _initialized = true;
+ _flushing = false;
+ }
_proxy = 0; // Break cyclic reference count.
_delegate = 0; // Break cyclic reference count.
notifyAll();