diff options
author | Benoit Foucher <benoit@zeroc.com> | 2002-08-22 22:52:31 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2002-08-22 22:52:31 +0000 |
commit | 8ee8c08018e63d5b5b6c505490e27dcb3ef766db (patch) | |
tree | 4f888dacbe96952a910eb969a5428aba67030e7b /cpp/src/IcePack/ActivatorI.cpp | |
parent | bug fix for identity reuse in add() (diff) | |
download | ice-8ee8c08018e63d5b5b6c505490e27dcb3ef766db.tar.bz2 ice-8ee8c08018e63d5b5b6c505490e27dcb3ef766db.tar.xz ice-8ee8c08018e63d5b5b6c505490e27dcb3ef766db.zip |
Added support for IcePack node and re-factored many things in IcePack.
Diffstat (limited to 'cpp/src/IcePack/ActivatorI.cpp')
-rw-r--r-- | cpp/src/IcePack/ActivatorI.cpp | 357 |
1 files changed, 213 insertions, 144 deletions
diff --git a/cpp/src/IcePack/ActivatorI.cpp b/cpp/src/IcePack/ActivatorI.cpp index 92654ff8eda..bb4c6516d84 100644 --- a/cpp/src/IcePack/ActivatorI.cpp +++ b/cpp/src/IcePack/ActivatorI.cpp @@ -15,7 +15,7 @@ #include <Ice/Ice.h> #include <IcePack/ActivatorI.h> #include <IcePack/Admin.h> -#include <IcePack/ServerManager.h> +#include <IcePack/Internal.h> #include <IcePack/TraceLevels.h> #include <sys/types.h> @@ -26,10 +26,33 @@ using namespace std; using namespace Ice; using namespace IcePack; -IcePack::ActivatorI::ActivatorI(const CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels) : - _communicator(communicator), +namespace IcePack +{ + +class TerminationListenerThread : public IceUtil::Thread +{ +public: + + TerminationListenerThread(ActivatorI& activator) : + _activator(activator) + { + } + + virtual + void run() + { + _activator.runTerminationListener(); + } + +private: + + ActivatorI& _activator; +}; + +} + +IcePack::ActivatorI::ActivatorI(const TraceLevelsPtr& traceLevels) : _traceLevels(traceLevels), - _destroy(false), _deactivating(false) { int fds[2]; @@ -48,110 +71,23 @@ IcePack::ActivatorI::ActivatorI(const CommunicatorPtr& communicator, const Trace IcePack::ActivatorI::~ActivatorI() { - assert(_destroy); - assert(_processes.empty()); + assert(!_thread); close(_fdIntrRead); close(_fdIntrWrite); } -void -IcePack::ActivatorI::run() +bool +IcePack::ActivatorI::activate(const ServerPtr& server) { - try - { - terminationListener(); - } - catch(const Exception& ex) - { - Error out(_communicator->getLogger()); - out << "exception in process termination listener:\n" << ex; - } - catch(...) - { - Error out(_communicator->getLogger()); - out << "unknown exception in process termination listener"; - } -} + IceUtil::Monitor< IceUtil::Mutex>::Lock sync(*this); -void -IcePack::ActivatorI::destroy() -{ - { - IceUtil::Mutex::Lock sync(*this); - - if(_destroy || _deactivating) // Don't destroy or deactivate twice. - { - return; - } - - // - // This ensure that no new processes will be activated. - // - _deactivating = true; - } - - - // - // Stop all activated processes. - // - while(true) - { - ServerPrx server; - { - IceUtil::Mutex::Lock sync(*this); - if(!_processes.empty()) - { - server = _processes.begin()->server; - } - else - { - // - // No more process to deactivate. - // - break; - } - } - - // - // Stop the server. The activator thread should detect the - // process deactivation and remove it from the activator - // active processes. This garantees that this loop will end at - // one point. - // - server->stop(); - } - - // - // Set the state as destroyed, this will cause the - // activator thread to exit. - // - { - IceUtil::Mutex::Lock sync(*this); - - _destroy = true; - setInterrupt(); - } - - // - // Join the activator thread. - // - getThreadControl().join(); -} - -Ice::Int -IcePack::ActivatorI::activate(const ServerPrx& server) -{ - IceUtil::Mutex::Lock sync(*this); - - if(_destroy || _deactivating) + if(_deactivating) { return false; } - ServerDescription desc = server->getServerDescription(); - - string path = desc.path; + string path = server->description.path; if(path.empty()) { return false; @@ -173,7 +109,7 @@ IcePack::ActivatorI::activate(const ServerPrx& server) // // Normalize the path to the working directory. // - string pwd = desc.pwd; + string pwd = server->description.pwd; if(!pwd.empty()) { string::size_type pos; @@ -190,12 +126,12 @@ IcePack::ActivatorI::activate(const ServerPrx& server) // // Compute arguments. // - int argc = desc.args.size() + 2; + int argc = server->description.args.size() + 2; char** argv = static_cast<char**>(malloc(argc * sizeof(char*))); argv[0] = strdup(path.c_str()); unsigned int i = 0; vector<string>::const_iterator q; - for(q = desc.args.begin(); q != desc.args.end(); ++q, ++i) + for(q = server->description.args.begin(); q != server->description.args.end(); ++q, ++i) { argv[i + 1] = strdup(q->c_str()); } @@ -204,7 +140,7 @@ IcePack::ActivatorI::activate(const ServerPrx& server) if(_traceLevels->activator > 1) { Ice::Trace out(_traceLevels->logger, _traceLevels->activatorCat); - out << "activating server `" << desc.name << "'"; + out << "activating server `" << server->description.name << "'"; if(_traceLevels->activator > 2) { out << "\n"; @@ -336,20 +272,20 @@ IcePack::ActivatorI::activate(const ServerPrx& server) flags |= O_NONBLOCK; fcntl(process.fd, F_SETFL, flags); - setInterrupt(); + setInterrupt(0); if(_traceLevels->activator > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->activatorCat); - out << "activated server `" << desc.name << "'(pid = " << pid << ")"; + out << "activated server `" << server->description.name << "'(pid = " << pid << ")"; } } - return pid; + return true; } void -IcePack::ActivatorI::deactivate(const ServerPrx& server) +IcePack::ActivatorI::deactivate(const ServerPtr& server) { pid_t pid = static_cast<pid_t>(server->getPid()); @@ -365,15 +301,13 @@ IcePack::ActivatorI::deactivate(const ServerPrx& server) if(_traceLevels->activator > 1) { - ServerDescription desc = server->getServerDescription(); - Ice::Trace out(_traceLevels->logger, _traceLevels->activatorCat); - out << "sent SIGTERM to server `" << desc.name << "' (pid = " << pid << ")"; + out << "sent SIGTERM to server `" << server->description.name << "' (pid = " << pid << ")"; } } void -IcePack::ActivatorI::kill(const ServerPrx& server) +IcePack::ActivatorI::kill(const ServerPtr& server) { pid_t pid = static_cast<pid_t>(server->getPid()); @@ -389,10 +323,124 @@ IcePack::ActivatorI::kill(const ServerPrx& server) if(_traceLevels->activator > 1) { - ServerDescription desc = server->getServerDescription(); - Ice::Trace out(_traceLevels->logger, _traceLevels->activatorCat); - out << "sent SIGKILL to server `" << desc.name << "' (pid = " << pid << ")"; + out << "sent SIGKILL to server `" << server->description.name << "' (pid = " << pid << ")"; + } +} + +Ice::Int +IcePack::ActivatorI::getServerPid(const ServerPtr& server) +{ + IceUtil::Monitor< IceUtil::Mutex>::Lock sync(*this); + + for(vector<Process>::iterator p = _processes.begin(); p != _processes.end(); ++p) + { + if(p->server == server) + { + return static_cast<Ice::Int>(p->pid); + } + } + + return 0; +} + +void +IcePack::ActivatorI::start() +{ + // + // Create and start the termination listener thread. + // + _thread = new TerminationListenerThread(*this); + _thread->start(); +} + +void +IcePack::ActivatorI::waitForShutdown() +{ + IceUtil::Monitor< IceUtil::Mutex>::Lock sync(*this); + while(!_deactivating) + { + wait(); + } +} + +void +IcePack::ActivatorI::shutdown() +{ + setInterrupt(1); +} + +void +IcePack::ActivatorI::destroy() +{ + { + IceUtil::Monitor< IceUtil::Mutex>::Lock sync(*this); + assert(_deactivating); + } + + // + // Deactivate all the processes. + // + deactivateAll(); + + // + // Join the termination listener thread. This thread terminates + // when there's no more processes and when _deactivating is set to + // true. + // + _thread->getThreadControl().join(); + _thread = 0; +} + +void +IcePack::ActivatorI::runTerminationListener() +{ + try + { + terminationListener(); + } + catch(const Exception& ex) + { + Error out(_traceLevels->logger); + out << "exception in process termination listener:\n" << ex; + } + catch(...) + { + Error out(_traceLevels->logger); + out << "unknown exception in process termination listener"; + } +} + +void +IcePack::ActivatorI::deactivateAll() +{ + // + // Stop all activate processes. + // + std::vector<Process> processes; + { + IceUtil::Monitor< IceUtil::Mutex>::Lock sync(*this); + processes = _processes; + } + + for(std::vector<Process>::iterator p = processes.begin(); p != processes.end(); ++p) + { + // + // Stop the server. The activator thread should detect the + // process deactivation and remove it from the activator + // active processes. + // + try + { + p->server->stop(); + } + catch(const Ice::ObjectNotExistException&) + { + // + // Expected if the server was in the process of being + // destroyed. + // + } } } @@ -407,12 +455,7 @@ IcePack::ActivatorI::terminationListener() FD_SET(_fdIntrRead, &fdSet); { - IceUtil::Mutex::Lock sync(*this); - - if(_destroy) - { - return; - } + IceUtil::Monitor< IceUtil::Mutex>::Lock sync(*this); for(vector<Process>::iterator p = _processes.begin(); p != _processes.end(); ++p) { @@ -442,18 +485,29 @@ IcePack::ActivatorI::terminationListener() } { - IceUtil::Mutex::Lock sync(*this); + IceUtil::Monitor< IceUtil::Mutex>::Lock sync(*this); if(FD_ISSET(_fdIntrRead, &fdSet)) { - clearInterrupt(); + bool deactivating = clearInterrupt(); + + if(deactivating && !_deactivating) + { + // + // Deactivation has been initiated. Set _deactivating to true to + // prevent new processes to be activated. This will also cause the + // termination of this loop when there's no more active processes. + // + _deactivating = true; + notifyAll(); // For waitForShutdown + + if(_processes.empty()) + { + return; + } + } } - if(_destroy) - { - return; - } - vector<Process>::iterator p = _processes.begin(); while(p != _processes.end()) { @@ -484,28 +538,39 @@ IcePack::ActivatorI::terminationListener() ++p; } else if(ret == 0) - { - ServerPrx server = p->server; - + { // - // If the pipe was closed, the process has - // terminated. + // If the pipe was closed, the process has terminated. // - p = _processes.erase(p); - close(fd); if(_traceLevels->activator > 0) { - ServerDescription desc = server->getServerDescription(); - Ice::Trace out(_traceLevels->logger, _traceLevels->activatorCat); - out << "detected server `" << desc.name << "' termination"; + out << "detected server `" << p->server->description.name << "' termination"; + } + + try + { + p->server->terminated(); } - + catch(const Ice::LocalException& ex) + { + Ice::Warning out(_traceLevels->logger); + out << "unexpected exception raised by server `" << p->server->description.name + << "' termination:\n" << ex; + } + + p = _processes.erase(p); + close(fd); + // - // Notify the server it has terminated. + // We are deactivating and there's no more active processes. We can now + // end this loop // - server->terminationCallback(); + if(_deactivating && _processes.empty()) + { + return; + } } // @@ -513,7 +578,7 @@ IcePack::ActivatorI::terminationListener() // if(!message.empty()) { - Error out(_communicator->getLogger()); + Error out(_traceLevels->logger); out << message; } } @@ -526,18 +591,22 @@ IcePack::ActivatorI::terminationListener() } } -void +bool IcePack::ActivatorI::clearInterrupt() { - char s[32]; // Clear up to 32 interrupts at once. - while(read(_fdIntrRead, s, 32) == 32) + bool shutdown = false; + char c; + + while(read(_fdIntrRead, &c, 1) == 1) { + shutdown = shutdown ? true : c == 1; } + + return shutdown; } void -IcePack::ActivatorI::setInterrupt() +IcePack::ActivatorI::setInterrupt(char c) { - char c = 0; write(_fdIntrWrite, &c, 1); } |