summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/KeepAliveThread.cpp
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2006-10-26 03:50:00 +0000
committerMatthew Newhook <matthew@zeroc.com>2006-10-26 03:50:00 +0000
commit12203029a37a8145599d368359710469b29f0318 (patch)
treed95dcc0d7c312f60e6c686a244e00a1436236ba8 /cpp/src/IceStorm/KeepAliveThread.cpp
parentfix for missing ICE_HOME (diff)
downloadice-12203029a37a8145599d368359710469b29f0318.tar.bz2
ice-12203029a37a8145599d368359710469b29f0318.tar.xz
ice-12203029a37a8145599d368359710469b29f0318.zip
Renamed IceStorm::subscribe to IceStorm::Topic::subscribeAndGetPublisher.
Integrated the upstream ping stuff. Updated CHANGES file.
Diffstat (limited to 'cpp/src/IceStorm/KeepAliveThread.cpp')
-rw-r--r--cpp/src/IceStorm/KeepAliveThread.cpp160
1 files changed, 160 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/KeepAliveThread.cpp b/cpp/src/IceStorm/KeepAliveThread.cpp
new file mode 100644
index 00000000000..600b07df0db
--- /dev/null
+++ b/cpp/src/IceStorm/KeepAliveThread.cpp
@@ -0,0 +1,160 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2006 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceStorm/KeepAliveThread.h>
+#include <IceStorm/TraceLevels.h>
+#include <Ice/LocalException.h>
+#include <Ice/LoggerUtil.h>
+#include <Ice/Communicator.h>
+
+using namespace std;
+using namespace IceStorm;
+
+KeepAliveThread::KeepAliveThread(const Ice::CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels,
+ const IceUtil::Time& timeout) :
+ _communicator(communicator),
+ _traceLevels(traceLevels),
+ _timeout(timeout),
+ _destroy(false)
+{
+}
+
+KeepAliveThread::~KeepAliveThread()
+{
+}
+
+void
+KeepAliveThread::add(const TopicUpstreamLinkPrx& upstream)
+{
+ Lock sync(*this);
+ if(_traceLevels->keepAlive > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->keepAliveCat);
+ out << "add " << _communicator->identityToString(upstream->ice_getIdentity());
+ }
+ _upstream.push_back(upstream);
+}
+
+void
+KeepAliveThread::remove(const TopicUpstreamLinkPrx& upstream)
+{
+ Lock sync(*this);
+ if(_traceLevels->keepAlive > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->keepAliveCat);
+ out << "remove " << _communicator->identityToString(upstream->ice_getIdentity());
+ }
+ list<TopicUpstreamLinkPrx>::iterator p = find(_upstream.begin(), _upstream.end(), upstream);
+ if(p != _upstream.end())
+ {
+ _upstream.erase(p);
+ }
+}
+
+void
+KeepAliveThread::destroy()
+{
+ Lock sync(*this);
+ _destroy = true;
+ notify();
+}
+
+bool
+KeepAliveThread::filter(IceStorm::TopicUpstreamLinkPrxSeq& upstream)
+{
+ Lock sync(*this);
+ bool changed = false;
+ IceStorm::TopicUpstreamLinkPrxSeq::iterator p = upstream.begin();
+ while(p != upstream.end())
+ {
+ list<TopicUpstreamLinkPrx>::iterator q = find(_failed.begin(), _failed.end(), *p);
+ if(q != _failed.end())
+ {
+ if(_traceLevels->keepAlive > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->keepAliveCat);
+ out << "filter " << _communicator->identityToString((*p)->ice_getIdentity());
+ }
+ _failed.erase(q);
+ p = upstream.erase(p);
+ changed = true;
+ }
+ else
+ {
+ ++p;
+ }
+ }
+ return changed;
+}
+
+void
+KeepAliveThread::run()
+{
+ list<TopicUpstreamLinkPrx> upstream;
+ {
+ Lock sync(*this);
+ upstream = _upstream;
+ }
+
+ //
+ // First all upstream links are notified. Then we wait. It is done
+ // in this order so that any upstream links are notified
+ // immediately upon startup of the service.
+ //
+ for(;;)
+ {
+ for(list<TopicUpstreamLinkPrx>::const_iterator p = upstream.begin(); p != upstream.end(); ++p)
+ {
+ try
+ {
+ (*p)->keepAlive();
+ }
+ catch(const Ice::ObjectNotExistException& e)
+ {
+ failed(*p);
+ }
+ catch(const Ice::Exception& e)
+ {
+ if(_traceLevels->keepAlive > 1)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->keepAliveCat);
+ out << "unreachable " << _communicator->identityToString((*p)->ice_getIdentity());
+ }
+ // Ignore
+ }
+ }
+
+ {
+ Lock sync(*this);
+ timedWait(_timeout);
+ if(_destroy)
+ {
+ return;
+ }
+ upstream = _upstream;
+ }
+ }
+}
+
+void
+KeepAliveThread::failed(const TopicUpstreamLinkPrx& upstream)
+{
+ Lock sync(*this);
+ if(_traceLevels->keepAlive > 1)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->keepAliveCat);
+ out << "failed " << _communicator->identityToString(upstream->ice_getIdentity());
+ }
+ list<TopicUpstreamLinkPrx>::iterator p = find(_upstream.begin(), _upstream.end(), upstream);
+ if(p != _upstream.end())
+ {
+ _upstream.erase(p);
+ }
+ _failed.push_back(upstream);
+}