summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
authorMichi Henning <michi@zeroc.com>2009-12-07 14:48:27 +1000
committerMichi Henning <michi@zeroc.com>2009-12-07 14:48:27 +1000
commit8d0c264fc218d2806eb610f325602353e58e6034 (patch)
tree939c96e2c7ab4ae0d8a3cd16fcba53574149dcc1 /cpp/src/Ice/ConnectionI.cpp
parentAdded Excel demo. (diff)
parent4424 - .NET FxCop Globalization Rules (diff)
downloadice-8d0c264fc218d2806eb610f325602353e58e6034.tar.bz2
ice-8d0c264fc218d2806eb610f325602353e58e6034.tar.xz
ice-8d0c264fc218d2806eb610f325602353e58e6034.zip
Merge branch 'master' of ssh://cvs.zeroc.com/home/git/ice
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp287
1 files changed, 223 insertions, 64 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 50d922be521..c0ad3585e5f 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -55,6 +55,69 @@ private:
Ice::ConnectionI* _connection;
};
+class DispatchDispatcherCall : public DispatcherCall
+{
+public:
+
+ DispatchDispatcherCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB,
+ const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs, Byte compress, Int requestId,
+ Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter,
+ const OutgoingAsyncPtr& outAsync, BasicStream& stream) :
+ _connection(connection),
+ _startCB(startCB),
+ _sentCBs(sentCBs),
+ _compress(compress),
+ _requestId(requestId),
+ _invokeNum(invokeNum),
+ _servantManager(servantManager),
+ _adapter(adapter),
+ _outAsync(outAsync),
+ _stream(stream.instance())
+ {
+ _stream.swap(stream);
+ }
+
+ virtual void
+ run()
+ {
+ _connection->dispatch(_startCB, _sentCBs, _compress, _requestId, _invokeNum, _servantManager, _adapter,
+ _outAsync, _stream);
+ }
+
+private:
+
+ ConnectionIPtr _connection;
+ ConnectionI::StartCallbackPtr _startCB;
+ vector<OutgoingAsyncMessageCallbackPtr> _sentCBs;
+ Byte _compress;
+ Int _requestId;
+ Int _invokeNum;
+ ServantManagerPtr _servantManager;
+ ObjectAdapterPtr _adapter;
+ OutgoingAsyncPtr _outAsync;
+ BasicStream _stream;
+};
+
+class FinishDispatcherCall : public DispatcherCall
+{
+public:
+
+ FinishDispatcherCall(const Ice::ConnectionIPtr& connection) :
+ _connection(connection)
+ {
+ }
+
+ virtual void
+ run()
+ {
+ _connection->finish();
+ }
+
+private:
+
+ ConnectionIPtr _connection;
+};
+
}
void
@@ -106,43 +169,42 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
adopted = true;
}
-void
+bool
Ice::ConnectionI::OutgoingMessage::sent(ConnectionI* connection, bool notify)
{
+ isSent = true; // The message is sent.
+
+ if(adopted)
+ {
+ delete stream;
+ stream = 0;
+ }
+
if(out)
{
out->sent(notify); // true = notify the waiting thread that the request was sent.
+ return false;
}
else if(outAsync)
{
- outAsync->__sent(connection);
+ return outAsync->__sent(connection);
}
-
- if(adopted)
+ else
{
- delete stream;
- stream = 0;
+ return false;
}
}
void
Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex)
{
- if(!response)
+ if(out)
{
- //
- // Only notify oneway requests. The connection keeps track of twoway
- // requests in the _requests/_asyncRequests maps and will notify them
- // of the connection exceptions.
- //
- if(out)
- {
- out->finished(ex);
- }
- else if(outAsync)
- {
- outAsync->__finished(ex);
- }
+ out->finished(ex, isSent);
+ }
+ else if(outAsync)
+ {
+ outAsync->__finished(ex, isSent);
}
if(adopted)
@@ -429,7 +491,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
//
_transceiver->checkSendSize(*os, _instance->messageSizeMax());
- Int requestId;
+ Int requestId = 0;
if(response)
{
//
@@ -460,8 +522,8 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
bool sent = false;
try
{
- OutgoingMessage message(out, os, compress, response);
- sent = sendMessage(message);
+ OutgoingMessage message(out, os, compress, requestId);
+ sent = sendMessage(message) & AsyncStatusSent;
}
catch(const LocalException& ex)
{
@@ -481,7 +543,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
return sent;
}
-bool
+AsyncStatus
Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, bool response)
{
BasicStream* os = out->__getOs();
@@ -506,7 +568,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
//
_transceiver->checkSendSize(*os, _instance->messageSizeMax());
- Int requestId;
+ Int requestId = 0;
if(response)
{
//
@@ -530,11 +592,11 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
#endif
}
- bool sent = false;
+ AsyncStatus status;
try
{
- OutgoingMessage message(out, os, compress, response);
- sent = sendMessage(message);
+ OutgoingMessage message(out, os, compress, requestId);
+ status = sendMessage(message);
}
catch(const LocalException& ex)
{
@@ -551,7 +613,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
_asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(),
pair<const Int, OutgoingAsyncPtr>(requestId, out));
}
- return sent;
+ return status;
}
void
@@ -798,8 +860,8 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
bool sent = false;
try
{
- OutgoingMessage message(out, out->os(), _batchRequestCompress, false);
- sent = sendMessage(message);
+ OutgoingMessage message(out, out->os(), _batchRequestCompress, 0);
+ sent = sendMessage(message) & AsyncStatusSent;
}
catch(const Ice::LocalException& ex)
{
@@ -819,7 +881,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
return sent;
}
-bool
+AsyncStatus
Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -835,8 +897,12 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
if(_batchRequestNum == 0)
{
- outAsync->__sent(this);
- return true;
+ AsyncStatus status = AsyncStatusSent;
+ if(outAsync->__sent(this))
+ {
+ status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
+ }
+ return status;
}
//
@@ -853,11 +919,11 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
//
// Send the batch stream.
//
- bool sent = false;
+ AsyncStatus status;
try
{
- OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, false);
- sent = sendMessage(message);
+ OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, 0);
+ status = sendMessage(message);
}
catch(const Ice::LocalException& ex)
{
@@ -874,7 +940,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
_batchRequestNum = 0;
_batchRequestCompress = false;
_batchMarker = 0;
- return sent;
+ return status;
}
void
@@ -1030,7 +1096,11 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
{
if(operation & SocketOperationWrite)
{
- _transceiver->startWrite(_writeStream);
+ if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty())
+ {
+ // The whole message is written, assume it's sent now for at-most-once semantics.
+ _sendStreams.front().isSent = true;
+ }
}
else if(operation & SocketOperationRead)
{
@@ -1058,7 +1128,6 @@ Ice::ConnectionI::finishAsync(SocketOperation operation)
{
_transceiver->finishRead(_readStream);
}
-
}
catch(const Ice::LocalException& ex)
{
@@ -1286,6 +1355,41 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
io.completed();
}
+ if(_dispatcher)
+ {
+ try
+ {
+ _dispatcher->dispatch(new DispatchDispatcherCall(this, startCB, sentCBs, compress, requestId, invokeNum,
+ servantManager, adapter, outAsync, current.stream), this);
+ }
+ catch(const std::exception& ex)
+ {
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "dispatch exception:\n" << ex << '\n' << _desc;
+ }
+ }
+ catch(...)
+ {
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "dispatch exception:\nunknown c++ exception" << '\n' << _desc;
+ }
+ }
+ }
+ else
+ {
+ dispatch(startCB, sentCBs, compress, requestId, invokeNum, servantManager, adapter, outAsync, current.stream);
+ }
+}
+
+void
+ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs,
+ Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager,
+ const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, BasicStream& stream)
+{
//
// Notify the factory that the connection establishment and
// validation has completed.
@@ -1300,7 +1404,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
//
for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p)
{
- (*p)->__sentCallback(_instance);
+ (*p)->__sent();
}
//
@@ -1309,7 +1413,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
//
if(outAsync)
{
- outAsync->__finished(current.stream);
+ outAsync->__finished(stream);
}
//
@@ -1319,7 +1423,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
//
if(invokeNum)
{
- invokeAll(current.stream, invokeNum, requestId, compress, servantManager, adapter);
+ invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter);
}
}
@@ -1337,11 +1441,45 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current)
// to call code that will potentially block (this avoids promoting a new leader and
// unecessary thread creation, especially if this is called on shutdown).
//
- if(_startCallback || !_sendStreams.empty() || !_asyncRequests.empty())
+ if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty())
+ {
+ finish();
+ return;
+ }
+
+ if(!_dispatcher)
{
current.ioCompleted();
+ finish();
+ }
+ else
+ {
+ try
+ {
+ _dispatcher->dispatch(new FinishDispatcherCall(this), this);
+ }
+ catch(const std::exception& ex)
+ {
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "dispatch exception:\n" << ex << '\n' << _desc;
+ }
+ }
+ catch(...)
+ {
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "dispatch exception:\nunknown c++ exception" << '\n' << _desc;
+ }
+ }
}
+}
+void
+Ice::ConnectionI::finish()
+{
if(_startCallback)
{
_startCallback->connectionStartFailed(this, *_exception.get());
@@ -1351,21 +1489,22 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current)
if(!_sendStreams.empty())
{
assert(!_writeStream.b.empty());
-
#ifdef ICE_USE_IOCP
//
- // The current message might be sent but not yet removed from _sendStreams if the
- // connection was closed shortly after. We check if that's the case here and mark
- // the message as sent if necessary.
+ // The current message might be sent but not yet removed from _sendStreams. If
+ // the response has been received in the meantime, we remove the message from
+ // _sendStreams to not call finished on a message which is already done.
//
OutgoingMessage* message = &_sendStreams.front();
_writeStream.swap(*message->stream);
- if(message->stream->i == message->stream->b.end())
+ if(message->requestId > 0 &&
+ (message->out && _requests.find(message->requestId) == _requests.end() ||
+ message->outAsync && _asyncRequests.find(message->requestId) == _asyncRequests.end()))
{
- message->sent(this, true);
- if(dynamic_cast<Ice::AMISentCallback*>(message->outAsync.get()))
+ if(message->sent(this, true))
{
- message->outAsync->__sentCallback(_instance);
+ assert(message->outAsync);
+ message->outAsync->__sent();
}
_sendStreams.pop_front();
}
@@ -1374,19 +1513,30 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current)
for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
{
o->finished(*_exception.get());
+ if(o->requestId) // Make sure finished isn't called twice.
+ {
+ if(o->out)
+ {
+ _requests.erase(o->requestId);
+ }
+ else
+ {
+ _asyncRequests.erase(o->requestId);
+ }
+ }
}
_sendStreams.clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage
}
for(map<Int, Outgoing*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
{
- p->second->finished(*_exception.get());
+ p->second->finished(*_exception.get(), true);
}
_requests.clear();
for(map<Int, OutgoingAsyncPtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
{
- q->second->__finished(*_exception.get());
+ q->second->__finished(*_exception.get(), true);
}
_asyncRequests.clear();
@@ -1509,6 +1659,7 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
_connector(connector),
_endpoint(endpoint),
_adapter(adapter),
+ _dispatcher(_instance->initializationData().dispatcher), // Cached for better performance.
_logger(_instance->initializationData().logger), // Cached for better performance.
_traceLevels(_instance->traceLevels()), // Cached for better performance.
_timer(_instance->timer()), // Cached for better performance.
@@ -1839,7 +1990,7 @@ Ice::ConnectionI::initiateShutdown()
os.write(headerSize); // Message size.
OutgoingMessage message(&os, false);
- if(sendMessage(message))
+ if(sendMessage(message) & AsyncStatusSent)
{
//
// Schedule the close timeout to wait for the peer to close the connection. If
@@ -2007,9 +2158,9 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb
//
OutgoingMessage* message = &_sendStreams.front();
_writeStream.swap(*message->stream);
- message->sent(this, true);
- if(dynamic_cast<Ice::AMISentCallback*>(message->outAsync.get()))
+ if(message->sent(this, true))
{
+ assert(message->outAsync);
callbacks.push_back(message->outAsync);
}
_sendStreams.pop_front();
@@ -2115,7 +2266,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb
}
}
-bool
+AsyncStatus
Ice::ConnectionI::sendMessage(OutgoingMessage& message)
{
assert(_state < StateClosed);
@@ -2126,7 +2277,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
{
_sendStreams.push_back(message);
_sendStreams.back().adopt(0);
- return false;
+ return AsyncStatusQueued;
}
//
@@ -2164,13 +2315,17 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
//
if(_transceiver->write(stream))
{
- message.sent(this, false);
+ AsyncStatus status = AsyncStatusSent;
+ if(message.sent(this, false))
+ {
+ status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
+ }
if(_acmTimeout > 0)
{
_acmAbsoluteTimeout =
IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
}
- return true;
+ return status;
}
_sendStreams.push_back(message);
@@ -2212,13 +2367,17 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
//
if(_transceiver->write(*message.stream))
{
- message.sent(this, false);
+ AsyncStatus status = AsyncStatusSent;
+ if(message.sent(this, false))
+ {
+ status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
+ }
if(_acmTimeout > 0)
{
_acmAbsoluteTimeout =
IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout);
}
- return true;
+ return status;
}
_sendStreams.push_back(message);
@@ -2228,7 +2387,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
_writeStream.swap(*_sendStreams.back().stream);
scheduleTimeout(SocketOperationWrite, _endpoint->timeout());
_threadPool->_register(this, SocketOperationWrite);
- return false;
+ return AsyncStatusQueued;
}
static string