summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/ReplicaSessionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/ReplicaSessionManager.cpp')
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.cpp265
1 files changed, 265 insertions, 0 deletions
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp
new file mode 100644
index 00000000000..ec55bd2c6b6
--- /dev/null
+++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp
@@ -0,0 +1,265 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2006 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <Ice/Ice.h>
+#include <IceGrid/ReplicaSessionManager.h>
+#include <IceGrid/TraceLevels.h>
+#include <IceGrid/Database.h>
+
+using namespace std;
+using namespace IceGrid;
+
+class MasterRegistryObserverI : public RegistryObserver
+{
+public:
+
+ MasterRegistryObserverI(const DatabasePtr& database) : _database(database)
+ {
+ }
+
+ virtual void
+ init(int serial,
+ const ApplicationDescriptorSeq& applications,
+ const AdapterInfoSeq& adapters,
+ const ObjectInfoSeq& objects,
+ const Ice::Current&)
+ {
+ _database->initReplica(serial, applications, adapters, objects);
+ }
+
+ virtual void
+ applicationAdded(int serial, const ApplicationDescriptor& application, const Ice::Current&)
+ {
+ _database->addApplicationDescriptor(0, application, serial);
+ }
+
+ virtual void
+ applicationRemoved(int serial, const std::string& name, const Ice::Current&)
+ {
+ _database->removeApplicationDescriptor(0, name, serial);
+ }
+
+ virtual void
+ applicationUpdated(int serial, const ApplicationUpdateDescriptor& update, const Ice::Current&)
+ {
+ _database->updateApplicationDescriptor(0, update, serial);
+ }
+
+ virtual void
+ adapterAdded(int serial, const AdapterInfo& info, const Ice::Current&)
+ {
+ _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy, serial);
+ }
+
+ virtual void
+ adapterUpdated(int serial, const AdapterInfo& info, const Ice::Current&)
+ {
+ _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy, serial);
+ }
+
+ virtual void
+ adapterRemoved(int serial, const std::string& id, const Ice::Current&)
+ {
+ _database->setAdapterDirectProxy(id, "", 0, serial);
+ }
+
+ virtual void
+ objectAdded(int serial, const ObjectInfo& info, const Ice::Current&)
+ {
+ _database->addObject(info, false, serial);
+ }
+
+ virtual void
+ objectUpdated(int serial, const ObjectInfo& info, const Ice::Current&)
+ {
+ _database->updateObject(info.proxy, serial);
+ }
+
+ virtual void
+ objectRemoved(int serial, const Ice::Identity& id, const Ice::Current&)
+ {
+ _database->removeObject(id, serial);
+ }
+
+private:
+
+ const DatabasePtr _database;
+};
+
+ReplicaSessionKeepAliveThread::ReplicaSessionKeepAliveThread(const std::string& name,
+ const InternalRegistryPrx& master,
+ const InternalRegistryPrx& replica,
+ const ReplicaInfo& info,
+ const DatabasePtr& database) :
+ _name(name),
+ _master(master),
+ _replica(replica),
+ _info(info),
+ _database(database),
+ _timeout(IceUtil::Time::seconds(5)),
+ _shutdown(false)
+{
+}
+
+void
+ReplicaSessionKeepAliveThread::run()
+{
+ //
+ // Keep alive the session.
+ //
+ ReplicaSessionPrx session;
+ while(true)
+ {
+ keepAlive(session);
+
+ {
+ Lock sync(*this);
+
+ session = _session;
+
+ if(!_shutdown)
+ {
+ timedWait(_timeout);
+ }
+
+ if(_shutdown)
+ {
+ break;
+ }
+ }
+ }
+
+ //
+ // Destroy the session.
+ //
+ if(session)
+ {
+ try
+ {
+ session->destroy();
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ //
+ // TODO: XXX: TRACE?
+ //
+// ostringstream os;
+// os << "couldn't contact the IceGrid registry to destroy the node session:\n" << ex;
+// _database->getTraceLevels()->logger->warning(os.str());
+ }
+ }
+}
+
+void
+ReplicaSessionKeepAliveThread::waitForCreate()
+{
+ Lock sync(*this);
+ while(!_session)
+ {
+ wait();
+ }
+}
+
+void
+ReplicaSessionKeepAliveThread::terminate()
+{
+ Lock sync(*this);
+ _shutdown = true;
+ notifyAll();
+}
+
+void
+ReplicaSessionKeepAliveThread::keepAlive(const ReplicaSessionPrx& session)
+{
+ if(session)
+ {
+ try
+ {
+ session->keepAlive();
+ return; // We're done!
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ }
+
+ try
+ {
+ ReplicaSessionPrx newSession = _master->registerReplica(_name, _replica, _info);
+ int timeout = newSession->getTimeout();
+ {
+ Lock sync(*this);
+ if(timeout > 0)
+ {
+ _timeout = IceUtil::Time::seconds(timeout);
+ }
+ _session = newSession;
+ notifyAll();
+ }
+ }
+ catch(const ReplicaActiveException&)
+ {
+ _database->getTraceLevels()->logger->error("a replica with the same name is already registered and active");
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ //
+ // TODO: FIX THIS SHOULD BE A TRACE
+ //
+ ostringstream os;
+ os << "couldn't contact the IceGrid registry:\n" << ex;
+ _database->getTraceLevels()->logger->warning(os.str());
+ }
+}
+
+ReplicaSessionManager::ReplicaSessionManager()
+{
+}
+
+void
+ReplicaSessionManager::create(const string& name,
+ const DatabasePtr& database,
+ const InternalRegistryPrx& replica,
+ const Ice::ObjectAdapterPtr& clientAdapter,
+ const Ice::ObjectAdapterPtr& serverAdapter)
+{
+ Ice::CommunicatorPtr communicator = database->getCommunicator();
+ Ice::PropertiesPtr properties = communicator->getProperties();
+
+ Ice::LocatorPrx locator = communicator->getDefaultLocator();
+ assert(locator);
+ string instanceName = locator->ice_getIdentity().category;
+
+ InternalRegistryPrx master =
+ InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(instanceName + "/InternalRegistry"));
+
+ Ice::ObjectPrx observer = database->getInternalAdapter()->addWithUUID(new MasterRegistryObserverI(database));
+
+ ReplicaInfo info;
+ info.clientProxy = clientAdapter->createDirectProxy(communicator->stringToIdentity("dummy"));
+ info.serverProxy = serverAdapter->createDirectProxy(communicator->stringToIdentity("dummy"));
+ info.observer = RegistryObserverPrx::uncheckedCast(observer);
+
+ _session = new ReplicaSessionKeepAliveThread(name, master, replica, info, database);
+ _session->start();
+}
+
+void
+ReplicaSessionManager::destroy()
+{
+ if(_session)
+ {
+ _session->terminate();
+ _session->getThreadControl().join();
+ }
+}
+
+
+
+