diff options
author | Matthew Newhook <matthew@zeroc.com> | 2006-11-09 09:01:11 +0000 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2006-11-09 09:01:11 +0000 |
commit | 382aef369900b75eeaf8e3ec4e0a27cb36c20355 (patch) | |
tree | 90debb0cfcf3473360e6b4e57aaa03bd3607d74a | |
parent | cosmetic change (diff) | |
download | ice-382aef369900b75eeaf8e3ec4e0a27cb36c20355.tar.bz2 ice-382aef369900b75eeaf8e3ec4e0a27cb36c20355.tar.xz ice-382aef369900b75eeaf8e3ec4e0a27cb36c20355.zip |
Fixes as suggested by Bernard (namespace { }, PublisherProxyI ->
PublisherI, etc.
New SubscriberPool reduction algorithm.
gcc compiler bug fix.
-rw-r--r-- | cpp/src/IceStorm/.depend | 4 | ||||
-rw-r--r-- | cpp/src/IceStorm/SubscriberPool.cpp | 242 | ||||
-rw-r--r-- | cpp/src/IceStorm/SubscriberPool.h | 36 | ||||
-rw-r--r-- | cpp/src/IceStorm/Subscribers.cpp | 153 | ||||
-rw-r--r-- | cpp/src/IceStorm/Subscribers.h | 14 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 84 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicI.h | 6 |
7 files changed, 349 insertions, 190 deletions
diff --git a/cpp/src/IceStorm/.depend b/cpp/src/IceStorm/.depend index f5a313eb806..988ddcecfc9 100644 --- a/cpp/src/IceStorm/.depend +++ b/cpp/src/IceStorm/.depend @@ -1,9 +1,9 @@ IceStorm$(OBJEXT): IceStorm.cpp ../../include/IceStorm/IceStorm.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/CommunicatorF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/SliceChecksumDict.h ../../include/Ice/LocalException.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/ObjectFactory.h ../../include/Ice/SliceChecksums.h ../../include/IceUtil/Iterator.h ../../include/IceUtil/ScopedArray.h ../../include/IceUtil/DisableWarnings.h IceStorm$(OBJEXT): IceStorm.cpp ../../include/IceStorm/IceStorm.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/CommunicatorF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/SliceChecksumDict.h ../../include/Ice/LocalException.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/ObjectFactory.h ../../include/Ice/SliceChecksums.h ../../include/IceUtil/Iterator.h ../../include/IceUtil/ScopedArray.h ../../include/IceUtil/DisableWarnings.h -Instance$(OBJEXT): Instance.cpp ../IceStorm/Instance.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/PropertiesF.h ../IceStorm/BatchFlusher.h ../../include/IceUtil/Thread.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../IceStorm/TraceLevels.h ../../include/Ice/LoggerF.h ../IceStorm/SubscriberPool.h ../IceStorm/KeepAliveThread.h ../IceStorm/LinkRecord.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../IceStorm/IceStormInternal.h ../../include/IceStorm/IceStorm.h ../../include/Ice/SliceChecksumDict.h ../IceStorm/Event.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/LocalException.h +Instance$(OBJEXT): Instance.cpp ../IceStorm/Instance.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/PropertiesF.h ../IceStorm/BatchFlusher.h ../../include/IceUtil/Thread.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../IceStorm/TraceLevels.h ../../include/Ice/LoggerF.h ../IceStorm/SubscriberPool.h ../../include/Ice/Identity.h ../IceStorm/KeepAliveThread.h ../IceStorm/LinkRecord.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/StreamF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../IceStorm/IceStormInternal.h ../../include/IceStorm/IceStorm.h ../../include/Ice/SliceChecksumDict.h ../IceStorm/Event.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/LocalException.h TraceLevels$(OBJEXT): TraceLevels.cpp ../IceStorm/TraceLevels.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Config.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Handle.h ../../include/Ice/PropertiesF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/LoggerF.h ../../include/Ice/Properties.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/CommunicatorF.h ../../include/Ice/BuiltinSequences.h BatchFlusher$(OBJEXT): BatchFlusher.cpp ../IceStorm/BatchFlusher.h ../../include/IceUtil/Thread.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Config.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/ProxyF.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/Handle.h ../IceStorm/Instance.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/PropertiesF.h ../IceStorm/TraceLevels.h ../../include/Ice/LoggerF.h ../../include/Ice/Communicator.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/StatsF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/LocalException.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Properties.h ../../include/Ice/LoggerUtil.h ../../include/Ice/Connection.h -SubscriberPool$(OBJEXT): SubscriberPool.cpp ../IceStorm/SubscriberPool.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Config.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/IceUtil/Thread.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Handle.h ../IceStorm/Instance.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/PropertiesF.h ../IceStorm/TraceLevels.h ../../include/Ice/LoggerF.h ../IceStorm/Subscribers.h ../IceStorm/IceStormInternal.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../../include/IceStorm/IceStorm.h ../../include/Ice/SliceChecksumDict.h ../IceStorm/Event.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/LoggerUtil.h +SubscriberPool$(OBJEXT): SubscriberPool.cpp ../IceStorm/SubscriberPool.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Config.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/IceUtil/Thread.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Handle.h ../../include/Ice/Identity.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/Ice/UndefSysMacros.h ../IceStorm/Instance.h ../../include/Ice/CommunicatorF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/PropertiesF.h ../IceStorm/TraceLevels.h ../../include/Ice/LoggerF.h ../IceStorm/Subscribers.h ../IceStorm/IceStormInternal.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/StreamF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../../include/IceStorm/IceStorm.h ../../include/Ice/SliceChecksumDict.h ../IceStorm/Event.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/LoggerUtil.h KeepAliveThread$(OBJEXT): KeepAliveThread.cpp ../IceStorm/KeepAliveThread.h ../IceStorm/LinkRecord.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/CommunicatorF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../IceStorm/IceStormInternal.h ../../include/IceStorm/IceStorm.h ../../include/Ice/SliceChecksumDict.h ../IceStorm/Event.h ../../include/Ice/BuiltinSequences.h ../../include/IceUtil/Thread.h ../IceStorm/Instance.h ../../include/Ice/PropertiesF.h ../IceStorm/TraceLevels.h ../../include/Ice/LoggerF.h ../../include/Ice/LocalException.h ../../include/Ice/LoggerUtil.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/Properties.h Subscribers$(OBJEXT): Subscribers.cpp ../IceStorm/Subscribers.h ../IceStorm/IceStormInternal.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/CommunicatorF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/IceUtil/RecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../../include/IceStorm/IceStorm.h ../../include/Ice/SliceChecksumDict.h ../IceStorm/Event.h ../../include/Ice/BuiltinSequences.h ../IceStorm/Instance.h ../../include/Ice/PropertiesF.h ../IceStorm/TraceLevels.h ../../include/Ice/LoggerF.h ../IceStorm/BatchFlusher.h ../../include/IceUtil/Thread.h ../IceStorm/SubscriberPool.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/IncomingAsync.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/FacetMap.h ../../include/Ice/Locator.h ../../include/Ice/ProcessF.h ../../include/Ice/LoggerUtil.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/LocalException.h ../../include/Ice/Connection.h TopicI$(OBJEXT): TopicI.cpp ../IceStorm/TopicI.h ../../include/IceUtil/RecMutex.h ../../include/IceUtil/Config.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/IceUtil/Exception.h ../IceStorm/IceStormInternal.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/Ice/Config.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ProxyF.h ../../include/Ice/ObjectF.h ../../include/Ice/GCCountMap.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/IceUtil/Mutex.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionIF.h ../../include/Ice/EndpointIF.h ../../include/Ice/Endpoint.h ../../include/Ice/UndefSysMacros.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/ConnectionF.h ../../include/Ice/Identity.h ../../include/Ice/StreamF.h ../../include/Ice/CommunicatorF.h ../../include/Ice/Object.h ../../include/Ice/GCShared.h ../../include/Ice/GCRecMutex.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Protocol.h ../../include/Ice/StringConverter.h ../../include/IceUtil/Unicode.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/FactoryTable.h ../../include/Ice/FactoryTableDef.h ../../include/IceUtil/StaticMutex.h ../../include/Ice/UserExceptionFactoryF.h ../../include/IceStorm/IceStorm.h ../../include/Ice/SliceChecksumDict.h ../IceStorm/Event.h ../../include/Ice/BuiltinSequences.h ../IceStorm/PersistentTopicMap.h ../../include/Freeze/Map.h ../../include/Ice/Ice.h ../../include/Ice/GC.h ../../include/IceUtil/Thread.h ../../include/Ice/Initialize.h ../../include/Ice/PropertiesF.h ../../include/Ice/LoggerF.h ../../include/Ice/StatsF.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/LoggerUtil.h ../../include/Ice/Stats.h ../../include/Ice/Communicator.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ImplicitContext.h ../../include/Ice/ObjectFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/IncomingAsync.h ../../include/Ice/FacetMap.h ../../include/Ice/Locator.h ../../include/Ice/ProcessF.h ../../include/Ice/ServantLocator.h ../../include/Ice/Process.h ../../include/Ice/Application.h ../../include/Ice/Connection.h ../../include/Ice/Functional.h ../../include/IceUtil/Functional.h ../../include/Ice/Stream.h ../../include/Freeze/DB.h ../../include/Freeze/Exception.h ../../include/Freeze/Connection.h ../../include/Freeze/Transaction.h ../IceStorm/LinkRecord.h ../IceStorm/PersistentUpstreamMap.h ../IceStorm/Instance.h ../IceStorm/Subscribers.h ../IceStorm/TraceLevels.h ../IceStorm/KeepAliveThread.h ../IceStorm/SubscriberPool.h ../../include/Freeze/Initialize.h ../../include/Freeze/EvictorF.h ../../include/Freeze/ConnectionF.h ../../include/Freeze/Index.h diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp index 6f42665af37..7943d3e1129 100644 --- a/cpp/src/IceStorm/SubscriberPool.cpp +++ b/cpp/src/IceStorm/SubscriberPool.cpp @@ -19,7 +19,7 @@ using namespace IceStorm; using namespace std; -namespace IceStorm +namespace { class SubscriberPoolWorker : public IceUtil::Thread @@ -39,22 +39,33 @@ public: virtual void run() { + IceUtil::Time interval = IceUtil::Time::seconds(24 * 60); // A long time. SubscriberPtr sub; + bool requeue = false; + bool computeInterval = false; while(true) { - sub = _manager->dequeue(sub); + sub = _manager->dequeue(sub, requeue, interval, computeInterval); if(!sub) { return; } + // - // If SubscriberPool returns true then the subscriber needs to be - // SubscriberPooled again, so therefore we will re-enqueue the - // subscriber in the call to dequeue. + // If SubscriberPool returns true then the subscriber + // needs to be SubscriberPooled again, so therefore we + // will re-enqueue the subscriber in the call to dequeue. // - if(!sub->flush()) + if(computeInterval) + { + IceUtil::Time start = IceUtil::Time::now(); + requeue = sub->flush(); + interval = IceUtil::Time::now() - start; + } + else { - sub = 0; + requeue = sub->flush(); + interval = IceUtil::Time::seconds(24 * 60); // A long time. } } } @@ -144,25 +155,23 @@ SubscriberPoolMonitor::destroy() SubscriberPool::SubscriberPool(const InstancePtr& instance) : _instance(instance), _sizeMax(instance->properties()->getPropertyAsIntWithDefault( - "IceStorm.SubscriberPool.SizeMax", -1)), + "IceStorm.SubscriberPool.SizeMax", 0)), _sizeWarn(instance->properties()->getPropertyAsIntWithDefault( "IceStorm.SubscriberPool.SizeWarn", 0)), _size(instance->properties()->getPropertyAsIntWithDefault( "IceStorm.SubscriberPool.Size", 1)), _timeout(IceUtil::Time::milliSeconds(max(instance->properties()->getPropertyAsIntWithDefault( "IceStorm.SubscriberPool.Timeout", 250), 50))), // minimum 50ms. + _stallCheck(_timeout * 10), // 10 * the stall timeout. _destroy(false), - _inUse(0), - _running(0), - _load(1.0) + _reap(0) { try { __setNoDelete(true); _subscriberPoolMonitor = new SubscriberPoolMonitor(this, _timeout); - for(int i = 0; i < _size; ++i) + for(unsigned int i = 0; i < _size; ++i) { - ++_running; ++_inUse; _workers.push_back(new SubscriberPoolWorker(this)); } @@ -186,9 +195,9 @@ SubscriberPool::~SubscriberPool() } void -SubscriberPool::add(list<SubscriberPtr>& subscribers) +SubscriberPool::flush(list<SubscriberPtr>& subscribers) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + Lock sync(*this); // // Splice on the new set of subscribers to SubscriberPool. // @@ -196,76 +205,150 @@ SubscriberPool::add(list<SubscriberPtr>& subscribers) notifyAll(); } +void +SubscriberPool::add(const SubscriberPtr& subscriber) +{ + Lock sync(*this); + _subscribers.push_back(subscriber); +} + +void +SubscriberPool::remove(const SubscriberPtr& subscriber) +{ + Lock sync(*this); + // + // Note that this cannot remove based on the subscriber id because + // the pool is TopicManager scoped and not topic scoped therefore + // its quite possible to have two subscribers with the same id in + // the list. + // + list<SubscriberPtr>::iterator p = _subscribers.begin(); + while(p != _subscribers.end()) + { + if((*p).get() == subscriber.get()) + { + _subscribers.erase(p); + return; + } + ++p; + } + + TraceLevelsPtr traceLevels = _instance->traceLevels(); + Ice::Error err(traceLevels->logger); + err << "SubscriberPool: subscriber not found: " << _instance->communicator()->identityToString(subscriber->id()); +} + // // The passed subscriber need to be enqueued again. // SubscriberPtr -SubscriberPool::dequeue(const SubscriberPtr& sub) +SubscriberPool::dequeue(const SubscriberPtr& sub, bool requeue, const IceUtil::Time& interval, bool& computeInterval) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + Lock sync(*this); if(sub) { - _pending.push_back(sub); + if(requeue) + { + _pending.push_back(sub); + } + sub->flushTime(interval); } // - // Now we check if this thread can be destroyed, based on a - // load factor. + // The worker is no longer in use. // - // The load factor jumps immediately to the number of threads - // that are currently in use, but decays exponentially if the - // number of threads in use is smaller than the load - // factor. This reflects that we create threads immediately - // when they are needed, but want the number of threads to - // slowly decline to the configured minimum. - // - double inUse = static_cast<double>(_inUse); - if(_load < inUse) - { - _load = inUse; - } - else - { - const double loadFactor = 0.05; // TODO: Configurable? - const double oneMinusLoadFactor = 1 - loadFactor; - _load = _load * oneMinusLoadFactor + inUse * loadFactor; - } + --_inUse; - if(_running > _size) + // + // If _sizeMax is 1 we never spawn up new threads if a stall is + // detected. + // + if(_sizeMax != 1) { TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->subscriberPool > 1) - { - IceUtil::Time interval = IceUtil::Time::now() - _lastNext; - Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); - out << "load check: " << _load; - } - int load = static_cast<int>(_load + 0.5); - - if(load < _running) + // + // Reap dead workers, if necessary. + // + if(_reap > 0) { - assert(_inUse > 0); - --_inUse; - - assert(_running > 0); - --_running; - if(traceLevels->subscriberPool > 0) { Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); - out << "reducing SubscriberPool threads: load: " << _load << " threads: " << _running; + out << "reaping: " << _reap << " workers"; + } + list<IceUtil::ThreadPtr>::iterator p = _workers.begin(); + while(p != _workers.end() && _reap > 0) + { + if(!(*p)->isAlive()) + { + (*p)->getThreadControl().join(); + p = _workers.erase(p); + --_reap; + } + else + { + ++p; + } } + } - return 0; + // + // If we have extra workers every _stallCheck period we run + // through the complete set of subscribers and determine how + // many have stalled since the last check. If this number is + // less than the number of extra threads then we terminate the + // calling worker. + // + // - The flush time is protected by the subscriber pool mutex. + // - The flush time is only computed if we have extra threads, + // otherwise it is set to some large value. + // - The max flush time is reset to the next sending interval + // after after _stallCheck period. + // - Every subscriber is considered to be stalled iff it has + // never sent an event or we have just created the first + // additional worker. The first handles the case where a + // subscriber stalls for a long time on the first message + // send. The second means that we can disable computation of + // the flush latency if there are no additional threads. + // + if(_workers.size() > _size) + { + IceUtil::Time now = IceUtil::Time::now(); + if(now - _lastStallCheck > _stallCheck) + { + _lastStallCheck = now; + unsigned int stalls = 0; + for(list<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + { + if((*p)->pollMaxFlushTime(now) > _timeout) + { + ++stalls; + } + } + + if(traceLevels->subscriberPool > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); + out << "checking stalls. extra workers: " << _workers.size() - _size + << " subscribers: " << _subscribers.size() << " stalls: " << stalls; + } + + if((_workers.size() - _size) > stalls) + { + if(traceLevels->subscriberPool > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); + out << "destroying workers"; + } + ++_reap; + return 0; + } + } } - } - - assert(_inUse > 0); - --_inUse; - + while(_pending.empty() && !_destroy) { // @@ -277,11 +360,10 @@ SubscriberPool::dequeue(const SubscriberPtr& sub) if(_destroy) { - --_running; return 0; } - _lastNext = IceUtil::Time::now(); + _lastDequeue = IceUtil::Time::now(); SubscriberPtr subscriber = _pending.front(); _pending.pop_front(); @@ -292,7 +374,7 @@ SubscriberPool::dequeue(const SubscriberPtr& sub) // If all threads are now in use then we need to start the // monitoring, otherwise we don't need to monitor. // - if(_inUse == _running && (_running < _sizeMax || _sizeMax == -1)) + if(_inUse == _workers.size() && (_workers.size() < _sizeMax || _sizeMax != 1)) { _subscriberPoolMonitor->startMonitor(); } @@ -300,6 +382,11 @@ SubscriberPool::dequeue(const SubscriberPtr& sub) { _subscriberPoolMonitor->stopMonitor(); } + // + // We only need to compute the push interval if we've created + // stall threads. + // + computeInterval = (_workers.size() - _size) > 0; return subscriber; } @@ -311,7 +398,7 @@ SubscriberPool::destroy() // threads to unblock and terminate. // { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + Lock sync(*this); _destroy = true; notifyAll(); if(_subscriberPoolMonitor) @@ -347,24 +434,37 @@ SubscriberPool::check() Lock sync(*this); TraceLevelsPtr traceLevels = _instance->traceLevels(); - IceUtil::Time interval = IceUtil::Time::now() - _lastNext; + IceUtil::Time now = IceUtil::Time::now(); + IceUtil::Time interval = now - _lastDequeue; +/* if(traceLevels->subscriberPool > 1) { Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); out << "check called: interval: " << interval << " timeout: " << _timeout - << " pending: " << _pending.size() << " running: " << _running + << " pending: " << _pending.size() << " running: " << _workers.size() << " sizeMax: " << _sizeMax; } - - if(interval > _timeout && _pending.size() > 0 && (_running < _sizeMax || _sizeMax == -1)) +*/ + + if(interval > _timeout && _pending.size() > 0 && (_workers.size() < _sizeMax || _sizeMax == 0)) { if(traceLevels->subscriberPool > 0) { Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); - out << "detected stall: creating thread: load: " << _load << " threads: " << _running; + out << "detected stall: creating thread: threads: " << _workers.size(); } - ++_running; + // + // We'll now start stall checking at regular intervals if this + // is the first newly created worker. Here we need to + // initially set the stall check and the number of requests at + // this point. + // + if(_workers.size() == _size) + { + _lastStallCheck = now; + } + ++_inUse; _workers.push_back(new SubscriberPoolWorker(this)); } diff --git a/cpp/src/IceStorm/SubscriberPool.h b/cpp/src/IceStorm/SubscriberPool.h index ce6416452db..8f8140bded9 100644 --- a/cpp/src/IceStorm/SubscriberPool.h +++ b/cpp/src/IceStorm/SubscriberPool.h @@ -16,7 +16,9 @@ #include <IceUtil/Monitor.h> #include <IceUtil/Time.h> #include <IceUtil/Thread.h> +#include <Ice/Identity.h> #include <list> +#include <set> namespace IceStorm { @@ -60,32 +62,40 @@ public: SubscriberPool(const InstancePtr&); ~SubscriberPool(); - void add(std::list<SubscriberPtr>&); + void flush(std::list<SubscriberPtr>&); + void add(const SubscriberPtr&); + void remove(const SubscriberPtr&); void destroy(); -private: - - friend class SubscriberPoolWorker; - SubscriberPtr dequeue(const SubscriberPtr&); - friend class SubscriberPoolMonitor; + // + // For use by the subscriber worker. + // + SubscriberPtr dequeue(const SubscriberPtr&, bool, const IceUtil::Time&, bool&); + // + // For use by the monitor. + // void check(); +private: + const InstancePtr _instance; - const int _sizeMax; - const int _sizeWarn; - const int _size; + const unsigned int _sizeMax; + const unsigned int _sizeWarn; + const unsigned int _size; const IceUtil::Time _timeout; + const IceUtil::Time _stallCheck; SubscriberPoolMonitorPtr _subscriberPoolMonitor; std::list<SubscriberPtr> _pending; + std::list<SubscriberPtr> _subscribers; bool _destroy; std::list<IceUtil::ThreadPtr> _workers; - int _inUse; - int _running; - double _load; + int _reap; + unsigned int _inUse; - IceUtil::Time _lastNext; + IceUtil::Time _lastStallCheck; + IceUtil::Time _lastDequeue; }; } // End namespace IceStorm diff --git a/cpp/src/IceStorm/Subscribers.cpp b/cpp/src/IceStorm/Subscribers.cpp index 2aca83c3ca9..17b42bdc605 100644 --- a/cpp/src/IceStorm/Subscribers.cpp +++ b/cpp/src/IceStorm/Subscribers.cpp @@ -29,19 +29,19 @@ using namespace IceStorm; // // Per Subscriber object. // -namespace IceStorm +namespace { -class PerSubscriberPublisherProxyI : public Ice::BlobjectArray +class PerSubscriberPublisherI : public Ice::BlobjectArray { public: - PerSubscriberPublisherProxyI(const InstancePtr& instance) : + PerSubscriberPublisherI(const InstancePtr& instance) : _instance(instance) { } - ~PerSubscriberPublisherProxyI() + ~PerSubscriberPublisherI() { } @@ -56,11 +56,18 @@ public: vector<Ice::Byte>&, const Ice::Current& current) { - EventPtr event = new Event; - event->op = current.operation; - event->mode = current.mode; - vector<Ice::Byte>(inParams.first, inParams.second).swap(event->data); - event->context = current.ctx; + EventPtr event = new Event( + current.operation, + current.mode, + Ice::ByteSeq(), + current.ctx); + + // + // COMPILERBUG: gcc 4.0.1 doesn't like this. + // + //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); + Ice::ByteSeq data(inParams.first, inParams.second); + event->data.swap(data); EventSeq e; e.push_back(event); @@ -70,7 +77,7 @@ public: { list<SubscriberPtr> l; l.push_back(_subscriber); - _instance->subscriberPool()->add(l); + _instance->subscriberPool()->flush(l); } return true; } @@ -80,11 +87,11 @@ private: const InstancePtr _instance; /*const*/ SubscriberPtr _subscriber; }; -typedef IceUtil::Handle<PerSubscriberPublisherProxyI> PerSubscriberPublisherProxyIPtr; +typedef IceUtil::Handle<PerSubscriberPublisherI> PerSubscriberPublisherIPtr; } // Each of the various Subscriber types. -namespace IceStorm +namespace { class SubscriberOneway : public Subscriber @@ -187,24 +194,18 @@ SubscriberOneway::flush() IceUtil::Mutex::Lock sync(_mutex); // - // If another thread is busy delivering events or we're no longer - // active then we have nothing left to do. + // If the subscriber errored out then we're done. // - if(_state != StateActive || _events.empty()) + if(_state != StateActive) { _busy = false; return false; } + assert(!_events.empty()); assert(_busy); try { - if(_state != StateActive) - { - assert(!_busy); - return false; - } - // // Get the current set of events, but release the lock before // attempting to deliver the events. This allows other threads @@ -216,15 +217,25 @@ SubscriberOneway::flush() sync.release(); // XXX: -/* TraceLevelsPtr traceLevels = _instance->traceLevels(); if(_obj->ice_getIdentity().name.substr(0, 4) == "slow") { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << "deliberately stalling"; - sleep(1); + //Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + //out << "deliberately stalling"; + sleep(2); + } + if(_obj->ice_getIdentity().name.substr(0, 5) == "block") + { + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << "-> stall for 100s"; + } + sleep(100); + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << "<- stall for 100s"; + } } -*/ // // Deliver the events without holding the lock. @@ -256,15 +267,12 @@ SubscriberOneway::flush() // Reacquire the lock before we check the queue again. // sync.acquire(); - + // // If there have been more events queued in the meantime then // we are still busy. // - if(_events.empty()) - { - _busy = false; - } + _busy = !_events.empty(); } catch(const Ice::LocalException& ex) { @@ -287,7 +295,7 @@ SubscriberOneway::destroy() Subscriber::destroy(); } -namespace IceStorm +namespace { class TwowayInvokeI : public Ice::AMI_Object_ice_invoke @@ -332,22 +340,16 @@ SubscriberTwoway::flush() IceUtil::Mutex::Lock sync(_mutex); // - // If another thread is busy delivering events or we're no longer - // active then we have nothing left to do. + // If the subscriber errored out then we're done. // - if(_state != StateActive || _events.empty()) + if(_state != StateActive) { _busy = false; return false; } + assert(!_events.empty()); assert(_busy); - if(_state != StateActive) - { - assert(!_busy); - return false; - } - // // Get the current set of events, but release the lock before // attempting to deliver the events. This allows other threads @@ -372,17 +374,15 @@ SubscriberTwoway::flush() sync.acquire(); // - // If there have been more events queued in the meantime then we - // are still busy. + // If there have been more events queued in the meantime then + // we are still busy. // - if(_events.empty()) - { - _busy = false; - } + _busy = !_events.empty(); + return _busy; } -namespace IceStorm +namespace { class TwowayOrderedInvokeI : public Ice::AMI_Object_ice_invoke @@ -428,23 +428,24 @@ SubscriberTwowayOrdered::flush() EventPtr e; { IceUtil::Mutex::Lock sync(_mutex); - + // - // If another thread is busy delivering events or we're no longer - // active then we have nothing left to do. + // If the subscriber errored out then we're done. // - if(_state != StateActive || _events.empty()) + if(_state != StateActive) { _busy = false; return false; } + assert(!_events.empty()); assert(_busy); e = _events.front(); _events.erase(_events.begin()); } - + _obj->ice_invoke_async(new TwowayOrderedInvokeI(this), e->op, e->mode, e->data, e->context); + return false; } @@ -454,15 +455,15 @@ SubscriberTwowayOrdered::response() EventPtr e; { IceUtil::Mutex::Lock sync(_mutex); - + assert(_state == StateActive && _busy); - + if(_events.empty()) { _busy = false; return; } - + e = _events.front(); _events.erase(_events.begin()); } @@ -470,7 +471,7 @@ SubscriberTwowayOrdered::response() _obj->ice_invoke_async(new TwowayOrderedInvokeI(this), e->op, e->mode, e->data, e->context); } -namespace IceStorm +namespace { class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward @@ -590,20 +591,21 @@ SubscriberLink::flush() IceUtil::Mutex::Lock sync(_mutex); // - // If another thread is busy delivering events or we're no longer - // active then we have nothing left to do. + // If the subscriber errored out then we're done. // - if(_state != StateActive || _events.empty()) + if(_state != StateActive) { _busy = false; return false; } + assert(!_events.empty()); assert(_busy); - + v.swap(_events); } _obj->forward_async(new Topiclink_forwardI(this), v); + return false; } @@ -634,7 +636,7 @@ Subscriber::create( const Ice::ObjectPrx& obj, const IceStorm::QoS& qos) { - PerSubscriberPublisherProxyIPtr per = new PerSubscriberPublisherProxyI(instance); + PerSubscriberPublisherIPtr per = new PerSubscriberPublisherI(instance); Ice::ObjectPrx proxy = instance->objectAdapter()->addWithUUID(per); TraceLevelsPtr traceLevels = instance->traceLevels(); SubscriberPtr subscriber; @@ -791,6 +793,29 @@ Subscriber::destroy() } void +Subscriber::flushTime(const IceUtil::Time& interval) +{ + if(_resetMax || interval > _maxSend) + { + assert(interval != IceUtil::Time()); + _resetMax = false; + _maxSend = interval; + } +} + +IceUtil::Time +Subscriber::pollMaxFlushTime(const IceUtil::Time& now) +{ + //IceUtil::Time max = _maxSend; + //_maxSend = _maxSend * 0.95; + //return max; + + // The next call to flushTime can reset the max time. + _resetMax = true; + return _maxSend; +} + +void Subscriber::setError(const Ice::Exception& e) { IceUtil::Mutex::Lock sync(_mutex); @@ -836,7 +861,9 @@ Subscriber::Subscriber( _persistent(persistent), _proxy(proxy), _state(StateActive), - _busy(false) + _busy(false), + _resetMax(true), + _maxSend(IceUtil::Time::seconds(60*24)) // A long time { } diff --git a/cpp/src/IceStorm/Subscribers.h b/cpp/src/IceStorm/Subscribers.h index d1dabbd145c..ac1fb7bbb19 100644 --- a/cpp/src/IceStorm/Subscribers.h +++ b/cpp/src/IceStorm/Subscribers.h @@ -49,6 +49,13 @@ public: virtual bool flush() = 0; virtual void destroy(); + // + // These methods must only be called by the SubscriberPool they + // are not internally mutex protected. + // + void flushTime(const IceUtil::Time&); + IceUtil::Time pollMaxFlushTime(const IceUtil::Time&); + void setError(const Ice::Exception&); void setUnreachable(const Ice::Exception&); @@ -84,6 +91,13 @@ protected: bool _busy; EventSeq _events; + + // + // Not protected by _mutex. These members are protected by the + // SubscriberPool mutex. + // + bool _resetMax; + IceUtil::Time _maxSend; }; bool operator==(const IceStorm::SubscriberPtr&, const Ice::Identity&); diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index 87dfde4f6c7..8ed30f7330a 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -24,18 +24,18 @@ using namespace IceStorm; using namespace std; -namespace IceStorm +namespace { // // The servant has a 1-1 association with a topic. It is used to // receive events from Publishers. // -class PublisherProxyI : public Ice::BlobjectArray +class PublisherI : public Ice::BlobjectArray { public: - PublisherProxyI(const TopicIPtr& topic) : + PublisherI(const TopicIPtr& topic) : _topic(topic) { } @@ -51,7 +51,12 @@ public: Ice::ByteSeq(), current.ctx); - event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); + // + // COMPILERBUG: gcc 4.0.1 doesn't like this. + // + //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); + Ice::ByteSeq data(inParams.first, inParams.second); + event->data.swap(data); EventSeq v; v.push_back(event); @@ -109,7 +114,7 @@ private: const SubscriberPtr _subscriber; }; -} // End namespace IceStorm +} TopicI::TopicI( const InstancePtr& instance, @@ -133,7 +138,7 @@ TopicI::TopicI( Ice::Identity id; id.category = _name; id.name = "publish"; - _publisherPrx = _instance->objectAdapter()->add(new PublisherProxyI(this), id); + _publisherPrx = _instance->objectAdapter()->add(new PublisherI(this), id); // // Create a servant per topic to receive linked event data. The @@ -164,6 +169,7 @@ TopicI::TopicI( _instance->objectAdapter()->add( new TopicUpstreamLinkI(subscriber), p->second.upstream->ice_getIdentity())); _subscribers.push_back(subscriber); + _instance->subscriberPool()->add(subscriber); } PersistentUpstreamMap::const_iterator upI = _upstream.find(_name); @@ -250,10 +256,13 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current& if(p != _subscribers.end()) { (*p)->destroy(); + _instance->subscriberPool()->remove(*p); _subscribers.erase(p); } - _subscribers.push_back(Subscriber::create(_instance, obj, qos)); + SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos); + _subscribers.push_back(subscriber); + _instance->subscriberPool()->add(subscriber); } Ice::ObjectPrx @@ -288,6 +297,7 @@ TopicI::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, cons SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos); _subscribers.push_back(subscriber); + _instance->subscriberPool()->add(subscriber); return subscriber->proxy(); } @@ -407,6 +417,7 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) IceUtil::Mutex::Lock subscriberSync(_subscribersMutex); _subscribers.push_back(subscriber); + _instance->subscriberPool()->add(subscriber); } void @@ -622,11 +633,8 @@ TopicI::reap() // list<SubscriberPtr> error; { - // - // Uses splice for efficiency - // IceUtil::Mutex::Lock errorSync(_errorMutex); - error.splice(error.begin(), _error); + _error.swap(error); } TraceLevelsPtr traceLevels = _instance->traceLevels(); @@ -675,31 +683,6 @@ TopicI::reap() } void -TopicI::removeSubscriber(const Ice::ObjectPrx& obj) -{ - Ice::Identity ident = obj->ice_getIdentity(); - - IceUtil::Mutex::Lock sync(_subscribersMutex); - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), ident); - if(p != _subscribers.end()) - { - (*p)->destroy(); - _subscribers.erase(p); - return; - } - - // - // If the subscriber was not found then display a diagnostic. - // - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _instance->communicator()->identityToString(ident) << ": not subscribed."; - } -} - -void TopicI::publish(bool forwarded, const EventSeq& events) { // @@ -740,7 +723,7 @@ TopicI::publish(bool forwarded, const EventSeq& events) // if(!flush.empty()) { - _instance->subscriberPool()->add(flush); + _instance->subscriberPool()->flush(flush); } // @@ -778,6 +761,7 @@ TopicI::publish(bool forwarded, const EventSeq& events) { reap.push_back(*q); } + _instance->subscriberPool()->remove(*q); _subscribers.erase(q); } } @@ -793,3 +777,29 @@ TopicI::publish(bool forwarded, const EventSeq& events) _error.splice(_error.begin(), reap); } } + +void +TopicI::removeSubscriber(const Ice::ObjectPrx& obj) +{ + Ice::Identity ident = obj->ice_getIdentity(); + + IceUtil::Mutex::Lock sync(_subscribersMutex); + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), ident); + if(p != _subscribers.end()) + { + (*p)->destroy(); + _instance->subscriberPool()->remove(*p); + _subscribers.erase(p); + return; + } + + // + // If the subscriber was not found then display a diagnostic. + // + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _instance->communicator()->identityToString(ident) << ": not subscribed."; + } +} diff --git a/cpp/src/IceStorm/TopicI.h b/cpp/src/IceStorm/TopicI.h index 5b5af9516f3..7b9c39b1d90 100644 --- a/cpp/src/IceStorm/TopicI.h +++ b/cpp/src/IceStorm/TopicI.h @@ -52,13 +52,11 @@ public: void reap(); + void publish(bool, const EventSeq&); + private: void removeSubscriber(const Ice::ObjectPrx&); - friend class PublisherProxyI; - friend class TopicLinkI; - void publish(bool, const EventSeq&); - // // Immutable members. // |