summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicI.cpp233
1 files changed, 233 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
new file mode 100644
index 00000000000..0510e081311
--- /dev/null
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -0,0 +1,233 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#include <Ice/Ice.h>
+#include <Ice/Functional.h>
+
+#include <IceStorm/TopicI.h>
+#include <IceStorm/Flusher.h>
+#include <IceStorm/Subscriber.h>
+#include <IceStorm/TraceLevels.h>
+
+#include <algorithm>
+
+using namespace IceStorm;
+using namespace std;
+
+namespace IceStorm
+{
+
+class BlobjectI : public Ice::Blobject
+{
+public:
+
+ BlobjectI(const IceStorm::TopicSubscribersPtr& s) :
+ _subscribers(s)
+ {
+ }
+
+ ~BlobjectI()
+ {
+ }
+
+ virtual void ice_invokeIn(const std::string&, const std::string&, const std::string&,
+ const std::vector< ::Ice::Byte>&);
+
+private:
+
+ IceStorm::TopicSubscribersPtr _subscribers;
+};
+
+
+class TopicSubscribers : public IceUtil::Shared, public JTCMutex
+{
+public:
+
+ TopicSubscribers(const TraceLevelsPtr& traceLevels, const Ice::LoggerPtr& logger, const string& topic,
+ const FlusherPtr& flusher) :
+ _traceLevels(traceLevels),
+ _logger(logger),
+ _topic(topic),
+ _flusher(flusher)
+ {
+ }
+
+ ~TopicSubscribers()
+ {
+ }
+
+ void
+ add(const Ice::ObjectPrx& s, const string& idPrefix, const QoS& qos)
+ {
+ //
+ // Create the full topic subscriber id (<prefix>#<topic>)
+ //
+ string id = idPrefix;
+ id += '#';
+ id += _topic;
+
+ //
+ // Change the identity of the proxy
+ //
+ Ice::ObjectPrx obj = s->ice_newIdentity(id);
+
+ SubscriberPtr subscriber = new Subscriber(_logger, _traceLevels, _flusher, qos, obj);
+
+ JTCSyncT<JTCMutex> sync(*this);
+
+ //
+ // Add to the set of subscribers
+ //
+ _subscribers.push_back(subscriber);
+ }
+
+ void
+ remove(const string& idPrefix)
+ {
+ JTCSyncT<JTCMutex> sync(*this);
+
+ //
+ // Create the full topic subscriber id (<prefix>#<topic>)
+ //
+ string id = idPrefix;
+ id += '#';
+ id += _topic;
+
+ SubscriberList::iterator i;
+ for (i = _subscribers.begin() ; i != _subscribers.end(); ++i)
+ {
+ if ((*i)->id() == id)
+ {
+ //
+ // This marks the subscriber as invalid. It will be
+ // removed on the next event publish.
+ //
+ (*i)->unsubscribe();
+ break;
+ }
+ }
+
+ //
+ // If the subscriber was not found then display a diagnostic
+ //
+ if (i == _subscribers.end())
+ {
+ if (_traceLevels->topic > 0)
+ {
+ ostringstream s;
+ s << _topic << ": " << id << "not subscribed.";
+ _logger->trace(_traceLevels->topicCat, s.str());
+ }
+ }
+ }
+
+
+ void
+ publish(const string& op, const std::vector< ::Ice::Byte>& blob)
+ {
+ JTCSyncT<JTCMutex> sync(*this);
+
+ //
+ // Using standard algorithms I don't think there is a way to
+ // do this in one pass. For instance, I thought about using
+ // remove_if - but the predicate needs to be a pure function
+ // (see Meyers for details). If this is fixed then fix Flusher
+ // also.
+ //
+ _subscribers.remove_if(::Ice::constMemFun(&Subscriber::invalid));
+
+ for (SubscriberList::iterator i = _subscribers.begin(); i != _subscribers.end(); ++i)
+ (*i)->publish(op, blob);
+ //for_each(_subscribers.begin(), _subscribers.end(), Ice::memFun(&Subscriber::publish));
+ }
+
+private:
+
+ TraceLevelsPtr _traceLevels;
+ Ice::LoggerPtr _logger;
+
+ string _topic;
+ FlusherPtr _flusher;
+ SubscriberList _subscribers;
+};
+
+} // End namespace IceStorm
+
+void
+BlobjectI::ice_invokeIn(const string&, const string&, const string& op,
+ const std::vector< ::Ice::Byte>& blob)
+{
+ cerr << "ice_invokeIn" << endl;
+ _subscribers->publish(op, blob);
+}
+
+TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& traceLevels, const Ice::LoggerPtr& logger,
+ const string& name, const FlusherPtr& flusher) :
+ _adapter(adapter),
+ _traceLevels(traceLevels),
+ _logger(logger),
+ _name(name),
+ _flusher(flusher),
+ _destroyed(false)
+{
+ _subscribers = new TopicSubscribers(_traceLevels, _logger, _name, _flusher);
+
+ _publisher = new BlobjectI(_subscribers);
+
+ string id = name;
+ id += '#';
+ id += "publish";
+
+ _adapter->add(_publisher, id);
+ _obj = adapter->createProxy(id);
+}
+
+TopicI::~TopicI()
+{
+}
+
+string
+TopicI::getName()
+{
+ return _name;
+}
+
+Ice::ObjectPrx
+TopicI::getPublisher()
+{
+ return _obj;
+}
+
+void
+TopicI::destroy()
+{
+ JTCSyncT<JTCMutex> sync(_destroyedMutex);
+ _adapter->remove(_name);
+ _destroyed = true;
+}
+
+bool
+TopicI::destroyed() const
+{
+ JTCSyncT<JTCMutex> sync(_destroyedMutex);
+ return _destroyed;
+}
+
+void
+TopicI::subscribe(const Ice::ObjectPrx& tmpl, const string& id, const QoS& qos)
+{
+ _subscribers->add(tmpl, id, qos);
+}
+
+void
+TopicI::unsubscribe(const string& id)
+{
+ _subscribers->remove(id);
+}