summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-07-01 17:42:04 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-07-01 17:42:04 +0200
commit344a7fd6e0d716f81dc27495e97a7ad9c2ab07b8 (patch)
treebd06f4919e5a5827f60e2a536e43e47a4fbed6d2 /cpp/src
parentFixed ICE-5569: IceStorm IceMX debug iterator assert (diff)
downloadice-344a7fd6e0d716f81dc27495e97a7ad9c2ab07b8.tar.bz2
ice-344a7fd6e0d716f81dc27495e97a7ad9c2ab07b8.tar.xz
ice-344a7fd6e0d716f81dc27495e97a7ad9c2ab07b8.zip
IceMX and Python support for the new collocation optimization
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.cpp8
-rw-r--r--cpp/src/Ice/InstrumentationI.cpp38
-rw-r--r--cpp/src/Ice/InstrumentationI.h17
-rw-r--r--cpp/src/Ice/Outgoing.cpp18
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp26
5 files changed, 60 insertions, 47 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp
index 6db9bae4daf..896062a4d42 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.cpp
+++ b/cpp/src/Ice/CollocatedRequestHandler.cpp
@@ -358,7 +358,7 @@ CollocatedRequestHandler::invokeRequest(Outgoing* out)
}
}
- out->attachCollocatedObserver(requestId);
+ out->attachCollocatedObserver(_adapter, requestId);
if(_reference->getInvocationTimeout() > 0)
{
@@ -389,7 +389,7 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync)
}
}
- outAsync->__attachCollocatedObserver(requestId);
+ outAsync->__attachCollocatedObserver(_adapter, requestId);
_adapter->getThreadPool()->execute(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, requestId, 1, false));
return AsyncStatusQueued;
@@ -428,7 +428,7 @@ CollocatedRequestHandler::invokeBatchRequests(BatchOutgoing* out)
}
}
- out->attachCollocatedObserver(0);
+ out->attachCollocatedObserver(_adapter, 0);
if(invokeNum > 0)
{
@@ -480,7 +480,7 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync)
}
}
- outAsync->__attachCollocatedObserver(0);
+ outAsync->__attachCollocatedObserver(_adapter, 0);
if(invokeNum > 0)
{
diff --git a/cpp/src/Ice/InstrumentationI.cpp b/cpp/src/Ice/InstrumentationI.cpp
index 6ff70f8aafb..b0a06b1cf08 100644
--- a/cpp/src/Ice/InstrumentationI.cpp
+++ b/cpp/src/Ice/InstrumentationI.cpp
@@ -583,7 +583,7 @@ private:
RemoteInvocationHelper::Attributes RemoteInvocationHelper::attributes;
-class CollocatedInvocationHelper : public MetricsHelperT<RemoteMetrics>
+class CollocatedInvocationHelper : public MetricsHelperT<CollocatedMetrics>
{
public:
@@ -600,7 +600,8 @@ public:
};
static Attributes attributes;
- CollocatedInvocationHelper(int requestId, int size) : _requestId(requestId), _size(size)
+ CollocatedInvocationHelper(const Ice::ObjectAdapterPtr& adapter, int requestId, int size) :
+ _requestId(requestId), _size(size), _id(adapter->getName())
{
}
@@ -609,7 +610,7 @@ public:
return attributes(this, attribute);
}
- virtual void initMetrics(const RemoteMetricsPtr& v) const
+ virtual void initMetrics(const CollocatedMetricsPtr& v) const
{
v->size += _size;
}
@@ -617,12 +618,6 @@ public:
const string&
getId() const
{
- if(_id.empty())
- {
- ostringstream os;
- os << _requestId;
- _id = os.str();
- }
return _id;
}
@@ -824,6 +819,16 @@ RemoteObserverI::reply(Int size)
}
void
+CollocatedObserverI::reply(Int size)
+{
+ forEach(add(&CollocatedMetrics::replySize, size));
+ if(_delegate)
+ {
+ _delegate->reply(size);
+ }
+}
+
+void
InvocationObserverI::retried()
{
forEach(inc(&InvocationMetrics::retry));
@@ -866,19 +871,19 @@ InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection,
return 0;
}
-RemoteObserverPtr
-InvocationObserverI::getCollocatedObserver(int requestId, int size)
+CollocatedObserverPtr
+InvocationObserverI::getCollocatedObserver(const Ice::ObjectAdapterPtr& adapter, int requestId, int size)
{
try
{
- RemoteObserverPtr delegate;
+ CollocatedObserverPtr delegate;
if(_delegate)
{
- delegate = _delegate->getCollocatedObserver(requestId, size);
+ delegate = _delegate->getCollocatedObserver(adapter, requestId, size);
}
- return getObserverWithDelegate<RemoteObserverI>("Collocated",
- CollocatedInvocationHelper(requestId, size),
- delegate);
+ return getObserverWithDelegate<CollocatedObserverI>("Collocated",
+ CollocatedInvocationHelper(adapter, requestId, size),
+ delegate);
}
catch(const exception&)
{
@@ -899,6 +904,7 @@ CommunicatorObserverI::CommunicatorObserverI(const IceInternal::MetricsAdminIPtr
_endpointLookups(metrics, "EndpointLookup")
{
_invocations.registerSubMap<RemoteMetrics>("Remote", &InvocationMetrics::remotes);
+ _invocations.registerSubMap<CollocatedMetrics>("Collocated", &InvocationMetrics::collocated);
}
void
diff --git a/cpp/src/Ice/InstrumentationI.h b/cpp/src/Ice/InstrumentationI.h
index cdc31e8106c..db515999f23 100644
--- a/cpp/src/Ice/InstrumentationI.h
+++ b/cpp/src/Ice/InstrumentationI.h
@@ -184,6 +184,14 @@ public:
virtual void reply(Ice::Int);
};
+class CollocatedObserverI : public ObserverWithDelegateT<IceMX::CollocatedMetrics,
+ Ice::Instrumentation::CollocatedObserver>
+{
+public:
+
+ virtual void reply(Ice::Int);
+};
+
class InvocationObserverI : public ObserverWithDelegateT<IceMX::InvocationMetrics,
Ice::Instrumentation::InvocationObserver>
{
@@ -193,12 +201,11 @@ public:
virtual void userException();
- virtual Ice::Instrumentation::RemoteObserverPtr getRemoteObserver(const Ice::ConnectionInfoPtr&,
- const Ice::EndpointPtr&,
- Ice::Int,
- Ice::Int);
+ virtual Ice::Instrumentation::RemoteObserverPtr
+ getRemoteObserver(const Ice::ConnectionInfoPtr&, const Ice::EndpointPtr&, Ice::Int, Ice::Int);
- virtual Ice::Instrumentation::RemoteObserverPtr getCollocatedObserver(Ice::Int, Ice::Int);
+ virtual Ice::Instrumentation::CollocatedObserverPtr
+ getCollocatedObserver(const Ice::ObjectAdapterPtr&, Ice::Int, Ice::Int);
};
typedef ObserverWithDelegateT<IceMX::Metrics, Ice::Instrumentation::Observer> ObserverI;
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp
index 73778c9dafd..0e89e524a0a 100644
--- a/cpp/src/Ice/Outgoing.cpp
+++ b/cpp/src/Ice/Outgoing.cpp
@@ -279,7 +279,7 @@ IceInternal::Outgoing::sent()
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
if(_proxy->__reference()->getMode() != Reference::ModeTwoway)
{
- _remoteObserver.detach();
+ _childObserver.detach();
_state = StateOK;
}
_sent = true;
@@ -297,8 +297,8 @@ IceInternal::Outgoing::finished(const Exception& ex, bool sent)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(_state <= StateInProgress);
- _remoteObserver.failed(ex.ice_name());
- _remoteObserver.detach();
+ _childObserver.failed(ex.ice_name());
+ _childObserver.detach();
_state = StateFailed;
_exception.reset(ex.ice_clone());
@@ -314,11 +314,11 @@ IceInternal::Outgoing::finished(BasicStream& is)
assert(_proxy->__reference()->getMode() == Reference::ModeTwoway); // Can only be called for twoways.
assert(_state <= StateInProgress);
- if(_remoteObserver)
+ if(_childObserver)
{
- _remoteObserver->reply(static_cast<Int>(is.b.size() - headerSize - 4));
+ _childObserver->reply(static_cast<Int>(is.b.size() - headerSize - 4));
}
- _remoteObserver.detach();
+ _childObserver.detach();
_is.swap(is);
@@ -615,7 +615,7 @@ void
IceInternal::BatchOutgoing::sent()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- _remoteObserver.detach();
+ _childObserver.detach();
_sent = true;
_monitor.notify();
@@ -631,8 +631,8 @@ void
IceInternal::BatchOutgoing::finished(const Ice::Exception& ex, bool)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- _remoteObserver.failed(ex.ice_name());
- _remoteObserver.detach();
+ _childObserver.failed(ex.ice_name());
+ _childObserver.detach();
_exception.reset(ex.ice_clone());
_monitor.notify();
}
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index 5a600e8fce2..3fb2c8ed08b 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -527,7 +527,7 @@ IceInternal::OutgoingAsync::__sent()
assert(!(_state & Done));
if(_proxy->__reference()->getMode() != Reference::ModeTwoway)
{
- _remoteObserver.detach();
+ _childObserver.detach();
if(!_callback || !_callback->__hasSentCallback())
{
_observer.detach();
@@ -556,8 +556,8 @@ IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc, bool sent)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(!(_state & Done));
- _remoteObserver.failed(exc.ice_name());
- _remoteObserver.detach();
+ _childObserver.failed(exc.ice_name());
+ _childObserver.detach();
if(_timeoutRequestHandler)
{
_instance->timer()->cancel(this);
@@ -596,11 +596,11 @@ IceInternal::OutgoingAsync::__finished()
assert(!_exception.get() && !(_state & Done));
assert(!_is.b.empty());
- if(_remoteObserver)
+ if(_childObserver)
{
- _remoteObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4));
+ _childObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4));
}
- _remoteObserver.detach();
+ _childObserver.detach();
if(_timeoutRequestHandler)
{
@@ -854,7 +854,7 @@ IceInternal::BatchOutgoingAsync::__sent()
assert(!_exception.get());
_state |= Done | OK | Sent;
//_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization.
- _remoteObserver.detach();
+ _childObserver.detach();
if(_timeoutRequestHandler)
{
_instance->timer()->cancel(this);
@@ -878,8 +878,8 @@ IceInternal::BatchOutgoingAsync::__invokeSent()
void
IceInternal::BatchOutgoingAsync::__finished(const Ice::Exception& exc, bool)
{
- _remoteObserver.failed(exc.ice_name());
- _remoteObserver.detach();
+ _childObserver.failed(exc.ice_name());
+ _childObserver.detach();
if(_timeoutRequestHandler)
{
_instance->timer()->cancel(this);
@@ -1021,7 +1021,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt
virtual bool __sent()
{
- _remoteObserver.detach();
+ _childObserver.detach();
_outAsync->check(false);
return false;
}
@@ -1032,15 +1032,15 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt
virtual void __finished(const Ice::Exception& ex, bool)
{
- _remoteObserver.failed(ex.ice_name());
- _remoteObserver.detach();
+ _childObserver.failed(ex.ice_name());
+ _childObserver.detach();
_outAsync->check(false);
}
virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt,
Ice::Int requestId, Ice::Int sz)
{
- _remoteObserver.attach(_observer.getRemoteObserver(connection, endpt, requestId, sz));
+ _childObserver.attach(_observer.getRemoteObserver(connection, endpt, requestId, sz));
}
private: