diff options
-rw-r--r-- | cpp/include/Freeze/Initialize.h | 4 | ||||
-rw-r--r-- | cpp/src/Freeze/EvictorI.cpp | 124 | ||||
-rw-r--r-- | cpp/src/Freeze/EvictorI.h | 31 |
3 files changed, 155 insertions, 4 deletions
diff --git a/cpp/include/Freeze/Initialize.h b/cpp/include/Freeze/Initialize.h index 3ea9ba8ff43..507c1f5d22d 100644 --- a/cpp/include/Freeze/Initialize.h +++ b/cpp/include/Freeze/Initialize.h @@ -51,6 +51,10 @@ FREEZE_API ConnectionPtr createConnection(const Ice::CommunicatorPtr& communicat const std::string& envName, DbEnv& dbEnv); + +typedef void (*FatalErrorCallback)(const EvictorPtr&, const Ice::CommunicatorPtr&); +FREEZE_API FatalErrorCallback registerFatalErrorCallback(FatalErrorCallback); + } #endif diff --git a/cpp/src/Freeze/EvictorI.cpp b/cpp/src/Freeze/EvictorI.cpp index 654cc24240a..abba41d3845 100644 --- a/cpp/src/Freeze/EvictorI.cpp +++ b/cpp/src/Freeze/EvictorI.cpp @@ -61,6 +61,35 @@ Freeze::createEvictor(const ObjectAdapterPtr& adapter, return new EvictorI(adapter, envName, dbEnv, filename, initializer, indices, createDb); } +// +// Fatal error callback +// + +static Freeze::FatalErrorCallback fatalErrorCallback = 0; +static IceUtil::StaticMutex fatalErrorCallbackMutex = ICE_STATIC_MUTEX_INITIALIZER; + +Freeze::FatalErrorCallback +Freeze::registerFatalErrorCallback(Freeze::FatalErrorCallback cb) +{ + IceUtil::StaticMutex::Lock lock(fatalErrorCallbackMutex); + FatalErrorCallback result = fatalErrorCallback; + return result; +} + +static void +handleFatalError(const Freeze::EvictorPtr& evictor, const Ice::CommunicatorPtr& communicator) +{ + IceUtil::StaticMutex::Lock lock(fatalErrorCallbackMutex); + if(fatalErrorCallback != 0) + { + fatalErrorCallback(evictor, communicator); + } + else + { + ::abort(); + } +} + // // Helper functions @@ -180,6 +209,65 @@ Freeze::DeactivateController::deactivationComplete() notifyAll(); } +// +// WatchDogThread +// + +Freeze::WatchDogThread::WatchDogThread(long timeout, EvictorI& evictor) : + _timeout(IceUtil::Time::milliSeconds(timeout)), + _evictor(evictor), + _done(false), + _active(false) +{ +} + + +void +Freeze::WatchDogThread::run() +{ + Lock sync(*this); + + while(!_done) + { + if(_active) + { + if(timedWait(_timeout) == false && _active && !_done) + { + Error out(_evictor.communicator()->getLogger()); + out << "Fatal error: streaming watch dog thread timed out."; + out.flush(); + handleFatalError(&_evictor, _evictor.communicator()); + } + } + else + { + wait(); + } + } +} + +void Freeze::WatchDogThread::activate() +{ + Lock sync(*this); + _active = true; + notify(); +} + +void Freeze::WatchDogThread::deactivate() +{ + Lock sync(*this); + _active = false; + notify(); +} + +void +Freeze::WatchDogThread::terminate() +{ + Lock sync(*this); + _done = true; + notify(); +} + // // EvictorI @@ -327,6 +415,18 @@ Freeze::EvictorI::init(const string& envName, const vector<IndexPtr>& indices) } // + // By default, no stream timeout + // + long streamTimeout = _communicator->getProperties()-> + getPropertyAsIntWithDefault(propertyPrefix+ ".StreamTimeout", 0) * 1000; + + if(streamTimeout > 0) + { + _watchDogThread = new WatchDogThread(streamTimeout, *this); + _watchDogThread->start(); + } + + // // Start saving thread // start(); @@ -1181,6 +1281,12 @@ Freeze::EvictorI::deactivate(const string&) sync.release(); getThreadControl().join(); + if(_watchDogThread != 0) + { + _watchDogThread->terminate(); + _watchDogThread->getThreadControl().join(); + } + for(StoreMap::iterator p = _storeMap.begin(); p != _storeMap.end(); ++p) { (*p).second->close(); @@ -1346,11 +1452,21 @@ Freeze::EvictorI::run() if(!lockServant.acquired()) { lockElement.release(); + + if(_watchDogThread != 0) + { + _watchDogThread->activate(); + } lockServant.acquire(); + if(_watchDogThread != 0) + { + _watchDogThread->deactivate(); + } + lockElement.acquire(); status = element->status; } - + switch(status) { case EvictorElement::created: @@ -1550,21 +1666,21 @@ Freeze::EvictorI::run() Error out(_communicator->getLogger()); out << "Saving thread killed by exception: " << ex; out.flush(); - ::abort(); + handleFatalError(this, _communicator); } catch(const std::exception& ex) { Error out(_communicator->getLogger()); out << "Saving thread killed by std::exception: " << ex.what(); out.flush(); - ::abort(); + handleFatalError(this, _communicator); } catch(...) { Error out(_communicator->getLogger()); out << "Saving thread killed by unknown exception"; out.flush(); - ::abort(); + handleFatalError(this, _communicator); } } diff --git a/cpp/src/Freeze/EvictorI.h b/cpp/src/Freeze/EvictorI.h index d1a0f9ed3b9..562e2073ad3 100644 --- a/cpp/src/Freeze/EvictorI.h +++ b/cpp/src/Freeze/EvictorI.h @@ -80,6 +80,36 @@ private: }; +class EvictorI; + +// +// The WatchDogThread is used by the saving thread to ensure the +// streaming of some object does not take more than timeout ms. +// We only measure the time necessary to acquire the lock on the +// object (servant), not the streaming itself. +// + +class WatchDogThread : public IceUtil::Thread, private IceUtil::Monitor<IceUtil::Mutex> +{ +public: + + WatchDogThread(long, EvictorI&); + + void run(); + + void activate(); + void deactivate(); + void terminate(); + +private: + const IceUtil::Time _timeout; + EvictorI& _evictor; + bool _done; + bool _active; +}; + +typedef IceUtil::Handle<WatchDogThread> WatchDogThreadPtr; + class EvictorI : public Evictor, public IceUtil::Monitor<IceUtil::Mutex>, public IceUtil::Thread { @@ -191,6 +221,7 @@ private: DeactivateController _deactivateController; bool _savingThreadDone; + WatchDogThreadPtr _watchDogThread; Ice::ObjectAdapterPtr _adapter; Ice::CommunicatorPtr _communicator; |