summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2016-09-22 16:47:36 +0200
committerBenoit Foucher <benoit@zeroc.com>2016-09-22 16:47:36 +0200
commit7b5454f97092c7a3952d4981bb057f1bd7d61993 (patch)
treed26dadf7fa19ef8fb9ed92851c1cf5cc179fc908 /cpp/src
parentSlice/unicodePaths fails in Sles12 (diff)
downloadice-7b5454f97092c7a3952d4981bb057f1bd7d61993.tar.bz2
ice-7b5454f97092c7a3952d4981bb057f1bd7d61993.tar.xz
ice-7b5454f97092c7a3952d4981bb057f1bd7d61993.zip
Fixed ICE-7032 - support for limiting subscriber queue size
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceGrid/Internal.ice90
-rw-r--r--cpp/src/IceStorm/IceStormInternal.ice4
-rw-r--r--cpp/src/IceStorm/Instance.cpp32
-rw-r--r--cpp/src/IceStorm/Instance.h10
-rw-r--r--cpp/src/IceStorm/Service.cpp2
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp54
6 files changed, 127 insertions, 65 deletions
diff --git a/cpp/src/IceGrid/Internal.ice b/cpp/src/IceGrid/Internal.ice
index fa19b46e6f9..3dcb4a538c7 100644
--- a/cpp/src/IceGrid/Internal.ice
+++ b/cpp/src/IceGrid/Internal.ice
@@ -33,7 +33,7 @@ class InternalDbEnvDescriptor
{
/** The name of the database environment. */
string name;
-
+
/** The database properties. */
PropertyDescriptorSeq properties;
};
@@ -70,7 +70,7 @@ class InternalServerDescriptor
/** The application uuid. */
string uuid;
-
+
/** The application revision. */
int revision;
@@ -79,19 +79,19 @@ class InternalServerDescriptor
/** The server executable. */
string exe;
-
+
/** The server working directory. */
string pwd;
/** The user ID to use to run the server. */
string user;
-
+
/** The server activation mode. */
string activation;
/** The server activation timeout. */
string activationTimeout;
-
+
/** The server deactivation timeout. */
string deactivationTimeout;
@@ -103,10 +103,10 @@ class InternalServerDescriptor
/** Specifies if a process object is registered. */
bool processRegistered;
-
+
/** The server command line options. */
Ice::StringSeq options;
-
+
/** The server environment variables. */
Ice::StringSeq envs;
@@ -142,14 +142,14 @@ interface Adapter
{
/**
*
- * Activate this adapter. If this adapter can be activated, this
- * will activate the adapter and return the direct proxy of the
+ * Activate this adapter. If this adapter can be activated, this
+ * will activate the adapter and return the direct proxy of the
* adapter once it's active. If this adapter can be activated on
* demand, this will return 0 if the adapter is inactive or the
* adapter direct proxy it's active.
*
**/
- ["amd"] Object* activate();
+ ["amd"] Object* activate();
/**
*
@@ -208,7 +208,7 @@ interface FileReader
/**
*
* Read lines (or size bytes) at the specified position from the given file.
- *
+ *
**/
["cpp:const"] idempotent bool read(string filename, long pos, int size, out long newPos, out Ice::StringSeq lines)
throws FileNotAvailableException;
@@ -236,7 +236,7 @@ interface Server extends FileReader
**/
["amd"] void stop()
throws ServerStopException;
-
+
/**
*
* Check if the given server can be loaded on this node.
@@ -253,7 +253,7 @@ interface Server extends FileReader
*
* Enable or disable the server.
*
- **/
+ **/
void setEnabled(bool enable);
/**
@@ -268,9 +268,9 @@ interface Server extends FileReader
* Send signal to the server
*
**/
- void sendSignal(string signal)
+ void sendSignal(string signal)
throws BadSignalException;
-
+
/**
*
* Write message on servers' stdout or stderr.
@@ -316,12 +316,12 @@ interface ReplicaObserver
*
* Initialization of the replica observer.
*
- **/
+ **/
void replicaInit(InternalRegistryPrxSeq replicas);
/**
*
- * Notification that a replica has been added. The node should
+ * Notification that a replica has been added. The node should
* establish a session with this new replica.
*
**/
@@ -364,8 +364,8 @@ interface Node extends FileReader, ReplicaObserver
**/
["amd"] idempotent Server* loadServer(InternalServerDescriptor svr,
string replicaName,
- out AdapterPrxDict adapters,
- out int actTimeout,
+ out AdapterPrxDict adapters,
+ out int actTimeout,
out int deactTimeout)
throws DeploymentException;
@@ -380,8 +380,8 @@ interface Node extends FileReader, ReplicaObserver
**/
["amd"] idempotent Server* loadServerWithoutRestart(InternalServerDescriptor svr,
string replicaName,
- out AdapterPrxDict adapters,
- out int actTimeout,
+ out AdapterPrxDict adapters,
+ out int actTimeout,
out int deactTimeout)
throws DeploymentException;
@@ -407,12 +407,12 @@ interface Node extends FileReader, ReplicaObserver
* using a distribution directory to patch are active, this method
* will raise a PatchException unless shutdown is set to true. In
* which case the servers will be shutdown.
- *
+ *
**/
- ["amd"] idempotent void patch(PatcherFeedback* feedback,
- string application,
- string server,
- InternalDistributionDescriptor appDistrib,
+ ["amd"] idempotent void patch(PatcherFeedback* feedback,
+ string application,
+ string server,
+ InternalDistributionDescriptor appDistrib,
bool shutdown);
/**
@@ -420,7 +420,7 @@ interface Node extends FileReader, ReplicaObserver
* Establish a session to the given replica, this method only
* returns once the registration was attempted (unlike
* replicaAdded below).
- *
+ *
**/
void registerWithReplica(InternalRegistry* replica);
@@ -429,14 +429,14 @@ interface Node extends FileReader, ReplicaObserver
* Get the node name.
*
**/
- ["nonmutating", "cpp:const"] idempotent string getName();
+ ["nonmutating", "cpp:const"] idempotent string getName();
/**
*
* Get the node hostname.
*
**/
- ["nonmutating", "cpp:const"] idempotent string getHostname();
+ ["nonmutating", "cpp:const"] idempotent string getHostname();
/**
*
@@ -495,7 +495,7 @@ interface NodeSession
*
* Return the node session timeout.
*
- **/
+ **/
["nonmutating", "cpp:const"] idempotent int getTimeout();
/**
@@ -508,7 +508,7 @@ interface NodeSession
/**
*
* Ask the registry to load the servers on the node.
- *
+ *
**/
["amd", "nonmutating", "cpp:const"] idempotent void loadServers();
@@ -576,7 +576,7 @@ interface ReplicaSession
*
* Return the replica session timeout.
*
- **/
+ **/
["cpp:const"] idempotent int getTimeout();
/**
@@ -593,7 +593,7 @@ interface ReplicaSession
* This method sets the endpoints of the replica. This allows the
* master to create proxies with multiple endpoints for replicated
* objects (e.g.: IceGrid::Query object).
- *
+ *
**/
idempotent void setEndpoints(StringObjectProxyDict endpoints);
@@ -621,7 +621,7 @@ interface ReplicaSession
* before to continue.
*
**/
- void receivedUpdate(TopicName name, int serial, string failure);
+ void receivedUpdate(TopicName name, int serial, string failure);
/**
*
@@ -663,7 +663,7 @@ class InternalNodeInfo
/**
*
* The operation system release level (as defined in uname()).
- *
+ *
**/
string release;
@@ -679,16 +679,16 @@ class InternalNodeInfo
* The machine hardware type (as defined in uname()).
*
**/
- string machine;
+ string machine;
/**
*
- * The number of processor threads (e.g. 8 on
+ * The number of processor threads (e.g. 8 on
* system with 1 quad-core CPU, with 2 threads per core)
*
**/
int nProcessors;
-
+
/**
*
* The path to the node data directory.
@@ -731,11 +731,11 @@ interface InternalRegistry extends FileReader
* @param info Some information on the node.
*
* @param prx The proxy of the node.
- *
+ *
* @param loadInf The load information of the node.
- *
+ *
* @return The node session proxy.
- *
+ *
* @throws NodeActiveException Raised if the node is already
* registered and currently active.
*
@@ -752,9 +752,9 @@ interface InternalRegistry extends FileReader
* @param info Some information on the replica.
*
* @param prx The proxy of the replica.
- *
+ *
* @return The replica session proxy.
- *
+ *
* @throws ReplicaActiveException Raised if the replica is already
* registered and currently active.
*
@@ -766,7 +766,7 @@ interface InternalRegistry extends FileReader
*
* Create a session with the given registry replica. This method
* returns only once the session creation has been attempted.
- *
+ *
**/
void registerWithReplica(InternalRegistry* prx);
@@ -788,7 +788,7 @@ interface InternalRegistry extends FileReader
/**
*
* Return applications, adapters, objects from this replica.
- *
+ *
**/
["cpp:const"] idempotent ApplicationInfoSeq getApplications(out long serial);
["cpp:const"] idempotent AdapterInfoSeq getAdapters(out long serial);
diff --git a/cpp/src/IceStorm/IceStormInternal.ice b/cpp/src/IceStorm/IceStormInternal.ice
index 32b010fe87d..bafb51717fa 100644
--- a/cpp/src/IceStorm/IceStormInternal.ice
+++ b/cpp/src/IceStorm/IceStormInternal.ice
@@ -39,6 +39,10 @@ module IceStorm
Ice::Context context;
};
+local exception SendQueueSizeMaxReached
+{
+};
+
/** A sequence of EventData. */
["cpp:type:std::deque< ::IceStorm::EventDataPtr>"] sequence<EventData> EventDataSeq;
diff --git a/cpp/src/IceStorm/Instance.cpp b/cpp/src/IceStorm/Instance.cpp
index 0e95d765763..ccc17a952c8 100644
--- a/cpp/src/IceStorm/Instance.cpp
+++ b/cpp/src/IceStorm/Instance.cpp
@@ -17,6 +17,7 @@
#include <Ice/InstrumentationI.h>
#include <Ice/Communicator.h>
#include <Ice/Properties.h>
+#include <Ice/TraceUtil.h>
using namespace std;
using namespace IceStorm;
@@ -60,6 +61,8 @@ Instance::Instance(
name + ".Flush.Timeout", 1000))), // default one second.
// default one minute.
_sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.Timeout", 60 * 1000)),
+ _sendQueueSizeMax(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.QueueSizeMax", -1)),
+ _sendQueueSizeMaxPolicy(RemoveSubscriber),
_topicReaper(new TopicReaper())
{
try
@@ -84,6 +87,21 @@ Instance::Instance(
_batchFlusher = new IceUtil::Timer();
_timer = new IceUtil::Timer();
+ string policy = properties->getProperty(name + ".Send.QueueSizeMaxPolicy");
+ if(policy == "RemoveSubscriber")
+ {
+ const_cast<SendQueueSizeMaxPolicy&>(_sendQueueSizeMaxPolicy) = RemoveSubscriber;
+ }
+ else if(policy == "DropEvents")
+ {
+ const_cast<SendQueueSizeMaxPolicy&>(_sendQueueSizeMaxPolicy) = DropEvents;
+ }
+ else if(!policy.empty())
+ {
+ Ice::Warning warn(_traceLevels->logger);
+ warn << "invalid value `" << policy << "' for `" << name << ".Send.QueueSizeMaxPolicy'";
+ }
+
//
// If an Ice metrics observer is setup on the communicator, also
// enable metrics for IceStorm.
@@ -237,6 +255,18 @@ Instance::sendTimeout() const
return _sendTimeout;
}
+int
+Instance::sendQueueSizeMax() const
+{
+ return _sendQueueSizeMax;
+}
+
+Instance::SendQueueSizeMaxPolicy
+Instance::sendQueueSizeMaxPolicy() const
+{
+ return _sendQueueSizeMaxPolicy;
+}
+
void
Instance::shutdown()
{
@@ -270,7 +300,7 @@ Instance::destroy()
_node = 0;
//
// The observer instance must be cleared as it holds the
- // TopicManagerImpl which hodlds the instance causing a
+ // TopicManagerImpl which hodlds the instance causing a
// cyclic reference.
//
_observer = 0;
diff --git a/cpp/src/IceStorm/Instance.h b/cpp/src/IceStorm/Instance.h
index 9b4a31ba88e..5cc90314930 100644
--- a/cpp/src/IceStorm/Instance.h
+++ b/cpp/src/IceStorm/Instance.h
@@ -60,6 +60,12 @@ class Instance : public IceUtil::Shared
{
public:
+ enum SendQueueSizeMaxPolicy
+ {
+ RemoveSubscriber,
+ DropEvents
+ };
+
Instance(const std::string&, const std::string&, const Ice::CommunicatorPtr&, const Ice::ObjectAdapterPtr&,
const Ice::ObjectAdapterPtr&, const Ice::ObjectAdapterPtr& = 0, const IceStormElection::NodePrx& = 0);
~Instance();
@@ -87,6 +93,8 @@ public:
IceUtil::Time discardInterval() const;
IceUtil::Time flushInterval() const;
int sendTimeout() const;
+ int sendQueueSizeMax() const;
+ SendQueueSizeMaxPolicy sendQueueSizeMaxPolicy() const;
void shutdown();
void destroy();
@@ -104,6 +112,8 @@ private:
const IceUtil::Time _discardInterval;
const IceUtil::Time _flushInterval;
const int _sendTimeout;
+ const int _sendQueueSizeMax;
+ const SendQueueSizeMaxPolicy _sendQueueSizeMaxPolicy;
const Ice::ObjectPrx _topicReplicaProxy;
const Ice::ObjectPrx _publisherReplicaProxy;
const TopicReaperPtr _topicReaper;
diff --git a/cpp/src/IceStorm/Service.cpp b/cpp/src/IceStorm/Service.cpp
index e0df2635d96..40886aeb1d6 100644
--- a/cpp/src/IceStorm/Service.cpp
+++ b/cpp/src/IceStorm/Service.cpp
@@ -529,6 +529,8 @@ ServiceI::validateProperties(const string& name, const PropertiesPtr& properties
"Trace.Topic",
"Trace.TopicManager",
"Send.Timeout",
+ "Send.QueueSizeMax",
+ "Send.QueueSizeMaxPolicy",
"Discard.Interval",
"SQL.DatabaseType",
"SQL.EncodingVersion",
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index 4b53be621e1..258aeac4fb9 100644
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -227,7 +227,7 @@ void
SubscriberBatch::doFlush()
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
+
//
// If the subscriber isn't online we're done.
//
@@ -239,7 +239,7 @@ SubscriberBatch::doFlush()
EventDataSeq v;
v.swap(_events);
assert(!v.empty());
-
+
if(_observer)
{
_outstandingCount = static_cast<Ice::Int>(v.size());
@@ -255,7 +255,7 @@ SubscriberBatch::doFlush()
}
Ice::AsyncResultPtr result = _obj->begin_ice_flushBatchRequests(
- Ice::newCallback_Object_ice_flushBatchRequests(this,
+ Ice::newCallback_Object_ice_flushBatchRequests(this,
&SubscriberBatch::exception,
&SubscriberBatch::sent));
if(result->sentSynchronously())
@@ -278,7 +278,7 @@ SubscriberBatch::doFlush()
{
_lock.notify();
}
-
+
// This is significantly faster than the async version, but it can
// block the calling thread. Bad news!
@@ -294,7 +294,7 @@ SubscriberBatch::sent(bool sentSynchronously)
}
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
+
// Decrement the _outstanding count.
--_outstanding;
assert(_outstanding == 0);
@@ -334,7 +334,7 @@ void
SubscriberOneway::flush()
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
+
//
// If the subscriber isn't online we're done.
//
@@ -360,7 +360,7 @@ SubscriberOneway::flush()
try
{
Ice::AsyncResultPtr result = _obj->begin_ice_invoke(
- e->op, e->mode, e->data, e->context, Ice::newCallback_Object_ice_invoke(this,
+ e->op, e->mode, e->data, e->context, Ice::newCallback_Object_ice_invoke(this,
&SubscriberOneway::exception,
&SubscriberOneway::sent));
if(!result->sentSynchronously())
@@ -394,7 +394,7 @@ SubscriberOneway::sent(bool sentSynchronously)
}
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
+
// Decrement the _outstanding count.
--_outstanding;
assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
@@ -481,7 +481,7 @@ void
SubscriberLink::flush()
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
+
if(_state != SubscriberStateOnline || _outstanding > 0)
{
return;
@@ -673,23 +673,39 @@ Subscriber::queue(bool forwarded, const EventDataSeq& events)
{
break;
}
-
+
//
// State transition to online.
//
setState(SubscriberStateOnline);
// fall through
}
-
+
case SubscriberStateOnline:
- copy(events.begin(), events.end(), back_inserter(_events));
+ {
+ for(EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p)
+ {
+ if(_events.size() == _instance->sendQueueSizeMax())
+ {
+ if(_instance->sendQueueSizeMaxPolicy() == Instance::RemoveSubscriber)
+ {
+ error(false, IceStorm::SendQueueSizeMaxReached(__FILE__, __LINE__));
+ }
+ else // DropEvents
+ {
+ _events.pop_front();
+ }
+ }
+ _events.push_back(*p);
+ }
+
if(_observer)
{
_observer->queued(static_cast<Ice::Int>(events.size()));
}
flush();
break;
-
+ }
case SubscriberStateError:
return false;
@@ -834,7 +850,7 @@ Subscriber::error(bool dec, const Ice::Exception& e)
{
_events.clear();
setState(SubscriberStateError);
-
+
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->subscriber > 0)
{
@@ -871,13 +887,13 @@ Subscriber::completed(const Ice::AsyncResultPtr& result)
{
_observer->delivered(_outstandingCount);
}
-
+
//
// A successful response means we're no longer retrying, we're
// back active.
//
_currentRetry = 0;
-
+
if(_events.empty() && _outstanding == 0 && _shutdown)
{
_lock.notify();
@@ -893,7 +909,7 @@ Subscriber::completed(const Ice::AsyncResultPtr& result)
}
}
-
+
void
Subscriber::shutdown()
{
@@ -954,7 +970,7 @@ Subscriber::Subscriber(
rec.topicName,
rec.obj,
rec.theQoS,
- rec.theTopic,
+ rec.theTopic,
toSubscriberState(_state),
0));
}
@@ -992,7 +1008,7 @@ Subscriber::setState(Subscriber::SubscriberState state)
if(traceLevels->subscriber > 1)
{
Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << "endpoints: " << IceStormInternal::describeEndpoints(_rec.obj)
+ out << "endpoints: " << IceStormInternal::describeEndpoints(_rec.obj)
<< " transition from: " << stateToString(_state) << " to: " << stateToString(state);
}
_state = state;