summaryrefslogtreecommitdiff
path: root/cpp/src/IcePack/ActivatorI.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2002-08-22 22:52:31 +0000
committerBenoit Foucher <benoit@zeroc.com>2002-08-22 22:52:31 +0000
commit8ee8c08018e63d5b5b6c505490e27dcb3ef766db (patch)
tree4f888dacbe96952a910eb969a5428aba67030e7b /cpp/src/IcePack/ActivatorI.cpp
parentbug fix for identity reuse in add() (diff)
downloadice-8ee8c08018e63d5b5b6c505490e27dcb3ef766db.tar.bz2
ice-8ee8c08018e63d5b5b6c505490e27dcb3ef766db.tar.xz
ice-8ee8c08018e63d5b5b6c505490e27dcb3ef766db.zip
Added support for IcePack node and re-factored many things in IcePack.
Diffstat (limited to 'cpp/src/IcePack/ActivatorI.cpp')
-rw-r--r--cpp/src/IcePack/ActivatorI.cpp357
1 files changed, 213 insertions, 144 deletions
diff --git a/cpp/src/IcePack/ActivatorI.cpp b/cpp/src/IcePack/ActivatorI.cpp
index 92654ff8eda..bb4c6516d84 100644
--- a/cpp/src/IcePack/ActivatorI.cpp
+++ b/cpp/src/IcePack/ActivatorI.cpp
@@ -15,7 +15,7 @@
#include <Ice/Ice.h>
#include <IcePack/ActivatorI.h>
#include <IcePack/Admin.h>
-#include <IcePack/ServerManager.h>
+#include <IcePack/Internal.h>
#include <IcePack/TraceLevels.h>
#include <sys/types.h>
@@ -26,10 +26,33 @@ using namespace std;
using namespace Ice;
using namespace IcePack;
-IcePack::ActivatorI::ActivatorI(const CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels) :
- _communicator(communicator),
+namespace IcePack
+{
+
+class TerminationListenerThread : public IceUtil::Thread
+{
+public:
+
+ TerminationListenerThread(ActivatorI& activator) :
+ _activator(activator)
+ {
+ }
+
+ virtual
+ void run()
+ {
+ _activator.runTerminationListener();
+ }
+
+private:
+
+ ActivatorI& _activator;
+};
+
+}
+
+IcePack::ActivatorI::ActivatorI(const TraceLevelsPtr& traceLevels) :
_traceLevels(traceLevels),
- _destroy(false),
_deactivating(false)
{
int fds[2];
@@ -48,110 +71,23 @@ IcePack::ActivatorI::ActivatorI(const CommunicatorPtr& communicator, const Trace
IcePack::ActivatorI::~ActivatorI()
{
- assert(_destroy);
- assert(_processes.empty());
+ assert(!_thread);
close(_fdIntrRead);
close(_fdIntrWrite);
}
-void
-IcePack::ActivatorI::run()
+bool
+IcePack::ActivatorI::activate(const ServerPtr& server)
{
- try
- {
- terminationListener();
- }
- catch(const Exception& ex)
- {
- Error out(_communicator->getLogger());
- out << "exception in process termination listener:\n" << ex;
- }
- catch(...)
- {
- Error out(_communicator->getLogger());
- out << "unknown exception in process termination listener";
- }
-}
+ IceUtil::Monitor< IceUtil::Mutex>::Lock sync(*this);
-void
-IcePack::ActivatorI::destroy()
-{
- {
- IceUtil::Mutex::Lock sync(*this);
-
- if(_destroy || _deactivating) // Don't destroy or deactivate twice.
- {
- return;
- }
-
- //
- // This ensure that no new processes will be activated.
- //
- _deactivating = true;
- }
-
-
- //
- // Stop all activated processes.
- //
- while(true)
- {
- ServerPrx server;
- {
- IceUtil::Mutex::Lock sync(*this);
- if(!_processes.empty())
- {
- server = _processes.begin()->server;
- }
- else
- {
- //
- // No more process to deactivate.
- //
- break;
- }
- }
-
- //
- // Stop the server. The activator thread should detect the
- // process deactivation and remove it from the activator
- // active processes. This garantees that this loop will end at
- // one point.
- //
- server->stop();
- }
-
- //
- // Set the state as destroyed, this will cause the
- // activator thread to exit.
- //
- {
- IceUtil::Mutex::Lock sync(*this);
-
- _destroy = true;
- setInterrupt();
- }
-
- //
- // Join the activator thread.
- //
- getThreadControl().join();
-}
-
-Ice::Int
-IcePack::ActivatorI::activate(const ServerPrx& server)
-{
- IceUtil::Mutex::Lock sync(*this);
-
- if(_destroy || _deactivating)
+ if(_deactivating)
{
return false;
}
- ServerDescription desc = server->getServerDescription();
-
- string path = desc.path;
+ string path = server->description.path;
if(path.empty())
{
return false;
@@ -173,7 +109,7 @@ IcePack::ActivatorI::activate(const ServerPrx& server)
//
// Normalize the path to the working directory.
//
- string pwd = desc.pwd;
+ string pwd = server->description.pwd;
if(!pwd.empty())
{
string::size_type pos;
@@ -190,12 +126,12 @@ IcePack::ActivatorI::activate(const ServerPrx& server)
//
// Compute arguments.
//
- int argc = desc.args.size() + 2;
+ int argc = server->description.args.size() + 2;
char** argv = static_cast<char**>(malloc(argc * sizeof(char*)));
argv[0] = strdup(path.c_str());
unsigned int i = 0;
vector<string>::const_iterator q;
- for(q = desc.args.begin(); q != desc.args.end(); ++q, ++i)
+ for(q = server->description.args.begin(); q != server->description.args.end(); ++q, ++i)
{
argv[i + 1] = strdup(q->c_str());
}
@@ -204,7 +140,7 @@ IcePack::ActivatorI::activate(const ServerPrx& server)
if(_traceLevels->activator > 1)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->activatorCat);
- out << "activating server `" << desc.name << "'";
+ out << "activating server `" << server->description.name << "'";
if(_traceLevels->activator > 2)
{
out << "\n";
@@ -336,20 +272,20 @@ IcePack::ActivatorI::activate(const ServerPrx& server)
flags |= O_NONBLOCK;
fcntl(process.fd, F_SETFL, flags);
- setInterrupt();
+ setInterrupt(0);
if(_traceLevels->activator > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->activatorCat);
- out << "activated server `" << desc.name << "'(pid = " << pid << ")";
+ out << "activated server `" << server->description.name << "'(pid = " << pid << ")";
}
}
- return pid;
+ return true;
}
void
-IcePack::ActivatorI::deactivate(const ServerPrx& server)
+IcePack::ActivatorI::deactivate(const ServerPtr& server)
{
pid_t pid = static_cast<pid_t>(server->getPid());
@@ -365,15 +301,13 @@ IcePack::ActivatorI::deactivate(const ServerPrx& server)
if(_traceLevels->activator > 1)
{
- ServerDescription desc = server->getServerDescription();
-
Ice::Trace out(_traceLevels->logger, _traceLevels->activatorCat);
- out << "sent SIGTERM to server `" << desc.name << "' (pid = " << pid << ")";
+ out << "sent SIGTERM to server `" << server->description.name << "' (pid = " << pid << ")";
}
}
void
-IcePack::ActivatorI::kill(const ServerPrx& server)
+IcePack::ActivatorI::kill(const ServerPtr& server)
{
pid_t pid = static_cast<pid_t>(server->getPid());
@@ -389,10 +323,124 @@ IcePack::ActivatorI::kill(const ServerPrx& server)
if(_traceLevels->activator > 1)
{
- ServerDescription desc = server->getServerDescription();
-
Ice::Trace out(_traceLevels->logger, _traceLevels->activatorCat);
- out << "sent SIGKILL to server `" << desc.name << "' (pid = " << pid << ")";
+ out << "sent SIGKILL to server `" << server->description.name << "' (pid = " << pid << ")";
+ }
+}
+
+Ice::Int
+IcePack::ActivatorI::getServerPid(const ServerPtr& server)
+{
+ IceUtil::Monitor< IceUtil::Mutex>::Lock sync(*this);
+
+ for(vector<Process>::iterator p = _processes.begin(); p != _processes.end(); ++p)
+ {
+ if(p->server == server)
+ {
+ return static_cast<Ice::Int>(p->pid);
+ }
+ }
+
+ return 0;
+}
+
+void
+IcePack::ActivatorI::start()
+{
+ //
+ // Create and start the termination listener thread.
+ //
+ _thread = new TerminationListenerThread(*this);
+ _thread->start();
+}
+
+void
+IcePack::ActivatorI::waitForShutdown()
+{
+ IceUtil::Monitor< IceUtil::Mutex>::Lock sync(*this);
+ while(!_deactivating)
+ {
+ wait();
+ }
+}
+
+void
+IcePack::ActivatorI::shutdown()
+{
+ setInterrupt(1);
+}
+
+void
+IcePack::ActivatorI::destroy()
+{
+ {
+ IceUtil::Monitor< IceUtil::Mutex>::Lock sync(*this);
+ assert(_deactivating);
+ }
+
+ //
+ // Deactivate all the processes.
+ //
+ deactivateAll();
+
+ //
+ // Join the termination listener thread. This thread terminates
+ // when there's no more processes and when _deactivating is set to
+ // true.
+ //
+ _thread->getThreadControl().join();
+ _thread = 0;
+}
+
+void
+IcePack::ActivatorI::runTerminationListener()
+{
+ try
+ {
+ terminationListener();
+ }
+ catch(const Exception& ex)
+ {
+ Error out(_traceLevels->logger);
+ out << "exception in process termination listener:\n" << ex;
+ }
+ catch(...)
+ {
+ Error out(_traceLevels->logger);
+ out << "unknown exception in process termination listener";
+ }
+}
+
+void
+IcePack::ActivatorI::deactivateAll()
+{
+ //
+ // Stop all activate processes.
+ //
+ std::vector<Process> processes;
+ {
+ IceUtil::Monitor< IceUtil::Mutex>::Lock sync(*this);
+ processes = _processes;
+ }
+
+ for(std::vector<Process>::iterator p = processes.begin(); p != processes.end(); ++p)
+ {
+ //
+ // Stop the server. The activator thread should detect the
+ // process deactivation and remove it from the activator
+ // active processes.
+ //
+ try
+ {
+ p->server->stop();
+ }
+ catch(const Ice::ObjectNotExistException&)
+ {
+ //
+ // Expected if the server was in the process of being
+ // destroyed.
+ //
+ }
}
}
@@ -407,12 +455,7 @@ IcePack::ActivatorI::terminationListener()
FD_SET(_fdIntrRead, &fdSet);
{
- IceUtil::Mutex::Lock sync(*this);
-
- if(_destroy)
- {
- return;
- }
+ IceUtil::Monitor< IceUtil::Mutex>::Lock sync(*this);
for(vector<Process>::iterator p = _processes.begin(); p != _processes.end(); ++p)
{
@@ -442,18 +485,29 @@ IcePack::ActivatorI::terminationListener()
}
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor< IceUtil::Mutex>::Lock sync(*this);
if(FD_ISSET(_fdIntrRead, &fdSet))
{
- clearInterrupt();
+ bool deactivating = clearInterrupt();
+
+ if(deactivating && !_deactivating)
+ {
+ //
+ // Deactivation has been initiated. Set _deactivating to true to
+ // prevent new processes to be activated. This will also cause the
+ // termination of this loop when there's no more active processes.
+ //
+ _deactivating = true;
+ notifyAll(); // For waitForShutdown
+
+ if(_processes.empty())
+ {
+ return;
+ }
+ }
}
- if(_destroy)
- {
- return;
- }
-
vector<Process>::iterator p = _processes.begin();
while(p != _processes.end())
{
@@ -484,28 +538,39 @@ IcePack::ActivatorI::terminationListener()
++p;
}
else if(ret == 0)
- {
- ServerPrx server = p->server;
-
+ {
//
- // If the pipe was closed, the process has
- // terminated.
+ // If the pipe was closed, the process has terminated.
//
- p = _processes.erase(p);
- close(fd);
if(_traceLevels->activator > 0)
{
- ServerDescription desc = server->getServerDescription();
-
Ice::Trace out(_traceLevels->logger, _traceLevels->activatorCat);
- out << "detected server `" << desc.name << "' termination";
+ out << "detected server `" << p->server->description.name << "' termination";
+ }
+
+ try
+ {
+ p->server->terminated();
}
-
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_traceLevels->logger);
+ out << "unexpected exception raised by server `" << p->server->description.name
+ << "' termination:\n" << ex;
+ }
+
+ p = _processes.erase(p);
+ close(fd);
+
//
- // Notify the server it has terminated.
+ // We are deactivating and there's no more active processes. We can now
+ // end this loop
//
- server->terminationCallback();
+ if(_deactivating && _processes.empty())
+ {
+ return;
+ }
}
//
@@ -513,7 +578,7 @@ IcePack::ActivatorI::terminationListener()
//
if(!message.empty())
{
- Error out(_communicator->getLogger());
+ Error out(_traceLevels->logger);
out << message;
}
}
@@ -526,18 +591,22 @@ IcePack::ActivatorI::terminationListener()
}
}
-void
+bool
IcePack::ActivatorI::clearInterrupt()
{
- char s[32]; // Clear up to 32 interrupts at once.
- while(read(_fdIntrRead, s, 32) == 32)
+ bool shutdown = false;
+ char c;
+
+ while(read(_fdIntrRead, &c, 1) == 1)
{
+ shutdown = shutdown ? true : c == 1;
}
+
+ return shutdown;
}
void
-IcePack::ActivatorI::setInterrupt()
+IcePack::ActivatorI::setInterrupt(char c)
{
- char c = 0;
write(_fdIntrWrite, &c, 1);
}