diff options
author | Benoit Foucher <benoit@zeroc.com> | 2016-09-22 16:47:36 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2016-09-22 16:47:36 +0200 |
commit | 7b5454f97092c7a3952d4981bb057f1bd7d61993 (patch) | |
tree | d26dadf7fa19ef8fb9ed92851c1cf5cc179fc908 /cpp/src | |
parent | Slice/unicodePaths fails in Sles12 (diff) | |
download | ice-7b5454f97092c7a3952d4981bb057f1bd7d61993.tar.bz2 ice-7b5454f97092c7a3952d4981bb057f1bd7d61993.tar.xz ice-7b5454f97092c7a3952d4981bb057f1bd7d61993.zip |
Fixed ICE-7032 - support for limiting subscriber queue size
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceGrid/Internal.ice | 90 | ||||
-rw-r--r-- | cpp/src/IceStorm/IceStormInternal.ice | 4 | ||||
-rw-r--r-- | cpp/src/IceStorm/Instance.cpp | 32 | ||||
-rw-r--r-- | cpp/src/IceStorm/Instance.h | 10 | ||||
-rw-r--r-- | cpp/src/IceStorm/Service.cpp | 2 | ||||
-rw-r--r-- | cpp/src/IceStorm/Subscriber.cpp | 54 |
6 files changed, 127 insertions, 65 deletions
diff --git a/cpp/src/IceGrid/Internal.ice b/cpp/src/IceGrid/Internal.ice index fa19b46e6f9..3dcb4a538c7 100644 --- a/cpp/src/IceGrid/Internal.ice +++ b/cpp/src/IceGrid/Internal.ice @@ -33,7 +33,7 @@ class InternalDbEnvDescriptor { /** The name of the database environment. */ string name; - + /** The database properties. */ PropertyDescriptorSeq properties; }; @@ -70,7 +70,7 @@ class InternalServerDescriptor /** The application uuid. */ string uuid; - + /** The application revision. */ int revision; @@ -79,19 +79,19 @@ class InternalServerDescriptor /** The server executable. */ string exe; - + /** The server working directory. */ string pwd; /** The user ID to use to run the server. */ string user; - + /** The server activation mode. */ string activation; /** The server activation timeout. */ string activationTimeout; - + /** The server deactivation timeout. */ string deactivationTimeout; @@ -103,10 +103,10 @@ class InternalServerDescriptor /** Specifies if a process object is registered. */ bool processRegistered; - + /** The server command line options. */ Ice::StringSeq options; - + /** The server environment variables. */ Ice::StringSeq envs; @@ -142,14 +142,14 @@ interface Adapter { /** * - * Activate this adapter. If this adapter can be activated, this - * will activate the adapter and return the direct proxy of the + * Activate this adapter. If this adapter can be activated, this + * will activate the adapter and return the direct proxy of the * adapter once it's active. If this adapter can be activated on * demand, this will return 0 if the adapter is inactive or the * adapter direct proxy it's active. * **/ - ["amd"] Object* activate(); + ["amd"] Object* activate(); /** * @@ -208,7 +208,7 @@ interface FileReader /** * * Read lines (or size bytes) at the specified position from the given file. - * + * **/ ["cpp:const"] idempotent bool read(string filename, long pos, int size, out long newPos, out Ice::StringSeq lines) throws FileNotAvailableException; @@ -236,7 +236,7 @@ interface Server extends FileReader **/ ["amd"] void stop() throws ServerStopException; - + /** * * Check if the given server can be loaded on this node. @@ -253,7 +253,7 @@ interface Server extends FileReader * * Enable or disable the server. * - **/ + **/ void setEnabled(bool enable); /** @@ -268,9 +268,9 @@ interface Server extends FileReader * Send signal to the server * **/ - void sendSignal(string signal) + void sendSignal(string signal) throws BadSignalException; - + /** * * Write message on servers' stdout or stderr. @@ -316,12 +316,12 @@ interface ReplicaObserver * * Initialization of the replica observer. * - **/ + **/ void replicaInit(InternalRegistryPrxSeq replicas); /** * - * Notification that a replica has been added. The node should + * Notification that a replica has been added. The node should * establish a session with this new replica. * **/ @@ -364,8 +364,8 @@ interface Node extends FileReader, ReplicaObserver **/ ["amd"] idempotent Server* loadServer(InternalServerDescriptor svr, string replicaName, - out AdapterPrxDict adapters, - out int actTimeout, + out AdapterPrxDict adapters, + out int actTimeout, out int deactTimeout) throws DeploymentException; @@ -380,8 +380,8 @@ interface Node extends FileReader, ReplicaObserver **/ ["amd"] idempotent Server* loadServerWithoutRestart(InternalServerDescriptor svr, string replicaName, - out AdapterPrxDict adapters, - out int actTimeout, + out AdapterPrxDict adapters, + out int actTimeout, out int deactTimeout) throws DeploymentException; @@ -407,12 +407,12 @@ interface Node extends FileReader, ReplicaObserver * using a distribution directory to patch are active, this method * will raise a PatchException unless shutdown is set to true. In * which case the servers will be shutdown. - * + * **/ - ["amd"] idempotent void patch(PatcherFeedback* feedback, - string application, - string server, - InternalDistributionDescriptor appDistrib, + ["amd"] idempotent void patch(PatcherFeedback* feedback, + string application, + string server, + InternalDistributionDescriptor appDistrib, bool shutdown); /** @@ -420,7 +420,7 @@ interface Node extends FileReader, ReplicaObserver * Establish a session to the given replica, this method only * returns once the registration was attempted (unlike * replicaAdded below). - * + * **/ void registerWithReplica(InternalRegistry* replica); @@ -429,14 +429,14 @@ interface Node extends FileReader, ReplicaObserver * Get the node name. * **/ - ["nonmutating", "cpp:const"] idempotent string getName(); + ["nonmutating", "cpp:const"] idempotent string getName(); /** * * Get the node hostname. * **/ - ["nonmutating", "cpp:const"] idempotent string getHostname(); + ["nonmutating", "cpp:const"] idempotent string getHostname(); /** * @@ -495,7 +495,7 @@ interface NodeSession * * Return the node session timeout. * - **/ + **/ ["nonmutating", "cpp:const"] idempotent int getTimeout(); /** @@ -508,7 +508,7 @@ interface NodeSession /** * * Ask the registry to load the servers on the node. - * + * **/ ["amd", "nonmutating", "cpp:const"] idempotent void loadServers(); @@ -576,7 +576,7 @@ interface ReplicaSession * * Return the replica session timeout. * - **/ + **/ ["cpp:const"] idempotent int getTimeout(); /** @@ -593,7 +593,7 @@ interface ReplicaSession * This method sets the endpoints of the replica. This allows the * master to create proxies with multiple endpoints for replicated * objects (e.g.: IceGrid::Query object). - * + * **/ idempotent void setEndpoints(StringObjectProxyDict endpoints); @@ -621,7 +621,7 @@ interface ReplicaSession * before to continue. * **/ - void receivedUpdate(TopicName name, int serial, string failure); + void receivedUpdate(TopicName name, int serial, string failure); /** * @@ -663,7 +663,7 @@ class InternalNodeInfo /** * * The operation system release level (as defined in uname()). - * + * **/ string release; @@ -679,16 +679,16 @@ class InternalNodeInfo * The machine hardware type (as defined in uname()). * **/ - string machine; + string machine; /** * - * The number of processor threads (e.g. 8 on + * The number of processor threads (e.g. 8 on * system with 1 quad-core CPU, with 2 threads per core) * **/ int nProcessors; - + /** * * The path to the node data directory. @@ -731,11 +731,11 @@ interface InternalRegistry extends FileReader * @param info Some information on the node. * * @param prx The proxy of the node. - * + * * @param loadInf The load information of the node. - * + * * @return The node session proxy. - * + * * @throws NodeActiveException Raised if the node is already * registered and currently active. * @@ -752,9 +752,9 @@ interface InternalRegistry extends FileReader * @param info Some information on the replica. * * @param prx The proxy of the replica. - * + * * @return The replica session proxy. - * + * * @throws ReplicaActiveException Raised if the replica is already * registered and currently active. * @@ -766,7 +766,7 @@ interface InternalRegistry extends FileReader * * Create a session with the given registry replica. This method * returns only once the session creation has been attempted. - * + * **/ void registerWithReplica(InternalRegistry* prx); @@ -788,7 +788,7 @@ interface InternalRegistry extends FileReader /** * * Return applications, adapters, objects from this replica. - * + * **/ ["cpp:const"] idempotent ApplicationInfoSeq getApplications(out long serial); ["cpp:const"] idempotent AdapterInfoSeq getAdapters(out long serial); diff --git a/cpp/src/IceStorm/IceStormInternal.ice b/cpp/src/IceStorm/IceStormInternal.ice index 32b010fe87d..bafb51717fa 100644 --- a/cpp/src/IceStorm/IceStormInternal.ice +++ b/cpp/src/IceStorm/IceStormInternal.ice @@ -39,6 +39,10 @@ module IceStorm Ice::Context context; }; +local exception SendQueueSizeMaxReached +{ +}; + /** A sequence of EventData. */ ["cpp:type:std::deque< ::IceStorm::EventDataPtr>"] sequence<EventData> EventDataSeq; diff --git a/cpp/src/IceStorm/Instance.cpp b/cpp/src/IceStorm/Instance.cpp index 0e95d765763..ccc17a952c8 100644 --- a/cpp/src/IceStorm/Instance.cpp +++ b/cpp/src/IceStorm/Instance.cpp @@ -17,6 +17,7 @@ #include <Ice/InstrumentationI.h> #include <Ice/Communicator.h> #include <Ice/Properties.h> +#include <Ice/TraceUtil.h> using namespace std; using namespace IceStorm; @@ -60,6 +61,8 @@ Instance::Instance( name + ".Flush.Timeout", 1000))), // default one second. // default one minute. _sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.Timeout", 60 * 1000)), + _sendQueueSizeMax(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.QueueSizeMax", -1)), + _sendQueueSizeMaxPolicy(RemoveSubscriber), _topicReaper(new TopicReaper()) { try @@ -84,6 +87,21 @@ Instance::Instance( _batchFlusher = new IceUtil::Timer(); _timer = new IceUtil::Timer(); + string policy = properties->getProperty(name + ".Send.QueueSizeMaxPolicy"); + if(policy == "RemoveSubscriber") + { + const_cast<SendQueueSizeMaxPolicy&>(_sendQueueSizeMaxPolicy) = RemoveSubscriber; + } + else if(policy == "DropEvents") + { + const_cast<SendQueueSizeMaxPolicy&>(_sendQueueSizeMaxPolicy) = DropEvents; + } + else if(!policy.empty()) + { + Ice::Warning warn(_traceLevels->logger); + warn << "invalid value `" << policy << "' for `" << name << ".Send.QueueSizeMaxPolicy'"; + } + // // If an Ice metrics observer is setup on the communicator, also // enable metrics for IceStorm. @@ -237,6 +255,18 @@ Instance::sendTimeout() const return _sendTimeout; } +int +Instance::sendQueueSizeMax() const +{ + return _sendQueueSizeMax; +} + +Instance::SendQueueSizeMaxPolicy +Instance::sendQueueSizeMaxPolicy() const +{ + return _sendQueueSizeMaxPolicy; +} + void Instance::shutdown() { @@ -270,7 +300,7 @@ Instance::destroy() _node = 0; // // The observer instance must be cleared as it holds the - // TopicManagerImpl which hodlds the instance causing a + // TopicManagerImpl which hodlds the instance causing a // cyclic reference. // _observer = 0; diff --git a/cpp/src/IceStorm/Instance.h b/cpp/src/IceStorm/Instance.h index 9b4a31ba88e..5cc90314930 100644 --- a/cpp/src/IceStorm/Instance.h +++ b/cpp/src/IceStorm/Instance.h @@ -60,6 +60,12 @@ class Instance : public IceUtil::Shared { public: + enum SendQueueSizeMaxPolicy + { + RemoveSubscriber, + DropEvents + }; + Instance(const std::string&, const std::string&, const Ice::CommunicatorPtr&, const Ice::ObjectAdapterPtr&, const Ice::ObjectAdapterPtr&, const Ice::ObjectAdapterPtr& = 0, const IceStormElection::NodePrx& = 0); ~Instance(); @@ -87,6 +93,8 @@ public: IceUtil::Time discardInterval() const; IceUtil::Time flushInterval() const; int sendTimeout() const; + int sendQueueSizeMax() const; + SendQueueSizeMaxPolicy sendQueueSizeMaxPolicy() const; void shutdown(); void destroy(); @@ -104,6 +112,8 @@ private: const IceUtil::Time _discardInterval; const IceUtil::Time _flushInterval; const int _sendTimeout; + const int _sendQueueSizeMax; + const SendQueueSizeMaxPolicy _sendQueueSizeMaxPolicy; const Ice::ObjectPrx _topicReplicaProxy; const Ice::ObjectPrx _publisherReplicaProxy; const TopicReaperPtr _topicReaper; diff --git a/cpp/src/IceStorm/Service.cpp b/cpp/src/IceStorm/Service.cpp index e0df2635d96..40886aeb1d6 100644 --- a/cpp/src/IceStorm/Service.cpp +++ b/cpp/src/IceStorm/Service.cpp @@ -529,6 +529,8 @@ ServiceI::validateProperties(const string& name, const PropertiesPtr& properties "Trace.Topic", "Trace.TopicManager", "Send.Timeout", + "Send.QueueSizeMax", + "Send.QueueSizeMaxPolicy", "Discard.Interval", "SQL.DatabaseType", "SQL.EncodingVersion", diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index 4b53be621e1..258aeac4fb9 100644 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -227,7 +227,7 @@ void SubscriberBatch::doFlush() { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - + // // If the subscriber isn't online we're done. // @@ -239,7 +239,7 @@ SubscriberBatch::doFlush() EventDataSeq v; v.swap(_events); assert(!v.empty()); - + if(_observer) { _outstandingCount = static_cast<Ice::Int>(v.size()); @@ -255,7 +255,7 @@ SubscriberBatch::doFlush() } Ice::AsyncResultPtr result = _obj->begin_ice_flushBatchRequests( - Ice::newCallback_Object_ice_flushBatchRequests(this, + Ice::newCallback_Object_ice_flushBatchRequests(this, &SubscriberBatch::exception, &SubscriberBatch::sent)); if(result->sentSynchronously()) @@ -278,7 +278,7 @@ SubscriberBatch::doFlush() { _lock.notify(); } - + // This is significantly faster than the async version, but it can // block the calling thread. Bad news! @@ -294,7 +294,7 @@ SubscriberBatch::sent(bool sentSynchronously) } IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - + // Decrement the _outstanding count. --_outstanding; assert(_outstanding == 0); @@ -334,7 +334,7 @@ void SubscriberOneway::flush() { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - + // // If the subscriber isn't online we're done. // @@ -360,7 +360,7 @@ SubscriberOneway::flush() try { Ice::AsyncResultPtr result = _obj->begin_ice_invoke( - e->op, e->mode, e->data, e->context, Ice::newCallback_Object_ice_invoke(this, + e->op, e->mode, e->data, e->context, Ice::newCallback_Object_ice_invoke(this, &SubscriberOneway::exception, &SubscriberOneway::sent)); if(!result->sentSynchronously()) @@ -394,7 +394,7 @@ SubscriberOneway::sent(bool sentSynchronously) } IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - + // Decrement the _outstanding count. --_outstanding; assert(_outstanding >= 0 && _outstanding < _maxOutstanding); @@ -481,7 +481,7 @@ void SubscriberLink::flush() { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - + if(_state != SubscriberStateOnline || _outstanding > 0) { return; @@ -673,23 +673,39 @@ Subscriber::queue(bool forwarded, const EventDataSeq& events) { break; } - + // // State transition to online. // setState(SubscriberStateOnline); // fall through } - + case SubscriberStateOnline: - copy(events.begin(), events.end(), back_inserter(_events)); + { + for(EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p) + { + if(_events.size() == _instance->sendQueueSizeMax()) + { + if(_instance->sendQueueSizeMaxPolicy() == Instance::RemoveSubscriber) + { + error(false, IceStorm::SendQueueSizeMaxReached(__FILE__, __LINE__)); + } + else // DropEvents + { + _events.pop_front(); + } + } + _events.push_back(*p); + } + if(_observer) { _observer->queued(static_cast<Ice::Int>(events.size())); } flush(); break; - + } case SubscriberStateError: return false; @@ -834,7 +850,7 @@ Subscriber::error(bool dec, const Ice::Exception& e) { _events.clear(); setState(SubscriberStateError); - + TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->subscriber > 0) { @@ -871,13 +887,13 @@ Subscriber::completed(const Ice::AsyncResultPtr& result) { _observer->delivered(_outstandingCount); } - + // // A successful response means we're no longer retrying, we're // back active. // _currentRetry = 0; - + if(_events.empty() && _outstanding == 0 && _shutdown) { _lock.notify(); @@ -893,7 +909,7 @@ Subscriber::completed(const Ice::AsyncResultPtr& result) } } - + void Subscriber::shutdown() { @@ -954,7 +970,7 @@ Subscriber::Subscriber( rec.topicName, rec.obj, rec.theQoS, - rec.theTopic, + rec.theTopic, toSubscriberState(_state), 0)); } @@ -992,7 +1008,7 @@ Subscriber::setState(Subscriber::SubscriberState state) if(traceLevels->subscriber > 1) { Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << "endpoints: " << IceStormInternal::describeEndpoints(_rec.obj) + out << "endpoints: " << IceStormInternal::describeEndpoints(_rec.obj) << " transition from: " << stateToString(_state) << " to: " << stateToString(state); } _state = state; |