diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceStorm/.depend | 8 | ||||
-rw-r--r-- | cpp/src/IceStorm/Admin.cpp | 120 | ||||
-rw-r--r-- | cpp/src/IceStorm/Flusher.cpp | 211 | ||||
-rw-r--r-- | cpp/src/IceStorm/Flusher.h | 44 | ||||
-rw-r--r-- | cpp/src/IceStorm/FlusherF.h | 27 | ||||
-rw-r--r-- | cpp/src/IceStorm/IceStormI.h | 11 | ||||
-rw-r--r-- | cpp/src/IceStorm/Makefile | 67 | ||||
-rw-r--r-- | cpp/src/IceStorm/Server.cpp | 93 | ||||
-rw-r--r-- | cpp/src/IceStorm/Subscriber.cpp | 167 | ||||
-rw-r--r-- | cpp/src/IceStorm/Subscriber.h | 72 | ||||
-rw-r--r-- | cpp/src/IceStorm/SubscriberF.h | 28 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 233 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicI.h | 66 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicIF.h | 24 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicManagerI.cpp | 190 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicManagerI.h | 51 | ||||
-rw-r--r-- | cpp/src/IceStorm/TraceLevels.cpp | 60 | ||||
-rw-r--r-- | cpp/src/IceStorm/TraceLevels.h | 43 | ||||
-rw-r--r-- | cpp/src/IceStorm/TraceLevelsF.h | 26 | ||||
-rw-r--r-- | cpp/src/IceStorm/config | 16 |
20 files changed, 1557 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/.depend b/cpp/src/IceStorm/.depend new file mode 100644 index 00000000000..cc42f506c35 --- /dev/null +++ b/cpp/src/IceStorm/.depend @@ -0,0 +1,8 @@ +IceStorm.o: IceStorm.cpp ../../include/IceStorm/IceStorm.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/EmitterF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/Object.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/Direct.h ../../include/Ice/ServantLocatorF.h +TraceLevels.o: TraceLevels.cpp ../../include/Ice/Properties.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../IceStorm/TraceLevels.h ../../include/Ice/PropertiesF.h ../IceStorm/TraceLevelsF.h +Flusher.o: Flusher.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/Stream.h ../../include/Ice/Communicator.h ../../include/Ice/LoggerF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/StreamF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantLocator.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/EmitterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/Functional.h ../../include/IceUtil/Functional.h ../IceStorm/Flusher.h ../IceStorm/FlusherF.h ../IceStorm/SubscriberF.h ../IceStorm/TraceLevelsF.h ../IceStorm/Subscriber.h ../../include/IceStorm/IceStorm.h ../../include/Ice/Object.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/Direct.h ../IceStorm/TraceLevels.h +Subscriber.o: Subscriber.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/Stream.h ../../include/Ice/Communicator.h ../../include/Ice/LoggerF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/StreamF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantLocator.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/EmitterF.h ../../include/Ice/ReferenceF.h ../IceStorm/Subscriber.h ../../include/IceStorm/IceStorm.h ../../include/Ice/Object.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/Direct.h ../IceStorm/SubscriberF.h ../IceStorm/TraceLevelsF.h ../IceStorm/FlusherF.h ../IceStorm/TraceLevels.h ../IceStorm/Flusher.h +TopicI.o: TopicI.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/Stream.h ../../include/Ice/Communicator.h ../../include/Ice/LoggerF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/StreamF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantLocator.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/EmitterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/Functional.h ../../include/IceUtil/Functional.h ../IceStorm/TopicI.h ../../include/IceStorm/IceStorm.h ../../include/Ice/Object.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/Direct.h ../IceStorm/TopicIF.h ../IceStorm/FlusherF.h ../IceStorm/TraceLevelsF.h ../IceStorm/Flusher.h ../IceStorm/SubscriberF.h ../IceStorm/Subscriber.h ../IceStorm/TraceLevels.h +TopicManagerI.o: TopicManagerI.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/Stream.h ../../include/Ice/Communicator.h ../../include/Ice/LoggerF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/StreamF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantLocator.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/EmitterF.h ../../include/Ice/ReferenceF.h ../IceStorm/TopicManagerI.h ../../include/IceStorm/IceStorm.h ../../include/Ice/Object.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/Direct.h ../IceStorm/TopicIF.h ../IceStorm/FlusherF.h ../IceStorm/TraceLevelsF.h ../IceStorm/TopicI.h ../IceStorm/Flusher.h ../IceStorm/SubscriberF.h ../IceStorm/TraceLevels.h +Server.o: Server.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/Stream.h ../../include/Ice/Communicator.h ../../include/Ice/LoggerF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/StreamF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantLocator.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/EmitterF.h ../../include/Ice/ReferenceF.h ../IceStorm/TopicManagerI.h ../../include/IceStorm/IceStorm.h ../../include/Ice/Object.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/Direct.h ../IceStorm/TopicIF.h ../IceStorm/FlusherF.h ../IceStorm/TraceLevelsF.h ../IceStorm/TraceLevels.h +Admin.o: Admin.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/Stream.h ../../include/Ice/Communicator.h ../../include/Ice/LoggerF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/StreamF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantLocator.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/EmitterF.h ../../include/Ice/ReferenceF.h ../../include/IceStorm/IceStorm.h ../../include/Ice/Object.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/Direct.h diff --git a/cpp/src/IceStorm/Admin.cpp b/cpp/src/IceStorm/Admin.cpp new file mode 100644 index 00000000000..4f4803db9e0 --- /dev/null +++ b/cpp/src/IceStorm/Admin.cpp @@ -0,0 +1,120 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#include <Ice/Ice.h> +#include <IceStorm/IceStorm.h> + +using namespace std; +using namespace Ice; + +void +usage(const char* n) +{ + cerr << "Usage: " << n << " [options] [file...]\n"; + cerr << + "Options:\n" + "-h, --help Show this message.\n" + "-v, --version Display the Ice version.\n" + "create topic Create the given topic.\n" + ; +} + +int +run(int argc, char* argv[], const CommunicatorPtr& communicator) +{ + PropertiesPtr properties = communicator->getProperties(); + const char* managerProperty = "IceStorm.TopicManager"; + string managerRef = properties->getProperty(managerProperty); + if (managerRef.empty()) + { + cerr << argv[0] << ": " << managerProperty << " is not set" << endl; + return EXIT_FAILURE; + } + + ObjectPrx base = communicator->stringToProxy(managerRef); + IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(base); + if (!manager) + { + cerr << argv[0] << ": `" << managerProperty << "' is not running" << endl; + return EXIT_FAILURE; + } + + int idx = 1; + while (idx < argc) + { + if (strcmp(argv[idx], "-h") == 0 || strcmp(argv[idx], "--help") == 0) + { + usage(argv[0]); + return EXIT_SUCCESS; + } + else if (strcmp(argv[idx], "-v") == 0 || strcmp(argv[idx], "--version") == 0) + { + cout << ICE_STRING_VERSION << endl; + return EXIT_SUCCESS; + } + else if (argv[idx][0] == '-') + { + cerr << argv[0] << ": unknown option `" << argv[idx] << "'" << endl; + usage(argv[0]); + return EXIT_FAILURE; + } + else if (strcmp(argv[idx], "create") == 0) + { + ++idx; + if (idx > argc) + { + usage(argv[0]); + return EXIT_FAILURE; + } + + IceStorm::TopicPrx topic = manager->create(argv[idx]); + cout << "created: " << argv[idx] << endl; + } + else + { + ++idx; + } + } + return EXIT_SUCCESS; +} + +int +main(int argc, char* argv[]) +{ + int status; + CommunicatorPtr communicator; + + try + { + communicator = initialize(argc, argv); + status = run(argc, argv, communicator); + } + catch(const Exception& ex) + { + cerr << ex << endl; + status = EXIT_FAILURE; + } + + if (communicator) + { + try + { + communicator->destroy(); + } + catch(const Exception& ex) + { + cerr << ex << endl; + status = EXIT_FAILURE; + } + } + + return status; +} + diff --git a/cpp/src/IceStorm/Flusher.cpp b/cpp/src/IceStorm/Flusher.cpp new file mode 100644 index 00000000000..3fb95ec0715 --- /dev/null +++ b/cpp/src/IceStorm/Flusher.cpp @@ -0,0 +1,211 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#include <Ice/Ice.h> +#include <Ice/Functional.h> + +#include <IceStorm/Flusher.h> +#include <IceStorm/Subscriber.h> +#include <IceStorm/TraceLevels.h> + +#include <algorithm> + +using namespace IceStorm; +using namespace std; + +void IceStorm::incRef(Flusher* p) { p->__incRef(); } +void IceStorm::decRef(Flusher* p) { p->__decRef(); } + +namespace IceStorm +{ + +class FlusherThread : public JTCThread, public JTCMonitor +{ +public: + + FlusherThread(const Ice::CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels) : + _traceLevels(traceLevels), + _logger(communicator->getLogger()) + { + Ice::PropertiesPtr properties = communicator->getProperties(); + string value; + value = properties->getProperty("IceStorm.Flush.Timeout"); + if (!value.empty()) + { + _flushTime = atoi(value.c_str()); + if (_flushTime < 100) + { + _flushTime = 100; // Minimum of 100 ms + } + } + else + { + _flushTime = 1000; // Default of 1 second + } + } + + ~FlusherThread() + { + } + + virtual void + run() + { + JTCSyncT<JTCMonitor> sync(*this); + while (!_destroy) + { + long tout = calcTimeout(); + try + { + if (tout == 0) + { + wait(); + } + else + { + wait(tout); + } + } + catch(const JTCInterruptedException&) + { + } + if (_destroy) + { + continue; + } + flushAll(); + } + } + + void + destroy() + { + JTCSyncT<JTCMonitor> sync(*this); + _destroy = true; + notify(); + } + + // + // It would be possible to write add/remove in such a way as to + // avoid blocking while flushing by having a queue of actions + // which are only performed before flushing. For now, however, + // this issue is ignored. + // + void + add(const SubscriberPtr& subscriber) + { + JTCSyncT<JTCMonitor> sync(*this); + bool isEmpty = _subscribers.empty(); + _subscribers.push_back(subscriber); + + // + // If the set of subscribers was previously empty then wake up + // the flushing thread since it will be waiting indefinitely + // + if (isEmpty) + { + notify(); + } + } + + void + remove(const SubscriberPtr& subscriber) + { + JTCSyncT<JTCMonitor> sync(*this); + _subscribers.remove(subscriber); + } + + +private: + + void + flushAll() + { + // This is always called with the monitor locked + //JTCSyncT<JTCMonitor> sync(*this); + + // + // Using standard algorithms I don't think there is a way to + // do this in one pass. For instance, I thought about using + // remove_if - but the predicate needs to be a pure function + // (see Meyers for details). If this is fixed then fix TopicI + // also. + // + _subscribers.remove_if(::Ice::constMemFun(&Subscriber::invalid)); + for_each(_subscribers.begin(), _subscribers.end(), Ice::voidMemFun(&Subscriber::flush)); + + // + // Trace after the flush so that the correct number of objects + // are displayed + // + if (_traceLevels->flush > 0) + { + ostringstream s; + s << _subscribers.size() << " object(s)"; + + _logger->trace(_traceLevels->flushCat, s.str()); + } + } + + long + calcTimeout() + { + return (_subscribers.empty()) ? 0 : _flushTime; + } + + TraceLevelsPtr _traceLevels; + Ice::LoggerPtr _logger; + + SubscriberList _subscribers; + bool _destroy; + long _flushTime; +}; + +} // End namespace IceStorm + +Flusher::Flusher(const Ice::CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels) +{ + _thread = new FlusherThread(communicator, traceLevels); + _thread->start(); +} + +Flusher::~Flusher() +{ + _thread->destroy(); + while(_thread->isAlive()) + { + try + { + _thread->join(); + } + catch(const JTCInterruptedException&) + { + } + } +} + +void +Flusher::add(const SubscriberPtr& subscriber) +{ + _thread->add(subscriber); +} + +void +Flusher::remove(const SubscriberPtr& subscriber) +{ + _thread->remove(subscriber); +} + +void +Flusher::stopFlushing() +{ + _thread->destroy(); +} + diff --git a/cpp/src/IceStorm/Flusher.h b/cpp/src/IceStorm/Flusher.h new file mode 100644 index 00000000000..377353f6041 --- /dev/null +++ b/cpp/src/IceStorm/Flusher.h @@ -0,0 +1,44 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#ifndef FLUSHER_H +#define FLUSHER_H + +#include <Ice/LoggerF.h> + +#include <IceStorm/FlusherF.h> +#include <IceStorm/SubscriberF.h> +#include <IceStorm/TraceLevelsF.h> + +namespace IceStorm +{ + +class FlusherThread; +typedef JTCHandleT<FlusherThread> FlusherThreadHandle; + +class Flusher : public IceUtil::Shared +{ +public: + + Flusher(const Ice::CommunicatorPtr&, const TraceLevelsPtr&); + ~Flusher(); + + void add(const SubscriberPtr&); + void remove(const SubscriberPtr&); + void stopFlushing(); + +private: + + FlusherThreadHandle _thread; +}; + +} // End namespace IceStorm + +#endif diff --git a/cpp/src/IceStorm/FlusherF.h b/cpp/src/IceStorm/FlusherF.h new file mode 100644 index 00000000000..05968483fab --- /dev/null +++ b/cpp/src/IceStorm/FlusherF.h @@ -0,0 +1,27 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#ifndef FLUSHER_F_H +#define FLUSHER_F_H + +#include <IceUtil/Handle.h> + +namespace IceStorm +{ + +class Flusher; +typedef IceUtil::Handle<Flusher> FlusherPtr; + +void incRef(Flusher*); +void decRef(Flusher*); + +} // End namespace IceStorm + +#endif diff --git a/cpp/src/IceStorm/IceStormI.h b/cpp/src/IceStorm/IceStormI.h new file mode 100644 index 00000000000..b63a61a043b --- /dev/null +++ b/cpp/src/IceStorm/IceStormI.h @@ -0,0 +1,11 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#ifndef TOPIC_MANAGER_I_H diff --git a/cpp/src/IceStorm/Makefile b/cpp/src/IceStorm/Makefile new file mode 100644 index 00000000000..38f571805b0 --- /dev/null +++ b/cpp/src/IceStorm/Makefile @@ -0,0 +1,67 @@ +#********************************************************************** +# +# Copyright (c) 2001 +# MutableRealms, Inc. +# Huntsville, AL, USA +# +# All Rights Reserved +# +# ********************************************************************** + +top_srcdir = ../.. + +BASE = libIceStorm.so +VERSIONED_BASE = $(BASE).$(VERSION) + +NAME = $(top_srcdir)/lib/$(BASE) +VERSIONED_NAME = $(top_srcdir)/lib/$(VERSIONED_BASE) + +SERVER = $(top_srcdir)/bin/icestorm +ADMIN = $(top_srcdir)/bin/icestormadmin +TARGETS = $(NAME) $(VERSIONED_NAME) $(SERVER) $(ADMIN) + +OBJS = IceStorm.o + +SOBJS = TraceLevels.o Flusher.o Subscriber.o TopicI.o TopicManagerI.o Server.o + +AOBJS = Admin.o + +SRCS = $(OBJS:.o=.cpp) \ + $(SOBJS:.o=.cpp) \ + $(AOBJS:.o=.cpp) + +HDIR = $(includedir)/IceStorm +SDIR = $(slicedir)/IceStorm +SLICECMD = $(SLICE2CPP) --include-dir IceStorm --dll-export ICE_STORM_API -I$(slicedir) + +include $(top_srcdir)/config/Make.rules + +CPPFLAGS := -I.. $(CPPFLAGS) + +$(VERSIONED_NAME): $(OBJS) + rm -f $@ + $(CXX) $(CXXFLAGS) $(LDFLAGS) -shared \ + -o $@ $(OBJS) + +$(NAME): $(VERSIONED_NAME) + rm -f $@ + ln -s $(VERSIONED_BASE) $@ + +$(SERVER): $(SOBJS) + rm -f $@ + $(CXX) $(CXXFLAGS) $(LDFLAGS) -o $@ $(SOBJS) -lIceStorm $(LIBS) + +$(ADMIN): $(AOBJS) + rm -f $@ + $(CXX) $(CXXFLAGS) $(LDFLAGS) -o $@ $(AOBJS) -lIceStorm $(LIBS) + +$(HDIR)/IceStorm.h IceStorm.cpp: $(SDIR)/IceStorm.ice $(SLICE2CPP) + rm -f $(HDIR)/IceStorm.h IceStorm.cpp + $(SLICECMD) $(SDIR)/IceStorm.ice + mv IceStorm.h $(HDIR) + + +clean:: + rm -f $(HDIR)/Clock.h Clock.cpp + +include .depend diff --git a/cpp/src/IceStorm/Server.cpp b/cpp/src/IceStorm/Server.cpp new file mode 100644 index 00000000000..e43b48ced91 --- /dev/null +++ b/cpp/src/IceStorm/Server.cpp @@ -0,0 +1,93 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#include <Ice/Ice.h> +#include <IceStorm/TopicManagerI.h> +#include <IceStorm/TraceLevels.h> + +using namespace std; + +void +usage(const char* n) +{ + cerr << "Usage: " << n << " [options]\n"; + cerr << + "Options:\n" + "-h, --help Show this message.\n" + "-v, --version Display the Ice version.\n" + ; +} + +int +run(int argc, char* argv[], const Ice::CommunicatorPtr& communicator) +{ + for (int i = 1; i < argc; ++i) + { + if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) + { + usage(argv[0]); + return EXIT_SUCCESS; + } + else if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--version") == 0) + { + cout << ICE_STRING_VERSION << endl; + return EXIT_SUCCESS; + } + else + { + cerr << argv[0] << ": unknown option `" << argv[i] << "'" << endl; + usage(argv[0]); + return EXIT_FAILURE; + } + } + + //PropertiesPtr properties = communicator->getProperties(); + + IceStorm::TraceLevelsPtr traceLevels = new IceStorm::TraceLevels(communicator->getProperties()); + Ice::ObjectAdapterPtr adapter = communicator->createObjectAdapter("TopicManager"); + Ice::ObjectPtr object = new IceStorm::TopicManagerI(communicator, adapter, traceLevels); + adapter->add(object, "TopicManager"); + adapter->activate(); + communicator->waitForShutdown(); + return EXIT_SUCCESS; +} + +int +main(int argc, char* argv[]) +{ + int status; + Ice::CommunicatorPtr communicator; + + try + { + communicator = Ice::initialize(argc, argv); + status = run(argc, argv, communicator); + } + catch(const Ice::Exception& ex) + { + cerr << ex << endl; + status = EXIT_FAILURE; + } + + if (communicator) + { + try + { + communicator->destroy(); + } + catch(const Ice::Exception& ex) + { + cerr << ex << endl; + status = EXIT_FAILURE; + } + } + + return status; +} diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp new file mode 100644 index 00000000000..590e3cf78e1 --- /dev/null +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -0,0 +1,167 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#include <Ice/Ice.h> + +#include <IceStorm/Subscriber.h> +#include <IceStorm/TraceLevels.h> +#include <IceStorm/Flusher.h> + +using namespace IceStorm; +using namespace std; + +void IceStorm::incRef(Subscriber* p) { p->__incRef(); } +void IceStorm::decRef(Subscriber* p) { p->__decRef(); } + +Subscriber::Subscriber(const Ice::LoggerPtr& logger, const TraceLevelsPtr& traceLevels, const FlusherPtr& flusher, + const QoS& qos, const Ice::ObjectPrx& obj) + : _logger(logger), + _traceLevels(traceLevels), + _invalid(false) +{ + + // + // Determine the requested reliability characteristics + // + QoS::const_iterator i = qos.find("reliability"); + string reliability; + if (i == qos.end()) + { + reliability = "oneway"; + } + else + { + reliability = i->second; + } + + if (reliability == "batch") + { + _obj = obj->ice_batchOneway(); + _flusher = flusher; + _flusher->add(this); + } + else // reliability == "oneway" + { + if (reliability != "oneway") + { + if (_traceLevels->subscriber > 0) + { + ostringstream s; + s << reliability <<" mode not understood."; + _logger->trace(_traceLevels->subscriberCat, s.str()); + } + } + _obj = obj->ice_oneway(); + } +} + +Subscriber::~Subscriber() +{ +} + +bool +Subscriber::invalid() const +{ + JTCSyncT<JTCMutex> sync(_invalidMutex); + return _invalid; +} + +void +Subscriber::unsubscribe() +{ + JTCSyncT<JTCMutex> sync(_invalidMutex); + _invalid = true; + + if (_traceLevels->subscriber > 0) + { + ostringstream s; + s << "Unsubscribe " << _obj->ice_getIdentity(); + _logger->trace(_traceLevels->subscriberCat, s.str()); + } + + // + // If this subscriber has been registered with the flusher then + // remove it. + // + if (_flusher) + { + _flusher->remove(this); + } +} + +void +Subscriber::flush() +{ + try + { + _obj->ice_flush(); + } + catch(const Ice::LocalException& e) + { + JTCSyncT<JTCMutex> sync(_invalidMutex); + // + // It's possible that the subscriber was unsubscribed, or + // marked invalid by another thread. Don't display a + // diagnostic in this case. + // + if (!_invalid) + { + if (_traceLevels->subscriber > 0) + { + ostringstream s; + s << _obj->ice_getIdentity() << ": flush failed: " << e; + _logger->trace(_traceLevels->subscriberCat, s.str()); + } + _invalid = true; + } + } +} + +void +Subscriber::publish(const string& op, const std::vector< ::Ice::Byte>& blob) +{ + try + { + bool nonmutating = true; + _obj->ice_invokeIn(op, nonmutating, blob); + } + catch(const Ice::LocalException& e) + { + JTCSyncT<JTCMutex> sync(_invalidMutex); + // + // It's possible that the subscriber was unsubscribed, or + // marked invalid by another thread. Don't display a + // diagnostic in this case. + // + if (!_invalid) + { + if (_traceLevels->subscriber > 0) + { + ostringstream s; + s << _obj->ice_getIdentity() << ": publish failed: " << e; + _logger->trace(_traceLevels->subscriberCat, s.str()); + } + _invalid = true; + } + } + +} + +std::string +Subscriber::id() const +{ + return _obj->ice_getIdentity(); +} + +bool +Subscriber::operator==(const Subscriber& rhs) const +{ + return _obj->ice_getIdentity() == rhs._obj->ice_getIdentity(); +} diff --git a/cpp/src/IceStorm/Subscriber.h b/cpp/src/IceStorm/Subscriber.h new file mode 100644 index 00000000000..65a5acefcff --- /dev/null +++ b/cpp/src/IceStorm/Subscriber.h @@ -0,0 +1,72 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#ifndef SUBSCRIBER_H +#define SUBSCRIBER_H + +#include <Ice/LoggerF.h> + +#include <IceStorm/IceStorm.h> // For QoS (nasty!) +#include <IceStorm/SubscriberF.h> +#include <IceStorm/TraceLevelsF.h> +#include <IceStorm/FlusherF.h> + +#include <list> + +namespace IceStorm +{ + +class Subscriber : public IceUtil::Shared +{ +public: + + Subscriber(const Ice::LoggerPtr&, const TraceLevelsPtr&, const FlusherPtr&, const QoS&, const Ice::ObjectPrx&); + ~Subscriber(); + + bool invalid() const; + + void unsubscribe(); + + void flush(); + void publish(const std::string&, const std::vector< ::Ice::Byte>&); + + std::string id() const; + + // TODO: should there be a global operator==? + bool operator==(const Subscriber&) const; + +private: + + // Immutable + Ice::LoggerPtr _logger; + TraceLevelsPtr _traceLevels; + + JTCMutex _invalidMutex; + bool _invalid; + + // + // This id is the full id of the subscriber for a particular topic + // (that is <prefix>#<topicname> + // + // Immutable + std::string _id; + + // Immutable + Ice::ObjectPrx _obj; + + FlusherPtr _flusher; +}; + + +typedef std::list<SubscriberPtr> SubscriberList; + +} // End namespace IceStorm + +#endif diff --git a/cpp/src/IceStorm/SubscriberF.h b/cpp/src/IceStorm/SubscriberF.h new file mode 100644 index 00000000000..e74b565cb44 --- /dev/null +++ b/cpp/src/IceStorm/SubscriberF.h @@ -0,0 +1,28 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#ifndef SUBSCRIBER_F_H +#define SUBSCRIBER_F_H + +#include <IceUtil/Handle.h> + +namespace IceStorm +{ + +class Subscriber; + +typedef IceUtil::Handle<Subscriber> SubscriberPtr; + +void incRef(Subscriber*); +void decRef(Subscriber*); + +} // End namespace IceStorm + +#endif diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp new file mode 100644 index 00000000000..0510e081311 --- /dev/null +++ b/cpp/src/IceStorm/TopicI.cpp @@ -0,0 +1,233 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#include <Ice/Ice.h> +#include <Ice/Functional.h> + +#include <IceStorm/TopicI.h> +#include <IceStorm/Flusher.h> +#include <IceStorm/Subscriber.h> +#include <IceStorm/TraceLevels.h> + +#include <algorithm> + +using namespace IceStorm; +using namespace std; + +namespace IceStorm +{ + +class BlobjectI : public Ice::Blobject +{ +public: + + BlobjectI(const IceStorm::TopicSubscribersPtr& s) : + _subscribers(s) + { + } + + ~BlobjectI() + { + } + + virtual void ice_invokeIn(const std::string&, const std::string&, const std::string&, + const std::vector< ::Ice::Byte>&); + +private: + + IceStorm::TopicSubscribersPtr _subscribers; +}; + + +class TopicSubscribers : public IceUtil::Shared, public JTCMutex +{ +public: + + TopicSubscribers(const TraceLevelsPtr& traceLevels, const Ice::LoggerPtr& logger, const string& topic, + const FlusherPtr& flusher) : + _traceLevels(traceLevels), + _logger(logger), + _topic(topic), + _flusher(flusher) + { + } + + ~TopicSubscribers() + { + } + + void + add(const Ice::ObjectPrx& s, const string& idPrefix, const QoS& qos) + { + // + // Create the full topic subscriber id (<prefix>#<topic>) + // + string id = idPrefix; + id += '#'; + id += _topic; + + // + // Change the identity of the proxy + // + Ice::ObjectPrx obj = s->ice_newIdentity(id); + + SubscriberPtr subscriber = new Subscriber(_logger, _traceLevels, _flusher, qos, obj); + + JTCSyncT<JTCMutex> sync(*this); + + // + // Add to the set of subscribers + // + _subscribers.push_back(subscriber); + } + + void + remove(const string& idPrefix) + { + JTCSyncT<JTCMutex> sync(*this); + + // + // Create the full topic subscriber id (<prefix>#<topic>) + // + string id = idPrefix; + id += '#'; + id += _topic; + + SubscriberList::iterator i; + for (i = _subscribers.begin() ; i != _subscribers.end(); ++i) + { + if ((*i)->id() == id) + { + // + // This marks the subscriber as invalid. It will be + // removed on the next event publish. + // + (*i)->unsubscribe(); + break; + } + } + + // + // If the subscriber was not found then display a diagnostic + // + if (i == _subscribers.end()) + { + if (_traceLevels->topic > 0) + { + ostringstream s; + s << _topic << ": " << id << "not subscribed."; + _logger->trace(_traceLevels->topicCat, s.str()); + } + } + } + + + void + publish(const string& op, const std::vector< ::Ice::Byte>& blob) + { + JTCSyncT<JTCMutex> sync(*this); + + // + // Using standard algorithms I don't think there is a way to + // do this in one pass. For instance, I thought about using + // remove_if - but the predicate needs to be a pure function + // (see Meyers for details). If this is fixed then fix Flusher + // also. + // + _subscribers.remove_if(::Ice::constMemFun(&Subscriber::invalid)); + + for (SubscriberList::iterator i = _subscribers.begin(); i != _subscribers.end(); ++i) + (*i)->publish(op, blob); + //for_each(_subscribers.begin(), _subscribers.end(), Ice::memFun(&Subscriber::publish)); + } + +private: + + TraceLevelsPtr _traceLevels; + Ice::LoggerPtr _logger; + + string _topic; + FlusherPtr _flusher; + SubscriberList _subscribers; +}; + +} // End namespace IceStorm + +void +BlobjectI::ice_invokeIn(const string&, const string&, const string& op, + const std::vector< ::Ice::Byte>& blob) +{ + cerr << "ice_invokeIn" << endl; + _subscribers->publish(op, blob); +} + +TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& traceLevels, const Ice::LoggerPtr& logger, + const string& name, const FlusherPtr& flusher) : + _adapter(adapter), + _traceLevels(traceLevels), + _logger(logger), + _name(name), + _flusher(flusher), + _destroyed(false) +{ + _subscribers = new TopicSubscribers(_traceLevels, _logger, _name, _flusher); + + _publisher = new BlobjectI(_subscribers); + + string id = name; + id += '#'; + id += "publish"; + + _adapter->add(_publisher, id); + _obj = adapter->createProxy(id); +} + +TopicI::~TopicI() +{ +} + +string +TopicI::getName() +{ + return _name; +} + +Ice::ObjectPrx +TopicI::getPublisher() +{ + return _obj; +} + +void +TopicI::destroy() +{ + JTCSyncT<JTCMutex> sync(_destroyedMutex); + _adapter->remove(_name); + _destroyed = true; +} + +bool +TopicI::destroyed() const +{ + JTCSyncT<JTCMutex> sync(_destroyedMutex); + return _destroyed; +} + +void +TopicI::subscribe(const Ice::ObjectPrx& tmpl, const string& id, const QoS& qos) +{ + _subscribers->add(tmpl, id, qos); +} + +void +TopicI::unsubscribe(const string& id) +{ + _subscribers->remove(id); +} diff --git a/cpp/src/IceStorm/TopicI.h b/cpp/src/IceStorm/TopicI.h new file mode 100644 index 00000000000..4d72b52a782 --- /dev/null +++ b/cpp/src/IceStorm/TopicI.h @@ -0,0 +1,66 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#ifndef TOPIC_I_H +#define TOPIC_I_H + +#include <IceStorm/IceStorm.h> +#include <IceStorm/TopicIF.h> +#include <IceStorm/FlusherF.h> +#include <IceStorm/TraceLevelsF.h> + +namespace IceStorm +{ + +class TopicSubscribers; +typedef IceUtil::Handle<TopicSubscribers> TopicSubscribersPtr; + +class TopicI : public Topic +{ +public: + + TopicI(const Ice::ObjectAdapterPtr&, const TraceLevelsPtr&, const Ice::LoggerPtr&, const std::string&, + const FlusherPtr&); + ~TopicI(); + + virtual std::string getName(); + virtual Ice::ObjectPrx getPublisher(); + virtual void destroy(); + + // Internal methods + bool destroyed() const; + void subscribe(const Ice::ObjectPrx&, const std::string&, const QoS&); + void unsubscribe(const std::string&); + +private: + + Ice::ObjectAdapterPtr _adapter; + TraceLevelsPtr _traceLevels; + Ice::LoggerPtr _logger; + + // Immutable + std::string _name; + + FlusherPtr _flusher; + + JTCMutex _destroyedMutex; + bool _destroyed; + + TopicSubscribersPtr _subscribers; + + // Immutable + Ice::ObjectPtr _publisher; + Ice::ObjectPrx _obj; + //Ice::ServantLocatorPtr _locator; +}; + +} // End namespace IceStorm + +#endif diff --git a/cpp/src/IceStorm/TopicIF.h b/cpp/src/IceStorm/TopicIF.h new file mode 100644 index 00000000000..bed8183735a --- /dev/null +++ b/cpp/src/IceStorm/TopicIF.h @@ -0,0 +1,24 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#ifndef TOPIC_I_F_H +#define TOPIC_I_F_H + +#include <IceUtil/Shared.h> + +namespace IceStorm +{ + +class TopicI; +typedef IceUtil::Handle<TopicI> TopicIPtr; + +} // End namespace IceStorm + +#endif diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp new file mode 100644 index 00000000000..f04198df675 --- /dev/null +++ b/cpp/src/IceStorm/TopicManagerI.cpp @@ -0,0 +1,190 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#include <Ice/Ice.h> + +#include <IceStorm/TopicManagerI.h> +#include <IceStorm/TopicI.h> +#include <IceStorm/Flusher.h> +#include <IceStorm/TraceLevels.h> + +#include <functional> + +using namespace IceStorm; +using namespace std; + +TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice::ObjectAdapterPtr& adapter, + const TraceLevelsPtr& traceLevels) : + _communicator(communicator), + _adapter(adapter), + _traceLevels(traceLevels) + +{ + _flusher = new Flusher(_communicator, _traceLevels); +} + +TopicManagerI::~TopicManagerI() +{ +} + +TopicPrx +TopicManagerI::create(const string& name) +{ + // TODO: reader/writer mutex + JTCSyncT<JTCMutex> sync(*this); + + reap(); + + if (_topicIMap.find(name) != _topicIMap.end()) + { + throw TopicExists(); + } + + if (_traceLevels->topicMgr > 0) + { + ostringstream s; + s << "Create " << name; + _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str()); + } + + // + // Create topic implementation + // + TopicIPtr topicI = new TopicI(_adapter, _traceLevels, _communicator->getLogger(), name, _flusher); + _adapter->add(topicI, name); + _topicIMap.insert(TopicIMap::value_type(name, topicI)); + + return TopicPrx::uncheckedCast(_adapter->createProxy(name)); +} + +TopicPrx +TopicManagerI::retrieve(const string& name) +{ + JTCSyncT<JTCMutex> sync(*this); + + reap(); + + if (_topicIMap.find(name) != _topicIMap.end()) + return TopicPrx::uncheckedCast(_adapter->createProxy(name)); + throw NoSuchTopic(); +} + +// +// The arguments cannot be const & (for some reason) +// +static TopicDict::value_type +transformToTopicDict(TopicIMap::value_type p, Ice::ObjectAdapterPtr adapter) +{ + return TopicDict::value_type(p.first, TopicPrx::uncheckedCast(adapter->createProxy(p.first))); +} + +TopicDict +TopicManagerI::retrieveAll() +{ + JTCSyncT<JTCMutex> sync(*this); + + reap(); + + TopicDict all; + transform(_topicIMap.begin(), _topicIMap.end(), inserter(all, all.begin()), + bind2nd(ptr_fun(transformToTopicDict), _adapter)); + + return all; +} + +void +TopicManagerI::subscribe(const Ice::ObjectPrx& tmpl, const string& id, const QoS& qos, const StringSeq& topics) +{ + JTCSyncT<JTCMutex> sync(*this); + + if (_traceLevels->topicMgr > 0) + { + ostringstream s; + s << "Subscribe: " << id; + if (_traceLevels->topicMgr > 1) + { + s << " QoS: "; + for (QoS::const_iterator qi = qos.begin(); qi != qos.end() ; ++qi) + { + if (qi != qos.begin()) + { + s << ','; + } + s << '[' << qi->first << "," << qi->second << ']'; + } + s << " Topics: "; + for (StringSeq::const_iterator ti = topics.begin(); ti != topics.end() ; ++ti) + { + if (ti != topics.begin()) + { + s << ","; + } + s << *ti; + } + } + _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str()); + } + + for (StringSeq::const_iterator i = topics.begin() ; i != topics.end() ; ++i) + { + TopicIMap::iterator elem = _topicIMap.find(*i); + if (elem != _topicIMap.end()) + { + elem->second->subscribe(tmpl, id, qos); + } + } +} + +void +TopicManagerI::unsubscribe(const string& id, const StringSeq& topics) +{ + JTCSyncT<JTCMutex> sync(*this); + + if (_traceLevels->topicMgr > 0) + { + ostringstream s; + s << "Unsubscribe: " << id; + if (_traceLevels->topicMgr > 1) + { + s << " Topics: "; + for (StringSeq::const_iterator ti = topics.begin(); ti != topics.end() ; ++ti) + { + if (ti != topics.begin()) + { + s << ","; + } + s << *ti; + } + } + _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str()); + } + + for (StringSeq::const_iterator i = topics.begin() ; i != topics.end() ; ++i) + { + TopicIMap::iterator elem = _topicIMap.find(*i); + if (elem != _topicIMap.end()) + { + elem->second->unsubscribe(id); + } + } +} + +void +TopicManagerI::shutdown() +{ + _flusher->stopFlushing(); + _communicator->shutdown(); +} + +void +TopicManagerI::reap() +{ +} + diff --git a/cpp/src/IceStorm/TopicManagerI.h b/cpp/src/IceStorm/TopicManagerI.h new file mode 100644 index 00000000000..b74e7b0bf3c --- /dev/null +++ b/cpp/src/IceStorm/TopicManagerI.h @@ -0,0 +1,51 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#ifndef TOPIC_MANAGER_I_H +#define TOPIC_MANAGER_I_H + +#include <IceStorm/IceStorm.h> +#include <IceStorm/TopicIF.h> +#include <IceStorm/FlusherF.h> +#include <IceStorm/TraceLevelsF.h> + +namespace IceStorm +{ + +typedef std::map<std::string, TopicIPtr> TopicIMap; + +class TopicManagerI : public TopicManager, public JTCMutex +{ +public: + + TopicManagerI(const Ice::CommunicatorPtr&, const Ice::ObjectAdapterPtr&, const TraceLevelsPtr&); + ~TopicManagerI(); + + virtual TopicPrx create(const std::string&); + virtual TopicPrx retrieve(const std::string&); + virtual TopicDict retrieveAll(); + virtual void subscribe(const Ice::ObjectPrx&, const std::string&, const QoS&, const StringSeq&); + virtual void unsubscribe(const std::string&, const StringSeq&); + virtual void shutdown(); + +private: + + void reap(); + + Ice::CommunicatorPtr _communicator; + Ice::ObjectAdapterPtr _adapter; + TraceLevelsPtr _traceLevels; + TopicIMap _topicIMap; + FlusherPtr _flusher; +}; + +} // End namespace IceStorm + +#endif diff --git a/cpp/src/IceStorm/TraceLevels.cpp b/cpp/src/IceStorm/TraceLevels.cpp new file mode 100644 index 00000000000..1dde386bc39 --- /dev/null +++ b/cpp/src/IceStorm/TraceLevels.cpp @@ -0,0 +1,60 @@ +// ********************************************************************** +// +// Copyright (c) 2001 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#include <Ice/Properties.h> +#include <IceStorm/TraceLevels.h> + +using namespace std; +using namespace IceStorm; + +void IceStorm::incRef(TraceLevels* p) { p->__incRef(); } +void IceStorm::decRef(TraceLevels* p) { p->__decRef(); } + +TraceLevels::TraceLevels(const Ice::PropertiesPtr& properties) : + topicMgr(0), + topicMgrCat("TopicManager"), + topic(0), + topicCat("Topic"), + flush(0), + flushCat("Flush"), + subscriber(0), + subscriberCat("Subscriber") +{ + string value; + const string keyBase = "IceStorm.Trace."; + + value = properties->getProperty(keyBase + topicMgrCat); + if (!value.empty()) + { + const_cast<int&>(topicMgr) = atoi(value.c_str()); + } + + value = properties->getProperty(keyBase + topicCat); + if (!value.empty()) + { + const_cast<int&>(topic) = atoi(value.c_str()); + } + + value = properties->getProperty(keyBase + flushCat); + if (!value.empty()) + { + const_cast<int&>(flush) = atoi(value.c_str()); + } + + value = properties->getProperty(keyBase + subscriberCat); + if (!value.empty()) + { + const_cast<int&>(subscriber) = atoi(value.c_str()); + } +} + +TraceLevels::~TraceLevels() +{ +} diff --git a/cpp/src/IceStorm/TraceLevels.h b/cpp/src/IceStorm/TraceLevels.h new file mode 100644 index 00000000000..5aa2176e164 --- /dev/null +++ b/cpp/src/IceStorm/TraceLevels.h @@ -0,0 +1,43 @@ +// ********************************************************************** +// +// Copyright (c) 2002 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#ifndef ICE_STORM_TRACE_LEVELS_H +#define ICE_STORM_TRACE_LEVELS_H + +#include <IceUtil/Shared.h> +#include <Ice/PropertiesF.h> +#include <IceStorm/TraceLevelsF.h> + +namespace IceStorm +{ + +class TraceLevels : public ::IceUtil::Shared +{ +public: + + TraceLevels(const ::Ice::PropertiesPtr&); + virtual ~TraceLevels(); + + const int topicMgr; + const char* topicMgrCat; + + const int topic; + const char* topicCat; + + const int flush; + const char* flushCat; + + const int subscriber; + const char* subscriberCat; +}; + +} + +#endif diff --git a/cpp/src/IceStorm/TraceLevelsF.h b/cpp/src/IceStorm/TraceLevelsF.h new file mode 100644 index 00000000000..cdb493dbf4f --- /dev/null +++ b/cpp/src/IceStorm/TraceLevelsF.h @@ -0,0 +1,26 @@ +// ********************************************************************** +// +// Copyright (c) 2002 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +#ifndef ICE_STORM_TRACE_LEVELS_F_H +#define ICE_STORM_TRACE_LEVELS_F_H + +#include <Ice/Handle.h> + +namespace IceStorm +{ + +class TraceLevels; +void incRef(TraceLevels*); +void decRef(TraceLevels*); +typedef IceInternal::Handle<TraceLevels> TraceLevelsPtr; + +} // End namespace IceStorm + +#endif diff --git a/cpp/src/IceStorm/config b/cpp/src/IceStorm/config new file mode 100644 index 00000000000..0c45dde479c --- /dev/null +++ b/cpp/src/IceStorm/config @@ -0,0 +1,16 @@ +Ice.Adapter.TopicManager.Endpoints=tcp -p 10000 + +Ice.Trace.Network=3 +Ice.Trace.Protocol=1 +#Ice.Trace.Security=2 + +IceStorm.Trace.TopicManager=2 + +IceStorm.Trace.Topic=1 + +IceStorm.Trace.Flush=1 + +IceStorm.Trace.Subscriber=1 + +IceStorm.Flush.Timeout = 2000 + |