diff options
author | Mark Spruiell <mes@zeroc.com> | 2003-03-27 22:01:54 +0000 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2003-03-27 22:01:54 +0000 |
commit | d14d3b45d87d981c2fada77dc439a054a8d21787 (patch) | |
tree | ddeb19290df69b801a089608d070fc945fff9a97 /cpp/src | |
parent | datagram fixes (diff) | |
download | ice-d14d3b45d87d981c2fada77dc439a054a8d21787.tar.bz2 ice-d14d3b45d87d981c2fada77dc439a054a8d21787.tar.xz ice-d14d3b45d87d981c2fada77dc439a054a8d21787.zip |
cleaning up topic subscription
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceStorm/.depend | 6 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 255 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicI.h | 10 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicManagerI.cpp | 226 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicManagerI.h | 20 |
5 files changed, 325 insertions, 192 deletions
diff --git a/cpp/src/IceStorm/.depend b/cpp/src/IceStorm/.depend index 659464228ef..ae0c2ac4a3b 100644 --- a/cpp/src/IceStorm/.depend +++ b/cpp/src/IceStorm/.depend @@ -8,12 +8,12 @@ OnewayBatchSubscriber.o: OnewayBatchSubscriber.cpp ../../include/Ice/Ice.h ../.. LinkSubscriber.o: LinkSubscriber.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/StreamF.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Proxy.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionF.h ../../include/Ice/EndpointF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/Identity.h ../../include/Ice/Facet.h ../../include/Ice/Object.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/LoggerUtil.h ../../include/Ice/LoggerF.h ../../include/Ice/Stats.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocator.h ../../include/Ice/IdentityUtil.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/IncomingAsync.h ../../include/Ice/Application.h ../IceStorm/LinkSubscriber.h ../IceStorm/Flushable.h ../IceStorm/Subscriber.h ../IceStorm/IceStormInternal.h ../../include/IceStorm/IceStorm.h ../IceStorm/TraceLevels.h SubscriberFactory.o: SubscriberFactory.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/StreamF.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Proxy.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionF.h ../../include/Ice/EndpointF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/Identity.h ../../include/Ice/Facet.h ../../include/Ice/Object.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/LoggerUtil.h ../../include/Ice/LoggerF.h ../../include/Ice/Stats.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocator.h ../../include/Ice/IdentityUtil.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/IncomingAsync.h ../../include/Ice/Application.h ../IceStorm/SubscriberFactory.h ../IceStorm/IceStormInternal.h ../../include/IceStorm/IceStorm.h ../IceStorm/LinkSubscriber.h ../IceStorm/Flushable.h ../IceStorm/Subscriber.h ../IceStorm/OnewaySubscriber.h ../IceStorm/OnewayBatchSubscriber.h ../IceStorm/Flusher.h ../IceStorm/TraceLevels.h TopicI.o: TopicI.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/StreamF.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Proxy.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionF.h ../../include/Ice/EndpointF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/Identity.h ../../include/Ice/Facet.h ../../include/Ice/Object.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/LoggerUtil.h ../../include/Ice/LoggerF.h ../../include/Ice/Stats.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocator.h ../../include/Ice/IdentityUtil.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/IncomingAsync.h ../../include/Ice/Application.h ../../include/Ice/Functional.h ../../include/IceUtil/Functional.h ../IceStorm/TopicI.h ../../include/IceUtil/RecMutex.h ../IceStorm/IceStormInternal.h ../../include/IceStorm/IceStorm.h ../IceStorm/IdentityLinkDict.h ../../include/Freeze/Map.h ../../include/Freeze/DB.h ../../include/Freeze/DBException.h ../../include/Freeze/DBF.h ../../include/Freeze/EvictorF.h ../IceStorm/LinkDB.h ../IceStorm/SubscriberFactory.h ../IceStorm/Subscriber.h ../IceStorm/TraceLevels.h -TopicManagerI.o: TopicManagerI.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/StreamF.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Proxy.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionF.h ../../include/Ice/EndpointF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/Identity.h ../../include/Ice/Facet.h ../../include/Ice/Object.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/LoggerUtil.h ../../include/Ice/LoggerF.h ../../include/Ice/Stats.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocator.h ../../include/Ice/IdentityUtil.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/IncomingAsync.h ../../include/Ice/Application.h ../IceStorm/TopicManagerI.h ../../include/IceStorm/IceStorm.h ../IceStorm/StringBoolDict.h ../../include/Freeze/Map.h ../../include/Freeze/DB.h ../../include/Freeze/DBException.h ../../include/Freeze/DBF.h ../../include/Freeze/EvictorF.h ../IceStorm/TopicI.h ../../include/IceUtil/RecMutex.h ../IceStorm/IceStormInternal.h ../IceStorm/IdentityLinkDict.h ../IceStorm/LinkDB.h ../IceStorm/SubscriberFactory.h ../IceStorm/Flusher.h ../IceStorm/TraceLevels.h -StringBoolDict.o: StringBoolDict.cpp ../../include/IceXML/StreamI.h ../../include/Ice/Stream.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/StreamF.h ../../include/Ice/Proxy.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionF.h ../../include/Ice/EndpointF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/Identity.h ../../include/Ice/Facet.h ../../include/Ice/Object.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/CommunicatorF.h ../../include/IceUtil/OutputUtil.h ../IceStorm/StringBoolDict.h ../../include/Freeze/Map.h ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/PropertiesF.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/LoggerUtil.h ../../include/Ice/LoggerF.h ../../include/Ice/Stats.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocator.h ../../include/Ice/IdentityUtil.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/IncomingAsync.h ../../include/Ice/Application.h ../../include/Freeze/DB.h ../../include/Freeze/DBException.h ../../include/Freeze/DBF.h ../../include/Freeze/EvictorF.h +TopicManagerI.o: TopicManagerI.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/StreamF.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Proxy.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionF.h ../../include/Ice/EndpointF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/Identity.h ../../include/Ice/Facet.h ../../include/Ice/Object.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/LoggerUtil.h ../../include/Ice/LoggerF.h ../../include/Ice/Stats.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocator.h ../../include/Ice/IdentityUtil.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/IncomingAsync.h ../../include/Ice/Application.h ../IceStorm/TopicManagerI.h ../../include/IceStorm/IceStorm.h ../IceStorm/StringStringDict.h ../../include/Freeze/Map.h ../../include/Freeze/DB.h ../../include/Freeze/DBException.h ../../include/Freeze/DBF.h ../../include/Freeze/EvictorF.h ../IceStorm/TopicI.h ../../include/IceUtil/RecMutex.h ../IceStorm/IceStormInternal.h ../IceStorm/IdentityLinkDict.h ../IceStorm/LinkDB.h ../IceStorm/SubscriberFactory.h ../IceStorm/Flusher.h ../IceStorm/TraceLevels.h +StringStringDict.o: StringStringDict.cpp ../../include/IceXML/StreamI.h ../../include/Ice/Stream.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/StreamF.h ../../include/Ice/Proxy.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionF.h ../../include/Ice/EndpointF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/Identity.h ../../include/Ice/Facet.h ../../include/Ice/Object.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/CommunicatorF.h ../../include/IceUtil/OutputUtil.h ../IceStorm/StringStringDict.h ../../include/Freeze/Map.h ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/PropertiesF.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/LoggerUtil.h ../../include/Ice/LoggerF.h ../../include/Ice/Stats.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocator.h ../../include/Ice/IdentityUtil.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/IncomingAsync.h ../../include/Ice/Application.h ../../include/Freeze/DB.h ../../include/Freeze/DBException.h ../../include/Freeze/DBF.h ../../include/Freeze/EvictorF.h IdentityLinkDict.o: IdentityLinkDict.cpp ../../include/IceXML/StreamI.h ../../include/Ice/Stream.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/StreamF.h ../../include/Ice/Proxy.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionF.h ../../include/Ice/EndpointF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/Identity.h ../../include/Ice/Facet.h ../../include/Ice/Object.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/CommunicatorF.h ../../include/IceUtil/OutputUtil.h ../IceStorm/IdentityLinkDict.h ../../include/Freeze/Map.h ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/PropertiesF.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/LoggerUtil.h ../../include/Ice/LoggerF.h ../../include/Ice/Stats.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocator.h ../../include/Ice/IdentityUtil.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/IncomingAsync.h ../../include/Ice/Application.h ../../include/Freeze/DB.h ../../include/Freeze/DBException.h ../../include/Freeze/DBF.h ../../include/Freeze/EvictorF.h ../IceStorm/LinkDB.h ../IceStorm/IceStormInternal.h ../../include/IceStorm/IceStorm.h LinkDB.o: LinkDB.cpp ../IceStorm/LinkDB.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/StreamF.h ../../include/Ice/Proxy.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionF.h ../../include/Ice/EndpointF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/Identity.h ../../include/Ice/Facet.h ../../include/Ice/Object.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../IceStorm/IceStormInternal.h ../../include/IceStorm/IceStorm.h ../../include/Ice/LocalException.h ../../include/Ice/ObjectFactory.h ../../include/Ice/Stream.h ../../include/Ice/BuiltinSequences.h IceStormInternal.o: IceStormInternal.cpp ../IceStorm/IceStormInternal.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/StreamF.h ../../include/Ice/Proxy.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionF.h ../../include/Ice/EndpointF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/Identity.h ../../include/Ice/Facet.h ../../include/Ice/Object.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/IceStorm/IceStorm.h ../../include/Ice/LocalException.h ../../include/Ice/ObjectFactory.h ../../include/Ice/Stream.h ../../include/Ice/BuiltinSequences.h -Service.o: Service.cpp ../IceStorm/TopicManagerI.h ../../include/IceStorm/IceStorm.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/StreamF.h ../../include/Ice/Proxy.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionF.h ../../include/Ice/EndpointF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/Identity.h ../../include/Ice/Facet.h ../../include/Ice/Object.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../IceStorm/StringBoolDict.h ../../include/Freeze/Map.h ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/PropertiesF.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/LoggerUtil.h ../../include/Ice/LoggerF.h ../../include/Ice/Stats.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocator.h ../../include/Ice/IdentityUtil.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/IncomingAsync.h ../../include/Ice/Application.h ../../include/Freeze/DB.h ../../include/Freeze/DBException.h ../../include/Freeze/DBF.h ../../include/Freeze/EvictorF.h ../IceStorm/TraceLevels.h ../../include/IceBox/IceBox.h +Service.o: Service.cpp ../IceStorm/TopicManagerI.h ../../include/IceStorm/IceStorm.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/StreamF.h ../../include/Ice/Proxy.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionF.h ../../include/Ice/EndpointF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/Identity.h ../../include/Ice/Facet.h ../../include/Ice/Object.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../IceStorm/StringStringDict.h ../../include/Freeze/Map.h ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/PropertiesF.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/LoggerUtil.h ../../include/Ice/LoggerF.h ../../include/Ice/Stats.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocator.h ../../include/Ice/IdentityUtil.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/IncomingAsync.h ../../include/Ice/Application.h ../../include/Freeze/DB.h ../../include/Freeze/DBException.h ../../include/Freeze/DBF.h ../../include/Freeze/EvictorF.h ../IceStorm/TraceLevels.h ../../include/IceBox/IceBox.h Admin.o: Admin.cpp ../../include/Ice/Application.h ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/StreamF.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Proxy.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionF.h ../../include/Ice/EndpointF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/Identity.h ../../include/Ice/Facet.h ../../include/Ice/Object.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/LoggerUtil.h ../../include/Ice/LoggerF.h ../../include/Ice/Stats.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocator.h ../../include/Ice/IdentityUtil.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/IncomingAsync.h ../IceStorm/Parser.h ../../include/IceStorm/IceStorm.h ../../include/Ice/Xerces.h Grammar.o: Grammar.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/StreamF.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Proxy.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionF.h ../../include/Ice/EndpointF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/Identity.h ../../include/Ice/Facet.h ../../include/Ice/Object.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/LoggerUtil.h ../../include/Ice/LoggerF.h ../../include/Ice/Stats.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocator.h ../../include/Ice/IdentityUtil.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/IncomingAsync.h ../../include/Ice/Application.h ../IceStorm/Parser.h ../../include/IceStorm/IceStorm.h Scanner.o: Scanner.cpp ../../include/IceUtil/Config.h ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Handle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/Ice/Config.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/Ice/ObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/StreamF.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/BuiltinSequences.h ../../include/Ice/Proxy.h ../../include/IceUtil/Mutex.h ../../include/IceUtil/Lock.h ../../include/IceUtil/ThreadException.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/ConnectionF.h ../../include/Ice/EndpointF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/OutgoingAsyncF.h ../../include/Ice/Current.h ../../include/Ice/Identity.h ../../include/Ice/Facet.h ../../include/Ice/Object.h ../../include/Ice/IncomingAsyncF.h ../../include/Ice/Outgoing.h ../../include/IceUtil/Monitor.h ../../include/IceUtil/Cond.h ../../include/IceUtil/Time.h ../../include/Ice/BasicStream.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantManagerF.h ../../include/Ice/Direct.h ../../include/Ice/LocalException.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/LoggerUtil.h ../../include/Ice/LoggerF.h ../../include/Ice/Stats.h ../../include/Ice/Communicator.h ../../include/Ice/StatsF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/RouterF.h ../../include/Ice/LocatorF.h ../../include/Ice/PluginF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocator.h ../../include/Ice/IdentityUtil.h ../../include/Ice/OutgoingAsync.h ../../include/Ice/IncomingAsync.h ../../include/Ice/Application.h ../IceStorm/Parser.h ../../include/IceStorm/IceStorm.h ../IceStorm/Grammar.h diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index edc1c1c3fd2..3edda8722e2 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -41,8 +41,8 @@ class PublisherProxyI : public Ice::Blobject { public: - PublisherProxyI(const IceStorm::TopicSubscribersPtr& s) : - _subscribers(s) + PublisherProxyI(const IceStorm::TopicSubscribersPtr& s, const string& type) : + _subscribers(s), _type(type) { } @@ -58,11 +58,16 @@ private: // Set of associated subscribers // IceStorm::TopicSubscribersPtr _subscribers; + + // + // The topic type + // + string _type; }; // // The servant has a 1-1 association with a topic. It is used to -// receive events from linked Topics.. +// receive events from linked Topics. // class TopicLinkI : public TopicLink { @@ -115,8 +120,8 @@ public: // If a subscriber with this identity is already subscribed // then mark the subscriber as replaced. // - // Note that this doesn't actually remove the Subscribe from - // the list of subscribers - it marks the Subscriber as + // Note that this doesn't actually remove the subscriber from + // the list of subscribers - it marks the subscriber as // replaced, and it's removed on the next event publish. // for(SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i) @@ -132,17 +137,16 @@ public: } } - // - // Add to the set of subscribers + // Add to the set of subscribers. // _subscribers.push_back(subscriber); } // - // Unsubscribe the Subscriber with the given identity. Note that - // this doesn't remove the Subscriber from the list of subscribers - // - it marks the Subscriber as unsubscribed, and it's removed on + // Unsubscribe the subscriber with the given identity. Note that + // this doesn't remove the subscriber from the list of subscribers + // - it marks the subscriber as unsubscribed, and it's removed on // the next event publish. // void @@ -189,7 +193,7 @@ public: publish(const Event& event) { // - // Copy of the subscriber list so that event publishing can + // Copy the subscriber list so that event publishing can // occur in parallel. // // TODO: Find out whether this is a false optimization - how @@ -286,27 +290,100 @@ PublisherProxyI::ice_invoke(const vector< Ice::Byte>& inParams, vector< Ice::Byt { const Ice::Context& context = current.ctx; - Event event; - event.forwarded = false; - Ice::Context::const_iterator p = context.find("cost"); - if(p != context.end()) + // + // Intercept the operations ice_isA, ice_id, and ice_ids so that + // this publisher object can appear to have the topic type. + // + if(current.operation == "ice_isA") { - event.cost = atoi(p->second.c_str()); + IceInternal::InstancePtr instance = IceInternal::getInstance(current.adapter->getCommunicator()); + IceInternal::BasicStream is(instance.get()); + is.b = inParams; + is.i = is.b.begin(); + string type; + is.read(type); + bool b; + if(type == _type) + { + b = true; + } + else + { + b = Ice::Blobject::ice_isA(type); + } + IceInternal::BasicStream os(instance.get()); + os.write(b); + outParam = os.b; + } + else if(current.operation == "ice_id") + { + IceInternal::InstancePtr instance = IceInternal::getInstance(current.adapter->getCommunicator()); + IceInternal::BasicStream os(instance.get()); + os.write(_type); + outParam = os.b; + } + else if(current.operation == "ice_ids") + { + IceInternal::InstancePtr instance = IceInternal::getInstance(current.adapter->getCommunicator()); + IceInternal::BasicStream os(instance.get()); + vector<string> ids = Ice::Blobject::ice_ids(); + ids.insert(ids.begin(), _type); + os.write(ids); + outParam = os.b; } else { - event.cost = 0; // TODO: Default comes from property? + Event event; + event.forwarded = false; + Ice::Context::const_iterator p = context.find("cost"); + if(p != context.end()) + { + event.cost = atoi(p->second.c_str()); + } + else + { + event.cost = 0; // TODO: Default comes from property? + } + event.op = current.operation; + event.mode = current.mode; + event.data = inParams; + event.context = context; + + _subscribers->publish(event); } - event.op = current.operation; - event.mode = current.mode; - event.data = inParams; - event.context = context; - - _subscribers->publish(event); return true; } +#if 0 +bool +PublisherProxyI::ice_isA(const string& id, const Ice::Current& current) +{ + if(id == _type) + { + return true; + } + else + { + return Ice::Blobject::ice_isA(id, current); + } +} + +vector<string> +PublisherProxyI::ice_ids(const Ice::Current& current) +{ + vector<string> ids = Ice::Blobject::ice_ids(current); + ids.insert(ids.begin(), _type); + return ids; +} + +const string& +PublisherProxyI::ice_id(const Ice::Current&) +{ + return _type; +} +#endif + // // Incoming events from linked topics. // @@ -326,10 +403,11 @@ TopicLinkI::forward(const string& op, Ice::OperationMode mode, const ByteSeq& da } TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& traceLevels, const string& name, - const SubscriberFactoryPtr& factory, const Freeze::DBPtr& db) : + const string& type, const SubscriberFactoryPtr& factory, const Freeze::DBPtr& db) : _adapter(adapter), _traceLevels(traceLevels), _name(name), + _type(type), _factory(factory), _destroyed(false), _links(db), @@ -338,11 +416,11 @@ TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& trace _subscribers = new TopicSubscribers(_traceLevels); // - // Create a servant per Topic to receive event data. The servants - // Identity is category=<topicname>, name=publish. Activate the + // Create a servant per topic to receive event data. The servant's + // identity is category=<topicname>, name=publish. Activate the // object and save a reference to give to publishers. // - _publisher = new PublisherProxyI(_subscribers); + _publisher = new PublisherProxyI(_subscribers, type); Ice::Identity id; id.category = _name; @@ -350,8 +428,8 @@ TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& trace _publisherPrx = _adapter->add(_publisher, id); // - // Create a servant per Topic to receive linked event data. The - // servants Identity is category=<topicname>, name=link. Activate + // Create a servant per topic to receive linked event data. The + // servant's identity is category=<topicname>, name=link. Activate // the object and save a reference to give to linked topics. // _link = new TopicLinkI(_subscribers); @@ -371,8 +449,8 @@ TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& trace } // - // Create the Subscriber object and add to the set of - // Subscribers. + // Create the subscriber object and add it to the set of + // subscribers. // SubscriberPtr subscriber = _factory->createLinkSubscriber(p->second.obj, p->second.info.cost); _subscribers->add(subscriber); @@ -390,6 +468,13 @@ TopicI::getName(const Ice::Current&) const return _name; } +string +TopicI::getType(const Ice::Current&) const +{ + // Immutable + return _type; +} + Ice::ObjectPrx TopicI::getPublisher(const Ice::Current&) const { @@ -409,8 +494,7 @@ TopicI::destroy(const Ice::Current&) _destroyed = true; // - // See the comment in the constructor for the derevation of the - // Identity. + // See the comment in the constructor for the format of the identity. // Ice::Identity id; id.category = _name; @@ -429,6 +513,71 @@ TopicI::destroy(const Ice::Current&) } void +TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& subscriber, const Ice::Current&) +{ + IceUtil::RecMutex::Lock sync(*this); + + if(_destroyed) + { + throw Ice::ObjectNotExistException(__FILE__, __LINE__); + } + + Ice::Identity ident = subscriber->ice_getIdentity(); + if(_traceLevels->topic > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat); + out << "Subscribe: " << Ice::identityToString(ident); + if(_traceLevels->topic > 1) + { + out << " QoS: "; + for(QoS::const_iterator qi = qos.begin(); qi != qos.end() ; ++qi) + { + if(qi != qos.begin()) + { + out << ','; + } + out << '[' << qi->first << "," << qi->second << ']'; + } + } + } + + reap(); + + // + // Add this subscriber to the set of subscribers. + // + SubscriberPtr sub = _factory->createSubscriber(qos, subscriber); + _subscribers->add(sub); +} + +void +TopicI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&) +{ + IceUtil::RecMutex::Lock sync(*this); + + if(_destroyed) + { + throw Ice::ObjectNotExistException(__FILE__, __LINE__); + } + + Ice::Identity ident = subscriber->ice_getIdentity(); + + if(_traceLevels->topic > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat); + + out << "Unsubscribe: " << Ice::identityToString(ident); + } + + reap(); + + // + // Unsubscribe the subscriber with this identity. + // + _subscribers->remove(subscriber); +} + +void TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) { IceUtil::RecMutex::Lock sync(*this); @@ -464,8 +613,7 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) dbInfo.info.cost = cost; // - // Create the Subscriber object and add to the setup of - // subscribers. + // Create the subscriber object and add it to the set of subscribers. // SubscriberPtr subscriber = _factory->createLinkSubscriber(dbInfo.obj, dbInfo.info.cost); _subscribers->add(subscriber); @@ -544,43 +692,6 @@ TopicI::destroyed() const } void -TopicI::subscribe(const Ice::ObjectPrx& obj, const QoS& qos) -{ - IceUtil::RecMutex::Lock sync(*this); - - if(_destroyed) - { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); - } - - reap(); - - // - // Add this subscriber to the set of subscribers. - // - SubscriberPtr subscriber = _factory->createSubscriber(qos, obj); - _subscribers->add(subscriber); -} - -void -TopicI::unsubscribe(const Ice::ObjectPrx& obj) -{ - IceUtil::RecMutex::Lock sync(*this); - - if(_destroyed) - { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); - } - - reap(); - - // - // Unsubscribe the subscriber with this identity. - // - _subscribers->remove(obj); -} - -void TopicI::reap() { IceUtil::RecMutex::Lock sync(*this); diff --git a/cpp/src/IceStorm/TopicI.h b/cpp/src/IceStorm/TopicI.h index 0e142e06f44..8f65e3f4cce 100644 --- a/cpp/src/IceStorm/TopicI.h +++ b/cpp/src/IceStorm/TopicI.h @@ -42,12 +42,15 @@ class TopicI : public TopicInternal, public IceUtil::RecMutex { public: - TopicI(const Ice::ObjectAdapterPtr&, const TraceLevelsPtr&, const std::string&, const SubscriberFactoryPtr&, - const Freeze::DBPtr&); + TopicI(const Ice::ObjectAdapterPtr&, const TraceLevelsPtr&, const std::string&, const std::string&, + const SubscriberFactoryPtr&, const Freeze::DBPtr&); ~TopicI(); virtual std::string getName(const Ice::Current&) const; + virtual std::string getType(const Ice::Current&) const; virtual Ice::ObjectPrx getPublisher(const Ice::Current&) const; + virtual void subscribe(const QoS&, const Ice::ObjectPrx&, const Ice::Current&); + virtual void unsubscribe(const Ice::ObjectPrx&, const Ice::Current&); virtual void destroy(const Ice::Current&); virtual void link(const TopicPrx&, Ice::Int, const Ice::Current&); virtual void unlink(const TopicPrx&, const Ice::Current&); @@ -57,8 +60,6 @@ public: // Internal methods bool destroyed() const; - void subscribe(const Ice::ObjectPrx&, const QoS&); - void unsubscribe(const Ice::ObjectPrx&); void reap(); @@ -70,6 +71,7 @@ private: Ice::ObjectAdapterPtr _adapter; TraceLevelsPtr _traceLevels; std::string _name; // The topic name + std::string _type; // The topic type SubscriberFactoryPtr _factory; Ice::ObjectPtr _publisher; // Publisher & associated proxy diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp index 53de6965b46..90658ba7470 100644 --- a/cpp/src/IceStorm/TopicManagerI.cpp +++ b/cpp/src/IceStorm/TopicManagerI.cpp @@ -19,15 +19,17 @@ #include <IceStorm/TraceLevels.h> #include <functional> +#include <ctype.h> using namespace IceStorm; using namespace std; -TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice::ObjectAdapterPtr& adapter, - const TraceLevelsPtr& traceLevels, const Freeze::DBEnvironmentPtr& dbEnv, - const Freeze::DBPtr& db) : +TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice::ObjectAdapterPtr& topicAdapter, + const Ice::ObjectAdapterPtr& publishAdapter, const TraceLevelsPtr& traceLevels, + const Freeze::DBEnvironmentPtr& dbEnv, const Freeze::DBPtr& db) : _communicator(communicator), - _adapter(adapter), + _topicAdapter(topicAdapter), + _publishAdapter(publishAdapter), _traceLevels(traceLevels), _dbEnv(dbEnv), _topics(db) @@ -39,15 +41,15 @@ TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice // Recreate each of the topics in the dictionary. If the topic // database doesn't exist then the topic was previously destroyed, // but not removed from the _topics dictionary. Normally this - // should only occur upon a crash. + // should only occur in the event of a crash. // - StringBoolDict::iterator p = _topics.begin(); + StringStringDict::iterator p = _topics.begin(); while(p != _topics.end()) { assert(_topicIMap.find(p->first) == _topicIMap.end()); try { - installTopic("recreate", p->first, false); + installTopic("recreate", p->first, p->second, false); ++p; } catch(const Freeze::DBNotFoundException& ex) @@ -57,7 +59,7 @@ TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat); out << ex; } - StringBoolDict::iterator tmp = p; + StringStringDict::iterator tmp = p; ++p; _topics.erase(tmp); } @@ -69,8 +71,10 @@ TopicManagerI::~TopicManagerI() } TopicPrx -TopicManagerI::create(const string& name, const Ice::Current&) +TopicManagerI::create(const string& name, const string& type, const Ice::Current&) { + validateType(type); + // TODO: reader/writer mutex IceUtil::Mutex::Lock sync(*this); @@ -83,15 +87,15 @@ TopicManagerI::create(const string& name, const Ice::Current&) throw ex; } - installTopic("create", name, true); - _topics.insert(make_pair(name, true)); + installTopic("create", name, type, true); + _topics.insert(make_pair(name, type)); // // The identity is the name of the Topic. // Ice::Identity id; id.name = name; - return TopicPrx::uncheckedCast(_adapter->createProxy(id)); + return TopicPrx::uncheckedCast(_topicAdapter->createProxy(id)); } TopicPrx @@ -106,7 +110,7 @@ TopicManagerI::retrieve(const string& name, const Ice::Current&) const { Ice::Identity id; id.name = name; - return TopicPrx::uncheckedCast(_adapter->createProxy(id)); + return TopicPrx::uncheckedCast(_topicAdapter->createProxy(id)); } NoSuchTopic ex; @@ -119,21 +123,20 @@ TopicManagerI::retrieve(const string& name, const Ice::Current&) const // struct TransformToTopicDict : public std::unary_function<TopicIMap::value_type, TopicDict::value_type> { + TransformToTopicDict(const Ice::ObjectAdapterPtr& adapter) : + _adapter(adapter) + { + } - TransformToTopicDict(const Ice::ObjectAdapterPtr& adapter) : - _adapter(adapter) - { - } - - TopicDict::value_type - operator()(TopicIMap::value_type p) - { - Ice::Identity id; - id.name = p.first; - return TopicDict::value_type(p.first, TopicPrx::uncheckedCast(_adapter->createProxy(id))); - } + TopicDict::value_type + operator()(TopicIMap::value_type p) + { + Ice::Identity id; + id.name = p.first; + return TopicDict::value_type(p.first, TopicPrx::uncheckedCast(_adapter->createProxy(id))); + } - Ice::ObjectAdapterPtr _adapter; + Ice::ObjectAdapterPtr _adapter; }; TopicDict @@ -146,83 +149,12 @@ TopicManagerI::retrieveAll(const Ice::Current&) const TopicDict all; transform(_topicIMap.begin(), _topicIMap.end(), inserter(all, all.begin()), - TransformToTopicDict(_adapter)); + TransformToTopicDict(_topicAdapter)); return all; } void -TopicManagerI::subscribe(const QoS& qos, const Ice::ObjectPrx& subscriber, const Ice::Current&) -{ - IceUtil::Mutex::Lock sync(*this); - - Ice::Identity ident = subscriber->ice_getIdentity(); - if(_traceLevels->topicMgr > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat); - out << "Subscribe: " << Ice::identityToString(ident); - if(_traceLevels->topicMgr > 1) - { - out << " QoS: "; - for(QoS::const_iterator qi = qos.begin(); qi != qos.end() ; ++qi) - { - if(qi != qos.begin()) - { - out << ','; - } - out << '[' << qi->first << "," << qi->second << ']'; - } - } - } - - // - // Ensure that the identities category refers to an existing - // channel. - // - TopicIMap::iterator elem = _topicIMap.find(ident.category); - if(elem == _topicIMap.end()) - { - NoSuchTopic ex; - ex.name = ident.category; - throw ex; - } - - // - // Subscribe to the topic. - // - elem->second->subscribe(subscriber, qos); -} - -void -TopicManagerI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&) -{ - IceUtil::Mutex::Lock sync(*this); - - Ice::Identity ident = subscriber->ice_getIdentity(); - - if(_traceLevels->topicMgr > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat); - - out << "Unsubscribe: " << Ice::identityToString(ident); - } - - - TopicIMap::iterator elem = _topicIMap.find(ident.category); - if(elem != _topicIMap.end()) - { - elem->second->unsubscribe(subscriber); - } -} - -void -TopicManagerI::shutdown(const Ice::Current&) -{ - _flusher->stopFlushing(); - _communicator->shutdown(); -} - -void TopicManagerI::reap() { // @@ -251,7 +183,7 @@ TopicManagerI::reap() } void -TopicManagerI::installTopic(const std::string& message, const std::string& name, bool create) +TopicManagerI::installTopic(const string& message, const string& name, const string& type, bool create) { if(_traceLevels->topicMgr > 0) { @@ -259,21 +191,109 @@ TopicManagerI::installTopic(const std::string& message, const std::string& name, out << message << ' ' << name; } + // + // Prepend "topic-" to the topic name in order to form a + // unique name for the Freeze database. Since the name we + // supply to openDB is also used as a filename, we call + // getDatabaseName to obtain a name with any questionable + // filename characters converted to hex. + // // TODO: instance - // TODO: reserved names? // TODO: failure? cleanup database? - Freeze::DBPtr db = _dbEnv->openDB(name, create); + // + string dbName = "topic-" + getDatabaseName(name); + Freeze::DBPtr db = _dbEnv->openDB(dbName, create); // // Create topic implementation // - TopicIPtr topicI = new TopicI(_adapter, _traceLevels, name, _factory, db); + TopicIPtr topicI = new TopicI(_publishAdapter, _traceLevels, name, type, _factory, db); // // The identity is the name of the Topic. // Ice::Identity id; id.name = name; - _adapter->add(topicI, id); + _topicAdapter->add(topicI, id); _topicIMap.insert(TopicIMap::value_type(name, topicI)); } + +void +TopicManagerI::validateType(const string& type) +{ + bool fail = false; + const string::size_type len = type.size(); + string::size_type pos = type.find("::"); + if(pos != 0) + { + fail = true; + } + + bool checkAlpha = false; + while(!fail && pos < len) + { + if(checkAlpha) + { + if(!isalpha(type[pos])) + { + fail = true; + } + else + { + checkAlpha = false; + } + } + else if(type[pos] == ':') + { + pos++; + if(pos == len || type[pos] != ':') + { + fail = true; + } + else + { + checkAlpha = true; + } + } + else if(!isalnum(type[pos])) + { + fail = true; + } + pos++; + } + + if(checkAlpha) // type ended with "::" + { + fail = true; + } + + if(fail) + { + InvalidType ex; + ex.type = type; + throw ex; + } +} + +string +TopicManagerI::getDatabaseName(const string& name) +{ + string result; + result.reserve(name.size()); + + for(string::size_type i = 0; i < name.size(); i++) + { + if(isalnum(name[i]) || name[i] == '.' || name[i] == '-' || name[i] == '_') + { + result.push_back(name[i]); + } + else + { + ostringstream ostr; + ostr << '%' << hex << static_cast<int>(name[i]); + result.append(ostr.str()); + } + } + + return result; +} diff --git a/cpp/src/IceStorm/TopicManagerI.h b/cpp/src/IceStorm/TopicManagerI.h index 697e5d41507..61300c96fbf 100644 --- a/cpp/src/IceStorm/TopicManagerI.h +++ b/cpp/src/IceStorm/TopicManagerI.h @@ -16,7 +16,7 @@ #define TOPIC_MANAGER_I_H #include <IceStorm/IceStorm.h> -#include <IceStorm/StringBoolDict.h> +#include <IceStorm/StringStringDict.h> namespace IceStorm { @@ -48,31 +48,31 @@ class TopicManagerI : public TopicManager, public IceUtil::Mutex { public: - TopicManagerI(const Ice::CommunicatorPtr&, const Ice::ObjectAdapterPtr&, const TraceLevelsPtr&, - const Freeze::DBEnvironmentPtr&, const Freeze::DBPtr&); + TopicManagerI(const Ice::CommunicatorPtr&, const Ice::ObjectAdapterPtr&, const Ice::ObjectAdapterPtr&, + const TraceLevelsPtr&, const Freeze::DBEnvironmentPtr&, const Freeze::DBPtr&); ~TopicManagerI(); - virtual TopicPrx create(const std::string&, const Ice::Current&); + virtual TopicPrx create(const std::string&, const std::string&, const Ice::Current&); virtual TopicPrx retrieve(const std::string&, const Ice::Current&) const; virtual TopicDict retrieveAll(const Ice::Current&) const; - virtual void subscribe(const QoS&, const Ice::ObjectPrx&, const Ice::Current&); - virtual void unsubscribe(const Ice::ObjectPrx&, const Ice::Current&); - virtual void shutdown(const Ice::Current&); void reap(); private: - void installTopic(const std::string&, const std::string&, bool); + void installTopic(const std::string&, const std::string&, const std::string&, bool); + static void validateType(const std::string&); + static std::string getDatabaseName(const std::string&); Ice::CommunicatorPtr _communicator; - Ice::ObjectAdapterPtr _adapter; + Ice::ObjectAdapterPtr _topicAdapter; + Ice::ObjectAdapterPtr _publishAdapter; TraceLevelsPtr _traceLevels; TopicIMap _topicIMap; FlusherPtr _flusher; SubscriberFactoryPtr _factory; Freeze::DBEnvironmentPtr _dbEnv; - StringBoolDict _topics; + StringStringDict _topics; }; typedef IceUtil::Handle<TopicManagerI> TopicManagerIPtr; |