From 9560b7d54ec4411f0605a3b53997835599f70ea2 Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Mon, 10 Sep 2012 08:47:58 +0200 Subject: Fixed communicator flushBatchRequests to allow tracing --- cpp/src/Ice/BasicStream.cpp | 4 +- cpp/src/Ice/ConnectionFactory.cpp | 5 +- cpp/src/Ice/ConnectionI.cpp | 106 +++++++++--------- cpp/src/Ice/ConnectionI.h | 3 + cpp/src/Ice/EndpointI.cpp | 4 +- cpp/src/Ice/Instance.cpp | 11 +- cpp/src/Ice/MetricsAdminI.cpp | 71 +++++++----- cpp/src/Ice/MetricsAdminI.h | 35 ++++-- cpp/src/Ice/MetricsObserverI.h | 14 +-- cpp/src/Ice/ObserverHelper.cpp | 83 ++++++++------ cpp/src/Ice/ObserverI.cpp | 227 ++++++++++++++++++++++++++++++-------- cpp/src/Ice/ObserverI.h | 17 ++- cpp/src/Ice/Outgoing.cpp | 12 +- cpp/src/Ice/OutgoingAsync.cpp | 127 +++++++++++---------- cpp/src/Ice/Proxy.cpp | 2 +- cpp/src/Ice/TcpTransceiver.cpp | 1 - cpp/src/Ice/UdpTransceiver.cpp | 6 +- cpp/src/IceSSL/TransceiverI.cpp | 2 - 18 files changed, 468 insertions(+), 262 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/Ice/BasicStream.cpp b/cpp/src/Ice/BasicStream.cpp index 199ef69d639..d8cf31ff20e 100755 --- a/cpp/src/Ice/BasicStream.cpp +++ b/cpp/src/Ice/BasicStream.cpp @@ -2856,8 +2856,8 @@ IceInternal::BasicStream::EncapsDecoder::skipSlice() } else { - throw MarshalException(__FILE__, - __LINE__, + throw MarshalException(__FILE__, + __LINE__, "compact format prevents slicing (the sender should use the sliced format instead)"); } diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 2869d610748..98917fb8cf7 100755 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -240,7 +240,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector& endpt ObserverPtr observer; if(obsv) { - observer = obsv->getConnectionEstablishmentObserver(q->endpoint->getInfo(), q->connector->toString()); + observer = obsv->getConnectionEstablishmentObserver(q->endpoint, q->connector->toString()); if(observer) { observer->attach(); @@ -1144,8 +1144,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector() const CommunicatorObserverPtr& obsv = _factory->_instance->initializationData().observer; if(obsv) { - _observer = obsv->getConnectionEstablishmentObserver(_iter->endpoint->getInfo(), - _iter->connector->toString()); + _observer = obsv->getConnectionEstablishmentObserver(_iter->endpoint, _iter->connector->toString()); if(_observer) { _observer->attach(); diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index fd722eca237..1b29f6a7664 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -38,6 +38,9 @@ Ice::LocalObject* Ice::upCast(ConnectionI* p) { return p; } namespace { +const ::std::string __flushBatchRequests_name = "flushBatchRequests"; + + class TimeoutCallback : public IceUtil::TimerTask { public: @@ -501,23 +504,11 @@ Ice::ConnectionI::updateObserver() return; } - if(!_info && _state < StateClosed) - { - _info = _transceiver->getInfo(); - _info->connectionId = _endpoint->connectionId(); - _info->incoming = _connector == 0; - _info->adapterName = _adapter ? _adapter->getName() : string(); - } - - if(_info) - { - const CommunicatorObserverPtr& comObsv = _instance->initializationData().observer; - assert(comObsv); - _observer.attach(comObsv->getConnectionObserver(_info, - _endpoint->getInfo(), - connectionStateMap[static_cast(_state)], - _observer.get())); - } + assert(_instance->initializationData().observer); + _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(), + _endpoint, + toConnectionState(_state), + _observer.get())); } void @@ -554,7 +545,6 @@ bool Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) { BasicStream* os = out->os(); - out->attachRemoteObserver(this); IceUtil::Monitor::Lock sync(*this); if(_exception.get()) @@ -570,6 +560,8 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) assert(_state > StateNotValidated); assert(_state < StateClosing); + out->attachRemoteObserver(initConnectionInfo(), _endpoint); + // // Ensure the message isn't bigger than what we can send with the // transport. @@ -647,6 +639,8 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b assert(_state > StateNotValidated); assert(_state < StateClosing); + out->__attachRemoteObserver(initConnectionInfo(), _endpoint); + // // Ensure the message isn't bigger than what we can send with the // transport. @@ -904,7 +898,8 @@ Ice::ConnectionI::abortBatchRequest() void Ice::ConnectionI::flushBatchRequests() { - BatchOutgoing out(this, _instance.get()); + IceInternal::InvocationObserver observer(_instance.get(), __flushBatchRequests_name); + BatchOutgoing out(this, _instance.get(), observer); out.invoke(); } @@ -914,13 +909,6 @@ Ice::ConnectionI::begin_flushBatchRequests() return __begin_flushBatchRequests(__dummyCallback, 0); } -namespace -{ - -const ::std::string __flushBatchRequests_name = "flushBatchRequests"; - -} - AsyncResultPtr Ice::ConnectionI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie) { @@ -971,6 +959,8 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) _exception->ice_throw(); } + out->attachRemoteObserver(initConnectionInfo(), _endpoint); + if(_batchRequestNum == 0) { out->sent(false); @@ -1029,6 +1019,8 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) _exception->ice_throw(); } + outAsync->__attachRemoteObserver(initConnectionInfo(), _endpoint); + if(_batchRequestNum == 0) { AsyncStatus status = AsyncStatusSent; @@ -1824,15 +1816,7 @@ Ice::ConnectionI::getInfo() const { _exception->ice_throw(); } - - if(!_info) - { - _info = _transceiver->getInfo(); - _info->connectionId = _endpoint->connectionId(); - _info->incoming = _connector == 0; - _info->adapterName = _adapter ? _adapter->getName() : string(); - } - return _info; + return initConnectionInfo(); } void @@ -2177,8 +2161,8 @@ Ice::ConnectionI::setState(State state) if(_observer) { - ConnectionState oldState = connectionStateMap[static_cast(_state)]; - ConnectionState newState = connectionStateMap[static_cast(state)]; + ConnectionState oldState = toConnectionState(_state); + ConnectionState newState = toConnectionState(state); if(oldState != newState) { _observer->stateChanged(oldState, newState); @@ -2271,26 +2255,21 @@ Ice::ConnectionI::initialize(SocketOperation operation) _threadPool->update(this, operation, s); return false; } - - const CommunicatorObserverPtr& comObsv = _instance->initializationData().observer; - if(comObsv) - { - _info = _transceiver->getInfo(); - _info->connectionId = _endpoint->connectionId(); - _info->incoming = _connector == 0; - _info->adapterName = _adapter ? _adapter->getName() : string(); - - _observer.attach(comObsv->getConnectionObserver(_info, - _endpoint->getInfo(), - ConnectionStateValidating, - 0)); - } // // Update the connection description once the transceiver is initialized. // const_cast(_desc) = _transceiver->toString(); setState(StateNotValidated); + + if(_instance->initializationData().observer) + { + _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(), + _endpoint, + ConnectionStateValidating, + 0)); + } + return true; } @@ -3114,3 +3093,28 @@ Ice::ConnectionI::closeTimeout() return _endpoint->timeout(); } } + +Ice::ConnectionInfoPtr +Ice::ConnectionI::initConnectionInfo() const +{ + if(_info) + { + return _info; + } + + ConnectionInfoPtr info = _transceiver->getInfo(); + info->connectionId = _endpoint->connectionId(); + info->incoming = _connector == 0; + info->adapterName = _adapter ? _adapter->getName() : string(); + if(_state > StateNotInitialized) + { + _info = info; // Cache the connection information only if initialized. + } + return info; +} + +ConnectionState +ConnectionI::toConnectionState(State state) const +{ + return connectionStateMap[static_cast(state)]; +} diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 1ede520d520..8a8ceb67116 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -299,6 +299,9 @@ private: int connectTimeout(); int closeTimeout(); + Ice::ConnectionInfoPtr initConnectionInfo() const; + Ice::Instrumentation::ConnectionState toConnectionState(State) const; + AsyncResultPtr __begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&); Ice::CommunicatorPtr _communicator; diff --git a/cpp/src/Ice/EndpointI.cpp b/cpp/src/Ice/EndpointI.cpp index a0b6f0c4415..0e1b8173db5 100644 --- a/cpp/src/Ice/EndpointI.cpp +++ b/cpp/src/Ice/EndpointI.cpp @@ -205,7 +205,7 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, const E const CommunicatorObserverPtr& obsv = _instance->initializationData().observer; if(obsv) { - observer.attach(obsv->getEndpointLookupObserver(endpoint->getInfo(), endpoint->toString())); + observer.attach(obsv->getEndpointLookupObserver(endpoint)); } vector connectors; @@ -255,7 +255,7 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, const E const CommunicatorObserverPtr& obsv = _instance->initializationData().observer; if(obsv) { - entry.observer = obsv->getEndpointLookupObserver(endpoint->getInfo(), endpoint->toString()); + entry.observer = obsv->getEndpointLookupObserver(endpoint); if(entry.observer) { entry.observer->attach(); diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index 71dde521406..e4f016cd0af 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -1097,15 +1097,15 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi _adminFacets.insert(FacetMap::value_type("Process", new ProcessI(communicator))); - IceMX::MetricsAdminIPtr admin = new IceMX::MetricsAdminI(_initData.properties); + IceMX::MetricsAdminIPtr admin = new IceMX::MetricsAdminI(_initData.properties, _initData.logger); _adminFacets.insert(FacetMap::value_type("MetricsAdmin", admin)); PropertiesAdminIPtr props = new PropertiesAdminI("Properties", _initData.properties, _initData.logger); _adminFacets.insert(FacetMap::value_type("Properties",props)); // - // Setup the communicator observer only the metrics admin plugin only if the user didn't already set an - // Ice observer resovler. + // Setup the communicator observer only if the user didn't already set an + // Ice observer resolver and if the admininistrative endpoints are set. // if(!_initData.observer && (_adminFacetFilter.empty() || _adminFacetFilter.find("MetricsAdmin") != _adminFacetFilter.end()) && @@ -1114,7 +1114,10 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi IceMX::CommunicatorObserverIPtr observer = new IceMX::CommunicatorObserverI(admin); _initData.observer = observer; - // Make sure the MetricsAdmin plugin received property update notifications. + // + // Make sure the observer receives property update notifications to update + // the metrics admin configuration. + // props->addUpdateCallback(observer); } diff --git a/cpp/src/Ice/MetricsAdminI.cpp b/cpp/src/Ice/MetricsAdminI.cpp index 8b6ae36586b..1596126712f 100644 --- a/cpp/src/Ice/MetricsAdminI.cpp +++ b/cpp/src/Ice/MetricsAdminI.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include @@ -24,7 +25,7 @@ namespace { vector -parseRule(const ::Ice::PropertiesPtr& properties, const string& name) +parseRule(const PropertiesPtr& properties, const string& name) { vector regexps; PropertyDict rules = properties->getPropertiesForPrefix(name + '.'); @@ -42,7 +43,7 @@ MetricsMapI::RegExp::RegExp(const string& attribute, const string& regexp) : _at #ifndef ICE_CPP11_REGEXP if(regcomp(&_preg, regexp.c_str(), REG_EXTENDED | REG_NOSUB) != 0) { - throw Ice::SyscallException(__FILE__, __LINE__); + throw SyscallException(__FILE__, __LINE__); } #else _regex = regex(regexp, std::regex_constants::extended | std::regex_constants::nosubs); @@ -71,7 +72,7 @@ MetricsMapI::RegExp::match(const MetricsHelper& helper) #endif } -MetricsMapI::MetricsMapI(const std::string& mapPrefix, const Ice::PropertiesPtr& properties) : +MetricsMapI::MetricsMapI(const std::string& mapPrefix, const PropertiesPtr& properties) : _properties(properties->getPropertiesForPrefix(mapPrefix)), _retain(properties->getPropertyAsIntWithDefault(mapPrefix + "RetainDetached", 10)), _accept(parseRule(properties, mapPrefix + "Accept")), @@ -133,7 +134,7 @@ MetricsViewI::MetricsViewI(const string& name) : _name(name) } void -MetricsViewI::update(const Ice::PropertiesPtr& properties, +MetricsViewI::update(const PropertiesPtr& properties, const map& factories, set& updatedMaps) { @@ -230,7 +231,8 @@ MetricsViewI::getMap(const string& mapName) const return 0; } -MetricsAdminI::MetricsAdminI(const Ice::PropertiesPtr& properties) : _properties(properties) +MetricsAdminI::MetricsAdminI(const PropertiesPtr& properties, const LoggerPtr& logger) : + _properties(properties), _logger(logger) { } @@ -269,7 +271,10 @@ MetricsAdminI::updateViews() { continue; // The view is disabled } - + + // + // Create the view or update it. + // map::const_iterator q = _views.find(viewName); if(q == _views.end()) { @@ -284,7 +289,7 @@ MetricsAdminI::updateViews() _views.swap(views); // - // Go through removed views to collect updated maps. + // Go through removed views to collect maps to update. // for(map::const_iterator p = views.begin(); p != views.end(); ++p) { @@ -296,7 +301,7 @@ MetricsAdminI::updateViews() } // - // Call the observer update for each of the updated maps. + // Gather the updates for each of the map to update. // for(set::const_iterator p = updatedMaps.begin(); p != updatedMaps.end(); ++p) { @@ -306,25 +311,29 @@ MetricsAdminI::updateViews() updaters.push_back(q->second); } } - } - - for(vector::const_iterator p = updaters.begin(); p != updaters.end(); ++p) - { - try - { - (*p)->update(); - } - catch(...) - { - // TODO: Warn? - } - } + } + + // + // Call the updaters to update the maps. + // + for(vector::const_iterator p = updaters.begin(); p != updaters.end(); ++p) + { + try + { + (*p)->update(); + } + catch(const std::exception& ex) + { + Warning warn(_logger); + warn << "unexpected exception while calling observer updater:\n" << ex; + } + } } - -Ice::StringSeq -MetricsAdminI::getMetricsViewNames(const ::Ice::Current&) + +StringSeq +MetricsAdminI::getMetricsViewNames(const Current&) { - Ice::StringSeq names; + StringSeq names; Lock sync(*this); for(map::const_iterator p = _views.begin(); p != _views.end(); ++p) @@ -335,7 +344,7 @@ MetricsAdminI::getMetricsViewNames(const ::Ice::Current&) } MetricsView -MetricsAdminI::getMetricsView(const string& view, const ::Ice::Current&) +MetricsAdminI::getMetricsView(const string& view, const Current&) { Lock sync(*this); std::map::const_iterator p = _views.find(view); @@ -347,7 +356,7 @@ MetricsAdminI::getMetricsView(const string& view, const ::Ice::Current&) } MetricsFailuresSeq -MetricsAdminI::getMapMetricsFailures(const string& view, const string& map, const ::Ice::Current&) +MetricsAdminI::getMapMetricsFailures(const string& view, const string& map, const Current&) { Lock sync(*this); std::map::const_iterator p = _views.find(view); @@ -359,7 +368,7 @@ MetricsAdminI::getMapMetricsFailures(const string& view, const string& map, cons } MetricsFailures -MetricsAdminI::getMetricsFailures(const string& view, const string& map, const string& id, const ::Ice::Current&) +MetricsAdminI::getMetricsFailures(const string& view, const string& map, const string& id, const Current&) { Lock sync(*this); std::map::const_iterator p = _views.find(view); @@ -385,3 +394,9 @@ MetricsAdminI::getMaps(const string& mapName) const } return maps; } + +const LoggerPtr& +MetricsAdminI::getLogger() const +{ + return _logger; +} diff --git a/cpp/src/Ice/MetricsAdminI.h b/cpp/src/Ice/MetricsAdminI.h index c788a8e28b0..1af52c19317 100644 --- a/cpp/src/Ice/MetricsAdminI.h +++ b/cpp/src/Ice/MetricsAdminI.h @@ -155,17 +155,17 @@ public: return map->getMatching(helper); } - TPtr + void attach(const MetricsHelperT& helper) { Lock sync(*this); ++_object->total; ++_object->current; helper.initMetrics(_object); - return _object; } - void detach(Ice::Long lifetime) + void + detach(Ice::Long lifetime) { MetricsMapT* map; { @@ -293,7 +293,7 @@ public: getFailures(const std::string& id) { Lock sync(*this); - typename std::map::const_iterator p = _objects.begin(); + typename std::map::const_iterator p = _objects.find(id); if(p != _objects.end()) { return p->second->getFailures(); @@ -317,6 +317,9 @@ public: EntryTPtr getMatching(const MetricsHelperT& helper) { + // + // Check the accept and reject filters. + // for(std::vector::const_iterator p = _accept.begin(); p != _accept.end(); ++p) { if(!(*p)->match(helper)) @@ -332,7 +335,10 @@ public: return 0; } } - + + // + // Compute the key from the GroupBy property. + // std::string key; if(_groupByAttributes.size() == 1) { @@ -353,7 +359,10 @@ public: } key = os.str(); } - + + // + // Lookup the metrics object. + // Lock sync(*this); typename std::map::const_iterator p = _objects.find(key); if(p == _objects.end()) @@ -369,7 +378,6 @@ private: { TPtr t = new T(); t->id = id; - t->failures = 0; return new EntryT(this, t, _detachedQueue.end()); } @@ -388,6 +396,7 @@ private: Lock sync(*this); assert(static_cast(_detachedQueue.size()) <= _retain); + // If the entry is already detached and in the queue, just move it to the back. if(entry->_detachedPos != _detachedQueue.end()) { _detachedQueue.splice(_detachedQueue.end(), _detachedQueue, entry->_detachedPos); @@ -395,6 +404,7 @@ private: return; } + // Otherwise, compress the queue by removing entries which are no longer detached. if(static_cast(_detachedQueue.size()) == _retain) { // Remove entries which are no longer detached @@ -403,6 +413,7 @@ private: { if(!(*p)->isDetached()) { + (*p)->_detachedPos = _detachedQueue.end(); p = _detachedQueue.erase(p); } else @@ -412,13 +423,14 @@ private: } } + // If there's still no room, remove the oldest entry (at the front). if(static_cast(_detachedQueue.size()) == _retain) { - // Remove oldest entry if there's still no room _objects.erase(_detachedQueue.front()->_object->id); _detachedQueue.pop_front(); } + // Add the entry at the back of the queue. _detachedQueue.push_back(entry); entry->_detachedPos = --_detachedQueue.end(); } @@ -478,7 +490,7 @@ class MetricsAdminI : public MetricsAdmin, private IceUtil::Mutex { public: - MetricsAdminI(const ::Ice::PropertiesPtr&); + MetricsAdminI(const ::Ice::PropertiesPtr&, const Ice::LoggerPtr&); void addUpdater(const std::string&, const UpdaterPtr&); void updateViews(); @@ -507,13 +519,16 @@ public: std::vector getMaps(const std::string&) const; + const Ice::LoggerPtr& getLogger() const; + private: std::map _views; std::map _updaters; std::map _factories; - Ice::PropertiesPtr _properties; + const Ice::PropertiesPtr _properties; + const Ice::LoggerPtr _logger; }; typedef IceUtil::Handle MetricsAdminIPtr; diff --git a/cpp/src/Ice/MetricsObserverI.h b/cpp/src/Ice/MetricsObserverI.h index fee2ba2305b..7af71ae3e8d 100644 --- a/cpp/src/Ice/MetricsObserverI.h +++ b/cpp/src/Ice/MetricsObserverI.h @@ -283,7 +283,6 @@ public: detach() { Ice::Long lifetime = _watch.stop(); - IceUtil::Mutex::Lock sync(*_mutex); for(typename EntrySeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p) { (*p)->detach(lifetime); @@ -293,7 +292,6 @@ public: virtual void failed(const std::string& exceptionName) { - IceUtil::Mutex::Lock sync(*_mutex); for(typename EntrySeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p) { (*p)->failed(exceptionName); @@ -303,7 +301,6 @@ public: template void forEach(const Function& func) { - IceUtil::Mutex::Lock sync(*_mutex); for(typename EntrySeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p) { (*p)->execute(func); @@ -311,9 +308,8 @@ public: } void - init(const MetricsHelperT& helper, EntrySeqType& objects, IceUtil::Mutex* mutex) + init(const MetricsHelperT& helper, EntrySeqType& objects) { - _mutex = mutex; _objects.swap(objects); std::sort(_objects.begin(), _objects.end()); for(typename EntrySeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p) @@ -352,7 +348,6 @@ public: template IceInternal::Handle getObserver(const std::string& mapName, const MetricsHelperT& helper) { - IceUtil::Mutex::Lock sync(*_mutex); std::vector::EntryTPtr> metricsObjects; for(typename EntrySeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p) { @@ -369,7 +364,7 @@ public: } IceInternal::Handle obsv = new ObserverImpl(); - obsv->init(helper, metricsObjects, _mutex); + obsv->init(helper, metricsObjects); return obsv; } @@ -377,7 +372,6 @@ private: EntrySeqType _objects; IceUtilInternal::StopWatch _watch; - IceUtil::Mutex* _mutex; }; class ObserverI : virtual public Ice::Instrumentation::Observer, public ObserverT @@ -438,7 +432,7 @@ public: std::sort(metricsObjects.begin(), metricsObjects.end()); ObserverImplPtrType obsv = new ObserverImplType(); - obsv->init(helper, metricsObjects, this); + obsv->init(helper, metricsObjects); return obsv; } @@ -468,7 +462,7 @@ public: if(!obsv) { obsv = new ObserverImplType(); - obsv->init(helper, metricsObjects, this); + obsv->init(helper, metricsObjects); } else { diff --git a/cpp/src/Ice/ObserverHelper.cpp b/cpp/src/Ice/ObserverHelper.cpp index c4406dd088b..32c162395b1 100644 --- a/cpp/src/Ice/ObserverHelper.cpp +++ b/cpp/src/Ice/ObserverHelper.cpp @@ -15,43 +15,64 @@ using namespace std; using namespace Ice; -using namespace IceInternal; +using namespace Ice::Instrumentation; -InvocationObserver::InvocationObserver(IceProxy::Ice::Object* proxy, const string& operation, const Context* context) +IceInternal::InvocationObserver::InvocationObserver(IceProxy::Ice::Object* proxy, const string& op, const Context* ctx) { - const Ice::Instrumentation::CommunicatorObserverPtr& obsv = - proxy->__reference()->getInstance()->initializationData().observer; - if(obsv) - { - if(context) - { - ObserverHelperT::attach( - obsv->getInvocationObserverWithContext(proxy, operation, *context)); - } - else - { - ObserverHelperT::attach( - obsv->getInvocationObserver(proxy, operation)); - } + const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->initializationData().observer; + if(!obsv) + { + return; + } + + if(ctx) + { + attach(obsv->getInvocationObserverWithContext(proxy, op, *ctx)); + } + else + { + attach(obsv->getInvocationObserver(proxy, op)); } } +IceInternal::InvocationObserver::InvocationObserver(IceInternal::Instance* instance, const string& op) +{ + const CommunicatorObserverPtr& obsv = instance->initializationData().observer; + if(!obsv) + { + return; + } + + attach(obsv->getInvocationObserver(0, op)); +} + void -InvocationObserver::attach(IceProxy::Ice::Object* proxy, const string& operation, const Context* context) +IceInternal::InvocationObserver::attach(IceProxy::Ice::Object* proxy, const string& op, const Context* ctx) { - const Ice::Instrumentation::CommunicatorObserverPtr& obsv = - proxy->__reference()->getInstance()->initializationData().observer; - if(obsv) - { - if(context) - { - ObserverHelperT::attach( - obsv->getInvocationObserverWithContext(proxy, operation, *context)); - } - else - { - ObserverHelperT::attach( - obsv->getInvocationObserver(proxy, operation)); - } + const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->initializationData().observer; + if(!obsv) + { + return; } + + if(ctx) + { + attach(obsv->getInvocationObserverWithContext(proxy, op, *ctx)); + } + else + { + attach(obsv->getInvocationObserver(proxy, op)); + } +} + +void +IceInternal::InvocationObserver::attach(IceInternal::Instance* instance, const string& op) +{ + const CommunicatorObserverPtr& obsv = instance->initializationData().observer; + if(!obsv) + { + return; + } + + attach(obsv->getInvocationObserver(0, op)); } diff --git a/cpp/src/Ice/ObserverI.cpp b/cpp/src/Ice/ObserverI.cpp index 715b6a180f0..1361a5de123 100644 --- a/cpp/src/Ice/ObserverI.cpp +++ b/cpp/src/Ice/ObserverI.cpp @@ -12,6 +12,9 @@ #include #include #include +#include +#include +#include using namespace std; using namespace Ice; @@ -21,7 +24,7 @@ using namespace IceMX; namespace { -Ice::Context emptyCtx; +Context emptyCtx; int ConnectionMetrics::* getConnectionStateMetric(ConnectionState s) @@ -154,8 +157,8 @@ public: }; static Attributes attributes; - ConnectionHelper(const ConnectionInfoPtr& con, const EndpointInfoPtr& endpt, ConnectionState state) : - _connection(con), _endpoint(endpt), _state(state) + ConnectionHelper(const ConnectionInfoPtr& con, const EndpointPtr& endpt, ConnectionState state) : + _connectionInfo(con), _endpoint(endpt), _state(state) { } @@ -175,7 +178,7 @@ public: if(_id.empty()) { ostringstream os; - IPConnectionInfoPtr info = IPConnectionInfoPtr::dynamicCast(_connection); + IPConnectionInfoPtr info = IPConnectionInfoPtr::dynamicCast(_connectionInfo); if(info) { os << info->localAddress << ':' << info->localPort; @@ -184,7 +187,7 @@ public: } else { - os << "connection-" << _connection.get(); + os << "connection-" << _connectionInfo.get(); } _id = os.str(); } @@ -194,9 +197,9 @@ public: string getParent() const { - if(!_connection->adapterName.empty()) + if(!_connectionInfo->adapterName.empty()) { - return _connection->adapterName; + return _connectionInfo->adapterName; } else { @@ -204,24 +207,29 @@ public: } } - ConnectionInfoPtr + const ConnectionInfoPtr& getConnectionInfo() const { - return _connection; + return _connectionInfo; } - EndpointInfoPtr + const EndpointInfoPtr& getEndpointInfo() const { - return _endpoint; + if(!_endpointInfo) + { + _endpointInfo = _endpoint->getInfo(); + } + return _endpointInfo; } private: - const ConnectionInfoPtr& _connection; - const EndpointInfoPtr& _endpoint; + const ConnectionInfoPtr& _connectionInfo; + const EndpointPtr& _endpoint; const ConnectionState _state; mutable string _id; + mutable EndpointInfoPtr _endpointInfo; }; ConnectionHelper::Attributes ConnectionHelper::attributes; @@ -266,7 +274,7 @@ public: { if(attribute.compare(0, 8, "context.") == 0) { - Ice::Context::const_iterator p = _current.ctx.find(attribute.substr(8)); + Context::const_iterator p = _current.ctx.find(attribute.substr(8)); if(p != _current.ctx.end()) { return p->second; @@ -309,10 +317,14 @@ public: return _current.con->getInfo(); } - EndpointInfoPtr + const EndpointInfoPtr& getEndpointInfo() const { - return _current.con->getEndpoint()->getInfo(); + if(!_endpointInfo) + { + _endpointInfo = _current.con->getEndpoint()->getInfo(); + } + return _endpointInfo; } const Current& @@ -331,6 +343,7 @@ private: const Current& _current; mutable string _id; + mutable EndpointInfoPtr _endpointInfo; }; DispatchHelper::Attributes DispatchHelper::attributes; @@ -361,7 +374,7 @@ public: }; static Attributes attributes; - InvocationHelper(const Ice::ObjectPrx& proxy, const string& op, const Ice::Context& ctx = emptyCtx) : + InvocationHelper(const ObjectPrx& proxy, const string& op, const Context& ctx = emptyCtx) : _proxy(proxy), _operation(op), _context(ctx) { } @@ -370,7 +383,7 @@ public: { if(attribute.compare(0, 8, "context.") == 0) { - Ice::Context::const_iterator p = _context.find(attribute.substr(8)); + Context::const_iterator p = _context.find(attribute.substr(8)); if(p != _context.end()) { return p->second; @@ -387,6 +400,11 @@ public: string getMode() const { + if(!_proxy) + { + return "unknown"; + } + if(_proxy->ice_isTwoway()) { return "twoway"; @@ -419,7 +437,22 @@ public: if(_id.empty()) { ostringstream os; - os << _proxy << " [" << _operation << ']'; + if(_proxy) + { + try + { + os << _proxy << " [" << _operation << ']'; + } + catch(const FixedProxyException& ex) + { + os << _proxy->ice_getCommunicator()->identityToString(_proxy->ice_getIdentity()); + os << " [" << _operation << ']'; + } + } + else + { + os << _operation; + } _id = os.str(); } return _id; @@ -441,7 +474,14 @@ public: Identity getIdentity() const { - return _proxy->ice_getIdentity(); + if(_proxy) + { + return _proxy->ice_getIdentity(); + } + else + { + return Identity(); + } } const string& @@ -454,7 +494,7 @@ private: const ObjectPrx& _proxy; const string& _operation; - const Ice::Context& _context; + const Context& _context; mutable string _id; }; @@ -477,7 +517,8 @@ public: }; static Attributes attributes; - RemoteInvocationHelper(const ConnectionPtr& con) : _connection(con) + RemoteInvocationHelper(const ConnectionInfoPtr& con, const EndpointPtr& endpt) : + _connectionInfo(con), _endpoint(endpt) { } @@ -492,14 +533,14 @@ public: if(_id.empty()) { ostringstream os; - IPConnectionInfoPtr info = IPConnectionInfoPtr::dynamicCast(_connection->getInfo()); + IPConnectionInfoPtr info = IPConnectionInfoPtr::dynamicCast(_connectionInfo); if(info) { os << info->remoteAddress << ':' << info->remotePort; } else { - os << "connection-" << _connection.get(); + os << "connection-" << _connectionInfo.get(); } _id = os.str(); } @@ -509,9 +550,9 @@ public: string getParent() const { - if(_connection->getAdapter()) + if(!_connectionInfo->adapterName.empty()) { - return _connection->getAdapter()->getName(); + return _connectionInfo->adapterName; } else { @@ -519,22 +560,28 @@ public: } } - ConnectionInfoPtr + const ConnectionInfoPtr& getConnectionInfo() const { - return _connection->getInfo(); + return _connectionInfo; } - EndpointInfoPtr + const EndpointInfoPtr& getEndpointInfo() const { - return _connection->getEndpoint()->getInfo(); + if(!_endpointInfo) + { + _endpointInfo = _endpoint->getInfo(); + } + return _endpointInfo; } private: - const ConnectionPtr& _connection; + const ConnectionInfoPtr& _connectionInfo; + const EndpointPtr& _endpoint; mutable string _id; + mutable EndpointInfoPtr _endpointInfo; }; RemoteInvocationHelper::Attributes RemoteInvocationHelper::attributes; @@ -592,13 +639,17 @@ public: Attributes() { add("parent", &EndpointHelper::getParent); - add("id", &EndpointHelper::_id); + add("id", &EndpointHelper::getId); addEndpointAttributes(*this); } }; static Attributes attributes; - EndpointHelper(const EndpointInfoPtr& endpt, const string& id) : _id(id), _endpoint(endpt) + EndpointHelper(const EndpointPtr& endpt, const string& id) : _endpoint(endpt), _id(id) + { + } + + EndpointHelper(const EndpointPtr& endpt) : _endpoint(endpt) { } @@ -607,10 +658,14 @@ public: return attributes(this, attribute); } - EndpointInfoPtr + const EndpointInfoPtr& getEndpointInfo() const { - return _endpoint; + if(!_endpointInfo) + { + _endpointInfo = _endpoint->getInfo(); + } + return _endpointInfo; } string @@ -619,10 +674,21 @@ public: return "Communicator"; } + const string& + getId() const + { + if(_id.empty()) + { + _id = _endpoint->toString(); + } + return _id; + } + private: - const string _id; - const EndpointInfoPtr _endpoint; + const EndpointPtr _endpoint; + mutable string _id; + mutable EndpointInfoPtr _endpointInfo; }; EndpointHelper::Attributes EndpointHelper::attributes; @@ -667,9 +733,16 @@ InvocationObserverI::retried() } ObserverPtr -InvocationObserverI::getRemoteObserver(const ConnectionPtr& connection) +InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection, const EndpointPtr& endpoint) { - return getObserver("Remote", RemoteInvocationHelper(connection)); + try + { + return getObserver("Remote", RemoteInvocationHelper(connection, endpoint)); + } + catch(const exception&) + { + } + return 0; } CommunicatorObserverI::CommunicatorObserverI(const MetricsAdminIPtr& metrics) : @@ -694,34 +767,58 @@ CommunicatorObserverI::setObserverUpdater(const ObserverUpdaterPtr& updater) } ObserverPtr -CommunicatorObserverI::getConnectionEstablishmentObserver(const EndpointInfoPtr& endpt, const string& connector) +CommunicatorObserverI::getConnectionEstablishmentObserver(const EndpointPtr& endpt, const string& connector) { if(_connects.isEnabled()) { - return _connects.getObserver(EndpointHelper(endpt, connector)); + try + { + return _connects.getObserver(EndpointHelper(endpt, connector)); + } + catch(const exception& ex) + { + Error error(_metrics->getLogger()); + error << "unexpected exception trying to obtain observer:\n" << ex; + } } return 0; } ObserverPtr -CommunicatorObserverI::getEndpointLookupObserver(const EndpointInfoPtr& endpt, const string& endpoint) +CommunicatorObserverI::getEndpointLookupObserver(const EndpointPtr& endpt) { if(_endpointLookups.isEnabled()) { - return _endpointLookups.getObserver(EndpointHelper(endpt, endpoint)); + try + { + return _endpointLookups.getObserver(EndpointHelper(endpt)); + } + catch(const exception& ex) + { + Error error(_metrics->getLogger()); + error << "unexpected exception trying to obtain observer:\n" << ex; + } } return 0; } ConnectionObserverPtr CommunicatorObserverI::getConnectionObserver(const ConnectionInfoPtr& con, - const EndpointInfoPtr& endpt, + const EndpointPtr& endpt, ConnectionState state, const ConnectionObserverPtr& observer) { if(_connections.isEnabled()) { - return _connections.getObserver(ConnectionHelper(con, endpt, state), observer); + try + { + return _connections.getObserver(ConnectionHelper(con, endpt, state), observer); + } + catch(const exception& ex) + { + Error error(_metrics->getLogger()); + error << "unexpected exception trying to obtain observer:\n" << ex; + } } return 0; } @@ -734,7 +831,15 @@ CommunicatorObserverI::getThreadObserver(const string& parent, { if(_threads.isEnabled()) { - return _threads.getObserver(ThreadHelper(parent, id, state), observer); + try + { + return _threads.getObserver(ThreadHelper(parent, id, state), observer); + } + catch(const exception& ex) + { + Error error(_metrics->getLogger()); + error << "unexpected exception trying to obtain observer:\n" << ex; + } } return 0; } @@ -744,7 +849,15 @@ CommunicatorObserverI::getInvocationObserver(const ObjectPrx& proxy, const strin { if(_invocations.isEnabled()) { - return _invocations.getObserver(InvocationHelper(proxy, op)); + try + { + return _invocations.getObserver(InvocationHelper(proxy, op)); + } + catch(const exception& ex) + { + Error error(_metrics->getLogger()); + error << "unexpected exception trying to obtain observer:\n" << ex; + } } return 0; } @@ -754,7 +867,15 @@ CommunicatorObserverI::getInvocationObserverWithContext(const ObjectPrx& proxy, { if(_invocations.isEnabled()) { - return _invocations.getObserver(InvocationHelper(proxy, op, ctx)); + try + { + return _invocations.getObserver(InvocationHelper(proxy, op, ctx)); + } + catch(const exception& ex) + { + Error error(_metrics->getLogger()); + error << "unexpected exception trying to obtain observer:\n" << ex; + } } return 0; } @@ -764,7 +885,15 @@ CommunicatorObserverI::getDispatchObserver(const Current& current) { if(_dispatch.isEnabled()) { - return _dispatch.getObserver(DispatchHelper(current)); + try + { + return _dispatch.getObserver(DispatchHelper(current)); + } + catch(const exception& ex) + { + Error error(_metrics->getLogger()); + error << "unexpected exception trying to obtain observer:\n" << ex; + } } return 0; } diff --git a/cpp/src/Ice/ObserverI.h b/cpp/src/Ice/ObserverI.h index c3954258ae8..3be96eea71e 100644 --- a/cpp/src/Ice/ObserverI.h +++ b/cpp/src/Ice/ObserverI.h @@ -39,7 +39,15 @@ public: virtual void retried(); - virtual Ice::Instrumentation::ObserverPtr getRemoteObserver(const Ice::ConnectionPtr&); + virtual Ice::Instrumentation::ObserverPtr getRemoteObserver(const Ice::ConnectionInfoPtr&, const Ice::EndpointPtr&); + + +private: + + friend class CommunicatorObserverI; + void initLogger(const Ice::LoggerPtr&); + + const Ice::LoggerPtr _logger; }; class CommunicatorObserverI : public Ice::Instrumentation::CommunicatorObserver, @@ -51,15 +59,14 @@ public: virtual void setObserverUpdater(const Ice::Instrumentation::ObserverUpdaterPtr&); - virtual Ice::Instrumentation::ObserverPtr getConnectionEstablishmentObserver(const Ice::EndpointInfoPtr&, + virtual Ice::Instrumentation::ObserverPtr getConnectionEstablishmentObserver(const Ice::EndpointPtr&, const std::string&); - virtual Ice::Instrumentation::ObserverPtr getEndpointLookupObserver(const Ice::EndpointInfoPtr&, - const std::string&); + virtual Ice::Instrumentation::ObserverPtr getEndpointLookupObserver(const Ice::EndpointPtr&); virtual Ice::Instrumentation::ConnectionObserverPtr getConnectionObserver(const Ice::ConnectionInfoPtr&, - const Ice::EndpointInfoPtr&, + const Ice::EndpointPtr&, Ice::Instrumentation::ConnectionState, const Ice::Instrumentation::ConnectionObserverPtr&); diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp index 23fc92af5df..565a954f691 100644 --- a/cpp/src/Ice/Outgoing.cpp +++ b/cpp/src/Ice/Outgoing.cpp @@ -547,19 +547,21 @@ IceInternal::Outgoing::throwUserException() } } -IceInternal::BatchOutgoing::BatchOutgoing(RequestHandler* handler) : +IceInternal::BatchOutgoing::BatchOutgoing(RequestHandler* handler, InvocationObserver& observer) : _handler(handler), _connection(0), _sent(false), - _os(handler->getReference()->getInstance().get(), Ice::currentProtocolEncoding) + _os(handler->getReference()->getInstance().get(), Ice::currentProtocolEncoding), + _observer(observer) { } -IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance) : +IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance, InvocationObserver& observer) : _handler(0), _connection(connection), _sent(false), - _os(instance, Ice::currentProtocolEncoding) + _os(instance, Ice::currentProtocolEncoding), + _observer(observer) { } @@ -574,7 +576,7 @@ IceInternal::BatchOutgoing::invoke() { _monitor.wait(); } - + _remoteObserver.detach(); if(_exception.get()) { _exception->ice_throw(); diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 6f96ed33a8e..2224b7dee97 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -504,6 +504,7 @@ IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection) { if(!_proxy->ice_isTwoway()) { + _remoteObserver.detach(); if(!_callback || !_callback->__hasSentCallback()) { _observer.detach(); @@ -538,6 +539,7 @@ IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent { IceUtil::Monitor::Lock sync(_monitor); assert(!(_state & Done)); + _remoteObserver.detach(); if(_timerTaskConnection) { _instance->timer()->cancel(this); @@ -576,6 +578,7 @@ IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc) // calling on the callback. The LocalExceptionWrapper exception is only called // before the invocation is sent. // + _remoteObserver.detach(); try { @@ -605,6 +608,7 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) { IceUtil::Monitor::Lock sync(_monitor); assert(!_exception.get() && !(_state & Done)); + _remoteObserver.detach(); if(_timerTaskConnection) { @@ -883,6 +887,7 @@ IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* connection) IceUtil::Monitor::Lock sync(_monitor); assert(!_exception.get()); _state |= Done | OK | Sent; + _remoteObserver.detach(); _monitor.notifyAll(); if(_callback && _callback->__hasSentCallback()) { @@ -904,6 +909,7 @@ IceInternal::BatchOutgoingAsync::__sent() void IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc, bool) { + _remoteObserver.detach(); __exception(exc); } @@ -954,6 +960,7 @@ IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const Co BatchOutgoingAsync(communicator, instance, operation, delegate, cookie), _connection(con) { + _observer.attach(instance.get(), operation); } void @@ -981,7 +988,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(cons const string& operation, const CallbackBasePtr& delegate, const Ice::LocalObjectPtr& cookie) : - BatchOutgoingAsync(communicator, instance, operation, delegate, cookie) + AsyncResult(communicator, instance, operation, delegate, cookie) { // // _useCount is initialized to 1 to prevent premature callbacks. @@ -994,93 +1001,98 @@ IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(cons // Assume all connections are flushed synchronously. // _sentSynchronously = true; + + // + // Attach observer + // + _observer.attach(instance.get(), operation); } void -IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionPtr& con) +IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPtr& con) { + class BatchOutgoingAsyncI : public BatchOutgoingAsync { - IceUtil::Monitor::Lock sync(_monitor); - ++_useCount; - } - CallbackPtr cb = newCallback(this, &CommunicatorBatchOutgoingAsync::completed, - &CommunicatorBatchOutgoingAsync::sent); - con->begin_flushBatchRequests(cb); -} + public: -void -IceInternal::CommunicatorBatchOutgoingAsync::ready() -{ - check(0, 0, true); -} + BatchOutgoingAsyncI(const CommunicatorBatchOutgoingAsyncPtr& outAsync) : + BatchOutgoingAsync(outAsync->_communicator, outAsync->_instance, outAsync->_operation, __dummyCallback, 0), + _outAsync(outAsync) + { + } -void -IceInternal::CommunicatorBatchOutgoingAsync::completed(const AsyncResultPtr& r) -{ - ConnectionPtr con = r->getConnection(); - assert(con); + virtual bool __sent(Ice::ConnectionI*) + { + _remoteObserver.detach(); + _outAsync->check(false); + return false; + } + + virtual void __finished(const Ice::LocalException&, bool) + { + _remoteObserver.detach(); + _outAsync->check(false); + } + + virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt) + { + _remoteObserver.attach(_outAsync->_observer.getRemoteObserver(connection, endpt)); + } + + private: + + const CommunicatorBatchOutgoingAsyncPtr _outAsync; + }; - try { - con->end_flushBatchRequests(r); - assert(false); // completed() should only be called when an exception occurs. + IceUtil::Monitor::Lock sync(_monitor); + ++_useCount; } - catch(const Ice::LocalException& ex) + + AsyncStatus status = con->flushAsyncBatchRequests(new BatchOutgoingAsyncI(this)); + if(!(status & AsyncStatusSent)) { - check(r, &ex, false); + _sentSynchronously = false; } } void -IceInternal::CommunicatorBatchOutgoingAsync::sent(const AsyncResultPtr& r) +IceInternal::CommunicatorBatchOutgoingAsync::ready() { - check(r, 0, r->sentSynchronously()); + check(true); } void -IceInternal::CommunicatorBatchOutgoingAsync::check(const AsyncResultPtr& r, const LocalException* ex, bool userThread) +IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread) { - bool done = false; - { IceUtil::Monitor::Lock sync(_monitor); assert(_useCount > 0); - --_useCount; - - // - // We report that the communicator flush request was sent synchronously - // if all of the connection flush requests are sent synchronously. - // - if((r && !r->sentSynchronously()) || ex) - { - _sentSynchronously = false; - } - - if(_useCount == 0) + if(--_useCount > 0) { - done = true; - _state |= Done | OK | Sent; - _monitor.notifyAll(); + return; } + + _observer.detach(); + _state |= Done | OK | Sent; + _monitor.notifyAll(); } - if(done) + // + // _sentSynchronously is immutable here. + // + if(!_sentSynchronously && userThread) { - // - // _sentSynchronously is immutable here. - // - if(!_sentSynchronously && userThread) - { - __sentAsync(); - } - else - { - assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible. - BatchOutgoingAsync::__sent(); - } + __sentAsync(); + } + else + { + assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible. + AsyncResult::__sent(); } } + namespace { @@ -1152,3 +1164,4 @@ Ice::AMICallbackBase::__sent(bool sentSynchronously) dynamic_cast(this)->ice_sent(); } } + diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index 69da155dd9d..4760984eb48 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -1657,7 +1657,7 @@ IceDelegateM::Ice::Object::ice_invoke(const string& operation, void IceDelegateM::Ice::Object::ice_flushBatchRequests(InvocationObserver& observer) { - BatchOutgoing __og(__handler.get()); + BatchOutgoing __og(__handler.get(), observer); __og.invoke(); } diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp index 61fda069922..a5bc169f8c8 100644 --- a/cpp/src/Ice/TcpTransceiver.cpp +++ b/cpp/src/Ice/TcpTransceiver.cpp @@ -443,7 +443,6 @@ IceInternal::TcpTransceiver::toString() const Ice::ConnectionInfoPtr IceInternal::TcpTransceiver::getInfo() const { - assert(_fd != INVALID_SOCKET); Ice::TCPConnectionInfoPtr info = new Ice::TCPConnectionInfo(); fdToAddressAndPort(_fd, info->localAddress, info->localPort, info->remoteAddress, info->remotePort); return info; diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp index b1aa39ddd96..9ecfaa1fceb 100644 --- a/cpp/src/Ice/UdpTransceiver.cpp +++ b/cpp/src/Ice/UdpTransceiver.cpp @@ -784,7 +784,11 @@ IceInternal::UdpTransceiver::getInfo() const return info; } #endif - assert(_fd != INVALID_SOCKET); + if(_fd == INVALID_SOCKET) + { + return info; + } + if(_state == StateNotConnected) { Address localAddr; diff --git a/cpp/src/IceSSL/TransceiverI.cpp b/cpp/src/IceSSL/TransceiverI.cpp index 0c05c971b65..d2d41ac2e4c 100644 --- a/cpp/src/IceSSL/TransceiverI.cpp +++ b/cpp/src/IceSSL/TransceiverI.cpp @@ -890,8 +890,6 @@ IceSSL::TransceiverI::~TransceiverI() NativeConnectionInfoPtr IceSSL::TransceiverI::getNativeConnectionInfo() const { - assert(_fd != INVALID_SOCKET); - NativeConnectionInfoPtr info = new NativeConnectionInfo(); IceInternal::fdToAddressAndPort(_fd, info->localAddress, info->localPort, info->remoteAddress, info->remotePort); -- cgit v1.2.3