diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 964 |
1 files changed, 378 insertions, 586 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 50b3dffe9b3..ce042d87ba5 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -20,7 +20,6 @@ #include <Ice/ACM.h> #include <Ice/ObjectAdapterI.h> // For getThreadPool() and getServantManager(). #include <Ice/EndpointI.h> -#include <Ice/Outgoing.h> #include <Ice/OutgoingAsync.h> #include <Ice/Incoming.h> #include <Ice/LocalException.h> @@ -38,14 +37,15 @@ using namespace Ice; using namespace Ice::Instrumentation; using namespace IceInternal; +#ifndef ICE_CPP11_MAPPING Ice::LocalObject* Ice::upCast(ConnectionI* p) { return p; } +#endif namespace { const ::std::string __flushBatchRequests_name = "flushBatchRequests"; - class TimeoutCallback : public IceUtil::TimerTask { public: @@ -72,8 +72,8 @@ public: DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, - const OutgoingAsyncBasePtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, - BasicStream& stream) : + const OutgoingAsyncBasePtr& outAsync, const ICE_HEARTBEAT_CALLBACK& heartbeatCallback, + InputStream& stream) : DispatchWorkItem(connection), _connection(connection), _startCB(startCB), @@ -108,8 +108,8 @@ private: const ServantManagerPtr _servantManager; const ObjectAdapterPtr _adapter; const OutgoingAsyncBasePtr _outAsync; - const ConnectionCallbackPtr _heartbeatCallback; - BasicStream _stream; + const ICE_HEARTBEAT_CALLBACK _heartbeatCallback; + InputStream _stream; }; class FinishCall : public DispatchWorkItem @@ -211,7 +211,7 @@ Ice::ConnectionI::Observer::attach(const Ice::Instrumentation::ConnectionObserve void -Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) +Ice::ConnectionI::OutgoingMessage::adopt(OutputStream* str) { if(adopted) { @@ -228,7 +228,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) } else if(!str) { - if(out || outAsync) + if(outAsync) { return; // Adopting request stream is not necessary. } @@ -240,7 +240,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) } assert(str); - stream = new BasicStream(str->instance(), currentProtocolEncoding); + stream = new OutputStream(str->instance(), currentProtocolEncoding); stream->swap(*str); adopted = true; } @@ -248,8 +248,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) void Ice::ConnectionI::OutgoingMessage::canceled(bool adoptStream) { - assert((out || outAsync)); // Only requests can timeout. - out = 0; + assert(outAsync); // Only requests can timeout. outAsync = 0; if(adoptStream) { @@ -270,11 +269,7 @@ Ice::ConnectionI::OutgoingMessage::sent() } stream = 0; - if(out) - { - out->sent(); - } - else if(outAsync) + if(outAsync) { #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) invokeSent = outAsync->sent(); @@ -289,15 +284,11 @@ Ice::ConnectionI::OutgoingMessage::sent() void Ice::ConnectionI::OutgoingMessage::completed(const Ice::LocalException& ex) { - if(out) - { - out->completed(ex); - } - else if(outAsync) + if(outAsync) { - if(outAsync->completed(ex)) + if(outAsync->exception(ex)) { - outAsync->invokeCompleted(); + outAsync->invokeException(); } } @@ -316,7 +307,7 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback) IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed. { - assert(_exception.get()); + assert(_exception); _exception->ice_throw(); } @@ -338,7 +329,7 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback) if(_state >= StateClosing) { - assert(_exception.get()); + assert(_exception); _exception->ice_throw(); } } @@ -353,7 +344,7 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback) exception(ex); if(callback) { - callback->connectionStartFailed(this, *_exception.get()); + callback->connectionStartFailed(ICE_SHARED_FROM_THIS, ex); return; } else @@ -365,7 +356,7 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback) if(callback) { - callback->connectionStartCompleted(this); + callback->connectionStartCompleted(ICE_SHARED_FROM_THIS); } } @@ -435,7 +426,7 @@ Ice::ConnectionI::close(bool force) // requests to be retried, regardless of whether the server // has processed them or not. // - while(!_requests.empty() || !_asyncRequests.empty()) + while(!_asyncRequests.empty()) { wait(); } @@ -486,7 +477,7 @@ Ice::ConnectionI::throwException() const { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_exception.get()) + if(_exception) { assert(_state >= StateClosing); _exception->ice_throw(); @@ -538,10 +529,12 @@ Ice::ConnectionI::updateObserver() } assert(_instance->initializationData().observer); - _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(), - _endpoint, - toConnectionState(_state), - _observer.get())); + + ConnectionObserverPtr o = _instance->initializationData().observer->getConnectionObserver(initConnectionInfo(), + _endpoint, + toConnectionState(_state), + _observer.get()); + _observer.attach(o); } void @@ -588,8 +581,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) if(acm.close != CloseOff && now >= (_acmLastActivity + acm.timeout)) { - if(acm.close == CloseOnIdleForceful || - (acm.close != CloseOnIdle && (!_requests.empty() || !_asyncRequests.empty()))) + if(acm.close == CloseOnIdleForceful || (acm.close != CloseOnIdle && !_asyncRequests.empty())) { // // Close the connection if we didn't receive a heartbeat in @@ -597,8 +589,8 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) // setState(StateClosed, ConnectionTimeoutException(__FILE__, __LINE__)); } - else if(acm.close != CloseOnInvocation && - _dispatchCount == 0 && _batchRequestQueue->isEmpty() && _requests.empty() && _asyncRequests.empty()) + else if(acm.close != CloseOnInvocation && _dispatchCount == 0 && _batchRequestQueue->isEmpty() && + _asyncRequests.empty()) { // // The connection is idle, close it. @@ -608,110 +600,21 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) } } -bool -Ice::ConnectionI::sendRequest(OutgoingBase* out, bool compress, bool response, int batchRequestNum) -{ - BasicStream* os = out->os(); - - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_exception.get()) - { - // - // If the connection is closed before we even have a chance - // to send our request, we always try to send the request - // again. - // - throw RetryException(*_exception.get()); - } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - // - // Ensure the message isn't bigger than what we can send with the - // transport. - // - _transceiver->checkSendSize(*os); - - Int requestId = 0; - if(response) - { - // - // Create a new unique request ID. - // - requestId = _nextRequestId++; - if(requestId <= 0) - { - _nextRequestId = 1; - requestId = _nextRequestId++; - } - - // - // Fill in the request ID. - // - const Byte* p = reinterpret_cast<const Byte*>(&requestId); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#else - copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#endif - } - else if(batchRequestNum > 0) - { - const Byte* p = reinterpret_cast<const Byte*>(&batchRequestNum); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#else - copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#endif - } - - out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); - - // - // Send the message. If it can't be sent without blocking the message is added - // to _sendStreams and it will be sent by the selector thread. - // - bool sent = false; - try - { - OutgoingMessage message(out, os, compress, requestId); - sent = sendMessage(message) & AsyncStatusSent; - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - assert(_exception.get()); - _exception->ice_throw(); - } - - if(response) - { - // - // Add to the requests map. - // - _requestsHint = _requests.insert(_requests.end(), pair<const Int, OutgoingBase*>(requestId, out)); - } - - return sent; -} - AsyncStatus Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compress, bool response, int batchRequestNum) { - BasicStream* os = out->getOs(); + OutputStream* os = out->getOs(); IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_exception.get()) + // + // If the exception is closed before we even have a chance + // to send our request, we always try to send the request + // again. + // + if(_exception) { - // - // If the exception is closed before we even have a chance - // to send our request, we always try to send the request - // again. - // - throw RetryException(*_exception.get()); + throw RetryException(*_exception); } - assert(_state > StateNotValidated); assert(_state < StateClosing); @@ -725,8 +628,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compres // Notify the request that it's cancelable with this connection. // This will throw if the request is canceled. // - out->cancelable(this); - + out->cancelable(ICE_SHARED_FROM_THIS); Int requestId = 0; if(response) { @@ -771,7 +673,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compres catch(const LocalException& ex) { setState(StateClosed, ex); - assert(_exception.get()); + assert(_exception); _exception->ice_throw(); } @@ -792,11 +694,38 @@ Ice::ConnectionI::getBatchRequestQueue() const return _batchRequestQueue; } +#ifdef ICE_CPP11_MAPPING +void +Ice::ConnectionI::flushBatchRequests() +{ + Connection::flushBatchRequestsAsync().get(); +} + +std::function<void()> +Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_ptr)> ex, + ::std::function<void(bool)> sent) +{ + class ConnectionFlushBatchLambda : public ConnectionFlushBatchAsync, public LambdaInvoke + { + public: + + ConnectionFlushBatchLambda(std::shared_ptr<Ice::ConnectionI>&& connection, + const InstancePtr& instance, + std::function<void(std::exception_ptr)> ex, + std::function<void(bool)> sent) : + ConnectionFlushBatchAsync(connection, instance), LambdaInvoke(std::move(ex), std::move(sent)) + { + } + }; + auto outAsync = make_shared<ConnectionFlushBatchLambda>(ICE_SHARED_FROM_THIS, _instance, ex, sent); + outAsync->invoke(__flushBatchRequests_name); + return [outAsync]() { outAsync->cancel(); }; +} +#else void Ice::ConnectionI::flushBatchRequests() { - ConnectionFlushBatch out(this, _instance.get(), __flushBatchRequests_name); - out.invoke(); + end_flushBatchRequests(begin_flushBatchRequests()); } AsyncResultPtr @@ -819,55 +748,49 @@ Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchR } AsyncResultPtr -Ice::ConnectionI::begin_flushBatchRequests(const IceInternal::Function<void (const Exception&)>& exception, - const IceInternal::Function<void (bool)>& sent) +Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie) { -#ifdef ICE_CPP11 - class Cpp11CB : public IceInternal::Cpp11FnCallbackNC + class ConnectionFlushBatchAsyncWithCallback : public ConnectionFlushBatchAsync, public CallbackCompletion { public: - Cpp11CB(const IceInternal::Function<void (const Exception&)>& excb, - const IceInternal::Function<void (bool)>& sentcb) : - IceInternal::Cpp11FnCallbackNC(excb, sentcb) + ConnectionFlushBatchAsyncWithCallback(const Ice::ConnectionIPtr& connection, + const Ice::CommunicatorPtr& communicator, + const InstancePtr& instance, + const CallbackBasePtr& callback, + const Ice::LocalObjectPtr& cookie) : + ConnectionFlushBatchAsync(connection, instance), + CallbackCompletion(callback, cookie), + _communicator(communicator), + _connection(connection) { - CallbackBase::checkCallback(true, excb != nullptr); + _cookie = cookie; } - virtual void - completed(const AsyncResultPtr& __result) const + virtual Ice::CommunicatorPtr getCommunicator() const { - ConnectionPtr __con = __result->getConnection(); - assert(__con); - try - { - __con->end_flushBatchRequests(__result); - assert(false); - } - catch(const Exception& ex) - { - IceInternal::Cpp11FnCallbackNC::exception(__result, ex); - } + return _communicator; } - }; - return __begin_flushBatchRequests(new Cpp11CB(exception, sent), 0); -#else - assert(false); // Ice not built with C++11 support. - return 0; -#endif -} + virtual Ice::ConnectionPtr getConnection() const + { + return _connection; + } -AsyncResultPtr -Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie) -{ - ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsync(this, - _communicator, - _instance, - __flushBatchRequests_name, - cb, - cookie); - result->invoke(); + virtual const std::string& + getOperation() const + { + return __flushBatchRequests_name; + } + + private: + + Ice::CommunicatorPtr _communicator; + Ice::ConnectionPtr _connection; + }; + + ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsyncWithCallback(this, _communicator, _instance, cb, cookie); + result->invoke(__flushBatchRequests_name); return result; } @@ -877,52 +800,70 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r) AsyncResult::__check(r, this, __flushBatchRequests_name); r->__wait(); } +#endif void -Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback) +Ice::ConnectionI::setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK) callback) { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + _heartbeatCallback = callback; +} + +void +Ice::ConnectionI::setCloseCallback(ICE_IN(ICE_CLOSE_CALLBACK) callback) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state >= StateClosed) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_state >= StateClosed) + if(callback) { - if(callback) + class CallbackWorkItem : public DispatchWorkItem { - class CallbackWorkItem : public DispatchWorkItem - { - public: + public: - CallbackWorkItem(const ConnectionIPtr& connection, const ConnectionCallbackPtr& callback) : - _connection(connection), - _callback(callback) - { - } + CallbackWorkItem(const ConnectionIPtr& connection, ICE_IN(ICE_CLOSE_CALLBACK) callback) : + _connection(connection), +#ifdef ICE_CPP11_MAPPING + _callback(move(callback)) +#else + _callback(callback) +#endif + { + } - virtual void run() - { - _connection->closeCallback(_callback); - } + virtual void run() + { + _connection->closeCallback(_callback); + } - private: + private: - const ConnectionIPtr _connection; - const ConnectionCallbackPtr _callback; - }; - _threadPool->dispatch(new CallbackWorkItem(this, callback)); - } - } - else - { - _callback = callback; + const ConnectionIPtr _connection; + const ICE_CLOSE_CALLBACK _callback; + }; +#ifdef ICE_CPP11_MAPPING + _threadPool->dispatch(new CallbackWorkItem(ICE_SHARED_FROM_THIS, move(callback))); +#else + _threadPool->dispatch(new CallbackWorkItem(ICE_SHARED_FROM_THIS, callback)); +#endif } } + else + { + _closeCallback = callback; + } } void -Ice::ConnectionI::closeCallback(const ConnectionCallbackPtr& callback) +Ice::ConnectionI::closeCallback(const ICE_CLOSE_CALLBACK& callback) { try { - callback->closed(this); +#ifdef ICE_CPP11_MAPPING + callback(ICE_SHARED_FROM_THIS); +#else + callback->closed(ICE_SHARED_FROM_THIS); +#endif } catch(const std::exception& ex) { @@ -949,7 +890,7 @@ Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout, if(_state == StateActive) { - _monitor->remove(this); + _monitor->remove(ICE_SHARED_FROM_THIS); } _monitor = _monitor->acm(timeout, close, heartbeat); @@ -964,7 +905,7 @@ Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout, if(_state == StateActive) { - _monitor->add(this); + _monitor->add(ICE_SHARED_FROM_THIS); } } @@ -980,96 +921,6 @@ Ice::ConnectionI::getACM() } void -Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_state >= StateClosed) - { - return; // The request has already been or will be shortly notified of the failure. - } - - for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) - { - if(o->out == out) - { - if(o->requestId) - { - if(_requestsHint != _requests.end() && _requestsHint->second == out) - { - _requests.erase(_requestsHint); - _requestsHint = _requests.end(); - } - else - { - _requests.erase(o->requestId); - } - } - - if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) - { - setState(StateClosed, ex); - } - else - { - // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. - // - if(o == _sendStreams.begin()) - { - o->canceled(true); // true = adopt the stream. - } - else - { - o->canceled(false); - _sendStreams.erase(o); - } - out->completed(ex); - } - return; - } - } - - if(dynamic_cast<Outgoing*>(out)) - { - if(_requestsHint != _requests.end() && _requestsHint->second == out) - { - if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) - { - setState(StateClosed, ex); - } - else - { - out->completed(ex); - _requests.erase(_requestsHint); - _requestsHint = _requests.end(); - } - return; - } - else - { - for(map<Int, OutgoingBase*>::iterator p = _requests.begin(); p != _requests.end(); ++p) - { - if(p->second == out) - { - if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) - { - setState(StateClosed, ex); - } - else - { - p->second->completed(ex); - assert(p != _requestsHint); - _requests.erase(p); - } - return; // We're done. - } - } - } - } -} - -void Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex) { // @@ -1089,7 +940,7 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con if(o->requestId) { if(_asyncRequestsHint != _asyncRequests.end() && - _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync)) + _asyncRequestsHint->second == ICE_DYNAMIC_CAST(OutgoingAsync, outAsync)) { _asyncRequests.erase(_asyncRequestsHint); _asyncRequestsHint = _asyncRequests.end(); @@ -1119,16 +970,16 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con o->canceled(false); _sendStreams.erase(o); } - if(outAsync->completed(ex)) + if(outAsync->exception(ex)) { - outAsync->invokeCompletedAsync(); + outAsync->invokeExceptionAsync(); } } return; } } - if(OutgoingAsyncPtr::dynamicCast(outAsync)) + if(ICE_DYNAMIC_CAST(OutgoingAsync, outAsync)) { if(_asyncRequestsHint != _asyncRequests.end()) { @@ -1142,9 +993,9 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con { _asyncRequests.erase(_asyncRequestsHint); _asyncRequestsHint = _asyncRequests.end(); - if(outAsync->completed(ex)) + if(outAsync->exception(ex)) { - outAsync->invokeCompletedAsync(); + outAsync->invokeExceptionAsync(); } } return; @@ -1163,9 +1014,9 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con { assert(p != _asyncRequestsHint); _asyncRequests.erase(p); - if(outAsync->completed(ex)) + if(outAsync->exception(ex)) { - outAsync->invokeCompletedAsync(); + outAsync->invokeExceptionAsync(); } } return; @@ -1175,7 +1026,7 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con } void -Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag, bool /*amd*/) +Ice::ConnectionI::sendResponse(Int, OutputStream* os, Byte compressFlag, bool /*amd*/) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_state > StateNotValidated); @@ -1193,7 +1044,7 @@ Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag, bool /*a if(_state >= StateClosed) { - assert(_exception.get()); + assert(_exception); _exception->ice_throw(); } @@ -1232,7 +1083,7 @@ Ice::ConnectionI::sendNoResponse() if(_state >= StateClosed) { - assert(_exception.get()); + assert(_exception); _exception->ice_throw(); } @@ -1335,15 +1186,15 @@ Ice::ConnectionI::getEndpoint() const return _endpoint; // No mutex protection necessary, _endpoint is immutable. } -ObjectPrx +ObjectPrxPtr Ice::ConnectionI::createProxy(const Identity& ident) const { // // Create a reference and return a reverse proxy for this // reference. // - ConnectionIPtr self = const_cast<ConnectionI*>(this); - return _instance->proxyFactory()->referenceToProxy(_instance->referenceFactory()->create(ident, self)); + return _instance->proxyFactory()->referenceToProxy( + _instance->referenceFactory()->create(ident, ICE_SHARED_FROM_CONST_THIS(ConnectionI))); } #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) @@ -1372,19 +1223,12 @@ Ice::ConnectionI::startAsync(SocketOperation operation) } else if(operation & SocketOperationRead) { - if(!_hasMoreData) - { - if(_observer && !_readHeader) - { - _observer.startRead(_readStream); - } - - _transceiver->startRead(_readStream); - } - else + if(_observer && !_readHeader) { - _transceiver->getNativeInfo()->completed(IceInternal::SocketOperationRead); + _observer.startRead(_readStream); } + + _transceiver->startRead(_readStream); } } catch(const Ice::LocalException& ex) @@ -1422,29 +1266,26 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) } else if(operation & SocketOperationRead) { - if(!_hasMoreData) + Buffer::Container::iterator start = _readStream.i; + _transceiver->finishRead(_readStream); + if(_instance->traceLevels()->network >= 3 && _readStream.i != start) { - Buffer::Container::iterator start = _readStream.i; - _transceiver->finishRead(_readStream, _hasMoreData); - if(_instance->traceLevels()->network >= 3 && _readStream.i != start) + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "received "; + if(_endpoint->datagram()) { - Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); - out << "received "; - if(_endpoint->datagram()) - { - out << _readStream.b.size(); - } - else - { - out << (_readStream.i - start) << " of " << (_readStream.b.end() - start); - } - out << " bytes via " << _endpoint->protocol() << "\n" << toString(); + out << _readStream.b.size(); } - - if(_observer && !_readHeader) + else { - _observer.finishRead(_readStream); + out << (_readStream.i - start) << " of " << (_readStream.b.end() - start); } + out << " bytes via " << _endpoint->protocol() << "\n" << toString(); + } + + if(_observer && !_readHeader) + { + _observer.finishRead(_readStream); } } } @@ -1467,11 +1308,10 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) ServantManagerPtr servantManager; ObjectAdapterPtr adapter; OutgoingAsyncBasePtr outAsync; - ConnectionCallbackPtr heartbeatCallback; + ICE_HEARTBEAT_CALLBACK heartbeatCallback; int dispatchCount = 0; ThreadPoolMessage<ConnectionI> msg(current, *this); - { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -1603,7 +1443,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) // satisfied before continuing. // scheduleTimeout(newOp); - _threadPool->update(this, current.operation, newOp); + _threadPool->update(ICE_SHARED_FROM_THIS, current.operation, newOp); return; } @@ -1617,7 +1457,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) return; } - _threadPool->unregister(this, current.operation); + _threadPool->unregister(ICE_SHARED_FROM_THIS, current.operation); // // We start out in holding state. @@ -1665,7 +1505,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) if(_state < StateClosed) { scheduleTimeout(newOp); - _threadPool->update(this, current.operation, newOp); + _threadPool->update(ICE_SHARED_FROM_THIS, current.operation, newOp); } } @@ -1727,9 +1567,10 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } else { - _threadPool->dispatchFromThisThread(new DispatchCall(this, startCB, sentCBs, compress, requestId, invokeNum, - servantManager, adapter, outAsync, heartbeatCallback, - current.stream)); + _threadPool->dispatchFromThisThread(new DispatchCall(ICE_SHARED_FROM_THIS, startCB, sentCBs, compress, requestId, + invokeNum, servantManager, adapter, outAsync, + heartbeatCallback, current.stream)); + } } @@ -1737,7 +1578,7 @@ void ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMessage>& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, const OutgoingAsyncBasePtr& outAsync, - const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream) + const ICE_HEARTBEAT_CALLBACK& heartbeatCallback, InputStream& stream) { int dispatchedCount = 0; @@ -1747,7 +1588,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess // if(startCB) { - startCB->connectionStartCompleted(this); + startCB->connectionStartCompleted(ICE_SHARED_FROM_THIS); ++dispatchedCount; } @@ -1765,10 +1606,10 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess } if(p->receivedReply) { - OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(p->outAsync); - if(outAsync->completed()) + OutgoingAsyncPtr o = ICE_DYNAMIC_CAST(OutgoingAsync, p->outAsync); + if(o->response()) { - outAsync->invokeCompleted(); + o->invokeResponse(); } } #else @@ -1784,7 +1625,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess // if(outAsync) { - outAsync->invokeCompleted(); + outAsync->invokeResponse(); ++dispatchedCount; } @@ -1792,7 +1633,11 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess { try { - heartbeatCallback->heartbeat(this); +#ifdef ICE_CPP11_MAPPING + heartbeatCallback(ICE_SHARED_FROM_THIS); +#else + heartbeatCallback->heartbeat(ICE_SHARED_FROM_THIS); +#endif } catch(const std::exception& ex) { @@ -1871,7 +1716,7 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close) // to call code that will potentially block (this avoids promoting a new leader and // unecessary thread creation, especially if this is called on shutdown). // - if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_callback) + if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_closeCallback && !_heartbeatCallback) { finish(close); return; @@ -1884,7 +1729,7 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close) } else { - _threadPool->dispatchFromThisThread(new FinishCall(this, close)); + _threadPool->dispatchFromThisThread(new FinishCall(ICE_SHARED_FROM_THIS, close)); } } @@ -1897,8 +1742,9 @@ Ice::ConnectionI::finish(bool close) { string verb = _connector ? "establish" : "accept"; Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "failed to " << verb << " " << _endpoint->protocol() << " connection\n" << toString() - << "\n" << *_exception.get(); + << "\n" << *_exception; } } else @@ -1908,28 +1754,35 @@ Ice::ConnectionI::finish(bool close) Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); out << "closed " << _endpoint->protocol() << " connection\n" << toString(); - // - // Trace the cause of unexpected connection closures - // if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()))) { - out << "\n" << *_exception.get(); + out << "\n" << *_exception; } } } if(close) { - _transceiver->close(); + try + { + _transceiver->close(); + } + catch(const Ice::LocalException& ex) + { + Error out(_logger); + out << "unexpected connection exception:\n" << ex << '\n' << _desc; + } } if(_startCallback) { - _startCallback->connectionStartFailed(this, *_exception.get()); + assert(_exception); + + _startCallback->connectionStartFailed(ICE_SHARED_FROM_THIS, *_exception); _startCallback = 0; } @@ -1958,10 +1811,10 @@ Ice::ConnectionI::finish(bool close) } if(message->receivedReply) { - OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(message->outAsync); - if(outAsync->completed()) + OutgoingAsyncPtr outAsync = ICE_DYNAMIC_CAST(OutgoingAsync, message->outAsync); + if(outAsync->response()) { - outAsync->invokeCompleted(); + outAsync->invokeResponse(); } } _sendStreams.pop_front(); @@ -1969,37 +1822,27 @@ Ice::ConnectionI::finish(bool close) #endif } + for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { - o->completed(*_exception.get()); + o->completed(*_exception); if(o->requestId) // Make sure finished isn't called twice. { - if(o->out) - { - _requests.erase(o->requestId); - } - else - { - _asyncRequests.erase(o->requestId); - } + _asyncRequests.erase(o->requestId); } } - _sendStreams.clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage - } - for(map<Int, OutgoingBase*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) - { - p->second->completed(*_exception.get()); + _sendStreams.clear(); } - _requests.clear(); for(map<Int, OutgoingAsyncBasePtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) { - if(q->second->completed(*_exception.get())) + if(q->second->exception(*_exception)) { - q->second->invokeCompleted(); + q->second->invokeException(); } } + _asyncRequests.clear(); // @@ -2010,12 +1853,14 @@ Ice::ConnectionI::finish(bool close) _readStream.clear(); _readStream.b.clear(); - if(_callback) + if(_closeCallback) { - closeCallback(_callback); - _callback = 0; + closeCallback(_closeCallback); + _closeCallback = ICE_NULLPTR; } + _heartbeatCallback = ICE_NULLPTR; + // // This must be done last as this will cause waitUntilFinished() to return (and communicator // objects such as the timer might be destroyed too). @@ -2131,7 +1976,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _warnUdp(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0), _compressionLevel(1), _nextRequestId(1), - _requestsHint(_requests.end()), _asyncRequestsHint(_asyncRequests.end()), _messageSizeMax(adapter ? adapter->messageSizeMax() : _instance->messageSizeMax()), _batchRequestQueue(new BatchRequestQueue(instance, endpoint->datagram())), @@ -2166,36 +2010,39 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, { _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); } +} - __setNoDelete(true); - try +Ice::ConnectionIPtr +Ice::ConnectionI::create(const CommunicatorPtr& communicator, + const InstancePtr& instance, + const ACMMonitorPtr& monitor, + const TransceiverPtr& transceiver, + const ConnectorPtr& connector, + const EndpointIPtr& endpoint, + const ObjectAdapterIPtr& adapter) +{ + Ice::ConnectionIPtr conn(new ConnectionI(communicator, instance, monitor, transceiver, connector, + endpoint, adapter)); + if(adapter) { - if(adapter) - { - const_cast<ThreadPoolPtr&>(_threadPool) = adapter->getThreadPool(); - } - else - { - const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool(); - } - _threadPool->initialize(this); + const_cast<ThreadPoolPtr&>(conn->_threadPool) = adapter->getThreadPool(); } - catch(const IceUtil::Exception&) + else { - __setNoDelete(false); - throw; + const_cast<ThreadPoolPtr&>(conn->_threadPool) = conn->_instance->clientThreadPool(); } - __setNoDelete(false); + conn->_threadPool->initialize(conn); + return conn; } Ice::ConnectionI::~ConnectionI() { assert(!_startCallback); - assert(!_callback); + assert(!_closeCallback); + assert(!_heartbeatCallback); assert(_state == StateFinished); assert(_dispatchCount == 0); assert(_sendStreams.empty()); - assert(_requests.empty()); assert(_asyncRequests.empty()); } @@ -2213,15 +2060,13 @@ Ice::ConnectionI::setState(State state, const LocalException& ex) return; } - if(!_exception.get()) + if(!_exception) { // // If we are in closed state, an exception must be set. // assert(_state != StateClosed); - - _exception.reset(ex.ice_clone()); - + ICE_SET_EXCEPTION_FROM_CLONE(_exception, ex.ice_clone()); // // We don't warn if we are not validated. // @@ -2230,15 +2075,15 @@ Ice::ConnectionI::setState(State state, const LocalException& ex) // // Don't warn about certain expected exceptions. // - if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || - dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || - dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || - dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || - dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || - (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state >= StateClosing))) + if(!(dynamic_cast<const CloseConnectionException*>(&ex) || + dynamic_cast<const ForcedCloseConnectionException*>(&ex) || + dynamic_cast<const ConnectionTimeoutException*>(&ex) || + dynamic_cast<const CommunicatorDestroyedException*>(&ex) || + dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex) || + (dynamic_cast<const ConnectionLostException*>(&ex) && _state >= StateClosing))) { Warning out(_logger); - out << "connection exception:\n" << *_exception.get() << '\n' << _desc; + out << "connection exception:\n" << ex << '\n' << _desc; } } } @@ -2280,92 +2125,92 @@ Ice::ConnectionI::setState(State state) { switch(state) { - case StateNotInitialized: - { - assert(false); - break; - } - - case StateNotValidated: - { - if(_state != StateNotInitialized) + case StateNotInitialized: { - assert(_state == StateClosed); - return; + assert(false); + break; } - break; - } - case StateActive: - { - // - // Can only switch from holding or not validated to - // active. - // - if(_state != StateHolding && _state != StateNotValidated) + case StateNotValidated: { - return; + if(_state != StateNotInitialized) + { + assert(_state == StateClosed); + return; + } + break; } - _threadPool->_register(this, SocketOperationRead); - break; - } - case StateHolding: - { - // - // Can only switch from active or not validated to - // holding. - // - if(_state != StateActive && _state != StateNotValidated) + case StateActive: { - return; + // + // Can only switch from holding or not validated to + // active. + // + if(_state != StateHolding && _state != StateNotValidated) + { + return; + } + _threadPool->_register(ICE_SHARED_FROM_THIS, SocketOperationRead); + break; } - if(_state == StateActive) + + case StateHolding: { - _threadPool->unregister(this, SocketOperationRead); + // + // Can only switch from active or not validated to + // holding. + // + if(_state != StateActive && _state != StateNotValidated) + { + return; + } + if(_state == StateActive) + { + _threadPool->unregister(ICE_SHARED_FROM_THIS, SocketOperationRead); + } + break; } - break; - } - case StateClosing: - case StateClosingPending: - { - // - // Can't change back from closing pending. - // - if(_state >= StateClosingPending) + case StateClosing: + case StateClosingPending: { - return; + // + // Can't change back from closing pending. + // + if(_state >= StateClosingPending) + { + return; + } + break; } - break; - } - case StateClosed: - { - if(_state == StateFinished) + case StateClosed: { - return; - } + if(_state == StateFinished) + { + return; + } - _batchRequestQueue->destroy(*_exception.get()); + _batchRequestQueue->destroy(*_exception); - // - // Don't need to close now for connections so only close the transceiver - // if the selector request it. - // - if(_threadPool->finish(this, false)) - { - _transceiver->close(); + // + // Don't need to close now for connections so only close the transceiver + // if the selector request it. + // + if(_threadPool->finish(ICE_SHARED_FROM_THIS, false)) + { + _transceiver->close(); + } + break; } - break; - } - case StateFinished: - { - assert(_state == StateClosed); - _communicator = 0; - break; - } + case StateFinished: + { + assert(_state == StateClosed); + _communicator = 0; + break; + } } } catch(const Ice::LocalException& ex) @@ -2388,11 +2233,11 @@ Ice::ConnectionI::setState(State state) { _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); } - _monitor->add(this); + _monitor->add(ICE_SHARED_FROM_THIS); } else if(_state == StateActive) { - _monitor->remove(this); + _monitor->remove(ICE_SHARED_FROM_THIS); } } @@ -2407,7 +2252,7 @@ Ice::ConnectionI::setState(State state) newState, _observer.get())); } - if(_observer && state == StateClosed && _exception.get()) + if(_observer && state == StateClosed && _exception) { if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || @@ -2416,7 +2261,7 @@ Ice::ConnectionI::setState(State state) dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state >= StateClosing))) { - _observer->failed(_exception->ice_name()); + _observer->failed(_exception->ice_id()); } } } @@ -2454,7 +2299,7 @@ Ice::ConnectionI::initiateShutdown() // // Before we shut down, we send a close connection message. // - BasicStream os(_instance.get(), Ice::currentProtocolEncoding); + OutputStream os(_instance.get(), Ice::currentProtocolEncoding); os.write(magic[0]); os.write(magic[1]); os.write(magic[2]); @@ -2473,11 +2318,11 @@ Ice::ConnectionI::initiateShutdown() // // Notify the the transceiver of the graceful connection closure. // - SocketOperation op = _transceiver->closing(true, *_exception.get()); + SocketOperation op = _transceiver->closing(true, *_exception); if(op) { scheduleTimeout(op); - _threadPool->_register(this, op); + _threadPool->_register(ICE_SHARED_FROM_THIS, op); } } } @@ -2490,7 +2335,7 @@ Ice::ConnectionI::heartbeat() if(!_endpoint->datagram()) { - BasicStream os(_instance.get(), Ice::currentProtocolEncoding); + OutputStream os(_instance.get(), Ice::currentProtocolEncoding); os.write(magic[0]); os.write(magic[1]); os.write(magic[2]); @@ -2509,7 +2354,7 @@ Ice::ConnectionI::heartbeat() catch(const LocalException& ex) { setState(StateClosed, ex); - assert(_exception.get()); + assert(_exception); } } } @@ -2517,11 +2362,11 @@ Ice::ConnectionI::heartbeat() bool Ice::ConnectionI::initialize(SocketOperation operation) { - SocketOperation s = _transceiver->initialize(_readStream, _writeStream, _hasMoreData); + SocketOperation s = _transceiver->initialize(_readStream, _writeStream); if(s != SocketOperationNone) { scheduleTimeout(s); - _threadPool->update(this, operation, s); + _threadPool->update(ICE_SHARED_FROM_THIS, operation, s); return false; } @@ -2567,7 +2412,7 @@ Ice::ConnectionI::validate(SocketOperation operation) if(op) { scheduleTimeout(op); - _threadPool->update(this, operation, op); + _threadPool->update(ICE_SHARED_FROM_THIS, operation, op); return false; } } @@ -2596,7 +2441,7 @@ Ice::ConnectionI::validate(SocketOperation operation) if(op) { scheduleTimeout(op); - _threadPool->update(this, operation, op); + _threadPool->update(ICE_SHARED_FROM_THIS, operation, op); return false; } } @@ -2740,17 +2585,10 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks) // // Do compression. // - BasicStream stream(_instance.get(), Ice::currentProtocolEncoding); + OutputStream stream(_instance.get(), Ice::currentProtocolEncoding); doCompress(*message->stream, stream); - if(message->outAsync) - { - trace("sending asynchronous request", *message->stream, _logger, _traceLevels); - } - else - { - traceSend(*message->stream, _logger, _traceLevels); - } + traceSend(*message->stream, _logger, _traceLevels); message->adopt(&stream); // Adopt the compressed stream. message->stream->i = message->stream->b.begin(); @@ -2777,14 +2615,8 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks) copy(p, p + sizeof(Int), message->stream->b.begin() + 10); #endif message->stream->i = message->stream->b.begin(); - if(message->outAsync) - { - trace("sending asynchronous request", *message->stream, _logger, _traceLevels); - } - else - { - traceSend(*message->stream, _logger, _traceLevels); - } + traceSend(*message->stream, _logger, _traceLevels); + #ifdef ICE_HAS_BZIP2 } #endif @@ -2819,7 +2651,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks) if(_state == StateClosing && _shutdownInitiated) { setState(StateClosingPending); - SocketOperation op = _transceiver->closing(true, *_exception.get()); + SocketOperation op = _transceiver->closing(true, *_exception); if(op) { return op; @@ -2865,18 +2697,11 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) // // Do compression. // - BasicStream stream(_instance.get(), Ice::currentProtocolEncoding); + OutputStream stream(_instance.get(), Ice::currentProtocolEncoding); doCompress(*message.stream, stream); stream.i = stream.b.begin(); - if(message.outAsync) - { - trace("sending asynchronous request", *message.stream, _logger, _traceLevels); - } - else - { - traceSend(*message.stream, _logger, _traceLevels); - } + traceSend(*message.stream, _logger, _traceLevels); // // Send the message without blocking. @@ -2931,14 +2756,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) #endif message.stream->i = message.stream->b.begin(); - if(message.outAsync) - { - trace("sending asynchronous request", *message.stream, _logger, _traceLevels); - } - else - { - traceSend(*message.stream, _logger, _traceLevels); - } + traceSend(*message.stream, _logger, _traceLevels); // // Send the message without blocking. @@ -2974,7 +2792,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) _writeStream.swap(*_sendStreams.back().stream); scheduleTimeout(op); - _threadPool->_register(this, op); + _threadPool->_register(ICE_SHARED_FROM_THIS, op); return AsyncStatusQueued; } @@ -3041,7 +2859,7 @@ getBZ2Error(int bzError) } void -Ice::ConnectionI::doCompress(BasicStream& uncompressed, BasicStream& compressed) +Ice::ConnectionI::doCompress(OutputStream& uncompressed, OutputStream& compressed) { const Byte* p; @@ -3096,7 +2914,7 @@ Ice::ConnectionI::doCompress(BasicStream& uncompressed, BasicStream& compressed) } void -Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompressed) +Ice::ConnectionI::doUncompress(InputStream& compressed, InputStream& uncompressed) { Int uncompressedSize; compressed.i = compressed.b.begin() + headerSize; @@ -3131,9 +2949,9 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse #endif SocketOperation -Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress, +Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& requestId, Byte& compress, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter, - OutgoingAsyncBasePtr& outAsync, ConnectionCallbackPtr& heartbeatCallback, + OutgoingAsyncBasePtr& outAsync, ICE_HEARTBEAT_CALLBACK& heartbeatCallback, int& dispatchCount) { assert(_state > StateNotValidated && _state < StateClosed); @@ -3169,7 +2987,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request if(compress == 2) { #ifdef ICE_HAS_BZIP2 - BasicStream ustream(_instance.get(), Ice::currentProtocolEncoding); + InputStream ustream(_instance.get(), Ice::currentProtocolEncoding); doUncompress(stream, ustream); stream.b.swap(ustream.b); #else @@ -3200,7 +3018,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request // // Notify the the transceiver of the graceful connection closure. // - SocketOperation op = _transceiver->closing(false, *_exception.get()); + SocketOperation op = _transceiver->closing(false, *_exception); if(op) { return op; @@ -3258,54 +3076,22 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request stream.read(requestId); - map<Int, OutgoingBase*>::iterator p = _requests.end(); map<Int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.end(); - if(_requestsHint != _requests.end()) + if(_asyncRequestsHint != _asyncRequests.end()) { - if(_requestsHint->first == requestId) + if(_asyncRequestsHint->first == requestId) { - p = _requestsHint; + q = _asyncRequestsHint; } } - if(p == _requests.end()) - { - if(_asyncRequestsHint != _asyncRequests.end()) - { - if(_asyncRequestsHint->first == requestId) - { - q = _asyncRequestsHint; - } - } - } - - if(p == _requests.end() && q == _asyncRequests.end()) - { - p = _requests.find(requestId); - } - - if(p == _requests.end() && q == _asyncRequests.end()) + if(q == _asyncRequests.end()) { q = _asyncRequests.find(requestId); } - if(p != _requests.end()) - { - p->second->completed(stream); - - if(p == _requestsHint) - { - _requests.erase(p++); - _requestsHint = p; - } - else - { - _requests.erase(p); - } - notifyAll(); // Notify threads blocked in close(false) - } - else if(q != _asyncRequests.end()) + if(q != _asyncRequests.end()) { outAsync = q->second; @@ -3333,7 +3119,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request message->receivedReply = true; outAsync = 0; } - else if(outAsync->completed()) + else if(outAsync->response()) { ++dispatchCount; } @@ -3342,7 +3128,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request outAsync = 0; } #else - if(outAsync->completed()) + if(outAsync->response()) { ++dispatchCount; } @@ -3360,9 +3146,9 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request case validateConnectionMsg: { traceRecv(stream, _logger, _traceLevels); - if(_callback) + if(_heartbeatCallback) { - heartbeatCallback = _callback; + heartbeatCallback = _heartbeatCallback; ++dispatchCount; } break; @@ -3395,7 +3181,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request } void -Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, Byte compress, +Ice::ConnectionI::invokeAll(InputStream& stream, Int invokeNum, Int requestId, Byte compress, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter) { // @@ -3530,11 +3316,17 @@ Ice::ConnectionI::initConnectionInfo() const } catch(const Ice::LocalException&) { - _info = new ConnectionInfo(); + _info = ICE_MAKE_SHARED(ConnectionInfo); + } + + Ice::ConnectionInfoPtr info = _info; + while(info) + { + info->connectionId = _endpoint->connectionId(); + info->incoming = _connector == 0; + info->adapterName = _adapter ? _adapter->getName() : string(); + info = info->underlying; } - _info->connectionId = _endpoint->connectionId(); - _info->incoming = _connector == 0; - _info->adapterName = _adapter ? _adapter->getName() : string(); return _info; } @@ -3548,7 +3340,7 @@ SocketOperation ConnectionI::read(Buffer& buf) { Buffer::Container::iterator start = buf.i; - SocketOperation op = _transceiver->read(buf, _hasMoreData); + SocketOperation op = _transceiver->read(buf); if(_instance->traceLevels()->network >= 3 && buf.i != start) { Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); @@ -3589,7 +3381,7 @@ ConnectionI::reap() { if(_monitor) { - _monitor->reap(this); + _monitor->reap(ICE_SHARED_FROM_THIS); } if(_observer) { |