summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2001-11-26 15:53:00 +0000
committerMatthew Newhook <matthew@zeroc.com>2001-11-26 15:53:00 +0000
commit021c6615fd0b299d7742c82f7d6b76f4a813f397 (patch)
treeaf38dfba0fc3204ce8a0572777b92883e6e657f8 /cpp/src
parentBlobject (diff)
downloadice-021c6615fd0b299d7742c82f7d6b76f4a813f397.tar.bz2
ice-021c6615fd0b299d7742c82f7d6b76f4a813f397.tar.xz
ice-021c6615fd0b299d7742c82f7d6b76f4a813f397.zip
Initial version of IceStorm.
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceStorm/.depend8
-rw-r--r--cpp/src/IceStorm/Admin.cpp120
-rw-r--r--cpp/src/IceStorm/Flusher.cpp211
-rw-r--r--cpp/src/IceStorm/Flusher.h44
-rw-r--r--cpp/src/IceStorm/FlusherF.h27
-rw-r--r--cpp/src/IceStorm/IceStormI.h11
-rw-r--r--cpp/src/IceStorm/Makefile67
-rw-r--r--cpp/src/IceStorm/Server.cpp93
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp167
-rw-r--r--cpp/src/IceStorm/Subscriber.h72
-rw-r--r--cpp/src/IceStorm/SubscriberF.h28
-rw-r--r--cpp/src/IceStorm/TopicI.cpp233
-rw-r--r--cpp/src/IceStorm/TopicI.h66
-rw-r--r--cpp/src/IceStorm/TopicIF.h24
-rw-r--r--cpp/src/IceStorm/TopicManagerI.cpp190
-rw-r--r--cpp/src/IceStorm/TopicManagerI.h51
-rw-r--r--cpp/src/IceStorm/TraceLevels.cpp60
-rw-r--r--cpp/src/IceStorm/TraceLevels.h43
-rw-r--r--cpp/src/IceStorm/TraceLevelsF.h26
-rw-r--r--cpp/src/IceStorm/config16
20 files changed, 1557 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/.depend b/cpp/src/IceStorm/.depend
new file mode 100644
index 00000000000..cc42f506c35
--- /dev/null
+++ b/cpp/src/IceStorm/.depend
@@ -0,0 +1,8 @@
+IceStorm.o: IceStorm.cpp ../../include/IceStorm/IceStorm.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/EmitterF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/Object.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/InstanceF.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/Direct.h ../../include/Ice/ServantLocatorF.h
+TraceLevels.o: TraceLevels.cpp ../../include/Ice/Properties.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../IceStorm/TraceLevels.h ../../include/Ice/PropertiesF.h ../IceStorm/TraceLevelsF.h
+Flusher.o: Flusher.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/Stream.h ../../include/Ice/Communicator.h ../../include/Ice/LoggerF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/StreamF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantLocator.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/EmitterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/Functional.h ../../include/IceUtil/Functional.h ../IceStorm/Flusher.h ../IceStorm/FlusherF.h ../IceStorm/SubscriberF.h ../IceStorm/TraceLevelsF.h ../IceStorm/Subscriber.h ../../include/IceStorm/IceStorm.h ../../include/Ice/Object.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/Direct.h ../IceStorm/TraceLevels.h
+Subscriber.o: Subscriber.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/Stream.h ../../include/Ice/Communicator.h ../../include/Ice/LoggerF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/StreamF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantLocator.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/EmitterF.h ../../include/Ice/ReferenceF.h ../IceStorm/Subscriber.h ../../include/IceStorm/IceStorm.h ../../include/Ice/Object.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/Direct.h ../IceStorm/SubscriberF.h ../IceStorm/TraceLevelsF.h ../IceStorm/FlusherF.h ../IceStorm/TraceLevels.h ../IceStorm/Flusher.h
+TopicI.o: TopicI.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/Stream.h ../../include/Ice/Communicator.h ../../include/Ice/LoggerF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/StreamF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantLocator.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/EmitterF.h ../../include/Ice/ReferenceF.h ../../include/Ice/Functional.h ../../include/IceUtil/Functional.h ../IceStorm/TopicI.h ../../include/IceStorm/IceStorm.h ../../include/Ice/Object.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/Direct.h ../IceStorm/TopicIF.h ../IceStorm/FlusherF.h ../IceStorm/TraceLevelsF.h ../IceStorm/Flusher.h ../IceStorm/SubscriberF.h ../IceStorm/Subscriber.h ../IceStorm/TraceLevels.h
+TopicManagerI.o: TopicManagerI.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/Stream.h ../../include/Ice/Communicator.h ../../include/Ice/LoggerF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/StreamF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantLocator.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/EmitterF.h ../../include/Ice/ReferenceF.h ../IceStorm/TopicManagerI.h ../../include/IceStorm/IceStorm.h ../../include/Ice/Object.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/Direct.h ../IceStorm/TopicIF.h ../IceStorm/FlusherF.h ../IceStorm/TraceLevelsF.h ../IceStorm/TopicI.h ../IceStorm/Flusher.h ../IceStorm/SubscriberF.h ../IceStorm/TraceLevels.h
+Server.o: Server.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/Stream.h ../../include/Ice/Communicator.h ../../include/Ice/LoggerF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/StreamF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantLocator.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/EmitterF.h ../../include/Ice/ReferenceF.h ../IceStorm/TopicManagerI.h ../../include/IceStorm/IceStorm.h ../../include/Ice/Object.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/Direct.h ../IceStorm/TopicIF.h ../IceStorm/FlusherF.h ../IceStorm/TraceLevelsF.h ../IceStorm/TraceLevels.h
+Admin.o: Admin.cpp ../../include/Ice/Ice.h ../../include/Ice/Initialize.h ../../include/Ice/CommunicatorF.h ../../include/Ice/ProxyF.h ../../include/Ice/ProxyHandle.h ../../include/IceUtil/Handle.h ../../include/IceUtil/Exception.h ../../include/IceUtil/Config.h ../../include/Ice/Config.h ../../include/Ice/ObjectF.h ../../include/Ice/Handle.h ../../include/Ice/LocalObjectF.h ../../include/Ice/Exception.h ../../include/Ice/LocalException.h ../../include/Ice/LocalObject.h ../../include/IceUtil/Shared.h ../../include/Ice/PropertiesF.h ../../include/Ice/InstanceF.h ../../include/Ice/Properties.h ../../include/Ice/Logger.h ../../include/Ice/Stream.h ../../include/Ice/Communicator.h ../../include/Ice/LoggerF.h ../../include/Ice/ObjectAdapterF.h ../../include/Ice/ObjectFactoryF.h ../../include/Ice/UserExceptionFactoryF.h ../../include/Ice/StreamF.h ../../include/Ice/ObjectFactory.h ../../include/Ice/UserExceptionFactory.h ../../include/Ice/ObjectAdapter.h ../../include/Ice/ServantLocatorF.h ../../include/Ice/ServantLocator.h ../../include/Ice/Proxy.h ../../include/Ice/ProxyFactoryF.h ../../include/Ice/EmitterF.h ../../include/Ice/ReferenceF.h ../../include/IceStorm/IceStorm.h ../../include/Ice/Object.h ../../include/Ice/Outgoing.h ../../include/Ice/BasicStream.h ../../include/Ice/Buffer.h ../../include/Ice/Incoming.h ../../include/Ice/Direct.h
diff --git a/cpp/src/IceStorm/Admin.cpp b/cpp/src/IceStorm/Admin.cpp
new file mode 100644
index 00000000000..4f4803db9e0
--- /dev/null
+++ b/cpp/src/IceStorm/Admin.cpp
@@ -0,0 +1,120 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#include <Ice/Ice.h>
+#include <IceStorm/IceStorm.h>
+
+using namespace std;
+using namespace Ice;
+
+void
+usage(const char* n)
+{
+ cerr << "Usage: " << n << " [options] [file...]\n";
+ cerr <<
+ "Options:\n"
+ "-h, --help Show this message.\n"
+ "-v, --version Display the Ice version.\n"
+ "create topic Create the given topic.\n"
+ ;
+}
+
+int
+run(int argc, char* argv[], const CommunicatorPtr& communicator)
+{
+ PropertiesPtr properties = communicator->getProperties();
+ const char* managerProperty = "IceStorm.TopicManager";
+ string managerRef = properties->getProperty(managerProperty);
+ if (managerRef.empty())
+ {
+ cerr << argv[0] << ": " << managerProperty << " is not set" << endl;
+ return EXIT_FAILURE;
+ }
+
+ ObjectPrx base = communicator->stringToProxy(managerRef);
+ IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(base);
+ if (!manager)
+ {
+ cerr << argv[0] << ": `" << managerProperty << "' is not running" << endl;
+ return EXIT_FAILURE;
+ }
+
+ int idx = 1;
+ while (idx < argc)
+ {
+ if (strcmp(argv[idx], "-h") == 0 || strcmp(argv[idx], "--help") == 0)
+ {
+ usage(argv[0]);
+ return EXIT_SUCCESS;
+ }
+ else if (strcmp(argv[idx], "-v") == 0 || strcmp(argv[idx], "--version") == 0)
+ {
+ cout << ICE_STRING_VERSION << endl;
+ return EXIT_SUCCESS;
+ }
+ else if (argv[idx][0] == '-')
+ {
+ cerr << argv[0] << ": unknown option `" << argv[idx] << "'" << endl;
+ usage(argv[0]);
+ return EXIT_FAILURE;
+ }
+ else if (strcmp(argv[idx], "create") == 0)
+ {
+ ++idx;
+ if (idx > argc)
+ {
+ usage(argv[0]);
+ return EXIT_FAILURE;
+ }
+
+ IceStorm::TopicPrx topic = manager->create(argv[idx]);
+ cout << "created: " << argv[idx] << endl;
+ }
+ else
+ {
+ ++idx;
+ }
+ }
+ return EXIT_SUCCESS;
+}
+
+int
+main(int argc, char* argv[])
+{
+ int status;
+ CommunicatorPtr communicator;
+
+ try
+ {
+ communicator = initialize(argc, argv);
+ status = run(argc, argv, communicator);
+ }
+ catch(const Exception& ex)
+ {
+ cerr << ex << endl;
+ status = EXIT_FAILURE;
+ }
+
+ if (communicator)
+ {
+ try
+ {
+ communicator->destroy();
+ }
+ catch(const Exception& ex)
+ {
+ cerr << ex << endl;
+ status = EXIT_FAILURE;
+ }
+ }
+
+ return status;
+}
+
diff --git a/cpp/src/IceStorm/Flusher.cpp b/cpp/src/IceStorm/Flusher.cpp
new file mode 100644
index 00000000000..3fb95ec0715
--- /dev/null
+++ b/cpp/src/IceStorm/Flusher.cpp
@@ -0,0 +1,211 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#include <Ice/Ice.h>
+#include <Ice/Functional.h>
+
+#include <IceStorm/Flusher.h>
+#include <IceStorm/Subscriber.h>
+#include <IceStorm/TraceLevels.h>
+
+#include <algorithm>
+
+using namespace IceStorm;
+using namespace std;
+
+void IceStorm::incRef(Flusher* p) { p->__incRef(); }
+void IceStorm::decRef(Flusher* p) { p->__decRef(); }
+
+namespace IceStorm
+{
+
+class FlusherThread : public JTCThread, public JTCMonitor
+{
+public:
+
+ FlusherThread(const Ice::CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels) :
+ _traceLevels(traceLevels),
+ _logger(communicator->getLogger())
+ {
+ Ice::PropertiesPtr properties = communicator->getProperties();
+ string value;
+ value = properties->getProperty("IceStorm.Flush.Timeout");
+ if (!value.empty())
+ {
+ _flushTime = atoi(value.c_str());
+ if (_flushTime < 100)
+ {
+ _flushTime = 100; // Minimum of 100 ms
+ }
+ }
+ else
+ {
+ _flushTime = 1000; // Default of 1 second
+ }
+ }
+
+ ~FlusherThread()
+ {
+ }
+
+ virtual void
+ run()
+ {
+ JTCSyncT<JTCMonitor> sync(*this);
+ while (!_destroy)
+ {
+ long tout = calcTimeout();
+ try
+ {
+ if (tout == 0)
+ {
+ wait();
+ }
+ else
+ {
+ wait(tout);
+ }
+ }
+ catch(const JTCInterruptedException&)
+ {
+ }
+ if (_destroy)
+ {
+ continue;
+ }
+ flushAll();
+ }
+ }
+
+ void
+ destroy()
+ {
+ JTCSyncT<JTCMonitor> sync(*this);
+ _destroy = true;
+ notify();
+ }
+
+ //
+ // It would be possible to write add/remove in such a way as to
+ // avoid blocking while flushing by having a queue of actions
+ // which are only performed before flushing. For now, however,
+ // this issue is ignored.
+ //
+ void
+ add(const SubscriberPtr& subscriber)
+ {
+ JTCSyncT<JTCMonitor> sync(*this);
+ bool isEmpty = _subscribers.empty();
+ _subscribers.push_back(subscriber);
+
+ //
+ // If the set of subscribers was previously empty then wake up
+ // the flushing thread since it will be waiting indefinitely
+ //
+ if (isEmpty)
+ {
+ notify();
+ }
+ }
+
+ void
+ remove(const SubscriberPtr& subscriber)
+ {
+ JTCSyncT<JTCMonitor> sync(*this);
+ _subscribers.remove(subscriber);
+ }
+
+
+private:
+
+ void
+ flushAll()
+ {
+ // This is always called with the monitor locked
+ //JTCSyncT<JTCMonitor> sync(*this);
+
+ //
+ // Using standard algorithms I don't think there is a way to
+ // do this in one pass. For instance, I thought about using
+ // remove_if - but the predicate needs to be a pure function
+ // (see Meyers for details). If this is fixed then fix TopicI
+ // also.
+ //
+ _subscribers.remove_if(::Ice::constMemFun(&Subscriber::invalid));
+ for_each(_subscribers.begin(), _subscribers.end(), Ice::voidMemFun(&Subscriber::flush));
+
+ //
+ // Trace after the flush so that the correct number of objects
+ // are displayed
+ //
+ if (_traceLevels->flush > 0)
+ {
+ ostringstream s;
+ s << _subscribers.size() << " object(s)";
+
+ _logger->trace(_traceLevels->flushCat, s.str());
+ }
+ }
+
+ long
+ calcTimeout()
+ {
+ return (_subscribers.empty()) ? 0 : _flushTime;
+ }
+
+ TraceLevelsPtr _traceLevels;
+ Ice::LoggerPtr _logger;
+
+ SubscriberList _subscribers;
+ bool _destroy;
+ long _flushTime;
+};
+
+} // End namespace IceStorm
+
+Flusher::Flusher(const Ice::CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels)
+{
+ _thread = new FlusherThread(communicator, traceLevels);
+ _thread->start();
+}
+
+Flusher::~Flusher()
+{
+ _thread->destroy();
+ while(_thread->isAlive())
+ {
+ try
+ {
+ _thread->join();
+ }
+ catch(const JTCInterruptedException&)
+ {
+ }
+ }
+}
+
+void
+Flusher::add(const SubscriberPtr& subscriber)
+{
+ _thread->add(subscriber);
+}
+
+void
+Flusher::remove(const SubscriberPtr& subscriber)
+{
+ _thread->remove(subscriber);
+}
+
+void
+Flusher::stopFlushing()
+{
+ _thread->destroy();
+}
+
diff --git a/cpp/src/IceStorm/Flusher.h b/cpp/src/IceStorm/Flusher.h
new file mode 100644
index 00000000000..377353f6041
--- /dev/null
+++ b/cpp/src/IceStorm/Flusher.h
@@ -0,0 +1,44 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#ifndef FLUSHER_H
+#define FLUSHER_H
+
+#include <Ice/LoggerF.h>
+
+#include <IceStorm/FlusherF.h>
+#include <IceStorm/SubscriberF.h>
+#include <IceStorm/TraceLevelsF.h>
+
+namespace IceStorm
+{
+
+class FlusherThread;
+typedef JTCHandleT<FlusherThread> FlusherThreadHandle;
+
+class Flusher : public IceUtil::Shared
+{
+public:
+
+ Flusher(const Ice::CommunicatorPtr&, const TraceLevelsPtr&);
+ ~Flusher();
+
+ void add(const SubscriberPtr&);
+ void remove(const SubscriberPtr&);
+ void stopFlushing();
+
+private:
+
+ FlusherThreadHandle _thread;
+};
+
+} // End namespace IceStorm
+
+#endif
diff --git a/cpp/src/IceStorm/FlusherF.h b/cpp/src/IceStorm/FlusherF.h
new file mode 100644
index 00000000000..05968483fab
--- /dev/null
+++ b/cpp/src/IceStorm/FlusherF.h
@@ -0,0 +1,27 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#ifndef FLUSHER_F_H
+#define FLUSHER_F_H
+
+#include <IceUtil/Handle.h>
+
+namespace IceStorm
+{
+
+class Flusher;
+typedef IceUtil::Handle<Flusher> FlusherPtr;
+
+void incRef(Flusher*);
+void decRef(Flusher*);
+
+} // End namespace IceStorm
+
+#endif
diff --git a/cpp/src/IceStorm/IceStormI.h b/cpp/src/IceStorm/IceStormI.h
new file mode 100644
index 00000000000..b63a61a043b
--- /dev/null
+++ b/cpp/src/IceStorm/IceStormI.h
@@ -0,0 +1,11 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#ifndef TOPIC_MANAGER_I_H
diff --git a/cpp/src/IceStorm/Makefile b/cpp/src/IceStorm/Makefile
new file mode 100644
index 00000000000..38f571805b0
--- /dev/null
+++ b/cpp/src/IceStorm/Makefile
@@ -0,0 +1,67 @@
+#**********************************************************************
+#
+# Copyright (c) 2001
+# MutableRealms, Inc.
+# Huntsville, AL, USA
+#
+# All Rights Reserved
+#
+# **********************************************************************
+
+top_srcdir = ../..
+
+BASE = libIceStorm.so
+VERSIONED_BASE = $(BASE).$(VERSION)
+
+NAME = $(top_srcdir)/lib/$(BASE)
+VERSIONED_NAME = $(top_srcdir)/lib/$(VERSIONED_BASE)
+
+SERVER = $(top_srcdir)/bin/icestorm
+ADMIN = $(top_srcdir)/bin/icestormadmin
+TARGETS = $(NAME) $(VERSIONED_NAME) $(SERVER) $(ADMIN)
+
+OBJS = IceStorm.o
+
+SOBJS = TraceLevels.o Flusher.o Subscriber.o TopicI.o TopicManagerI.o Server.o
+
+AOBJS = Admin.o
+
+SRCS = $(OBJS:.o=.cpp) \
+ $(SOBJS:.o=.cpp) \
+ $(AOBJS:.o=.cpp)
+
+HDIR = $(includedir)/IceStorm
+SDIR = $(slicedir)/IceStorm
+SLICECMD = $(SLICE2CPP) --include-dir IceStorm --dll-export ICE_STORM_API -I$(slicedir)
+
+include $(top_srcdir)/config/Make.rules
+
+CPPFLAGS := -I.. $(CPPFLAGS)
+
+$(VERSIONED_NAME): $(OBJS)
+ rm -f $@
+ $(CXX) $(CXXFLAGS) $(LDFLAGS) -shared \
+ -o $@ $(OBJS)
+
+$(NAME): $(VERSIONED_NAME)
+ rm -f $@
+ ln -s $(VERSIONED_BASE) $@
+
+$(SERVER): $(SOBJS)
+ rm -f $@
+ $(CXX) $(CXXFLAGS) $(LDFLAGS) -o $@ $(SOBJS) -lIceStorm $(LIBS)
+
+$(ADMIN): $(AOBJS)
+ rm -f $@
+ $(CXX) $(CXXFLAGS) $(LDFLAGS) -o $@ $(AOBJS) -lIceStorm $(LIBS)
+
+$(HDIR)/IceStorm.h IceStorm.cpp: $(SDIR)/IceStorm.ice $(SLICE2CPP)
+ rm -f $(HDIR)/IceStorm.h IceStorm.cpp
+ $(SLICECMD) $(SDIR)/IceStorm.ice
+ mv IceStorm.h $(HDIR)
+
+
+clean::
+ rm -f $(HDIR)/Clock.h Clock.cpp
+
+include .depend
diff --git a/cpp/src/IceStorm/Server.cpp b/cpp/src/IceStorm/Server.cpp
new file mode 100644
index 00000000000..e43b48ced91
--- /dev/null
+++ b/cpp/src/IceStorm/Server.cpp
@@ -0,0 +1,93 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#include <Ice/Ice.h>
+#include <IceStorm/TopicManagerI.h>
+#include <IceStorm/TraceLevels.h>
+
+using namespace std;
+
+void
+usage(const char* n)
+{
+ cerr << "Usage: " << n << " [options]\n";
+ cerr <<
+ "Options:\n"
+ "-h, --help Show this message.\n"
+ "-v, --version Display the Ice version.\n"
+ ;
+}
+
+int
+run(int argc, char* argv[], const Ice::CommunicatorPtr& communicator)
+{
+ for (int i = 1; i < argc; ++i)
+ {
+ if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0)
+ {
+ usage(argv[0]);
+ return EXIT_SUCCESS;
+ }
+ else if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--version") == 0)
+ {
+ cout << ICE_STRING_VERSION << endl;
+ return EXIT_SUCCESS;
+ }
+ else
+ {
+ cerr << argv[0] << ": unknown option `" << argv[i] << "'" << endl;
+ usage(argv[0]);
+ return EXIT_FAILURE;
+ }
+ }
+
+ //PropertiesPtr properties = communicator->getProperties();
+
+ IceStorm::TraceLevelsPtr traceLevels = new IceStorm::TraceLevels(communicator->getProperties());
+ Ice::ObjectAdapterPtr adapter = communicator->createObjectAdapter("TopicManager");
+ Ice::ObjectPtr object = new IceStorm::TopicManagerI(communicator, adapter, traceLevels);
+ adapter->add(object, "TopicManager");
+ adapter->activate();
+ communicator->waitForShutdown();
+ return EXIT_SUCCESS;
+}
+
+int
+main(int argc, char* argv[])
+{
+ int status;
+ Ice::CommunicatorPtr communicator;
+
+ try
+ {
+ communicator = Ice::initialize(argc, argv);
+ status = run(argc, argv, communicator);
+ }
+ catch(const Ice::Exception& ex)
+ {
+ cerr << ex << endl;
+ status = EXIT_FAILURE;
+ }
+
+ if (communicator)
+ {
+ try
+ {
+ communicator->destroy();
+ }
+ catch(const Ice::Exception& ex)
+ {
+ cerr << ex << endl;
+ status = EXIT_FAILURE;
+ }
+ }
+
+ return status;
+}
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
new file mode 100644
index 00000000000..590e3cf78e1
--- /dev/null
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -0,0 +1,167 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#include <Ice/Ice.h>
+
+#include <IceStorm/Subscriber.h>
+#include <IceStorm/TraceLevels.h>
+#include <IceStorm/Flusher.h>
+
+using namespace IceStorm;
+using namespace std;
+
+void IceStorm::incRef(Subscriber* p) { p->__incRef(); }
+void IceStorm::decRef(Subscriber* p) { p->__decRef(); }
+
+Subscriber::Subscriber(const Ice::LoggerPtr& logger, const TraceLevelsPtr& traceLevels, const FlusherPtr& flusher,
+ const QoS& qos, const Ice::ObjectPrx& obj)
+ : _logger(logger),
+ _traceLevels(traceLevels),
+ _invalid(false)
+{
+
+ //
+ // Determine the requested reliability characteristics
+ //
+ QoS::const_iterator i = qos.find("reliability");
+ string reliability;
+ if (i == qos.end())
+ {
+ reliability = "oneway";
+ }
+ else
+ {
+ reliability = i->second;
+ }
+
+ if (reliability == "batch")
+ {
+ _obj = obj->ice_batchOneway();
+ _flusher = flusher;
+ _flusher->add(this);
+ }
+ else // reliability == "oneway"
+ {
+ if (reliability != "oneway")
+ {
+ if (_traceLevels->subscriber > 0)
+ {
+ ostringstream s;
+ s << reliability <<" mode not understood.";
+ _logger->trace(_traceLevels->subscriberCat, s.str());
+ }
+ }
+ _obj = obj->ice_oneway();
+ }
+}
+
+Subscriber::~Subscriber()
+{
+}
+
+bool
+Subscriber::invalid() const
+{
+ JTCSyncT<JTCMutex> sync(_invalidMutex);
+ return _invalid;
+}
+
+void
+Subscriber::unsubscribe()
+{
+ JTCSyncT<JTCMutex> sync(_invalidMutex);
+ _invalid = true;
+
+ if (_traceLevels->subscriber > 0)
+ {
+ ostringstream s;
+ s << "Unsubscribe " << _obj->ice_getIdentity();
+ _logger->trace(_traceLevels->subscriberCat, s.str());
+ }
+
+ //
+ // If this subscriber has been registered with the flusher then
+ // remove it.
+ //
+ if (_flusher)
+ {
+ _flusher->remove(this);
+ }
+}
+
+void
+Subscriber::flush()
+{
+ try
+ {
+ _obj->ice_flush();
+ }
+ catch(const Ice::LocalException& e)
+ {
+ JTCSyncT<JTCMutex> sync(_invalidMutex);
+ //
+ // It's possible that the subscriber was unsubscribed, or
+ // marked invalid by another thread. Don't display a
+ // diagnostic in this case.
+ //
+ if (!_invalid)
+ {
+ if (_traceLevels->subscriber > 0)
+ {
+ ostringstream s;
+ s << _obj->ice_getIdentity() << ": flush failed: " << e;
+ _logger->trace(_traceLevels->subscriberCat, s.str());
+ }
+ _invalid = true;
+ }
+ }
+}
+
+void
+Subscriber::publish(const string& op, const std::vector< ::Ice::Byte>& blob)
+{
+ try
+ {
+ bool nonmutating = true;
+ _obj->ice_invokeIn(op, nonmutating, blob);
+ }
+ catch(const Ice::LocalException& e)
+ {
+ JTCSyncT<JTCMutex> sync(_invalidMutex);
+ //
+ // It's possible that the subscriber was unsubscribed, or
+ // marked invalid by another thread. Don't display a
+ // diagnostic in this case.
+ //
+ if (!_invalid)
+ {
+ if (_traceLevels->subscriber > 0)
+ {
+ ostringstream s;
+ s << _obj->ice_getIdentity() << ": publish failed: " << e;
+ _logger->trace(_traceLevels->subscriberCat, s.str());
+ }
+ _invalid = true;
+ }
+ }
+
+}
+
+std::string
+Subscriber::id() const
+{
+ return _obj->ice_getIdentity();
+}
+
+bool
+Subscriber::operator==(const Subscriber& rhs) const
+{
+ return _obj->ice_getIdentity() == rhs._obj->ice_getIdentity();
+}
diff --git a/cpp/src/IceStorm/Subscriber.h b/cpp/src/IceStorm/Subscriber.h
new file mode 100644
index 00000000000..65a5acefcff
--- /dev/null
+++ b/cpp/src/IceStorm/Subscriber.h
@@ -0,0 +1,72 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#ifndef SUBSCRIBER_H
+#define SUBSCRIBER_H
+
+#include <Ice/LoggerF.h>
+
+#include <IceStorm/IceStorm.h> // For QoS (nasty!)
+#include <IceStorm/SubscriberF.h>
+#include <IceStorm/TraceLevelsF.h>
+#include <IceStorm/FlusherF.h>
+
+#include <list>
+
+namespace IceStorm
+{
+
+class Subscriber : public IceUtil::Shared
+{
+public:
+
+ Subscriber(const Ice::LoggerPtr&, const TraceLevelsPtr&, const FlusherPtr&, const QoS&, const Ice::ObjectPrx&);
+ ~Subscriber();
+
+ bool invalid() const;
+
+ void unsubscribe();
+
+ void flush();
+ void publish(const std::string&, const std::vector< ::Ice::Byte>&);
+
+ std::string id() const;
+
+ // TODO: should there be a global operator==?
+ bool operator==(const Subscriber&) const;
+
+private:
+
+ // Immutable
+ Ice::LoggerPtr _logger;
+ TraceLevelsPtr _traceLevels;
+
+ JTCMutex _invalidMutex;
+ bool _invalid;
+
+ //
+ // This id is the full id of the subscriber for a particular topic
+ // (that is <prefix>#<topicname>
+ //
+ // Immutable
+ std::string _id;
+
+ // Immutable
+ Ice::ObjectPrx _obj;
+
+ FlusherPtr _flusher;
+};
+
+
+typedef std::list<SubscriberPtr> SubscriberList;
+
+} // End namespace IceStorm
+
+#endif
diff --git a/cpp/src/IceStorm/SubscriberF.h b/cpp/src/IceStorm/SubscriberF.h
new file mode 100644
index 00000000000..e74b565cb44
--- /dev/null
+++ b/cpp/src/IceStorm/SubscriberF.h
@@ -0,0 +1,28 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#ifndef SUBSCRIBER_F_H
+#define SUBSCRIBER_F_H
+
+#include <IceUtil/Handle.h>
+
+namespace IceStorm
+{
+
+class Subscriber;
+
+typedef IceUtil::Handle<Subscriber> SubscriberPtr;
+
+void incRef(Subscriber*);
+void decRef(Subscriber*);
+
+} // End namespace IceStorm
+
+#endif
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
new file mode 100644
index 00000000000..0510e081311
--- /dev/null
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -0,0 +1,233 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#include <Ice/Ice.h>
+#include <Ice/Functional.h>
+
+#include <IceStorm/TopicI.h>
+#include <IceStorm/Flusher.h>
+#include <IceStorm/Subscriber.h>
+#include <IceStorm/TraceLevels.h>
+
+#include <algorithm>
+
+using namespace IceStorm;
+using namespace std;
+
+namespace IceStorm
+{
+
+class BlobjectI : public Ice::Blobject
+{
+public:
+
+ BlobjectI(const IceStorm::TopicSubscribersPtr& s) :
+ _subscribers(s)
+ {
+ }
+
+ ~BlobjectI()
+ {
+ }
+
+ virtual void ice_invokeIn(const std::string&, const std::string&, const std::string&,
+ const std::vector< ::Ice::Byte>&);
+
+private:
+
+ IceStorm::TopicSubscribersPtr _subscribers;
+};
+
+
+class TopicSubscribers : public IceUtil::Shared, public JTCMutex
+{
+public:
+
+ TopicSubscribers(const TraceLevelsPtr& traceLevels, const Ice::LoggerPtr& logger, const string& topic,
+ const FlusherPtr& flusher) :
+ _traceLevels(traceLevels),
+ _logger(logger),
+ _topic(topic),
+ _flusher(flusher)
+ {
+ }
+
+ ~TopicSubscribers()
+ {
+ }
+
+ void
+ add(const Ice::ObjectPrx& s, const string& idPrefix, const QoS& qos)
+ {
+ //
+ // Create the full topic subscriber id (<prefix>#<topic>)
+ //
+ string id = idPrefix;
+ id += '#';
+ id += _topic;
+
+ //
+ // Change the identity of the proxy
+ //
+ Ice::ObjectPrx obj = s->ice_newIdentity(id);
+
+ SubscriberPtr subscriber = new Subscriber(_logger, _traceLevels, _flusher, qos, obj);
+
+ JTCSyncT<JTCMutex> sync(*this);
+
+ //
+ // Add to the set of subscribers
+ //
+ _subscribers.push_back(subscriber);
+ }
+
+ void
+ remove(const string& idPrefix)
+ {
+ JTCSyncT<JTCMutex> sync(*this);
+
+ //
+ // Create the full topic subscriber id (<prefix>#<topic>)
+ //
+ string id = idPrefix;
+ id += '#';
+ id += _topic;
+
+ SubscriberList::iterator i;
+ for (i = _subscribers.begin() ; i != _subscribers.end(); ++i)
+ {
+ if ((*i)->id() == id)
+ {
+ //
+ // This marks the subscriber as invalid. It will be
+ // removed on the next event publish.
+ //
+ (*i)->unsubscribe();
+ break;
+ }
+ }
+
+ //
+ // If the subscriber was not found then display a diagnostic
+ //
+ if (i == _subscribers.end())
+ {
+ if (_traceLevels->topic > 0)
+ {
+ ostringstream s;
+ s << _topic << ": " << id << "not subscribed.";
+ _logger->trace(_traceLevels->topicCat, s.str());
+ }
+ }
+ }
+
+
+ void
+ publish(const string& op, const std::vector< ::Ice::Byte>& blob)
+ {
+ JTCSyncT<JTCMutex> sync(*this);
+
+ //
+ // Using standard algorithms I don't think there is a way to
+ // do this in one pass. For instance, I thought about using
+ // remove_if - but the predicate needs to be a pure function
+ // (see Meyers for details). If this is fixed then fix Flusher
+ // also.
+ //
+ _subscribers.remove_if(::Ice::constMemFun(&Subscriber::invalid));
+
+ for (SubscriberList::iterator i = _subscribers.begin(); i != _subscribers.end(); ++i)
+ (*i)->publish(op, blob);
+ //for_each(_subscribers.begin(), _subscribers.end(), Ice::memFun(&Subscriber::publish));
+ }
+
+private:
+
+ TraceLevelsPtr _traceLevels;
+ Ice::LoggerPtr _logger;
+
+ string _topic;
+ FlusherPtr _flusher;
+ SubscriberList _subscribers;
+};
+
+} // End namespace IceStorm
+
+void
+BlobjectI::ice_invokeIn(const string&, const string&, const string& op,
+ const std::vector< ::Ice::Byte>& blob)
+{
+ cerr << "ice_invokeIn" << endl;
+ _subscribers->publish(op, blob);
+}
+
+TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& traceLevels, const Ice::LoggerPtr& logger,
+ const string& name, const FlusherPtr& flusher) :
+ _adapter(adapter),
+ _traceLevels(traceLevels),
+ _logger(logger),
+ _name(name),
+ _flusher(flusher),
+ _destroyed(false)
+{
+ _subscribers = new TopicSubscribers(_traceLevels, _logger, _name, _flusher);
+
+ _publisher = new BlobjectI(_subscribers);
+
+ string id = name;
+ id += '#';
+ id += "publish";
+
+ _adapter->add(_publisher, id);
+ _obj = adapter->createProxy(id);
+}
+
+TopicI::~TopicI()
+{
+}
+
+string
+TopicI::getName()
+{
+ return _name;
+}
+
+Ice::ObjectPrx
+TopicI::getPublisher()
+{
+ return _obj;
+}
+
+void
+TopicI::destroy()
+{
+ JTCSyncT<JTCMutex> sync(_destroyedMutex);
+ _adapter->remove(_name);
+ _destroyed = true;
+}
+
+bool
+TopicI::destroyed() const
+{
+ JTCSyncT<JTCMutex> sync(_destroyedMutex);
+ return _destroyed;
+}
+
+void
+TopicI::subscribe(const Ice::ObjectPrx& tmpl, const string& id, const QoS& qos)
+{
+ _subscribers->add(tmpl, id, qos);
+}
+
+void
+TopicI::unsubscribe(const string& id)
+{
+ _subscribers->remove(id);
+}
diff --git a/cpp/src/IceStorm/TopicI.h b/cpp/src/IceStorm/TopicI.h
new file mode 100644
index 00000000000..4d72b52a782
--- /dev/null
+++ b/cpp/src/IceStorm/TopicI.h
@@ -0,0 +1,66 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#ifndef TOPIC_I_H
+#define TOPIC_I_H
+
+#include <IceStorm/IceStorm.h>
+#include <IceStorm/TopicIF.h>
+#include <IceStorm/FlusherF.h>
+#include <IceStorm/TraceLevelsF.h>
+
+namespace IceStorm
+{
+
+class TopicSubscribers;
+typedef IceUtil::Handle<TopicSubscribers> TopicSubscribersPtr;
+
+class TopicI : public Topic
+{
+public:
+
+ TopicI(const Ice::ObjectAdapterPtr&, const TraceLevelsPtr&, const Ice::LoggerPtr&, const std::string&,
+ const FlusherPtr&);
+ ~TopicI();
+
+ virtual std::string getName();
+ virtual Ice::ObjectPrx getPublisher();
+ virtual void destroy();
+
+ // Internal methods
+ bool destroyed() const;
+ void subscribe(const Ice::ObjectPrx&, const std::string&, const QoS&);
+ void unsubscribe(const std::string&);
+
+private:
+
+ Ice::ObjectAdapterPtr _adapter;
+ TraceLevelsPtr _traceLevels;
+ Ice::LoggerPtr _logger;
+
+ // Immutable
+ std::string _name;
+
+ FlusherPtr _flusher;
+
+ JTCMutex _destroyedMutex;
+ bool _destroyed;
+
+ TopicSubscribersPtr _subscribers;
+
+ // Immutable
+ Ice::ObjectPtr _publisher;
+ Ice::ObjectPrx _obj;
+ //Ice::ServantLocatorPtr _locator;
+};
+
+} // End namespace IceStorm
+
+#endif
diff --git a/cpp/src/IceStorm/TopicIF.h b/cpp/src/IceStorm/TopicIF.h
new file mode 100644
index 00000000000..bed8183735a
--- /dev/null
+++ b/cpp/src/IceStorm/TopicIF.h
@@ -0,0 +1,24 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#ifndef TOPIC_I_F_H
+#define TOPIC_I_F_H
+
+#include <IceUtil/Shared.h>
+
+namespace IceStorm
+{
+
+class TopicI;
+typedef IceUtil::Handle<TopicI> TopicIPtr;
+
+} // End namespace IceStorm
+
+#endif
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp
new file mode 100644
index 00000000000..f04198df675
--- /dev/null
+++ b/cpp/src/IceStorm/TopicManagerI.cpp
@@ -0,0 +1,190 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#include <Ice/Ice.h>
+
+#include <IceStorm/TopicManagerI.h>
+#include <IceStorm/TopicI.h>
+#include <IceStorm/Flusher.h>
+#include <IceStorm/TraceLevels.h>
+
+#include <functional>
+
+using namespace IceStorm;
+using namespace std;
+
+TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice::ObjectAdapterPtr& adapter,
+ const TraceLevelsPtr& traceLevels) :
+ _communicator(communicator),
+ _adapter(adapter),
+ _traceLevels(traceLevels)
+
+{
+ _flusher = new Flusher(_communicator, _traceLevels);
+}
+
+TopicManagerI::~TopicManagerI()
+{
+}
+
+TopicPrx
+TopicManagerI::create(const string& name)
+{
+ // TODO: reader/writer mutex
+ JTCSyncT<JTCMutex> sync(*this);
+
+ reap();
+
+ if (_topicIMap.find(name) != _topicIMap.end())
+ {
+ throw TopicExists();
+ }
+
+ if (_traceLevels->topicMgr > 0)
+ {
+ ostringstream s;
+ s << "Create " << name;
+ _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
+ }
+
+ //
+ // Create topic implementation
+ //
+ TopicIPtr topicI = new TopicI(_adapter, _traceLevels, _communicator->getLogger(), name, _flusher);
+ _adapter->add(topicI, name);
+ _topicIMap.insert(TopicIMap::value_type(name, topicI));
+
+ return TopicPrx::uncheckedCast(_adapter->createProxy(name));
+}
+
+TopicPrx
+TopicManagerI::retrieve(const string& name)
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ reap();
+
+ if (_topicIMap.find(name) != _topicIMap.end())
+ return TopicPrx::uncheckedCast(_adapter->createProxy(name));
+ throw NoSuchTopic();
+}
+
+//
+// The arguments cannot be const & (for some reason)
+//
+static TopicDict::value_type
+transformToTopicDict(TopicIMap::value_type p, Ice::ObjectAdapterPtr adapter)
+{
+ return TopicDict::value_type(p.first, TopicPrx::uncheckedCast(adapter->createProxy(p.first)));
+}
+
+TopicDict
+TopicManagerI::retrieveAll()
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ reap();
+
+ TopicDict all;
+ transform(_topicIMap.begin(), _topicIMap.end(), inserter(all, all.begin()),
+ bind2nd(ptr_fun(transformToTopicDict), _adapter));
+
+ return all;
+}
+
+void
+TopicManagerI::subscribe(const Ice::ObjectPrx& tmpl, const string& id, const QoS& qos, const StringSeq& topics)
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ if (_traceLevels->topicMgr > 0)
+ {
+ ostringstream s;
+ s << "Subscribe: " << id;
+ if (_traceLevels->topicMgr > 1)
+ {
+ s << " QoS: ";
+ for (QoS::const_iterator qi = qos.begin(); qi != qos.end() ; ++qi)
+ {
+ if (qi != qos.begin())
+ {
+ s << ',';
+ }
+ s << '[' << qi->first << "," << qi->second << ']';
+ }
+ s << " Topics: ";
+ for (StringSeq::const_iterator ti = topics.begin(); ti != topics.end() ; ++ti)
+ {
+ if (ti != topics.begin())
+ {
+ s << ",";
+ }
+ s << *ti;
+ }
+ }
+ _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
+ }
+
+ for (StringSeq::const_iterator i = topics.begin() ; i != topics.end() ; ++i)
+ {
+ TopicIMap::iterator elem = _topicIMap.find(*i);
+ if (elem != _topicIMap.end())
+ {
+ elem->second->subscribe(tmpl, id, qos);
+ }
+ }
+}
+
+void
+TopicManagerI::unsubscribe(const string& id, const StringSeq& topics)
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ if (_traceLevels->topicMgr > 0)
+ {
+ ostringstream s;
+ s << "Unsubscribe: " << id;
+ if (_traceLevels->topicMgr > 1)
+ {
+ s << " Topics: ";
+ for (StringSeq::const_iterator ti = topics.begin(); ti != topics.end() ; ++ti)
+ {
+ if (ti != topics.begin())
+ {
+ s << ",";
+ }
+ s << *ti;
+ }
+ }
+ _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
+ }
+
+ for (StringSeq::const_iterator i = topics.begin() ; i != topics.end() ; ++i)
+ {
+ TopicIMap::iterator elem = _topicIMap.find(*i);
+ if (elem != _topicIMap.end())
+ {
+ elem->second->unsubscribe(id);
+ }
+ }
+}
+
+void
+TopicManagerI::shutdown()
+{
+ _flusher->stopFlushing();
+ _communicator->shutdown();
+}
+
+void
+TopicManagerI::reap()
+{
+}
+
diff --git a/cpp/src/IceStorm/TopicManagerI.h b/cpp/src/IceStorm/TopicManagerI.h
new file mode 100644
index 00000000000..b74e7b0bf3c
--- /dev/null
+++ b/cpp/src/IceStorm/TopicManagerI.h
@@ -0,0 +1,51 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#ifndef TOPIC_MANAGER_I_H
+#define TOPIC_MANAGER_I_H
+
+#include <IceStorm/IceStorm.h>
+#include <IceStorm/TopicIF.h>
+#include <IceStorm/FlusherF.h>
+#include <IceStorm/TraceLevelsF.h>
+
+namespace IceStorm
+{
+
+typedef std::map<std::string, TopicIPtr> TopicIMap;
+
+class TopicManagerI : public TopicManager, public JTCMutex
+{
+public:
+
+ TopicManagerI(const Ice::CommunicatorPtr&, const Ice::ObjectAdapterPtr&, const TraceLevelsPtr&);
+ ~TopicManagerI();
+
+ virtual TopicPrx create(const std::string&);
+ virtual TopicPrx retrieve(const std::string&);
+ virtual TopicDict retrieveAll();
+ virtual void subscribe(const Ice::ObjectPrx&, const std::string&, const QoS&, const StringSeq&);
+ virtual void unsubscribe(const std::string&, const StringSeq&);
+ virtual void shutdown();
+
+private:
+
+ void reap();
+
+ Ice::CommunicatorPtr _communicator;
+ Ice::ObjectAdapterPtr _adapter;
+ TraceLevelsPtr _traceLevels;
+ TopicIMap _topicIMap;
+ FlusherPtr _flusher;
+};
+
+} // End namespace IceStorm
+
+#endif
diff --git a/cpp/src/IceStorm/TraceLevels.cpp b/cpp/src/IceStorm/TraceLevels.cpp
new file mode 100644
index 00000000000..1dde386bc39
--- /dev/null
+++ b/cpp/src/IceStorm/TraceLevels.cpp
@@ -0,0 +1,60 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#include <Ice/Properties.h>
+#include <IceStorm/TraceLevels.h>
+
+using namespace std;
+using namespace IceStorm;
+
+void IceStorm::incRef(TraceLevels* p) { p->__incRef(); }
+void IceStorm::decRef(TraceLevels* p) { p->__decRef(); }
+
+TraceLevels::TraceLevels(const Ice::PropertiesPtr& properties) :
+ topicMgr(0),
+ topicMgrCat("TopicManager"),
+ topic(0),
+ topicCat("Topic"),
+ flush(0),
+ flushCat("Flush"),
+ subscriber(0),
+ subscriberCat("Subscriber")
+{
+ string value;
+ const string keyBase = "IceStorm.Trace.";
+
+ value = properties->getProperty(keyBase + topicMgrCat);
+ if (!value.empty())
+ {
+ const_cast<int&>(topicMgr) = atoi(value.c_str());
+ }
+
+ value = properties->getProperty(keyBase + topicCat);
+ if (!value.empty())
+ {
+ const_cast<int&>(topic) = atoi(value.c_str());
+ }
+
+ value = properties->getProperty(keyBase + flushCat);
+ if (!value.empty())
+ {
+ const_cast<int&>(flush) = atoi(value.c_str());
+ }
+
+ value = properties->getProperty(keyBase + subscriberCat);
+ if (!value.empty())
+ {
+ const_cast<int&>(subscriber) = atoi(value.c_str());
+ }
+}
+
+TraceLevels::~TraceLevels()
+{
+}
diff --git a/cpp/src/IceStorm/TraceLevels.h b/cpp/src/IceStorm/TraceLevels.h
new file mode 100644
index 00000000000..5aa2176e164
--- /dev/null
+++ b/cpp/src/IceStorm/TraceLevels.h
@@ -0,0 +1,43 @@
+// **********************************************************************
+//
+// Copyright (c) 2002
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#ifndef ICE_STORM_TRACE_LEVELS_H
+#define ICE_STORM_TRACE_LEVELS_H
+
+#include <IceUtil/Shared.h>
+#include <Ice/PropertiesF.h>
+#include <IceStorm/TraceLevelsF.h>
+
+namespace IceStorm
+{
+
+class TraceLevels : public ::IceUtil::Shared
+{
+public:
+
+ TraceLevels(const ::Ice::PropertiesPtr&);
+ virtual ~TraceLevels();
+
+ const int topicMgr;
+ const char* topicMgrCat;
+
+ const int topic;
+ const char* topicCat;
+
+ const int flush;
+ const char* flushCat;
+
+ const int subscriber;
+ const char* subscriberCat;
+};
+
+}
+
+#endif
diff --git a/cpp/src/IceStorm/TraceLevelsF.h b/cpp/src/IceStorm/TraceLevelsF.h
new file mode 100644
index 00000000000..cdb493dbf4f
--- /dev/null
+++ b/cpp/src/IceStorm/TraceLevelsF.h
@@ -0,0 +1,26 @@
+// **********************************************************************
+//
+// Copyright (c) 2002
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#ifndef ICE_STORM_TRACE_LEVELS_F_H
+#define ICE_STORM_TRACE_LEVELS_F_H
+
+#include <Ice/Handle.h>
+
+namespace IceStorm
+{
+
+class TraceLevels;
+void incRef(TraceLevels*);
+void decRef(TraceLevels*);
+typedef IceInternal::Handle<TraceLevels> TraceLevelsPtr;
+
+} // End namespace IceStorm
+
+#endif
diff --git a/cpp/src/IceStorm/config b/cpp/src/IceStorm/config
new file mode 100644
index 00000000000..0c45dde479c
--- /dev/null
+++ b/cpp/src/IceStorm/config
@@ -0,0 +1,16 @@
+Ice.Adapter.TopicManager.Endpoints=tcp -p 10000
+
+Ice.Trace.Network=3
+Ice.Trace.Protocol=1
+#Ice.Trace.Security=2
+
+IceStorm.Trace.TopicManager=2
+
+IceStorm.Trace.Topic=1
+
+IceStorm.Trace.Flush=1
+
+IceStorm.Trace.Subscriber=1
+
+IceStorm.Flush.Timeout = 2000
+