diff options
author | Michi Henning <michi@zeroc.com> | 2009-12-07 14:48:27 +1000 |
---|---|---|
committer | Michi Henning <michi@zeroc.com> | 2009-12-07 14:48:27 +1000 |
commit | 8d0c264fc218d2806eb610f325602353e58e6034 (patch) | |
tree | 939c96e2c7ab4ae0d8a3cd16fcba53574149dcc1 /cpp/src/Ice/ConnectionI.cpp | |
parent | Added Excel demo. (diff) | |
parent | 4424 - .NET FxCop Globalization Rules (diff) | |
download | ice-8d0c264fc218d2806eb610f325602353e58e6034.tar.bz2 ice-8d0c264fc218d2806eb610f325602353e58e6034.tar.xz ice-8d0c264fc218d2806eb610f325602353e58e6034.zip |
Merge branch 'master' of ssh://cvs.zeroc.com/home/git/ice
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 287 |
1 files changed, 223 insertions, 64 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 50d922be521..c0ad3585e5f 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -55,6 +55,69 @@ private: Ice::ConnectionI* _connection; }; +class DispatchDispatcherCall : public DispatcherCall +{ +public: + + DispatchDispatcherCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, + const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs, Byte compress, Int requestId, + Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, + const OutgoingAsyncPtr& outAsync, BasicStream& stream) : + _connection(connection), + _startCB(startCB), + _sentCBs(sentCBs), + _compress(compress), + _requestId(requestId), + _invokeNum(invokeNum), + _servantManager(servantManager), + _adapter(adapter), + _outAsync(outAsync), + _stream(stream.instance()) + { + _stream.swap(stream); + } + + virtual void + run() + { + _connection->dispatch(_startCB, _sentCBs, _compress, _requestId, _invokeNum, _servantManager, _adapter, + _outAsync, _stream); + } + +private: + + ConnectionIPtr _connection; + ConnectionI::StartCallbackPtr _startCB; + vector<OutgoingAsyncMessageCallbackPtr> _sentCBs; + Byte _compress; + Int _requestId; + Int _invokeNum; + ServantManagerPtr _servantManager; + ObjectAdapterPtr _adapter; + OutgoingAsyncPtr _outAsync; + BasicStream _stream; +}; + +class FinishDispatcherCall : public DispatcherCall +{ +public: + + FinishDispatcherCall(const Ice::ConnectionIPtr& connection) : + _connection(connection) + { + } + + virtual void + run() + { + _connection->finish(); + } + +private: + + ConnectionIPtr _connection; +}; + } void @@ -106,43 +169,42 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) adopted = true; } -void +bool Ice::ConnectionI::OutgoingMessage::sent(ConnectionI* connection, bool notify) { + isSent = true; // The message is sent. + + if(adopted) + { + delete stream; + stream = 0; + } + if(out) { out->sent(notify); // true = notify the waiting thread that the request was sent. + return false; } else if(outAsync) { - outAsync->__sent(connection); + return outAsync->__sent(connection); } - - if(adopted) + else { - delete stream; - stream = 0; + return false; } } void Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex) { - if(!response) + if(out) { - // - // Only notify oneway requests. The connection keeps track of twoway - // requests in the _requests/_asyncRequests maps and will notify them - // of the connection exceptions. - // - if(out) - { - out->finished(ex); - } - else if(outAsync) - { - outAsync->__finished(ex); - } + out->finished(ex, isSent); + } + else if(outAsync) + { + outAsync->__finished(ex, isSent); } if(adopted) @@ -429,7 +491,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) // _transceiver->checkSendSize(*os, _instance->messageSizeMax()); - Int requestId; + Int requestId = 0; if(response) { // @@ -460,8 +522,8 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) bool sent = false; try { - OutgoingMessage message(out, os, compress, response); - sent = sendMessage(message); + OutgoingMessage message(out, os, compress, requestId); + sent = sendMessage(message) & AsyncStatusSent; } catch(const LocalException& ex) { @@ -481,7 +543,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) return sent; } -bool +AsyncStatus Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, bool response) { BasicStream* os = out->__getOs(); @@ -506,7 +568,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b // _transceiver->checkSendSize(*os, _instance->messageSizeMax()); - Int requestId; + Int requestId = 0; if(response) { // @@ -530,11 +592,11 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b #endif } - bool sent = false; + AsyncStatus status; try { - OutgoingMessage message(out, os, compress, response); - sent = sendMessage(message); + OutgoingMessage message(out, os, compress, requestId); + status = sendMessage(message); } catch(const LocalException& ex) { @@ -551,7 +613,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), pair<const Int, OutgoingAsyncPtr>(requestId, out)); } - return sent; + return status; } void @@ -798,8 +860,8 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) bool sent = false; try { - OutgoingMessage message(out, out->os(), _batchRequestCompress, false); - sent = sendMessage(message); + OutgoingMessage message(out, out->os(), _batchRequestCompress, 0); + sent = sendMessage(message) & AsyncStatusSent; } catch(const Ice::LocalException& ex) { @@ -819,7 +881,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) return sent; } -bool +AsyncStatus Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -835,8 +897,12 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) if(_batchRequestNum == 0) { - outAsync->__sent(this); - return true; + AsyncStatus status = AsyncStatusSent; + if(outAsync->__sent(this)) + { + status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback); + } + return status; } // @@ -853,11 +919,11 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) // // Send the batch stream. // - bool sent = false; + AsyncStatus status; try { - OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, false); - sent = sendMessage(message); + OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, 0); + status = sendMessage(message); } catch(const Ice::LocalException& ex) { @@ -874,7 +940,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) _batchRequestNum = 0; _batchRequestCompress = false; _batchMarker = 0; - return sent; + return status; } void @@ -1030,7 +1096,11 @@ Ice::ConnectionI::startAsync(SocketOperation operation) { if(operation & SocketOperationWrite) { - _transceiver->startWrite(_writeStream); + if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty()) + { + // The whole message is written, assume it's sent now for at-most-once semantics. + _sendStreams.front().isSent = true; + } } else if(operation & SocketOperationRead) { @@ -1058,7 +1128,6 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) { _transceiver->finishRead(_readStream); } - } catch(const Ice::LocalException& ex) { @@ -1286,6 +1355,41 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) io.completed(); } + if(_dispatcher) + { + try + { + _dispatcher->dispatch(new DispatchDispatcherCall(this, startCB, sentCBs, compress, requestId, invokeNum, + servantManager, adapter, outAsync, current.stream), this); + } + catch(const std::exception& ex) + { + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + Warning out(_instance->initializationData().logger); + out << "dispatch exception:\n" << ex << '\n' << _desc; + } + } + catch(...) + { + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + Warning out(_instance->initializationData().logger); + out << "dispatch exception:\nunknown c++ exception" << '\n' << _desc; + } + } + } + else + { + dispatch(startCB, sentCBs, compress, requestId, invokeNum, servantManager, adapter, outAsync, current.stream); + } +} + +void +ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingAsyncMessageCallbackPtr>& sentCBs, + Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, + const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, BasicStream& stream) +{ // // Notify the factory that the connection establishment and // validation has completed. @@ -1300,7 +1404,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) // for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = sentCBs.begin(); p != sentCBs.end(); ++p) { - (*p)->__sentCallback(_instance); + (*p)->__sent(); } // @@ -1309,7 +1413,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) // if(outAsync) { - outAsync->__finished(current.stream); + outAsync->__finished(stream); } // @@ -1319,7 +1423,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) // if(invokeNum) { - invokeAll(current.stream, invokeNum, requestId, compress, servantManager, adapter); + invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter); } } @@ -1337,11 +1441,45 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current) // to call code that will potentially block (this avoids promoting a new leader and // unecessary thread creation, especially if this is called on shutdown). // - if(_startCallback || !_sendStreams.empty() || !_asyncRequests.empty()) + if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty()) + { + finish(); + return; + } + + if(!_dispatcher) { current.ioCompleted(); + finish(); + } + else + { + try + { + _dispatcher->dispatch(new FinishDispatcherCall(this), this); + } + catch(const std::exception& ex) + { + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + Warning out(_instance->initializationData().logger); + out << "dispatch exception:\n" << ex << '\n' << _desc; + } + } + catch(...) + { + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + Warning out(_instance->initializationData().logger); + out << "dispatch exception:\nunknown c++ exception" << '\n' << _desc; + } + } } +} +void +Ice::ConnectionI::finish() +{ if(_startCallback) { _startCallback->connectionStartFailed(this, *_exception.get()); @@ -1351,21 +1489,22 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current) if(!_sendStreams.empty()) { assert(!_writeStream.b.empty()); - #ifdef ICE_USE_IOCP // - // The current message might be sent but not yet removed from _sendStreams if the - // connection was closed shortly after. We check if that's the case here and mark - // the message as sent if necessary. + // The current message might be sent but not yet removed from _sendStreams. If + // the response has been received in the meantime, we remove the message from + // _sendStreams to not call finished on a message which is already done. // OutgoingMessage* message = &_sendStreams.front(); _writeStream.swap(*message->stream); - if(message->stream->i == message->stream->b.end()) + if(message->requestId > 0 && + (message->out && _requests.find(message->requestId) == _requests.end() || + message->outAsync && _asyncRequests.find(message->requestId) == _asyncRequests.end())) { - message->sent(this, true); - if(dynamic_cast<Ice::AMISentCallback*>(message->outAsync.get())) + if(message->sent(this, true)) { - message->outAsync->__sentCallback(_instance); + assert(message->outAsync); + message->outAsync->__sent(); } _sendStreams.pop_front(); } @@ -1374,19 +1513,30 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current) for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { o->finished(*_exception.get()); + if(o->requestId) // Make sure finished isn't called twice. + { + if(o->out) + { + _requests.erase(o->requestId); + } + else + { + _asyncRequests.erase(o->requestId); + } + } } _sendStreams.clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage } for(map<Int, Outgoing*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) { - p->second->finished(*_exception.get()); + p->second->finished(*_exception.get(), true); } _requests.clear(); for(map<Int, OutgoingAsyncPtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) { - q->second->__finished(*_exception.get()); + q->second->__finished(*_exception.get(), true); } _asyncRequests.clear(); @@ -1509,6 +1659,7 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, _connector(connector), _endpoint(endpoint), _adapter(adapter), + _dispatcher(_instance->initializationData().dispatcher), // Cached for better performance. _logger(_instance->initializationData().logger), // Cached for better performance. _traceLevels(_instance->traceLevels()), // Cached for better performance. _timer(_instance->timer()), // Cached for better performance. @@ -1839,7 +1990,7 @@ Ice::ConnectionI::initiateShutdown() os.write(headerSize); // Message size. OutgoingMessage message(&os, false); - if(sendMessage(message)) + if(sendMessage(message) & AsyncStatusSent) { // // Schedule the close timeout to wait for the peer to close the connection. If @@ -2007,9 +2158,9 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb // OutgoingMessage* message = &_sendStreams.front(); _writeStream.swap(*message->stream); - message->sent(this, true); - if(dynamic_cast<Ice::AMISentCallback*>(message->outAsync.get())) + if(message->sent(this, true)) { + assert(message->outAsync); callbacks.push_back(message->outAsync); } _sendStreams.pop_front(); @@ -2115,7 +2266,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb } } -bool +AsyncStatus Ice::ConnectionI::sendMessage(OutgoingMessage& message) { assert(_state < StateClosed); @@ -2126,7 +2277,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) { _sendStreams.push_back(message); _sendStreams.back().adopt(0); - return false; + return AsyncStatusQueued; } // @@ -2164,13 +2315,17 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) // if(_transceiver->write(stream)) { - message.sent(this, false); + AsyncStatus status = AsyncStatusSent; + if(message.sent(this, false)) + { + status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback); + } if(_acmTimeout > 0) { _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); } - return true; + return status; } _sendStreams.push_back(message); @@ -2212,13 +2367,17 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) // if(_transceiver->write(*message.stream)) { - message.sent(this, false); + AsyncStatus status = AsyncStatusSent; + if(message.sent(this, false)) + { + status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback); + } if(_acmTimeout > 0) { _acmAbsoluteTimeout = IceUtil::Time::now(IceUtil::Time::Monotonic) + IceUtil::Time::seconds(_acmTimeout); } - return true; + return status; } _sendStreams.push_back(message); @@ -2228,7 +2387,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) _writeStream.swap(*_sendStreams.back().stream); scheduleTimeout(SocketOperationWrite, _endpoint->timeout()); _threadPool->_register(this, SocketOperationWrite); - return false; + return AsyncStatusQueued; } static string |