summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2006-11-09 09:01:11 +0000
committerMatthew Newhook <matthew@zeroc.com>2006-11-09 09:01:11 +0000
commit382aef369900b75eeaf8e3ec4e0a27cb36c20355 (patch)
tree90debb0cfcf3473360e6b4e57aaa03bd3607d74a
parentcosmetic change (diff)
downloadice-382aef369900b75eeaf8e3ec4e0a27cb36c20355.tar.bz2
ice-382aef369900b75eeaf8e3ec4e0a27cb36c20355.tar.xz
ice-382aef369900b75eeaf8e3ec4e0a27cb36c20355.zip
Fixes as suggested by Bernard (namespace { }, PublisherProxyI ->
PublisherI, etc. New SubscriberPool reduction algorithm. gcc compiler bug fix.
-rw-r--r--cpp/src/IceStorm/.depend4
-rw-r--r--cpp/src/IceStorm/SubscriberPool.cpp242
-rw-r--r--cpp/src/IceStorm/SubscriberPool.h36
-rw-r--r--cpp/src/IceStorm/Subscribers.cpp153
-rw-r--r--cpp/src/IceStorm/Subscribers.h14
-rw-r--r--cpp/src/IceStorm/TopicI.cpp84
-rw-r--r--cpp/src/IceStorm/TopicI.h6
7 files changed, 349 insertions, 190 deletions
diff --git a/cpp/src/IceStorm/.depend b/cpp/src/IceStorm/.depend
index f5a313eb806..988ddcecfc9 100644
--- a/cpp/src/IceStorm/.depend
+++ b/cpp/src/IceStorm/.depend
@@ -1,9 +1,9 @@
IceStorm$(OBJEXT): IceStorm.cpp ../../include/IceStorm/IceStorm.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/CommunicatorF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/SliceChecksumDict.h ../../include/Ice/LocalException.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/ObjectFactory.h ../../include/Ice/SliceChecksums.h ../../include/IceUtil/Iterator.h ../../include/IceUtil/ScopedArray.h ../../include/IceUtil/DisableWarnings.h
IceStorm$(OBJEXT): IceStorm.cpp ../../include/IceStorm/IceStorm.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/CommunicatorF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/SliceChecksumDict.h ../../include/Ice/LocalException.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/ObjectFactory.h ../../include/Ice/SliceChecksums.h ../../include/IceUtil/Iterator.h ../../include/IceUtil/ScopedArray.h ../../include/IceUtil/DisableWarnings.h
-Instance$(OBJEXT): Instance.cpp ../IceStorm/Instance.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/PropertiesF.h ../IceStorm/BatchFlusher.h ../../include/IceUtil/Thread.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../IceStorm/TraceLevels.h ../../include/Ice/LoggerF.h ../IceStorm/SubscriberPool.h ../IceStorm/KeepAliveThread.h ../IceStorm/LinkRecord.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../IceStorm/IceStormInternal.h ../../include/IceStorm/IceStorm.h ../../include/Ice/SliceChecksumDict.h ../IceStorm/Event.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/LocalException.h
+Instance$(OBJEXT): Instance.cpp ../IceStorm/Instance.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/PropertiesF.h ../IceStorm/BatchFlusher.h ../../include/IceUtil/Thread.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../IceStorm/TraceLevels.h ../../include/Ice/LoggerF.h ../IceStorm/SubscriberPool.h ../../include/Ice/Identity.h ../IceStorm/KeepAliveThread.h ../IceStorm/LinkRecord.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/StreamF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../IceStorm/IceStormInternal.h ../../include/IceStorm/IceStorm.h ../../include/Ice/SliceChecksumDict.h ../IceStorm/Event.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/LocalException.h
TraceLevels$(OBJEXT): TraceLevels.cpp ../IceStorm/TraceLevels.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Config.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Handle.h ../../include/Ice/PropertiesF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/LoggerF.h ../../include/Ice/Properties.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/CommunicatorF.h ../../include/Ice/BuiltinSequences.h
BatchFlusher$(OBJEXT): BatchFlusher.cpp ../IceStorm/BatchFlusher.h ../../include/IceUtil/Thread.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Config.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/ProxyF.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/Handle.h ../IceStorm/Instance.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/PropertiesF.h ../IceStorm/TraceLevels.h ../../include/Ice/LoggerF.h ../../include/Ice/Communicator.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/StatsF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/LocalException.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Properties.h ../../include/Ice/LoggerUtil.h ../../include/Ice/Connection.h
-SubscriberPool$(OBJEXT): SubscriberPool.cpp ../IceStorm/SubscriberPool.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Config.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/IceUtil/Thread.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Handle.h ../IceStorm/Instance.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/PropertiesF.h ../IceStorm/TraceLevels.h ../../include/Ice/LoggerF.h ../IceStorm/Subscribers.h ../IceStorm/IceStormInternal.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../../include/IceStorm/IceStorm.h ../../include/Ice/SliceChecksumDict.h ../IceStorm/Event.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/LoggerUtil.h
+SubscriberPool$(OBJEXT): SubscriberPool.cpp ../IceStorm/SubscriberPool.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Config.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/IceUtil/Thread.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Handle.h ../../include/Ice/Identity.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/Ice/UndefSysMacros.h ../IceStorm/Instance.h ../../include/Ice/CommunicatorF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/PropertiesF.h ../IceStorm/TraceLevels.h ../../include/Ice/LoggerF.h ../IceStorm/Subscribers.h ../IceStorm/IceStormInternal.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/StreamF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../../include/IceStorm/IceStorm.h ../../include/Ice/SliceChecksumDict.h ../IceStorm/Event.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/LoggerUtil.h
KeepAliveThread$(OBJEXT): KeepAliveThread.cpp ../IceStorm/KeepAliveThread.h ../IceStorm/LinkRecord.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/CommunicatorF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../IceStorm/IceStormInternal.h ../../include/IceStorm/IceStorm.h ../../include/Ice/SliceChecksumDict.h ../IceStorm/Event.h ../../include/Ice/BuiltinSequences.h ../../include/IceUtil/Thread.h ../IceStorm/Instance.h ../../include/Ice/PropertiesF.h ../IceStorm/TraceLevels.h ../../include/Ice/LoggerF.h ../../include/Ice/LocalException.h ../../include/Ice/LoggerUtil.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/Properties.h
Subscribers$(OBJEXT): Subscribers.cpp ../IceStorm/Subscribers.h ../IceStorm/IceStormInternal.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/CommunicatorF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../../include/IceStorm/IceStorm.h ../../include/Ice/SliceChecksumDict.h ../IceStorm/Event.h ../../include/Ice/BuiltinSequences.h ../IceStorm/Instance.h ../../include/Ice/PropertiesF.h ../IceStorm/TraceLevels.h ../../include/Ice/LoggerF.h ../IceStorm/BatchFlusher.h ../../include/IceUtil/Thread.h ../IceStorm/SubscriberPool.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/IncomingAsync.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/FacetMap.h ../../include/Ice/Locator.h ../../include/Ice/ProcessF.h ../../include/Ice/LoggerUtil.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/LocalException.h ../../include/Ice/Connection.h
TopicI$(OBJEXT): TopicI.cpp ../IceStorm/TopicI.h ../../include/IceUtil/RecMutex.h ../../include/IceUtil/Config.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/IceUtil/Exception.h ../IceStorm/IceStormInternal.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Mutex.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/CommunicatorF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../../include/IceStorm/IceStorm.h ../../include/Ice/SliceChecksumDict.h ../IceStorm/Event.h ../../include/Ice/BuiltinSequences.h ../IceStorm/PersistentTopicMap.h ../../include/Freeze/Map.h ../../include/Ice/Ice.h ../../include/Ice/GC.h ../../include/IceUtil/Thread.h ../../include/Ice/Initialize.h ../../include/Ice/PropertiesF.h ../../include/Ice/LoggerF.h ../../include/Ice/StatsF.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/LoggerUtil.h ../../include/Ice/Stats.h ../../include/Ice/Communicator.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/ObjectFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/IncomingAsync.h ../../include/Ice/FacetMap.h ../../include/Ice/Locator.h ../../include/Ice/ProcessF.h ../../include/Ice/ServantLocator.h ../../include/Ice/Process.h ../../include/Ice/Application.h ../../include/Ice/Connection.h ../../include/Ice/Functional.h ../../include/IceUtil/Functional.h ../../include/Ice/Stream.h ../../include/Freeze/DB.h ../../include/Freeze/Exception.h ../../include/Freeze/Connection.h ../../include/Freeze/Transaction.h ../IceStorm/LinkRecord.h ../IceStorm/PersistentUpstreamMap.h ../IceStorm/Instance.h ../IceStorm/Subscribers.h ../IceStorm/TraceLevels.h ../IceStorm/KeepAliveThread.h ../IceStorm/SubscriberPool.h ../../include/Freeze/Initialize.h ../../include/Freeze/EvictorF.h ../../include/Freeze/ConnectionF.h ../../include/Freeze/Index.h
diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp
index 6f42665af37..7943d3e1129 100644
--- a/cpp/src/IceStorm/SubscriberPool.cpp
+++ b/cpp/src/IceStorm/SubscriberPool.cpp
@@ -19,7 +19,7 @@
using namespace IceStorm;
using namespace std;
-namespace IceStorm
+namespace
{
class SubscriberPoolWorker : public IceUtil::Thread
@@ -39,22 +39,33 @@ public:
virtual void
run()
{
+ IceUtil::Time interval = IceUtil::Time::seconds(24 * 60); // A long time.
SubscriberPtr sub;
+ bool requeue = false;
+ bool computeInterval = false;
while(true)
{
- sub = _manager->dequeue(sub);
+ sub = _manager->dequeue(sub, requeue, interval, computeInterval);
if(!sub)
{
return;
}
+
//
- // If SubscriberPool returns true then the subscriber needs to be
- // SubscriberPooled again, so therefore we will re-enqueue the
- // subscriber in the call to dequeue.
+ // If SubscriberPool returns true then the subscriber
+ // needs to be SubscriberPooled again, so therefore we
+ // will re-enqueue the subscriber in the call to dequeue.
//
- if(!sub->flush())
+ if(computeInterval)
+ {
+ IceUtil::Time start = IceUtil::Time::now();
+ requeue = sub->flush();
+ interval = IceUtil::Time::now() - start;
+ }
+ else
{
- sub = 0;
+ requeue = sub->flush();
+ interval = IceUtil::Time::seconds(24 * 60); // A long time.
}
}
}
@@ -144,25 +155,23 @@ SubscriberPoolMonitor::destroy()
SubscriberPool::SubscriberPool(const InstancePtr& instance) :
_instance(instance),
_sizeMax(instance->properties()->getPropertyAsIntWithDefault(
- "IceStorm.SubscriberPool.SizeMax", -1)),
+ "IceStorm.SubscriberPool.SizeMax", 0)),
_sizeWarn(instance->properties()->getPropertyAsIntWithDefault(
"IceStorm.SubscriberPool.SizeWarn", 0)),
_size(instance->properties()->getPropertyAsIntWithDefault(
"IceStorm.SubscriberPool.Size", 1)),
_timeout(IceUtil::Time::milliSeconds(max(instance->properties()->getPropertyAsIntWithDefault(
"IceStorm.SubscriberPool.Timeout", 250), 50))), // minimum 50ms.
+ _stallCheck(_timeout * 10), // 10 * the stall timeout.
_destroy(false),
- _inUse(0),
- _running(0),
- _load(1.0)
+ _reap(0)
{
try
{
__setNoDelete(true);
_subscriberPoolMonitor = new SubscriberPoolMonitor(this, _timeout);
- for(int i = 0; i < _size; ++i)
+ for(unsigned int i = 0; i < _size; ++i)
{
- ++_running;
++_inUse;
_workers.push_back(new SubscriberPoolWorker(this));
}
@@ -186,9 +195,9 @@ SubscriberPool::~SubscriberPool()
}
void
-SubscriberPool::add(list<SubscriberPtr>& subscribers)
+SubscriberPool::flush(list<SubscriberPtr>& subscribers)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ Lock sync(*this);
//
// Splice on the new set of subscribers to SubscriberPool.
//
@@ -196,76 +205,150 @@ SubscriberPool::add(list<SubscriberPtr>& subscribers)
notifyAll();
}
+void
+SubscriberPool::add(const SubscriberPtr& subscriber)
+{
+ Lock sync(*this);
+ _subscribers.push_back(subscriber);
+}
+
+void
+SubscriberPool::remove(const SubscriberPtr& subscriber)
+{
+ Lock sync(*this);
+ //
+ // Note that this cannot remove based on the subscriber id because
+ // the pool is TopicManager scoped and not topic scoped therefore
+ // its quite possible to have two subscribers with the same id in
+ // the list.
+ //
+ list<SubscriberPtr>::iterator p = _subscribers.begin();
+ while(p != _subscribers.end())
+ {
+ if((*p).get() == subscriber.get())
+ {
+ _subscribers.erase(p);
+ return;
+ }
+ ++p;
+ }
+
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ Ice::Error err(traceLevels->logger);
+ err << "SubscriberPool: subscriber not found: " << _instance->communicator()->identityToString(subscriber->id());
+}
+
//
// The passed subscriber need to be enqueued again.
//
SubscriberPtr
-SubscriberPool::dequeue(const SubscriberPtr& sub)
+SubscriberPool::dequeue(const SubscriberPtr& sub, bool requeue, const IceUtil::Time& interval, bool& computeInterval)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ Lock sync(*this);
if(sub)
{
- _pending.push_back(sub);
+ if(requeue)
+ {
+ _pending.push_back(sub);
+ }
+ sub->flushTime(interval);
}
//
- // Now we check if this thread can be destroyed, based on a
- // load factor.
+ // The worker is no longer in use.
//
- // The load factor jumps immediately to the number of threads
- // that are currently in use, but decays exponentially if the
- // number of threads in use is smaller than the load
- // factor. This reflects that we create threads immediately
- // when they are needed, but want the number of threads to
- // slowly decline to the configured minimum.
- //
- double inUse = static_cast<double>(_inUse);
- if(_load < inUse)
- {
- _load = inUse;
- }
- else
- {
- const double loadFactor = 0.05; // TODO: Configurable?
- const double oneMinusLoadFactor = 1 - loadFactor;
- _load = _load * oneMinusLoadFactor + inUse * loadFactor;
- }
+ --_inUse;
- if(_running > _size)
+ //
+ // If _sizeMax is 1 we never spawn up new threads if a stall is
+ // detected.
+ //
+ if(_sizeMax != 1)
{
TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->subscriberPool > 1)
- {
- IceUtil::Time interval = IceUtil::Time::now() - _lastNext;
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
- out << "load check: " << _load;
- }
- int load = static_cast<int>(_load + 0.5);
-
- if(load < _running)
+ //
+ // Reap dead workers, if necessary.
+ //
+ if(_reap > 0)
{
- assert(_inUse > 0);
- --_inUse;
-
- assert(_running > 0);
- --_running;
-
if(traceLevels->subscriberPool > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
- out << "reducing SubscriberPool threads: load: " << _load << " threads: " << _running;
+ out << "reaping: " << _reap << " workers";
+ }
+ list<IceUtil::ThreadPtr>::iterator p = _workers.begin();
+ while(p != _workers.end() && _reap > 0)
+ {
+ if(!(*p)->isAlive())
+ {
+ (*p)->getThreadControl().join();
+ p = _workers.erase(p);
+ --_reap;
+ }
+ else
+ {
+ ++p;
+ }
}
+ }
- return 0;
+ //
+ // If we have extra workers every _stallCheck period we run
+ // through the complete set of subscribers and determine how
+ // many have stalled since the last check. If this number is
+ // less than the number of extra threads then we terminate the
+ // calling worker.
+ //
+ // - The flush time is protected by the subscriber pool mutex.
+ // - The flush time is only computed if we have extra threads,
+ // otherwise it is set to some large value.
+ // - The max flush time is reset to the next sending interval
+ // after after _stallCheck period.
+ // - Every subscriber is considered to be stalled iff it has
+ // never sent an event or we have just created the first
+ // additional worker. The first handles the case where a
+ // subscriber stalls for a long time on the first message
+ // send. The second means that we can disable computation of
+ // the flush latency if there are no additional threads.
+ //
+ if(_workers.size() > _size)
+ {
+ IceUtil::Time now = IceUtil::Time::now();
+ if(now - _lastStallCheck > _stallCheck)
+ {
+ _lastStallCheck = now;
+ unsigned int stalls = 0;
+ for(list<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ {
+ if((*p)->pollMaxFlushTime(now) > _timeout)
+ {
+ ++stalls;
+ }
+ }
+
+ if(traceLevels->subscriberPool > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
+ out << "checking stalls. extra workers: " << _workers.size() - _size
+ << " subscribers: " << _subscribers.size() << " stalls: " << stalls;
+ }
+
+ if((_workers.size() - _size) > stalls)
+ {
+ if(traceLevels->subscriberPool > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
+ out << "destroying workers";
+ }
+ ++_reap;
+ return 0;
+ }
+ }
}
-
}
-
- assert(_inUse > 0);
- --_inUse;
-
+
while(_pending.empty() && !_destroy)
{
//
@@ -277,11 +360,10 @@ SubscriberPool::dequeue(const SubscriberPtr& sub)
if(_destroy)
{
- --_running;
return 0;
}
- _lastNext = IceUtil::Time::now();
+ _lastDequeue = IceUtil::Time::now();
SubscriberPtr subscriber = _pending.front();
_pending.pop_front();
@@ -292,7 +374,7 @@ SubscriberPool::dequeue(const SubscriberPtr& sub)
// If all threads are now in use then we need to start the
// monitoring, otherwise we don't need to monitor.
//
- if(_inUse == _running && (_running < _sizeMax || _sizeMax == -1))
+ if(_inUse == _workers.size() && (_workers.size() < _sizeMax || _sizeMax != 1))
{
_subscriberPoolMonitor->startMonitor();
}
@@ -300,6 +382,11 @@ SubscriberPool::dequeue(const SubscriberPtr& sub)
{
_subscriberPoolMonitor->stopMonitor();
}
+ //
+ // We only need to compute the push interval if we've created
+ // stall threads.
+ //
+ computeInterval = (_workers.size() - _size) > 0;
return subscriber;
}
@@ -311,7 +398,7 @@ SubscriberPool::destroy()
// threads to unblock and terminate.
//
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ Lock sync(*this);
_destroy = true;
notifyAll();
if(_subscriberPoolMonitor)
@@ -347,24 +434,37 @@ SubscriberPool::check()
Lock sync(*this);
TraceLevelsPtr traceLevels = _instance->traceLevels();
- IceUtil::Time interval = IceUtil::Time::now() - _lastNext;
+ IceUtil::Time now = IceUtil::Time::now();
+ IceUtil::Time interval = now - _lastDequeue;
+/*
if(traceLevels->subscriberPool > 1)
{
Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
out << "check called: interval: " << interval << " timeout: " << _timeout
- << " pending: " << _pending.size() << " running: " << _running
+ << " pending: " << _pending.size() << " running: " << _workers.size()
<< " sizeMax: " << _sizeMax;
}
-
- if(interval > _timeout && _pending.size() > 0 && (_running < _sizeMax || _sizeMax == -1))
+*/
+
+ if(interval > _timeout && _pending.size() > 0 && (_workers.size() < _sizeMax || _sizeMax == 0))
{
if(traceLevels->subscriberPool > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
- out << "detected stall: creating thread: load: " << _load << " threads: " << _running;
+ out << "detected stall: creating thread: threads: " << _workers.size();
}
- ++_running;
+ //
+ // We'll now start stall checking at regular intervals if this
+ // is the first newly created worker. Here we need to
+ // initially set the stall check and the number of requests at
+ // this point.
+ //
+ if(_workers.size() == _size)
+ {
+ _lastStallCheck = now;
+ }
+
++_inUse;
_workers.push_back(new SubscriberPoolWorker(this));
}
diff --git a/cpp/src/IceStorm/SubscriberPool.h b/cpp/src/IceStorm/SubscriberPool.h
index ce6416452db..8f8140bded9 100644
--- a/cpp/src/IceStorm/SubscriberPool.h
+++ b/cpp/src/IceStorm/SubscriberPool.h
@@ -16,7 +16,9 @@
#include <IceUtil/Monitor.h>
#include <IceUtil/Time.h>
#include <IceUtil/Thread.h>
+#include <Ice/Identity.h>
#include <list>
+#include <set>
namespace IceStorm
{
@@ -60,32 +62,40 @@ public:
SubscriberPool(const InstancePtr&);
~SubscriberPool();
- void add(std::list<SubscriberPtr>&);
+ void flush(std::list<SubscriberPtr>&);
+ void add(const SubscriberPtr&);
+ void remove(const SubscriberPtr&);
void destroy();
-private:
-
- friend class SubscriberPoolWorker;
- SubscriberPtr dequeue(const SubscriberPtr&);
- friend class SubscriberPoolMonitor;
+ //
+ // For use by the subscriber worker.
+ //
+ SubscriberPtr dequeue(const SubscriberPtr&, bool, const IceUtil::Time&, bool&);
+ //
+ // For use by the monitor.
+ //
void check();
+private:
+
const InstancePtr _instance;
- const int _sizeMax;
- const int _sizeWarn;
- const int _size;
+ const unsigned int _sizeMax;
+ const unsigned int _sizeWarn;
+ const unsigned int _size;
const IceUtil::Time _timeout;
+ const IceUtil::Time _stallCheck;
SubscriberPoolMonitorPtr _subscriberPoolMonitor;
std::list<SubscriberPtr> _pending;
+ std::list<SubscriberPtr> _subscribers;
bool _destroy;
std::list<IceUtil::ThreadPtr> _workers;
- int _inUse;
- int _running;
- double _load;
+ int _reap;
+ unsigned int _inUse;
- IceUtil::Time _lastNext;
+ IceUtil::Time _lastStallCheck;
+ IceUtil::Time _lastDequeue;
};
} // End namespace IceStorm
diff --git a/cpp/src/IceStorm/Subscribers.cpp b/cpp/src/IceStorm/Subscribers.cpp
index 2aca83c3ca9..17b42bdc605 100644
--- a/cpp/src/IceStorm/Subscribers.cpp
+++ b/cpp/src/IceStorm/Subscribers.cpp
@@ -29,19 +29,19 @@ using namespace IceStorm;
//
// Per Subscriber object.
//
-namespace IceStorm
+namespace
{
-class PerSubscriberPublisherProxyI : public Ice::BlobjectArray
+class PerSubscriberPublisherI : public Ice::BlobjectArray
{
public:
- PerSubscriberPublisherProxyI(const InstancePtr& instance) :
+ PerSubscriberPublisherI(const InstancePtr& instance) :
_instance(instance)
{
}
- ~PerSubscriberPublisherProxyI()
+ ~PerSubscriberPublisherI()
{
}
@@ -56,11 +56,18 @@ public:
vector<Ice::Byte>&,
const Ice::Current& current)
{
- EventPtr event = new Event;
- event->op = current.operation;
- event->mode = current.mode;
- vector<Ice::Byte>(inParams.first, inParams.second).swap(event->data);
- event->context = current.ctx;
+ EventPtr event = new Event(
+ current.operation,
+ current.mode,
+ Ice::ByteSeq(),
+ current.ctx);
+
+ //
+ // COMPILERBUG: gcc 4.0.1 doesn't like this.
+ //
+ //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
+ Ice::ByteSeq data(inParams.first, inParams.second);
+ event->data.swap(data);
EventSeq e;
e.push_back(event);
@@ -70,7 +77,7 @@ public:
{
list<SubscriberPtr> l;
l.push_back(_subscriber);
- _instance->subscriberPool()->add(l);
+ _instance->subscriberPool()->flush(l);
}
return true;
}
@@ -80,11 +87,11 @@ private:
const InstancePtr _instance;
/*const*/ SubscriberPtr _subscriber;
};
-typedef IceUtil::Handle<PerSubscriberPublisherProxyI> PerSubscriberPublisherProxyIPtr;
+typedef IceUtil::Handle<PerSubscriberPublisherI> PerSubscriberPublisherIPtr;
}
// Each of the various Subscriber types.
-namespace IceStorm
+namespace
{
class SubscriberOneway : public Subscriber
@@ -187,24 +194,18 @@ SubscriberOneway::flush()
IceUtil::Mutex::Lock sync(_mutex);
//
- // If another thread is busy delivering events or we're no longer
- // active then we have nothing left to do.
+ // If the subscriber errored out then we're done.
//
- if(_state != StateActive || _events.empty())
+ if(_state != StateActive)
{
_busy = false;
return false;
}
+ assert(!_events.empty());
assert(_busy);
try
{
- if(_state != StateActive)
- {
- assert(!_busy);
- return false;
- }
-
//
// Get the current set of events, but release the lock before
// attempting to deliver the events. This allows other threads
@@ -216,15 +217,25 @@ SubscriberOneway::flush()
sync.release();
// XXX:
-/*
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(_obj->ice_getIdentity().name.substr(0, 4) == "slow")
{
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << "deliberately stalling";
- sleep(1);
+ //Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ //out << "deliberately stalling";
+ sleep(2);
+ }
+ if(_obj->ice_getIdentity().name.substr(0, 5) == "block")
+ {
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << "-> stall for 100s";
+ }
+ sleep(100);
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << "<- stall for 100s";
+ }
}
-*/
//
// Deliver the events without holding the lock.
@@ -256,15 +267,12 @@ SubscriberOneway::flush()
// Reacquire the lock before we check the queue again.
//
sync.acquire();
-
+
//
// If there have been more events queued in the meantime then
// we are still busy.
//
- if(_events.empty())
- {
- _busy = false;
- }
+ _busy = !_events.empty();
}
catch(const Ice::LocalException& ex)
{
@@ -287,7 +295,7 @@ SubscriberOneway::destroy()
Subscriber::destroy();
}
-namespace IceStorm
+namespace
{
class TwowayInvokeI : public Ice::AMI_Object_ice_invoke
@@ -332,22 +340,16 @@ SubscriberTwoway::flush()
IceUtil::Mutex::Lock sync(_mutex);
//
- // If another thread is busy delivering events or we're no longer
- // active then we have nothing left to do.
+ // If the subscriber errored out then we're done.
//
- if(_state != StateActive || _events.empty())
+ if(_state != StateActive)
{
_busy = false;
return false;
}
+ assert(!_events.empty());
assert(_busy);
- if(_state != StateActive)
- {
- assert(!_busy);
- return false;
- }
-
//
// Get the current set of events, but release the lock before
// attempting to deliver the events. This allows other threads
@@ -372,17 +374,15 @@ SubscriberTwoway::flush()
sync.acquire();
//
- // If there have been more events queued in the meantime then we
- // are still busy.
+ // If there have been more events queued in the meantime then
+ // we are still busy.
//
- if(_events.empty())
- {
- _busy = false;
- }
+ _busy = !_events.empty();
+
return _busy;
}
-namespace IceStorm
+namespace
{
class TwowayOrderedInvokeI : public Ice::AMI_Object_ice_invoke
@@ -428,23 +428,24 @@ SubscriberTwowayOrdered::flush()
EventPtr e;
{
IceUtil::Mutex::Lock sync(_mutex);
-
+
//
- // If another thread is busy delivering events or we're no longer
- // active then we have nothing left to do.
+ // If the subscriber errored out then we're done.
//
- if(_state != StateActive || _events.empty())
+ if(_state != StateActive)
{
_busy = false;
return false;
}
+ assert(!_events.empty());
assert(_busy);
e = _events.front();
_events.erase(_events.begin());
}
-
+
_obj->ice_invoke_async(new TwowayOrderedInvokeI(this), e->op, e->mode, e->data, e->context);
+
return false;
}
@@ -454,15 +455,15 @@ SubscriberTwowayOrdered::response()
EventPtr e;
{
IceUtil::Mutex::Lock sync(_mutex);
-
+
assert(_state == StateActive && _busy);
-
+
if(_events.empty())
{
_busy = false;
return;
}
-
+
e = _events.front();
_events.erase(_events.begin());
}
@@ -470,7 +471,7 @@ SubscriberTwowayOrdered::response()
_obj->ice_invoke_async(new TwowayOrderedInvokeI(this), e->op, e->mode, e->data, e->context);
}
-namespace IceStorm
+namespace
{
class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward
@@ -590,20 +591,21 @@ SubscriberLink::flush()
IceUtil::Mutex::Lock sync(_mutex);
//
- // If another thread is busy delivering events or we're no longer
- // active then we have nothing left to do.
+ // If the subscriber errored out then we're done.
//
- if(_state != StateActive || _events.empty())
+ if(_state != StateActive)
{
_busy = false;
return false;
}
+ assert(!_events.empty());
assert(_busy);
-
+
v.swap(_events);
}
_obj->forward_async(new Topiclink_forwardI(this), v);
+
return false;
}
@@ -634,7 +636,7 @@ Subscriber::create(
const Ice::ObjectPrx& obj,
const IceStorm::QoS& qos)
{
- PerSubscriberPublisherProxyIPtr per = new PerSubscriberPublisherProxyI(instance);
+ PerSubscriberPublisherIPtr per = new PerSubscriberPublisherI(instance);
Ice::ObjectPrx proxy = instance->objectAdapter()->addWithUUID(per);
TraceLevelsPtr traceLevels = instance->traceLevels();
SubscriberPtr subscriber;
@@ -791,6 +793,29 @@ Subscriber::destroy()
}
void
+Subscriber::flushTime(const IceUtil::Time& interval)
+{
+ if(_resetMax || interval > _maxSend)
+ {
+ assert(interval != IceUtil::Time());
+ _resetMax = false;
+ _maxSend = interval;
+ }
+}
+
+IceUtil::Time
+Subscriber::pollMaxFlushTime(const IceUtil::Time& now)
+{
+ //IceUtil::Time max = _maxSend;
+ //_maxSend = _maxSend * 0.95;
+ //return max;
+
+ // The next call to flushTime can reset the max time.
+ _resetMax = true;
+ return _maxSend;
+}
+
+void
Subscriber::setError(const Ice::Exception& e)
{
IceUtil::Mutex::Lock sync(_mutex);
@@ -836,7 +861,9 @@ Subscriber::Subscriber(
_persistent(persistent),
_proxy(proxy),
_state(StateActive),
- _busy(false)
+ _busy(false),
+ _resetMax(true),
+ _maxSend(IceUtil::Time::seconds(60*24)) // A long time
{
}
diff --git a/cpp/src/IceStorm/Subscribers.h b/cpp/src/IceStorm/Subscribers.h
index d1dabbd145c..ac1fb7bbb19 100644
--- a/cpp/src/IceStorm/Subscribers.h
+++ b/cpp/src/IceStorm/Subscribers.h
@@ -49,6 +49,13 @@ public:
virtual bool flush() = 0;
virtual void destroy();
+ //
+ // These methods must only be called by the SubscriberPool they
+ // are not internally mutex protected.
+ //
+ void flushTime(const IceUtil::Time&);
+ IceUtil::Time pollMaxFlushTime(const IceUtil::Time&);
+
void setError(const Ice::Exception&);
void setUnreachable(const Ice::Exception&);
@@ -84,6 +91,13 @@ protected:
bool _busy;
EventSeq _events;
+
+ //
+ // Not protected by _mutex. These members are protected by the
+ // SubscriberPool mutex.
+ //
+ bool _resetMax;
+ IceUtil::Time _maxSend;
};
bool operator==(const IceStorm::SubscriberPtr&, const Ice::Identity&);
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index 87dfde4f6c7..8ed30f7330a 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -24,18 +24,18 @@
using namespace IceStorm;
using namespace std;
-namespace IceStorm
+namespace
{
//
// The servant has a 1-1 association with a topic. It is used to
// receive events from Publishers.
//
-class PublisherProxyI : public Ice::BlobjectArray
+class PublisherI : public Ice::BlobjectArray
{
public:
- PublisherProxyI(const TopicIPtr& topic) :
+ PublisherI(const TopicIPtr& topic) :
_topic(topic)
{
}
@@ -51,7 +51,12 @@ public:
Ice::ByteSeq(),
current.ctx);
- event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
+ //
+ // COMPILERBUG: gcc 4.0.1 doesn't like this.
+ //
+ //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
+ Ice::ByteSeq data(inParams.first, inParams.second);
+ event->data.swap(data);
EventSeq v;
v.push_back(event);
@@ -109,7 +114,7 @@ private:
const SubscriberPtr _subscriber;
};
-} // End namespace IceStorm
+}
TopicI::TopicI(
const InstancePtr& instance,
@@ -133,7 +138,7 @@ TopicI::TopicI(
Ice::Identity id;
id.category = _name;
id.name = "publish";
- _publisherPrx = _instance->objectAdapter()->add(new PublisherProxyI(this), id);
+ _publisherPrx = _instance->objectAdapter()->add(new PublisherI(this), id);
//
// Create a servant per topic to receive linked event data. The
@@ -164,6 +169,7 @@ TopicI::TopicI(
_instance->objectAdapter()->add(
new TopicUpstreamLinkI(subscriber), p->second.upstream->ice_getIdentity()));
_subscribers.push_back(subscriber);
+ _instance->subscriberPool()->add(subscriber);
}
PersistentUpstreamMap::const_iterator upI = _upstream.find(_name);
@@ -250,10 +256,13 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&
if(p != _subscribers.end())
{
(*p)->destroy();
+ _instance->subscriberPool()->remove(*p);
_subscribers.erase(p);
}
- _subscribers.push_back(Subscriber::create(_instance, obj, qos));
+ SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos);
+ _subscribers.push_back(subscriber);
+ _instance->subscriberPool()->add(subscriber);
}
Ice::ObjectPrx
@@ -288,6 +297,7 @@ TopicI::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, cons
SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos);
_subscribers.push_back(subscriber);
+ _instance->subscriberPool()->add(subscriber);
return subscriber->proxy();
}
@@ -407,6 +417,7 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
IceUtil::Mutex::Lock subscriberSync(_subscribersMutex);
_subscribers.push_back(subscriber);
+ _instance->subscriberPool()->add(subscriber);
}
void
@@ -622,11 +633,8 @@ TopicI::reap()
//
list<SubscriberPtr> error;
{
- //
- // Uses splice for efficiency
- //
IceUtil::Mutex::Lock errorSync(_errorMutex);
- error.splice(error.begin(), _error);
+ _error.swap(error);
}
TraceLevelsPtr traceLevels = _instance->traceLevels();
@@ -675,31 +683,6 @@ TopicI::reap()
}
void
-TopicI::removeSubscriber(const Ice::ObjectPrx& obj)
-{
- Ice::Identity ident = obj->ice_getIdentity();
-
- IceUtil::Mutex::Lock sync(_subscribersMutex);
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), ident);
- if(p != _subscribers.end())
- {
- (*p)->destroy();
- _subscribers.erase(p);
- return;
- }
-
- //
- // If the subscriber was not found then display a diagnostic.
- //
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _instance->communicator()->identityToString(ident) << ": not subscribed.";
- }
-}
-
-void
TopicI::publish(bool forwarded, const EventSeq& events)
{
//
@@ -740,7 +723,7 @@ TopicI::publish(bool forwarded, const EventSeq& events)
//
if(!flush.empty())
{
- _instance->subscriberPool()->add(flush);
+ _instance->subscriberPool()->flush(flush);
}
//
@@ -778,6 +761,7 @@ TopicI::publish(bool forwarded, const EventSeq& events)
{
reap.push_back(*q);
}
+ _instance->subscriberPool()->remove(*q);
_subscribers.erase(q);
}
}
@@ -793,3 +777,29 @@ TopicI::publish(bool forwarded, const EventSeq& events)
_error.splice(_error.begin(), reap);
}
}
+
+void
+TopicI::removeSubscriber(const Ice::ObjectPrx& obj)
+{
+ Ice::Identity ident = obj->ice_getIdentity();
+
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+ vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), ident);
+ if(p != _subscribers.end())
+ {
+ (*p)->destroy();
+ _instance->subscriberPool()->remove(*p);
+ _subscribers.erase(p);
+ return;
+ }
+
+ //
+ // If the subscriber was not found then display a diagnostic.
+ //
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _instance->communicator()->identityToString(ident) << ": not subscribed.";
+ }
+}
diff --git a/cpp/src/IceStorm/TopicI.h b/cpp/src/IceStorm/TopicI.h
index 5b5af9516f3..7b9c39b1d90 100644
--- a/cpp/src/IceStorm/TopicI.h
+++ b/cpp/src/IceStorm/TopicI.h
@@ -52,13 +52,11 @@ public:
void reap();
+ void publish(bool, const EventSeq&);
+
private:
void removeSubscriber(const Ice::ObjectPrx&);
- friend class PublisherProxyI;
- friend class TopicLinkI;
- void publish(bool, const EventSeq&);
-
//
// Immutable members.
//