summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp176
1 files changed, 101 insertions, 75 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 1bdbeb12448..92dc4a1693d 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -242,7 +242,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
}
void
-Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream)
+Ice::ConnectionI::OutgoingMessage::canceled(bool adoptStream)
{
assert((out || outAsync)); // Only requests can timeout.
out = 0;
@@ -253,7 +253,7 @@ Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream)
}
else
{
- assert(!adopted && !stream);
+ assert(!adopted);
}
}
@@ -273,25 +273,28 @@ Ice::ConnectionI::OutgoingMessage::sent()
else if(outAsync)
{
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
- invokeSent = outAsync->__sent();
+ invokeSent = outAsync->sent();
return invokeSent || receivedReply;
#else
- return outAsync->__sent();
+ return outAsync->sent();
#endif
}
return false;
}
void
-Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex)
+Ice::ConnectionI::OutgoingMessage::completed(const Ice::LocalException& ex)
{
if(out)
{
- out->finished(ex);
+ out->completed(ex);
}
else if(outAsync)
{
- outAsync->__finished(ex);
+ if(outAsync->completed(ex))
+ {
+ outAsync->invokeCompleted();
+ }
}
if(adopted)
@@ -651,8 +654,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
#endif
}
- out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId,
- static_cast<Int>(os->b.size() - headerSize - 4));
+ out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
//
// Send the message. If it can't be sent without blocking the message is added
@@ -685,7 +687,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
AsyncStatus
Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, bool response)
{
- BasicStream* os = out->__getOs();
+ BasicStream* os = out->getOs();
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_exception.get())
@@ -731,8 +733,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
#endif
}
- out->__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId,
- static_cast<Int>(os->b.size() - headerSize - 4));
+ out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
AsyncStatus status = AsyncStatusQueued;
try
@@ -747,6 +748,11 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
_exception->ice_throw();
}
+ if(response || status & AsyncStatusQueued)
+ {
+ out->cancelable(this); // Notify the request that it's cancelable
+ }
+
if(response)
{
//
@@ -961,7 +967,7 @@ Ice::ConnectionI::abortBatchRequest()
void
Ice::ConnectionI::flushBatchRequests()
{
- BatchOutgoing out(this, _instance.get(), __flushBatchRequests_name);
+ FlushBatch out(this, _instance.get(), __flushBatchRequests_name);
out.invoke();
}
@@ -986,9 +992,8 @@ Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchR
#ifdef ICE_CPP11
AsyncResultPtr
-Ice::ConnectionI::begin_flushBatchRequests(
- const IceInternal::Function<void (const Exception&)>& exception,
- const IceInternal::Function<void (bool)>& sent)
+Ice::ConnectionI::begin_flushBatchRequests(const IceInternal::Function<void (const Exception&)>& exception,
+ const IceInternal::Function<void (bool)>& sent)
{
class Cpp11CB : public IceInternal::Cpp11FnCallbackNC
@@ -1026,16 +1031,13 @@ Ice::ConnectionI::begin_flushBatchRequests(
AsyncResultPtr
Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie)
{
- ConnectionBatchOutgoingAsyncPtr result =
- new ConnectionBatchOutgoingAsync(this, _communicator, _instance, __flushBatchRequests_name, cb, cookie);
- try
- {
- result->__invoke();
- }
- catch(const LocalException& __ex)
- {
- result->__invokeExceptionAsync(__ex);
- }
+ ConnectionFlushBatchPtr result = new ConnectionFlushBatch(this,
+ _communicator,
+ _instance,
+ __flushBatchRequests_name,
+ cb,
+ cookie);
+ result->invoke();
return result;
}
@@ -1047,7 +1049,7 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r)
}
bool
-Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
+Ice::ConnectionI::flushBatchRequests(OutgoingBase* out)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
while(_batchStreamInUse && !_exception.get())
@@ -1075,12 +1077,10 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
#else
copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
#endif
-
- out->attachRemoteObserver(initConnectionInfo(), _endpoint,
- static_cast<Int>(_batchStream.b.size() - headerSize - 4));
-
_batchStream.swap(*out->os());
+ out->attachRemoteObserver(initConnectionInfo(), _endpoint, 0);
+
//
// Send the batch stream.
//
@@ -1109,7 +1109,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
}
AsyncStatus
-Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
+Ice::ConnectionI::flushAsyncBatchRequests(const OutgoingAsyncBasePtr& outAsync)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
while(_batchStreamInUse && !_exception.get())
@@ -1125,7 +1125,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
if(_batchRequestNum == 0)
{
AsyncStatus status = AsyncStatusSent;
- if(outAsync->__sent())
+ if(outAsync->sent())
{
status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
}
@@ -1141,11 +1141,9 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
#else
copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
#endif
+ _batchStream.swap(*outAsync->getOs());
- outAsync->__attachRemoteObserver(initConnectionInfo(), _endpoint, 0,
- static_cast<Int>(_batchStream.b.size() - headerSize - 4));
-
- _batchStream.swap(*outAsync->__getOs());
+ outAsync->attachRemoteObserver(initConnectionInfo(), _endpoint, 0);
//
// Send the batch stream.
@@ -1153,7 +1151,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
AsyncStatus status = AsyncStatusQueued;
try
{
- OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, 0);
+ OutgoingMessage message(outAsync, outAsync->getOs(), _batchRequestCompress, 0);
status = sendMessage(message);
}
catch(const Ice::LocalException& ex)
@@ -1163,6 +1161,11 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
_exception->ice_throw();
}
+ if(status & AsyncStatusQueued)
+ {
+ outAsync->cancelable(this); // Notify the request that it's cancelable.
+ }
+
//
// Reset the batch stream.
//
@@ -1276,9 +1279,14 @@ Ice::ConnectionI::getACM()
}
void
-Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
+Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state >= StateClosed)
+ {
+ return; // The request has already been or will be shortly notified of the failure.
+ }
+
for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
{
if(o->out == out)
@@ -1302,16 +1310,15 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
//
if(o == _sendStreams.begin())
{
- o->timedOut(true); // true = adopt the stream.
+ o->canceled(true); // true = adopt the stream.
}
else
{
- o->timedOut(false);
+ o->canceled(false);
_sendStreams.erase(o);
}
- InvocationTimeoutException ex(__FILE__, __LINE__);
- out->finished(ex);
+ out->completed(ex);
return;
}
}
@@ -1321,8 +1328,7 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
{
if(_requestsHint != _requests.end() && _requestsHint->second == o)
{
- InvocationTimeoutException ex(__FILE__, __LINE__);
- o->finished(ex);
+ o->completed(ex);
_requests.erase(_requestsHint);
_requestsHint = _requests.end();
}
@@ -1332,8 +1338,7 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
{
if(p->second == o)
{
- InvocationTimeoutException ex(__FILE__, __LINE__);
- o->finished(ex);
+ o->completed(ex);
assert(p != _requestsHint);
_requests.erase(p);
return; // We're done.
@@ -1344,10 +1349,18 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
}
void
-Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync)
+Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ //
+ // NOTE: This isn't called from a thread pool thread.
+ //
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state >= StateClosed)
+ {
+ return; // The request has already been or will be shortly notified of the failure.
+ }
+
for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
{
if(o->outAsync.get() == outAsync.get())
@@ -1365,25 +1378,29 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou
_asyncRequests.erase(o->requestId);
}
}
-
+
//
// If the request is being sent, don't remove it from the send streams,
// it will be removed once the sending is finished.
//
if(o == _sendStreams.begin())
{
- o->timedOut(true); // true = adopt the stream
+ o->canceled(true); // true = adopt the stream
}
else
{
- o->timedOut(false);
+ o->canceled(false);
_sendStreams.erase(o);
}
- outAsync->__dispatchInvocationTimeout(_threadPool, this);
- return; // We're done
+ if(outAsync->completed(ex))
+ {
+ sync.release();
+ outAsync->invokeCompleted();
+ }
+ return;
}
}
-
+
OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync);
if(o)
{
@@ -1393,19 +1410,25 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou
{
_asyncRequests.erase(_asyncRequestsHint);
_asyncRequestsHint = _asyncRequests.end();
- outAsync->__dispatchInvocationTimeout(_threadPool, this);
- return; // We're done
+ if(outAsync->completed(ex))
+ {
+ outAsync->invokeCompletedAsync();
+ }
+ return;
}
}
-
+
for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p)
{
if(p->second.get() == o.get())
{
assert(p != _asyncRequestsHint);
_asyncRequests.erase(p);
- outAsync->__dispatchInvocationTimeout(_threadPool, this);
- return; // We're done
+ if(outAsync->completed(ex))
+ {
+ outAsync->invokeCompletedAsync();
+ }
+ return;
}
}
}
@@ -1972,18 +1995,18 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
if(p->invokeSent)
{
- p->outAsync->__invokeSent();
+ p->outAsync->invokeSent();
}
if(p->receivedReply)
{
OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(p->outAsync);
- if(outAsync->__finished())
+ if(outAsync->completed())
{
- outAsync->__invokeCompleted();
+ outAsync->invokeCompleted();
}
}
#else
- p->outAsync->__invokeSent();
+ p->outAsync->invokeSent();
#endif
}
++dispatchedCount;
@@ -1995,7 +2018,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
//
if(outAsync)
{
- outAsync->__invokeCompleted();
+ outAsync->invokeCompleted();
++dispatchedCount;
}
@@ -2147,14 +2170,14 @@ Ice::ConnectionI::finish()
{
if(message->sent() && message->invokeSent)
{
- message->outAsync->__invokeSent();
+ message->outAsync->invokeSent();
}
if(message->receivedReply)
{
OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(message->outAsync);
- if(outAsync->__finished())
+ if(outAsync->completed())
{
- outAsync->__invokeCompleted();
+ outAsync->invokeCompleted();
}
}
_sendStreams.pop_front();
@@ -2164,7 +2187,7 @@ Ice::ConnectionI::finish()
for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
{
- o->finished(*_exception.get());
+ o->completed(*_exception.get());
if(o->requestId) // Make sure finished isn't called twice.
{
if(o->out)
@@ -2182,13 +2205,16 @@ Ice::ConnectionI::finish()
for(map<Int, Outgoing*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
{
- p->second->finished(*_exception.get());
+ p->second->completed(*_exception.get());
}
_requests.clear();
for(map<Int, OutgoingAsyncPtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
{
- q->second->__finished(*_exception.get());
+ if(q->second->completed(*_exception.get()))
+ {
+ q->second->invokeCompleted();
+ }
}
_asyncRequests.clear();
@@ -3481,7 +3507,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
if(p != _requests.end())
{
- p->second->finished(stream);
+ p->second->completed(stream);
if(p == _requestsHint)
{
@@ -3508,7 +3534,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
_asyncRequests.erase(q);
}
- stream.swap(*outAsync->__getIs());
+ stream.swap(*outAsync->getIs());
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
//
@@ -3522,7 +3548,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
message->receivedReply = true;
outAsync = 0;
}
- else if(outAsync->__finished())
+ else if(outAsync->completed())
{
++dispatchCount;
}
@@ -3531,7 +3557,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
outAsync = 0;
}
#else
- if(outAsync->__finished())
+ if(outAsync->completed())
{
++dispatchCount;
}