summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp167
1 files changed, 167 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
new file mode 100644
index 00000000000..590e3cf78e1
--- /dev/null
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -0,0 +1,167 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#include <Ice/Ice.h>
+
+#include <IceStorm/Subscriber.h>
+#include <IceStorm/TraceLevels.h>
+#include <IceStorm/Flusher.h>
+
+using namespace IceStorm;
+using namespace std;
+
+void IceStorm::incRef(Subscriber* p) { p->__incRef(); }
+void IceStorm::decRef(Subscriber* p) { p->__decRef(); }
+
+Subscriber::Subscriber(const Ice::LoggerPtr& logger, const TraceLevelsPtr& traceLevels, const FlusherPtr& flusher,
+ const QoS& qos, const Ice::ObjectPrx& obj)
+ : _logger(logger),
+ _traceLevels(traceLevels),
+ _invalid(false)
+{
+
+ //
+ // Determine the requested reliability characteristics
+ //
+ QoS::const_iterator i = qos.find("reliability");
+ string reliability;
+ if (i == qos.end())
+ {
+ reliability = "oneway";
+ }
+ else
+ {
+ reliability = i->second;
+ }
+
+ if (reliability == "batch")
+ {
+ _obj = obj->ice_batchOneway();
+ _flusher = flusher;
+ _flusher->add(this);
+ }
+ else // reliability == "oneway"
+ {
+ if (reliability != "oneway")
+ {
+ if (_traceLevels->subscriber > 0)
+ {
+ ostringstream s;
+ s << reliability <<" mode not understood.";
+ _logger->trace(_traceLevels->subscriberCat, s.str());
+ }
+ }
+ _obj = obj->ice_oneway();
+ }
+}
+
+Subscriber::~Subscriber()
+{
+}
+
+bool
+Subscriber::invalid() const
+{
+ JTCSyncT<JTCMutex> sync(_invalidMutex);
+ return _invalid;
+}
+
+void
+Subscriber::unsubscribe()
+{
+ JTCSyncT<JTCMutex> sync(_invalidMutex);
+ _invalid = true;
+
+ if (_traceLevels->subscriber > 0)
+ {
+ ostringstream s;
+ s << "Unsubscribe " << _obj->ice_getIdentity();
+ _logger->trace(_traceLevels->subscriberCat, s.str());
+ }
+
+ //
+ // If this subscriber has been registered with the flusher then
+ // remove it.
+ //
+ if (_flusher)
+ {
+ _flusher->remove(this);
+ }
+}
+
+void
+Subscriber::flush()
+{
+ try
+ {
+ _obj->ice_flush();
+ }
+ catch(const Ice::LocalException& e)
+ {
+ JTCSyncT<JTCMutex> sync(_invalidMutex);
+ //
+ // It's possible that the subscriber was unsubscribed, or
+ // marked invalid by another thread. Don't display a
+ // diagnostic in this case.
+ //
+ if (!_invalid)
+ {
+ if (_traceLevels->subscriber > 0)
+ {
+ ostringstream s;
+ s << _obj->ice_getIdentity() << ": flush failed: " << e;
+ _logger->trace(_traceLevels->subscriberCat, s.str());
+ }
+ _invalid = true;
+ }
+ }
+}
+
+void
+Subscriber::publish(const string& op, const std::vector< ::Ice::Byte>& blob)
+{
+ try
+ {
+ bool nonmutating = true;
+ _obj->ice_invokeIn(op, nonmutating, blob);
+ }
+ catch(const Ice::LocalException& e)
+ {
+ JTCSyncT<JTCMutex> sync(_invalidMutex);
+ //
+ // It's possible that the subscriber was unsubscribed, or
+ // marked invalid by another thread. Don't display a
+ // diagnostic in this case.
+ //
+ if (!_invalid)
+ {
+ if (_traceLevels->subscriber > 0)
+ {
+ ostringstream s;
+ s << _obj->ice_getIdentity() << ": publish failed: " << e;
+ _logger->trace(_traceLevels->subscriberCat, s.str());
+ }
+ _invalid = true;
+ }
+ }
+
+}
+
+std::string
+Subscriber::id() const
+{
+ return _obj->ice_getIdentity();
+}
+
+bool
+Subscriber::operator==(const Subscriber& rhs) const
+{
+ return _obj->ice_getIdentity() == rhs._obj->ice_getIdentity();
+}