summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2012-09-10 08:47:58 +0200
committerBenoit Foucher <benoit@zeroc.com>2012-09-10 08:47:58 +0200
commit9560b7d54ec4411f0605a3b53997835599f70ea2 (patch)
treec40611c772a7a4f1af4ea0df5d487305dded456d /cpp/src
parentFix (diff)
downloadice-9560b7d54ec4411f0605a3b53997835599f70ea2.tar.bz2
ice-9560b7d54ec4411f0605a3b53997835599f70ea2.tar.xz
ice-9560b7d54ec4411f0605a3b53997835599f70ea2.zip
Fixed communicator flushBatchRequests to allow tracing
Diffstat (limited to 'cpp/src')
-rwxr-xr-xcpp/src/Ice/BasicStream.cpp4
-rwxr-xr-xcpp/src/Ice/ConnectionFactory.cpp5
-rw-r--r--cpp/src/Ice/ConnectionI.cpp106
-rw-r--r--cpp/src/Ice/ConnectionI.h3
-rw-r--r--cpp/src/Ice/EndpointI.cpp4
-rw-r--r--cpp/src/Ice/Instance.cpp11
-rw-r--r--cpp/src/Ice/MetricsAdminI.cpp71
-rw-r--r--cpp/src/Ice/MetricsAdminI.h35
-rw-r--r--cpp/src/Ice/MetricsObserverI.h14
-rw-r--r--cpp/src/Ice/ObserverHelper.cpp83
-rw-r--r--cpp/src/Ice/ObserverI.cpp227
-rw-r--r--cpp/src/Ice/ObserverI.h17
-rw-r--r--cpp/src/Ice/Outgoing.cpp12
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp127
-rw-r--r--cpp/src/Ice/Proxy.cpp2
-rw-r--r--cpp/src/Ice/TcpTransceiver.cpp1
-rw-r--r--cpp/src/Ice/UdpTransceiver.cpp6
-rw-r--r--cpp/src/IceSSL/TransceiverI.cpp2
18 files changed, 468 insertions, 262 deletions
diff --git a/cpp/src/Ice/BasicStream.cpp b/cpp/src/Ice/BasicStream.cpp
index 199ef69d639..d8cf31ff20e 100755
--- a/cpp/src/Ice/BasicStream.cpp
+++ b/cpp/src/Ice/BasicStream.cpp
@@ -2856,8 +2856,8 @@ IceInternal::BasicStream::EncapsDecoder::skipSlice()
}
else
{
- throw MarshalException(__FILE__,
- __LINE__,
+ throw MarshalException(__FILE__,
+ __LINE__,
"compact format prevents slicing (the sender should use the sliced format instead)");
}
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 2869d610748..98917fb8cf7 100755
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -240,7 +240,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
ObserverPtr observer;
if(obsv)
{
- observer = obsv->getConnectionEstablishmentObserver(q->endpoint->getInfo(), q->connector->toString());
+ observer = obsv->getConnectionEstablishmentObserver(q->endpoint, q->connector->toString());
if(observer)
{
observer->attach();
@@ -1144,8 +1144,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector()
const CommunicatorObserverPtr& obsv = _factory->_instance->initializationData().observer;
if(obsv)
{
- _observer = obsv->getConnectionEstablishmentObserver(_iter->endpoint->getInfo(),
- _iter->connector->toString());
+ _observer = obsv->getConnectionEstablishmentObserver(_iter->endpoint, _iter->connector->toString());
if(_observer)
{
_observer->attach();
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index fd722eca237..1b29f6a7664 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -38,6 +38,9 @@ Ice::LocalObject* Ice::upCast(ConnectionI* p) { return p; }
namespace
{
+const ::std::string __flushBatchRequests_name = "flushBatchRequests";
+
+
class TimeoutCallback : public IceUtil::TimerTask
{
public:
@@ -501,23 +504,11 @@ Ice::ConnectionI::updateObserver()
return;
}
- if(!_info && _state < StateClosed)
- {
- _info = _transceiver->getInfo();
- _info->connectionId = _endpoint->connectionId();
- _info->incoming = _connector == 0;
- _info->adapterName = _adapter ? _adapter->getName() : string();
- }
-
- if(_info)
- {
- const CommunicatorObserverPtr& comObsv = _instance->initializationData().observer;
- assert(comObsv);
- _observer.attach(comObsv->getConnectionObserver(_info,
- _endpoint->getInfo(),
- connectionStateMap[static_cast<int>(_state)],
- _observer.get()));
- }
+ assert(_instance->initializationData().observer);
+ _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ toConnectionState(_state),
+ _observer.get()));
}
void
@@ -554,7 +545,6 @@ bool
Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
{
BasicStream* os = out->os();
- out->attachRemoteObserver(this);
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_exception.get())
@@ -570,6 +560,8 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
assert(_state > StateNotValidated);
assert(_state < StateClosing);
+ out->attachRemoteObserver(initConnectionInfo(), _endpoint);
+
//
// Ensure the message isn't bigger than what we can send with the
// transport.
@@ -647,6 +639,8 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
assert(_state > StateNotValidated);
assert(_state < StateClosing);
+ out->__attachRemoteObserver(initConnectionInfo(), _endpoint);
+
//
// Ensure the message isn't bigger than what we can send with the
// transport.
@@ -904,7 +898,8 @@ Ice::ConnectionI::abortBatchRequest()
void
Ice::ConnectionI::flushBatchRequests()
{
- BatchOutgoing out(this, _instance.get());
+ IceInternal::InvocationObserver observer(_instance.get(), __flushBatchRequests_name);
+ BatchOutgoing out(this, _instance.get(), observer);
out.invoke();
}
@@ -914,13 +909,6 @@ Ice::ConnectionI::begin_flushBatchRequests()
return __begin_flushBatchRequests(__dummyCallback, 0);
}
-namespace
-{
-
-const ::std::string __flushBatchRequests_name = "flushBatchRequests";
-
-}
-
AsyncResultPtr
Ice::ConnectionI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie)
{
@@ -971,6 +959,8 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
_exception->ice_throw();
}
+ out->attachRemoteObserver(initConnectionInfo(), _endpoint);
+
if(_batchRequestNum == 0)
{
out->sent(false);
@@ -1029,6 +1019,8 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
_exception->ice_throw();
}
+ outAsync->__attachRemoteObserver(initConnectionInfo(), _endpoint);
+
if(_batchRequestNum == 0)
{
AsyncStatus status = AsyncStatusSent;
@@ -1824,15 +1816,7 @@ Ice::ConnectionI::getInfo() const
{
_exception->ice_throw();
}
-
- if(!_info)
- {
- _info = _transceiver->getInfo();
- _info->connectionId = _endpoint->connectionId();
- _info->incoming = _connector == 0;
- _info->adapterName = _adapter ? _adapter->getName() : string();
- }
- return _info;
+ return initConnectionInfo();
}
void
@@ -2177,8 +2161,8 @@ Ice::ConnectionI::setState(State state)
if(_observer)
{
- ConnectionState oldState = connectionStateMap[static_cast<int>(_state)];
- ConnectionState newState = connectionStateMap[static_cast<int>(state)];
+ ConnectionState oldState = toConnectionState(_state);
+ ConnectionState newState = toConnectionState(state);
if(oldState != newState)
{
_observer->stateChanged(oldState, newState);
@@ -2271,26 +2255,21 @@ Ice::ConnectionI::initialize(SocketOperation operation)
_threadPool->update(this, operation, s);
return false;
}
-
- const CommunicatorObserverPtr& comObsv = _instance->initializationData().observer;
- if(comObsv)
- {
- _info = _transceiver->getInfo();
- _info->connectionId = _endpoint->connectionId();
- _info->incoming = _connector == 0;
- _info->adapterName = _adapter ? _adapter->getName() : string();
-
- _observer.attach(comObsv->getConnectionObserver(_info,
- _endpoint->getInfo(),
- ConnectionStateValidating,
- 0));
- }
//
// Update the connection description once the transceiver is initialized.
//
const_cast<string&>(_desc) = _transceiver->toString();
setState(StateNotValidated);
+
+ if(_instance->initializationData().observer)
+ {
+ _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ ConnectionStateValidating,
+ 0));
+ }
+
return true;
}
@@ -3114,3 +3093,28 @@ Ice::ConnectionI::closeTimeout()
return _endpoint->timeout();
}
}
+
+Ice::ConnectionInfoPtr
+Ice::ConnectionI::initConnectionInfo() const
+{
+ if(_info)
+ {
+ return _info;
+ }
+
+ ConnectionInfoPtr info = _transceiver->getInfo();
+ info->connectionId = _endpoint->connectionId();
+ info->incoming = _connector == 0;
+ info->adapterName = _adapter ? _adapter->getName() : string();
+ if(_state > StateNotInitialized)
+ {
+ _info = info; // Cache the connection information only if initialized.
+ }
+ return info;
+}
+
+ConnectionState
+ConnectionI::toConnectionState(State state) const
+{
+ return connectionStateMap[static_cast<int>(state)];
+}
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index 1ede520d520..8a8ceb67116 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -299,6 +299,9 @@ private:
int connectTimeout();
int closeTimeout();
+ Ice::ConnectionInfoPtr initConnectionInfo() const;
+ Ice::Instrumentation::ConnectionState toConnectionState(State) const;
+
AsyncResultPtr __begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
Ice::CommunicatorPtr _communicator;
diff --git a/cpp/src/Ice/EndpointI.cpp b/cpp/src/Ice/EndpointI.cpp
index a0b6f0c4415..0e1b8173db5 100644
--- a/cpp/src/Ice/EndpointI.cpp
+++ b/cpp/src/Ice/EndpointI.cpp
@@ -205,7 +205,7 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, const E
const CommunicatorObserverPtr& obsv = _instance->initializationData().observer;
if(obsv)
{
- observer.attach(obsv->getEndpointLookupObserver(endpoint->getInfo(), endpoint->toString()));
+ observer.attach(obsv->getEndpointLookupObserver(endpoint));
}
vector<ConnectorPtr> connectors;
@@ -255,7 +255,7 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, const E
const CommunicatorObserverPtr& obsv = _instance->initializationData().observer;
if(obsv)
{
- entry.observer = obsv->getEndpointLookupObserver(endpoint->getInfo(), endpoint->toString());
+ entry.observer = obsv->getEndpointLookupObserver(endpoint);
if(entry.observer)
{
entry.observer->attach();
diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp
index 71dde521406..e4f016cd0af 100644
--- a/cpp/src/Ice/Instance.cpp
+++ b/cpp/src/Ice/Instance.cpp
@@ -1097,15 +1097,15 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi
_adminFacets.insert(FacetMap::value_type("Process", new ProcessI(communicator)));
- IceMX::MetricsAdminIPtr admin = new IceMX::MetricsAdminI(_initData.properties);
+ IceMX::MetricsAdminIPtr admin = new IceMX::MetricsAdminI(_initData.properties, _initData.logger);
_adminFacets.insert(FacetMap::value_type("MetricsAdmin", admin));
PropertiesAdminIPtr props = new PropertiesAdminI("Properties", _initData.properties, _initData.logger);
_adminFacets.insert(FacetMap::value_type("Properties",props));
//
- // Setup the communicator observer only the metrics admin plugin only if the user didn't already set an
- // Ice observer resovler.
+ // Setup the communicator observer only if the user didn't already set an
+ // Ice observer resolver and if the admininistrative endpoints are set.
//
if(!_initData.observer &&
(_adminFacetFilter.empty() || _adminFacetFilter.find("MetricsAdmin") != _adminFacetFilter.end()) &&
@@ -1114,7 +1114,10 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi
IceMX::CommunicatorObserverIPtr observer = new IceMX::CommunicatorObserverI(admin);
_initData.observer = observer;
- // Make sure the MetricsAdmin plugin received property update notifications.
+ //
+ // Make sure the observer receives property update notifications to update
+ // the metrics admin configuration.
+ //
props->addUpdateCallback(observer);
}
diff --git a/cpp/src/Ice/MetricsAdminI.cpp b/cpp/src/Ice/MetricsAdminI.cpp
index 8b6ae36586b..1596126712f 100644
--- a/cpp/src/Ice/MetricsAdminI.cpp
+++ b/cpp/src/Ice/MetricsAdminI.cpp
@@ -13,6 +13,7 @@
#include <Ice/Properties.h>
#include <Ice/Communicator.h>
#include <Ice/Instance.h>
+#include <Ice/LoggerUtil.h>
#include <IceUtil/StringUtil.h>
@@ -24,7 +25,7 @@ namespace
{
vector<MetricsMapI::RegExpPtr>
-parseRule(const ::Ice::PropertiesPtr& properties, const string& name)
+parseRule(const PropertiesPtr& properties, const string& name)
{
vector<MetricsMapI::RegExpPtr> regexps;
PropertyDict rules = properties->getPropertiesForPrefix(name + '.');
@@ -42,7 +43,7 @@ MetricsMapI::RegExp::RegExp(const string& attribute, const string& regexp) : _at
#ifndef ICE_CPP11_REGEXP
if(regcomp(&_preg, regexp.c_str(), REG_EXTENDED | REG_NOSUB) != 0)
{
- throw Ice::SyscallException(__FILE__, __LINE__);
+ throw SyscallException(__FILE__, __LINE__);
}
#else
_regex = regex(regexp, std::regex_constants::extended | std::regex_constants::nosubs);
@@ -71,7 +72,7 @@ MetricsMapI::RegExp::match(const MetricsHelper& helper)
#endif
}
-MetricsMapI::MetricsMapI(const std::string& mapPrefix, const Ice::PropertiesPtr& properties) :
+MetricsMapI::MetricsMapI(const std::string& mapPrefix, const PropertiesPtr& properties) :
_properties(properties->getPropertiesForPrefix(mapPrefix)),
_retain(properties->getPropertyAsIntWithDefault(mapPrefix + "RetainDetached", 10)),
_accept(parseRule(properties, mapPrefix + "Accept")),
@@ -133,7 +134,7 @@ MetricsViewI::MetricsViewI(const string& name) : _name(name)
}
void
-MetricsViewI::update(const Ice::PropertiesPtr& properties,
+MetricsViewI::update(const PropertiesPtr& properties,
const map<string, MetricsMapFactoryPtr>& factories,
set<string>& updatedMaps)
{
@@ -230,7 +231,8 @@ MetricsViewI::getMap(const string& mapName) const
return 0;
}
-MetricsAdminI::MetricsAdminI(const Ice::PropertiesPtr& properties) : _properties(properties)
+MetricsAdminI::MetricsAdminI(const PropertiesPtr& properties, const LoggerPtr& logger) :
+ _properties(properties), _logger(logger)
{
}
@@ -269,7 +271,10 @@ MetricsAdminI::updateViews()
{
continue; // The view is disabled
}
-
+
+ //
+ // Create the view or update it.
+ //
map<string, MetricsViewIPtr>::const_iterator q = _views.find(viewName);
if(q == _views.end())
{
@@ -284,7 +289,7 @@ MetricsAdminI::updateViews()
_views.swap(views);
//
- // Go through removed views to collect updated maps.
+ // Go through removed views to collect maps to update.
//
for(map<string, MetricsViewIPtr>::const_iterator p = views.begin(); p != views.end(); ++p)
{
@@ -296,7 +301,7 @@ MetricsAdminI::updateViews()
}
//
- // Call the observer update for each of the updated maps.
+ // Gather the updates for each of the map to update.
//
for(set<string>::const_iterator p = updatedMaps.begin(); p != updatedMaps.end(); ++p)
{
@@ -306,25 +311,29 @@ MetricsAdminI::updateViews()
updaters.push_back(q->second);
}
}
- }
-
- for(vector<UpdaterPtr>::const_iterator p = updaters.begin(); p != updaters.end(); ++p)
- {
- try
- {
- (*p)->update();
- }
- catch(...)
- {
- // TODO: Warn?
- }
- }
+ }
+
+ //
+ // Call the updaters to update the maps.
+ //
+ for(vector<UpdaterPtr>::const_iterator p = updaters.begin(); p != updaters.end(); ++p)
+ {
+ try
+ {
+ (*p)->update();
+ }
+ catch(const std::exception& ex)
+ {
+ Warning warn(_logger);
+ warn << "unexpected exception while calling observer updater:\n" << ex;
+ }
+ }
}
-
-Ice::StringSeq
-MetricsAdminI::getMetricsViewNames(const ::Ice::Current&)
+
+StringSeq
+MetricsAdminI::getMetricsViewNames(const Current&)
{
- Ice::StringSeq names;
+ StringSeq names;
Lock sync(*this);
for(map<string, MetricsViewIPtr>::const_iterator p = _views.begin(); p != _views.end(); ++p)
@@ -335,7 +344,7 @@ MetricsAdminI::getMetricsViewNames(const ::Ice::Current&)
}
MetricsView
-MetricsAdminI::getMetricsView(const string& view, const ::Ice::Current&)
+MetricsAdminI::getMetricsView(const string& view, const Current&)
{
Lock sync(*this);
std::map<string, MetricsViewIPtr>::const_iterator p = _views.find(view);
@@ -347,7 +356,7 @@ MetricsAdminI::getMetricsView(const string& view, const ::Ice::Current&)
}
MetricsFailuresSeq
-MetricsAdminI::getMapMetricsFailures(const string& view, const string& map, const ::Ice::Current&)
+MetricsAdminI::getMapMetricsFailures(const string& view, const string& map, const Current&)
{
Lock sync(*this);
std::map<string, MetricsViewIPtr>::const_iterator p = _views.find(view);
@@ -359,7 +368,7 @@ MetricsAdminI::getMapMetricsFailures(const string& view, const string& map, cons
}
MetricsFailures
-MetricsAdminI::getMetricsFailures(const string& view, const string& map, const string& id, const ::Ice::Current&)
+MetricsAdminI::getMetricsFailures(const string& view, const string& map, const string& id, const Current&)
{
Lock sync(*this);
std::map<string, MetricsViewIPtr>::const_iterator p = _views.find(view);
@@ -385,3 +394,9 @@ MetricsAdminI::getMaps(const string& mapName) const
}
return maps;
}
+
+const LoggerPtr&
+MetricsAdminI::getLogger() const
+{
+ return _logger;
+}
diff --git a/cpp/src/Ice/MetricsAdminI.h b/cpp/src/Ice/MetricsAdminI.h
index c788a8e28b0..1af52c19317 100644
--- a/cpp/src/Ice/MetricsAdminI.h
+++ b/cpp/src/Ice/MetricsAdminI.h
@@ -155,17 +155,17 @@ public:
return map->getMatching(helper);
}
- TPtr
+ void
attach(const MetricsHelperT<T>& helper)
{
Lock sync(*this);
++_object->total;
++_object->current;
helper.initMetrics(_object);
- return _object;
}
- void detach(Ice::Long lifetime)
+ void
+ detach(Ice::Long lifetime)
{
MetricsMapT* map;
{
@@ -293,7 +293,7 @@ public:
getFailures(const std::string& id)
{
Lock sync(*this);
- typename std::map<std::string, EntryTPtr>::const_iterator p = _objects.begin();
+ typename std::map<std::string, EntryTPtr>::const_iterator p = _objects.find(id);
if(p != _objects.end())
{
return p->second->getFailures();
@@ -317,6 +317,9 @@ public:
EntryTPtr
getMatching(const MetricsHelperT<T>& helper)
{
+ //
+ // Check the accept and reject filters.
+ //
for(std::vector<RegExpPtr>::const_iterator p = _accept.begin(); p != _accept.end(); ++p)
{
if(!(*p)->match(helper))
@@ -332,7 +335,10 @@ public:
return 0;
}
}
-
+
+ //
+ // Compute the key from the GroupBy property.
+ //
std::string key;
if(_groupByAttributes.size() == 1)
{
@@ -353,7 +359,10 @@ public:
}
key = os.str();
}
-
+
+ //
+ // Lookup the metrics object.
+ //
Lock sync(*this);
typename std::map<std::string, EntryTPtr>::const_iterator p = _objects.find(key);
if(p == _objects.end())
@@ -369,7 +378,6 @@ private:
{
TPtr t = new T();
t->id = id;
- t->failures = 0;
return new EntryT(this, t, _detachedQueue.end());
}
@@ -388,6 +396,7 @@ private:
Lock sync(*this);
assert(static_cast<int>(_detachedQueue.size()) <= _retain);
+ // If the entry is already detached and in the queue, just move it to the back.
if(entry->_detachedPos != _detachedQueue.end())
{
_detachedQueue.splice(_detachedQueue.end(), _detachedQueue, entry->_detachedPos);
@@ -395,6 +404,7 @@ private:
return;
}
+ // Otherwise, compress the queue by removing entries which are no longer detached.
if(static_cast<int>(_detachedQueue.size()) == _retain)
{
// Remove entries which are no longer detached
@@ -403,6 +413,7 @@ private:
{
if(!(*p)->isDetached())
{
+ (*p)->_detachedPos = _detachedQueue.end();
p = _detachedQueue.erase(p);
}
else
@@ -412,13 +423,14 @@ private:
}
}
+ // If there's still no room, remove the oldest entry (at the front).
if(static_cast<int>(_detachedQueue.size()) == _retain)
{
- // Remove oldest entry if there's still no room
_objects.erase(_detachedQueue.front()->_object->id);
_detachedQueue.pop_front();
}
+ // Add the entry at the back of the queue.
_detachedQueue.push_back(entry);
entry->_detachedPos = --_detachedQueue.end();
}
@@ -478,7 +490,7 @@ class MetricsAdminI : public MetricsAdmin, private IceUtil::Mutex
{
public:
- MetricsAdminI(const ::Ice::PropertiesPtr&);
+ MetricsAdminI(const ::Ice::PropertiesPtr&, const Ice::LoggerPtr&);
void addUpdater(const std::string&, const UpdaterPtr&);
void updateViews();
@@ -507,13 +519,16 @@ public:
std::vector<MetricsMapIPtr> getMaps(const std::string&) const;
+ const Ice::LoggerPtr& getLogger() const;
+
private:
std::map<std::string, MetricsViewIPtr> _views;
std::map<std::string, UpdaterPtr> _updaters;
std::map<std::string, MetricsMapFactoryPtr> _factories;
- Ice::PropertiesPtr _properties;
+ const Ice::PropertiesPtr _properties;
+ const Ice::LoggerPtr _logger;
};
typedef IceUtil::Handle<MetricsAdminI> MetricsAdminIPtr;
diff --git a/cpp/src/Ice/MetricsObserverI.h b/cpp/src/Ice/MetricsObserverI.h
index fee2ba2305b..7af71ae3e8d 100644
--- a/cpp/src/Ice/MetricsObserverI.h
+++ b/cpp/src/Ice/MetricsObserverI.h
@@ -283,7 +283,6 @@ public:
detach()
{
Ice::Long lifetime = _watch.stop();
- IceUtil::Mutex::Lock sync(*_mutex);
for(typename EntrySeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
{
(*p)->detach(lifetime);
@@ -293,7 +292,6 @@ public:
virtual void
failed(const std::string& exceptionName)
{
- IceUtil::Mutex::Lock sync(*_mutex);
for(typename EntrySeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
{
(*p)->failed(exceptionName);
@@ -303,7 +301,6 @@ public:
template<typename Function> void
forEach(const Function& func)
{
- IceUtil::Mutex::Lock sync(*_mutex);
for(typename EntrySeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
{
(*p)->execute(func);
@@ -311,9 +308,8 @@ public:
}
void
- init(const MetricsHelperT<MetricsType>& helper, EntrySeqType& objects, IceUtil::Mutex* mutex)
+ init(const MetricsHelperT<MetricsType>& helper, EntrySeqType& objects)
{
- _mutex = mutex;
_objects.swap(objects);
std::sort(_objects.begin(), _objects.end());
for(typename EntrySeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
@@ -352,7 +348,6 @@ public:
template<typename ObserverImpl, typename ObserverMetricsType> IceInternal::Handle<ObserverImpl>
getObserver(const std::string& mapName, const MetricsHelperT<ObserverMetricsType>& helper)
{
- IceUtil::Mutex::Lock sync(*_mutex);
std::vector<typename MetricsMapT<ObserverMetricsType>::EntryTPtr> metricsObjects;
for(typename EntrySeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
{
@@ -369,7 +364,7 @@ public:
}
IceInternal::Handle<ObserverImpl> obsv = new ObserverImpl();
- obsv->init(helper, metricsObjects, _mutex);
+ obsv->init(helper, metricsObjects);
return obsv;
}
@@ -377,7 +372,6 @@ private:
EntrySeqType _objects;
IceUtilInternal::StopWatch _watch;
- IceUtil::Mutex* _mutex;
};
class ObserverI : virtual public Ice::Instrumentation::Observer, public ObserverT<Metrics>
@@ -438,7 +432,7 @@ public:
std::sort(metricsObjects.begin(), metricsObjects.end());
ObserverImplPtrType obsv = new ObserverImplType();
- obsv->init(helper, metricsObjects, this);
+ obsv->init(helper, metricsObjects);
return obsv;
}
@@ -468,7 +462,7 @@ public:
if(!obsv)
{
obsv = new ObserverImplType();
- obsv->init(helper, metricsObjects, this);
+ obsv->init(helper, metricsObjects);
}
else
{
diff --git a/cpp/src/Ice/ObserverHelper.cpp b/cpp/src/Ice/ObserverHelper.cpp
index c4406dd088b..32c162395b1 100644
--- a/cpp/src/Ice/ObserverHelper.cpp
+++ b/cpp/src/Ice/ObserverHelper.cpp
@@ -15,43 +15,64 @@
using namespace std;
using namespace Ice;
-using namespace IceInternal;
+using namespace Ice::Instrumentation;
-InvocationObserver::InvocationObserver(IceProxy::Ice::Object* proxy, const string& operation, const Context* context)
+IceInternal::InvocationObserver::InvocationObserver(IceProxy::Ice::Object* proxy, const string& op, const Context* ctx)
{
- const Ice::Instrumentation::CommunicatorObserverPtr& obsv =
- proxy->__reference()->getInstance()->initializationData().observer;
- if(obsv)
- {
- if(context)
- {
- ObserverHelperT<Ice::Instrumentation::InvocationObserver>::attach(
- obsv->getInvocationObserverWithContext(proxy, operation, *context));
- }
- else
- {
- ObserverHelperT<Ice::Instrumentation::InvocationObserver>::attach(
- obsv->getInvocationObserver(proxy, operation));
- }
+ const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->initializationData().observer;
+ if(!obsv)
+ {
+ return;
+ }
+
+ if(ctx)
+ {
+ attach(obsv->getInvocationObserverWithContext(proxy, op, *ctx));
+ }
+ else
+ {
+ attach(obsv->getInvocationObserver(proxy, op));
}
}
+IceInternal::InvocationObserver::InvocationObserver(IceInternal::Instance* instance, const string& op)
+{
+ const CommunicatorObserverPtr& obsv = instance->initializationData().observer;
+ if(!obsv)
+ {
+ return;
+ }
+
+ attach(obsv->getInvocationObserver(0, op));
+}
+
void
-InvocationObserver::attach(IceProxy::Ice::Object* proxy, const string& operation, const Context* context)
+IceInternal::InvocationObserver::attach(IceProxy::Ice::Object* proxy, const string& op, const Context* ctx)
{
- const Ice::Instrumentation::CommunicatorObserverPtr& obsv =
- proxy->__reference()->getInstance()->initializationData().observer;
- if(obsv)
- {
- if(context)
- {
- ObserverHelperT<Ice::Instrumentation::InvocationObserver>::attach(
- obsv->getInvocationObserverWithContext(proxy, operation, *context));
- }
- else
- {
- ObserverHelperT<Ice::Instrumentation::InvocationObserver>::attach(
- obsv->getInvocationObserver(proxy, operation));
- }
+ const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->initializationData().observer;
+ if(!obsv)
+ {
+ return;
}
+
+ if(ctx)
+ {
+ attach(obsv->getInvocationObserverWithContext(proxy, op, *ctx));
+ }
+ else
+ {
+ attach(obsv->getInvocationObserver(proxy, op));
+ }
+}
+
+void
+IceInternal::InvocationObserver::attach(IceInternal::Instance* instance, const string& op)
+{
+ const CommunicatorObserverPtr& obsv = instance->initializationData().observer;
+ if(!obsv)
+ {
+ return;
+ }
+
+ attach(obsv->getInvocationObserver(0, op));
}
diff --git a/cpp/src/Ice/ObserverI.cpp b/cpp/src/Ice/ObserverI.cpp
index 715b6a180f0..1361a5de123 100644
--- a/cpp/src/Ice/ObserverI.cpp
+++ b/cpp/src/Ice/ObserverI.cpp
@@ -12,6 +12,9 @@
#include <Ice/Connection.h>
#include <Ice/Endpoint.h>
#include <Ice/ObjectAdapter.h>
+#include <Ice/LocalException.h>
+#include <Ice/Communicator.h>
+#include <Ice/LoggerUtil.h>
using namespace std;
using namespace Ice;
@@ -21,7 +24,7 @@ using namespace IceMX;
namespace
{
-Ice::Context emptyCtx;
+Context emptyCtx;
int ConnectionMetrics::*
getConnectionStateMetric(ConnectionState s)
@@ -154,8 +157,8 @@ public:
};
static Attributes attributes;
- ConnectionHelper(const ConnectionInfoPtr& con, const EndpointInfoPtr& endpt, ConnectionState state) :
- _connection(con), _endpoint(endpt), _state(state)
+ ConnectionHelper(const ConnectionInfoPtr& con, const EndpointPtr& endpt, ConnectionState state) :
+ _connectionInfo(con), _endpoint(endpt), _state(state)
{
}
@@ -175,7 +178,7 @@ public:
if(_id.empty())
{
ostringstream os;
- IPConnectionInfoPtr info = IPConnectionInfoPtr::dynamicCast(_connection);
+ IPConnectionInfoPtr info = IPConnectionInfoPtr::dynamicCast(_connectionInfo);
if(info)
{
os << info->localAddress << ':' << info->localPort;
@@ -184,7 +187,7 @@ public:
}
else
{
- os << "connection-" << _connection.get();
+ os << "connection-" << _connectionInfo.get();
}
_id = os.str();
}
@@ -194,9 +197,9 @@ public:
string
getParent() const
{
- if(!_connection->adapterName.empty())
+ if(!_connectionInfo->adapterName.empty())
{
- return _connection->adapterName;
+ return _connectionInfo->adapterName;
}
else
{
@@ -204,24 +207,29 @@ public:
}
}
- ConnectionInfoPtr
+ const ConnectionInfoPtr&
getConnectionInfo() const
{
- return _connection;
+ return _connectionInfo;
}
- EndpointInfoPtr
+ const EndpointInfoPtr&
getEndpointInfo() const
{
- return _endpoint;
+ if(!_endpointInfo)
+ {
+ _endpointInfo = _endpoint->getInfo();
+ }
+ return _endpointInfo;
}
private:
- const ConnectionInfoPtr& _connection;
- const EndpointInfoPtr& _endpoint;
+ const ConnectionInfoPtr& _connectionInfo;
+ const EndpointPtr& _endpoint;
const ConnectionState _state;
mutable string _id;
+ mutable EndpointInfoPtr _endpointInfo;
};
ConnectionHelper::Attributes ConnectionHelper::attributes;
@@ -266,7 +274,7 @@ public:
{
if(attribute.compare(0, 8, "context.") == 0)
{
- Ice::Context::const_iterator p = _current.ctx.find(attribute.substr(8));
+ Context::const_iterator p = _current.ctx.find(attribute.substr(8));
if(p != _current.ctx.end())
{
return p->second;
@@ -309,10 +317,14 @@ public:
return _current.con->getInfo();
}
- EndpointInfoPtr
+ const EndpointInfoPtr&
getEndpointInfo() const
{
- return _current.con->getEndpoint()->getInfo();
+ if(!_endpointInfo)
+ {
+ _endpointInfo = _current.con->getEndpoint()->getInfo();
+ }
+ return _endpointInfo;
}
const Current&
@@ -331,6 +343,7 @@ private:
const Current& _current;
mutable string _id;
+ mutable EndpointInfoPtr _endpointInfo;
};
DispatchHelper::Attributes DispatchHelper::attributes;
@@ -361,7 +374,7 @@ public:
};
static Attributes attributes;
- InvocationHelper(const Ice::ObjectPrx& proxy, const string& op, const Ice::Context& ctx = emptyCtx) :
+ InvocationHelper(const ObjectPrx& proxy, const string& op, const Context& ctx = emptyCtx) :
_proxy(proxy), _operation(op), _context(ctx)
{
}
@@ -370,7 +383,7 @@ public:
{
if(attribute.compare(0, 8, "context.") == 0)
{
- Ice::Context::const_iterator p = _context.find(attribute.substr(8));
+ Context::const_iterator p = _context.find(attribute.substr(8));
if(p != _context.end())
{
return p->second;
@@ -387,6 +400,11 @@ public:
string
getMode() const
{
+ if(!_proxy)
+ {
+ return "unknown";
+ }
+
if(_proxy->ice_isTwoway())
{
return "twoway";
@@ -419,7 +437,22 @@ public:
if(_id.empty())
{
ostringstream os;
- os << _proxy << " [" << _operation << ']';
+ if(_proxy)
+ {
+ try
+ {
+ os << _proxy << " [" << _operation << ']';
+ }
+ catch(const FixedProxyException& ex)
+ {
+ os << _proxy->ice_getCommunicator()->identityToString(_proxy->ice_getIdentity());
+ os << " [" << _operation << ']';
+ }
+ }
+ else
+ {
+ os << _operation;
+ }
_id = os.str();
}
return _id;
@@ -441,7 +474,14 @@ public:
Identity
getIdentity() const
{
- return _proxy->ice_getIdentity();
+ if(_proxy)
+ {
+ return _proxy->ice_getIdentity();
+ }
+ else
+ {
+ return Identity();
+ }
}
const string&
@@ -454,7 +494,7 @@ private:
const ObjectPrx& _proxy;
const string& _operation;
- const Ice::Context& _context;
+ const Context& _context;
mutable string _id;
};
@@ -477,7 +517,8 @@ public:
};
static Attributes attributes;
- RemoteInvocationHelper(const ConnectionPtr& con) : _connection(con)
+ RemoteInvocationHelper(const ConnectionInfoPtr& con, const EndpointPtr& endpt) :
+ _connectionInfo(con), _endpoint(endpt)
{
}
@@ -492,14 +533,14 @@ public:
if(_id.empty())
{
ostringstream os;
- IPConnectionInfoPtr info = IPConnectionInfoPtr::dynamicCast(_connection->getInfo());
+ IPConnectionInfoPtr info = IPConnectionInfoPtr::dynamicCast(_connectionInfo);
if(info)
{
os << info->remoteAddress << ':' << info->remotePort;
}
else
{
- os << "connection-" << _connection.get();
+ os << "connection-" << _connectionInfo.get();
}
_id = os.str();
}
@@ -509,9 +550,9 @@ public:
string
getParent() const
{
- if(_connection->getAdapter())
+ if(!_connectionInfo->adapterName.empty())
{
- return _connection->getAdapter()->getName();
+ return _connectionInfo->adapterName;
}
else
{
@@ -519,22 +560,28 @@ public:
}
}
- ConnectionInfoPtr
+ const ConnectionInfoPtr&
getConnectionInfo() const
{
- return _connection->getInfo();
+ return _connectionInfo;
}
- EndpointInfoPtr
+ const EndpointInfoPtr&
getEndpointInfo() const
{
- return _connection->getEndpoint()->getInfo();
+ if(!_endpointInfo)
+ {
+ _endpointInfo = _endpoint->getInfo();
+ }
+ return _endpointInfo;
}
private:
- const ConnectionPtr& _connection;
+ const ConnectionInfoPtr& _connectionInfo;
+ const EndpointPtr& _endpoint;
mutable string _id;
+ mutable EndpointInfoPtr _endpointInfo;
};
RemoteInvocationHelper::Attributes RemoteInvocationHelper::attributes;
@@ -592,13 +639,17 @@ public:
Attributes()
{
add("parent", &EndpointHelper::getParent);
- add("id", &EndpointHelper::_id);
+ add("id", &EndpointHelper::getId);
addEndpointAttributes<EndpointHelper>(*this);
}
};
static Attributes attributes;
- EndpointHelper(const EndpointInfoPtr& endpt, const string& id) : _id(id), _endpoint(endpt)
+ EndpointHelper(const EndpointPtr& endpt, const string& id) : _endpoint(endpt), _id(id)
+ {
+ }
+
+ EndpointHelper(const EndpointPtr& endpt) : _endpoint(endpt)
{
}
@@ -607,10 +658,14 @@ public:
return attributes(this, attribute);
}
- EndpointInfoPtr
+ const EndpointInfoPtr&
getEndpointInfo() const
{
- return _endpoint;
+ if(!_endpointInfo)
+ {
+ _endpointInfo = _endpoint->getInfo();
+ }
+ return _endpointInfo;
}
string
@@ -619,10 +674,21 @@ public:
return "Communicator";
}
+ const string&
+ getId() const
+ {
+ if(_id.empty())
+ {
+ _id = _endpoint->toString();
+ }
+ return _id;
+ }
+
private:
- const string _id;
- const EndpointInfoPtr _endpoint;
+ const EndpointPtr _endpoint;
+ mutable string _id;
+ mutable EndpointInfoPtr _endpointInfo;
};
EndpointHelper::Attributes EndpointHelper::attributes;
@@ -667,9 +733,16 @@ InvocationObserverI::retried()
}
ObserverPtr
-InvocationObserverI::getRemoteObserver(const ConnectionPtr& connection)
+InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection, const EndpointPtr& endpoint)
{
- return getObserver<ObserverI>("Remote", RemoteInvocationHelper(connection));
+ try
+ {
+ return getObserver<ObserverI>("Remote", RemoteInvocationHelper(connection, endpoint));
+ }
+ catch(const exception&)
+ {
+ }
+ return 0;
}
CommunicatorObserverI::CommunicatorObserverI(const MetricsAdminIPtr& metrics) :
@@ -694,34 +767,58 @@ CommunicatorObserverI::setObserverUpdater(const ObserverUpdaterPtr& updater)
}
ObserverPtr
-CommunicatorObserverI::getConnectionEstablishmentObserver(const EndpointInfoPtr& endpt, const string& connector)
+CommunicatorObserverI::getConnectionEstablishmentObserver(const EndpointPtr& endpt, const string& connector)
{
if(_connects.isEnabled())
{
- return _connects.getObserver(EndpointHelper(endpt, connector));
+ try
+ {
+ return _connects.getObserver(EndpointHelper(endpt, connector));
+ }
+ catch(const exception& ex)
+ {
+ Error error(_metrics->getLogger());
+ error << "unexpected exception trying to obtain observer:\n" << ex;
+ }
}
return 0;
}
ObserverPtr
-CommunicatorObserverI::getEndpointLookupObserver(const EndpointInfoPtr& endpt, const string& endpoint)
+CommunicatorObserverI::getEndpointLookupObserver(const EndpointPtr& endpt)
{
if(_endpointLookups.isEnabled())
{
- return _endpointLookups.getObserver(EndpointHelper(endpt, endpoint));
+ try
+ {
+ return _endpointLookups.getObserver(EndpointHelper(endpt));
+ }
+ catch(const exception& ex)
+ {
+ Error error(_metrics->getLogger());
+ error << "unexpected exception trying to obtain observer:\n" << ex;
+ }
}
return 0;
}
ConnectionObserverPtr
CommunicatorObserverI::getConnectionObserver(const ConnectionInfoPtr& con,
- const EndpointInfoPtr& endpt,
+ const EndpointPtr& endpt,
ConnectionState state,
const ConnectionObserverPtr& observer)
{
if(_connections.isEnabled())
{
- return _connections.getObserver(ConnectionHelper(con, endpt, state), observer);
+ try
+ {
+ return _connections.getObserver(ConnectionHelper(con, endpt, state), observer);
+ }
+ catch(const exception& ex)
+ {
+ Error error(_metrics->getLogger());
+ error << "unexpected exception trying to obtain observer:\n" << ex;
+ }
}
return 0;
}
@@ -734,7 +831,15 @@ CommunicatorObserverI::getThreadObserver(const string& parent,
{
if(_threads.isEnabled())
{
- return _threads.getObserver(ThreadHelper(parent, id, state), observer);
+ try
+ {
+ return _threads.getObserver(ThreadHelper(parent, id, state), observer);
+ }
+ catch(const exception& ex)
+ {
+ Error error(_metrics->getLogger());
+ error << "unexpected exception trying to obtain observer:\n" << ex;
+ }
}
return 0;
}
@@ -744,7 +849,15 @@ CommunicatorObserverI::getInvocationObserver(const ObjectPrx& proxy, const strin
{
if(_invocations.isEnabled())
{
- return _invocations.getObserver(InvocationHelper(proxy, op));
+ try
+ {
+ return _invocations.getObserver(InvocationHelper(proxy, op));
+ }
+ catch(const exception& ex)
+ {
+ Error error(_metrics->getLogger());
+ error << "unexpected exception trying to obtain observer:\n" << ex;
+ }
}
return 0;
}
@@ -754,7 +867,15 @@ CommunicatorObserverI::getInvocationObserverWithContext(const ObjectPrx& proxy,
{
if(_invocations.isEnabled())
{
- return _invocations.getObserver(InvocationHelper(proxy, op, ctx));
+ try
+ {
+ return _invocations.getObserver(InvocationHelper(proxy, op, ctx));
+ }
+ catch(const exception& ex)
+ {
+ Error error(_metrics->getLogger());
+ error << "unexpected exception trying to obtain observer:\n" << ex;
+ }
}
return 0;
}
@@ -764,7 +885,15 @@ CommunicatorObserverI::getDispatchObserver(const Current& current)
{
if(_dispatch.isEnabled())
{
- return _dispatch.getObserver(DispatchHelper(current));
+ try
+ {
+ return _dispatch.getObserver(DispatchHelper(current));
+ }
+ catch(const exception& ex)
+ {
+ Error error(_metrics->getLogger());
+ error << "unexpected exception trying to obtain observer:\n" << ex;
+ }
}
return 0;
}
diff --git a/cpp/src/Ice/ObserverI.h b/cpp/src/Ice/ObserverI.h
index c3954258ae8..3be96eea71e 100644
--- a/cpp/src/Ice/ObserverI.h
+++ b/cpp/src/Ice/ObserverI.h
@@ -39,7 +39,15 @@ public:
virtual void retried();
- virtual Ice::Instrumentation::ObserverPtr getRemoteObserver(const Ice::ConnectionPtr&);
+ virtual Ice::Instrumentation::ObserverPtr getRemoteObserver(const Ice::ConnectionInfoPtr&, const Ice::EndpointPtr&);
+
+
+private:
+
+ friend class CommunicatorObserverI;
+ void initLogger(const Ice::LoggerPtr&);
+
+ const Ice::LoggerPtr _logger;
};
class CommunicatorObserverI : public Ice::Instrumentation::CommunicatorObserver,
@@ -51,15 +59,14 @@ public:
virtual void setObserverUpdater(const Ice::Instrumentation::ObserverUpdaterPtr&);
- virtual Ice::Instrumentation::ObserverPtr getConnectionEstablishmentObserver(const Ice::EndpointInfoPtr&,
+ virtual Ice::Instrumentation::ObserverPtr getConnectionEstablishmentObserver(const Ice::EndpointPtr&,
const std::string&);
- virtual Ice::Instrumentation::ObserverPtr getEndpointLookupObserver(const Ice::EndpointInfoPtr&,
- const std::string&);
+ virtual Ice::Instrumentation::ObserverPtr getEndpointLookupObserver(const Ice::EndpointPtr&);
virtual Ice::Instrumentation::ConnectionObserverPtr
getConnectionObserver(const Ice::ConnectionInfoPtr&,
- const Ice::EndpointInfoPtr&,
+ const Ice::EndpointPtr&,
Ice::Instrumentation::ConnectionState,
const Ice::Instrumentation::ConnectionObserverPtr&);
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp
index 23fc92af5df..565a954f691 100644
--- a/cpp/src/Ice/Outgoing.cpp
+++ b/cpp/src/Ice/Outgoing.cpp
@@ -547,19 +547,21 @@ IceInternal::Outgoing::throwUserException()
}
}
-IceInternal::BatchOutgoing::BatchOutgoing(RequestHandler* handler) :
+IceInternal::BatchOutgoing::BatchOutgoing(RequestHandler* handler, InvocationObserver& observer) :
_handler(handler),
_connection(0),
_sent(false),
- _os(handler->getReference()->getInstance().get(), Ice::currentProtocolEncoding)
+ _os(handler->getReference()->getInstance().get(), Ice::currentProtocolEncoding),
+ _observer(observer)
{
}
-IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance) :
+IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance, InvocationObserver& observer) :
_handler(0),
_connection(connection),
_sent(false),
- _os(instance, Ice::currentProtocolEncoding)
+ _os(instance, Ice::currentProtocolEncoding),
+ _observer(observer)
{
}
@@ -574,7 +576,7 @@ IceInternal::BatchOutgoing::invoke()
{
_monitor.wait();
}
-
+ _remoteObserver.detach();
if(_exception.get())
{
_exception->ice_throw();
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index 6f96ed33a8e..2224b7dee97 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -504,6 +504,7 @@ IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection)
{
if(!_proxy->ice_isTwoway())
{
+ _remoteObserver.detach();
if(!_callback || !_callback->__hasSentCallback())
{
_observer.detach();
@@ -538,6 +539,7 @@ IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(!(_state & Done));
+ _remoteObserver.detach();
if(_timerTaskConnection)
{
_instance->timer()->cancel(this);
@@ -576,6 +578,7 @@ IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc)
// calling on the callback. The LocalExceptionWrapper exception is only called
// before the invocation is sent.
//
+ _remoteObserver.detach();
try
{
@@ -605,6 +608,7 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(!_exception.get() && !(_state & Done));
+ _remoteObserver.detach();
if(_timerTaskConnection)
{
@@ -883,6 +887,7 @@ IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* connection)
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(!_exception.get());
_state |= Done | OK | Sent;
+ _remoteObserver.detach();
_monitor.notifyAll();
if(_callback && _callback->__hasSentCallback())
{
@@ -904,6 +909,7 @@ IceInternal::BatchOutgoingAsync::__sent()
void
IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc, bool)
{
+ _remoteObserver.detach();
__exception(exc);
}
@@ -954,6 +960,7 @@ IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const Co
BatchOutgoingAsync(communicator, instance, operation, delegate, cookie),
_connection(con)
{
+ _observer.attach(instance.get(), operation);
}
void
@@ -981,7 +988,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(cons
const string& operation,
const CallbackBasePtr& delegate,
const Ice::LocalObjectPtr& cookie) :
- BatchOutgoingAsync(communicator, instance, operation, delegate, cookie)
+ AsyncResult(communicator, instance, operation, delegate, cookie)
{
//
// _useCount is initialized to 1 to prevent premature callbacks.
@@ -994,93 +1001,98 @@ IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(cons
// Assume all connections are flushed synchronously.
//
_sentSynchronously = true;
+
+ //
+ // Attach observer
+ //
+ _observer.attach(instance.get(), operation);
}
void
-IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionPtr& con)
+IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPtr& con)
{
+ class BatchOutgoingAsyncI : public BatchOutgoingAsync
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- ++_useCount;
- }
- CallbackPtr cb = newCallback(this, &CommunicatorBatchOutgoingAsync::completed,
- &CommunicatorBatchOutgoingAsync::sent);
- con->begin_flushBatchRequests(cb);
-}
+ public:
-void
-IceInternal::CommunicatorBatchOutgoingAsync::ready()
-{
- check(0, 0, true);
-}
+ BatchOutgoingAsyncI(const CommunicatorBatchOutgoingAsyncPtr& outAsync) :
+ BatchOutgoingAsync(outAsync->_communicator, outAsync->_instance, outAsync->_operation, __dummyCallback, 0),
+ _outAsync(outAsync)
+ {
+ }
-void
-IceInternal::CommunicatorBatchOutgoingAsync::completed(const AsyncResultPtr& r)
-{
- ConnectionPtr con = r->getConnection();
- assert(con);
+ virtual bool __sent(Ice::ConnectionI*)
+ {
+ _remoteObserver.detach();
+ _outAsync->check(false);
+ return false;
+ }
+
+ virtual void __finished(const Ice::LocalException&, bool)
+ {
+ _remoteObserver.detach();
+ _outAsync->check(false);
+ }
+
+ virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt)
+ {
+ _remoteObserver.attach(_outAsync->_observer.getRemoteObserver(connection, endpt));
+ }
+
+ private:
+
+ const CommunicatorBatchOutgoingAsyncPtr _outAsync;
+ };
- try
{
- con->end_flushBatchRequests(r);
- assert(false); // completed() should only be called when an exception occurs.
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ ++_useCount;
}
- catch(const Ice::LocalException& ex)
+
+ AsyncStatus status = con->flushAsyncBatchRequests(new BatchOutgoingAsyncI(this));
+ if(!(status & AsyncStatusSent))
{
- check(r, &ex, false);
+ _sentSynchronously = false;
}
}
void
-IceInternal::CommunicatorBatchOutgoingAsync::sent(const AsyncResultPtr& r)
+IceInternal::CommunicatorBatchOutgoingAsync::ready()
{
- check(r, 0, r->sentSynchronously());
+ check(true);
}
void
-IceInternal::CommunicatorBatchOutgoingAsync::check(const AsyncResultPtr& r, const LocalException* ex, bool userThread)
+IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread)
{
- bool done = false;
-
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(_useCount > 0);
- --_useCount;
-
- //
- // We report that the communicator flush request was sent synchronously
- // if all of the connection flush requests are sent synchronously.
- //
- if((r && !r->sentSynchronously()) || ex)
- {
- _sentSynchronously = false;
- }
-
- if(_useCount == 0)
+ if(--_useCount > 0)
{
- done = true;
- _state |= Done | OK | Sent;
- _monitor.notifyAll();
+ return;
}
+
+ _observer.detach();
+ _state |= Done | OK | Sent;
+ _monitor.notifyAll();
}
- if(done)
+ //
+ // _sentSynchronously is immutable here.
+ //
+ if(!_sentSynchronously && userThread)
{
- //
- // _sentSynchronously is immutable here.
- //
- if(!_sentSynchronously && userThread)
- {
- __sentAsync();
- }
- else
- {
- assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible.
- BatchOutgoingAsync::__sent();
- }
+ __sentAsync();
+ }
+ else
+ {
+ assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible.
+ AsyncResult::__sent();
}
}
+
namespace
{
@@ -1152,3 +1164,4 @@ Ice::AMICallbackBase::__sent(bool sentSynchronously)
dynamic_cast<AMISentCallback*>(this)->ice_sent();
}
}
+
diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp
index 69da155dd9d..4760984eb48 100644
--- a/cpp/src/Ice/Proxy.cpp
+++ b/cpp/src/Ice/Proxy.cpp
@@ -1657,7 +1657,7 @@ IceDelegateM::Ice::Object::ice_invoke(const string& operation,
void
IceDelegateM::Ice::Object::ice_flushBatchRequests(InvocationObserver& observer)
{
- BatchOutgoing __og(__handler.get());
+ BatchOutgoing __og(__handler.get(), observer);
__og.invoke();
}
diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp
index 61fda069922..a5bc169f8c8 100644
--- a/cpp/src/Ice/TcpTransceiver.cpp
+++ b/cpp/src/Ice/TcpTransceiver.cpp
@@ -443,7 +443,6 @@ IceInternal::TcpTransceiver::toString() const
Ice::ConnectionInfoPtr
IceInternal::TcpTransceiver::getInfo() const
{
- assert(_fd != INVALID_SOCKET);
Ice::TCPConnectionInfoPtr info = new Ice::TCPConnectionInfo();
fdToAddressAndPort(_fd, info->localAddress, info->localPort, info->remoteAddress, info->remotePort);
return info;
diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp
index b1aa39ddd96..9ecfaa1fceb 100644
--- a/cpp/src/Ice/UdpTransceiver.cpp
+++ b/cpp/src/Ice/UdpTransceiver.cpp
@@ -784,7 +784,11 @@ IceInternal::UdpTransceiver::getInfo() const
return info;
}
#endif
- assert(_fd != INVALID_SOCKET);
+ if(_fd == INVALID_SOCKET)
+ {
+ return info;
+ }
+
if(_state == StateNotConnected)
{
Address localAddr;
diff --git a/cpp/src/IceSSL/TransceiverI.cpp b/cpp/src/IceSSL/TransceiverI.cpp
index 0c05c971b65..d2d41ac2e4c 100644
--- a/cpp/src/IceSSL/TransceiverI.cpp
+++ b/cpp/src/IceSSL/TransceiverI.cpp
@@ -890,8 +890,6 @@ IceSSL::TransceiverI::~TransceiverI()
NativeConnectionInfoPtr
IceSSL::TransceiverI::getNativeConnectionInfo() const
{
- assert(_fd != INVALID_SOCKET);
-
NativeConnectionInfoPtr info = new NativeConnectionInfo();
IceInternal::fdToAddressAndPort(_fd, info->localAddress, info->localPort, info->remoteAddress, info->remotePort);