diff options
author | Matthew Newhook <matthew@zeroc.com> | 2014-07-23 15:06:02 -0230 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2014-07-23 15:06:02 -0230 |
commit | 866f9ff17391176b836f9bb49f6da40c2c938441 (patch) | |
tree | 7366963294ef3356c7b887cd89af753988c21beb /cpp | |
parent | adding ACM tests for Python/Ruby/PHP (diff) | |
download | ice-866f9ff17391176b836f9bb49f6da40c2c938441.tar.bz2 ice-866f9ff17391176b836f9bb49f6da40c2c938441.tar.xz ice-866f9ff17391176b836f9bb49f6da40c2c938441.zip |
ICE-4234 - Update Ice to use current Java threading constructs
- Use ScheduledThreadPoolDispatcher not IceUtilInternal.Timer.
- Use Ice timer in glacier2, Freeze impl.
- Align C++, C# with java changes.
- Database demo now supports mariadb.
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/include/Glacier2/Application.h | 5 | ||||
-rw-r--r-- | cpp/src/Freeze/BackgroundSaveEvictorI.cpp | 157 | ||||
-rw-r--r-- | cpp/src/Freeze/BackgroundSaveEvictorI.h | 31 | ||||
-rw-r--r-- | cpp/src/Glacier2Lib/Application.cpp | 87 | ||||
-rw-r--r-- | cpp/src/Glacier2Lib/SessionHelper.cpp | 106 |
5 files changed, 114 insertions, 272 deletions
diff --git a/cpp/include/Glacier2/Application.h b/cpp/include/Glacier2/Application.h index b8ab2e48f5a..aa0d7ae89a9 100644 --- a/cpp/include/Glacier2/Application.h +++ b/cpp/include/Glacier2/Application.h @@ -136,8 +136,9 @@ public: /** * Called when the session refresh thread detects that the session has been * destroyed. A subclass can override this method to take action after the - * loss of connectivity with the Glacier2 router. This method is always - * called from the session refresh thread. + * loss of connectivity with the Glacier2 router. This method is called + * according to the Ice invocation dipsatch rules (in other words, it + * uses the same rules as an servant upcall or AMI callback). **/ virtual void sessionDestroyed() { diff --git a/cpp/src/Freeze/BackgroundSaveEvictorI.cpp b/cpp/src/Freeze/BackgroundSaveEvictorI.cpp index 1da527561e3..f37d44bad7e 100644 --- a/cpp/src/Freeze/BackgroundSaveEvictorI.cpp +++ b/cpp/src/Freeze/BackgroundSaveEvictorI.cpp @@ -24,34 +24,6 @@ using namespace std; using namespace Freeze; using namespace Ice; - -// -// createEvictor functions -// - -Freeze::BackgroundSaveEvictorPtr -Freeze::createBackgroundSaveEvictor(const ObjectAdapterPtr& adapter, - const string& envName, - const string& filename, - const ServantInitializerPtr& initializer, - const vector<IndexPtr>& indices, - bool createDb) -{ - return new BackgroundSaveEvictorI(adapter, envName, 0, filename, initializer, indices, createDb); -} - -BackgroundSaveEvictorPtr -Freeze::createBackgroundSaveEvictor(const ObjectAdapterPtr& adapter, - const string& envName, - DbEnv& dbEnv, - const string& filename, - const ServantInitializerPtr& initializer, - const vector<IndexPtr>& indices, - bool createDb) -{ - return new BackgroundSaveEvictorI(adapter, envName, &dbEnv, filename, initializer, indices, createDb); -} - namespace { @@ -79,18 +51,7 @@ public: }; Init init; -} - -FatalErrorCallback -Freeze::registerFatalErrorCallback(FatalErrorCallback cb) -{ - IceUtilInternal::MutexPtrLock<IceUtil::Mutex> lock(fatalErrorCallbackMutex); - FatalErrorCallback result = fatalErrorCallback; - fatalErrorCallback = cb; - return result; -} - -static void +void handleFatalError(const Freeze::BackgroundSaveEvictorPtr& evictor, const Ice::CommunicatorPtr& communicator) { IceUtilInternal::MutexPtrLock<IceUtil::Mutex> lock(fatalErrorCallbackMutex); @@ -104,68 +65,70 @@ handleFatalError(const Freeze::BackgroundSaveEvictorPtr& evictor, const Ice::Com } } - // -// WatchDogThread +// The timer is used to ensure the streaming of some object does not take more than +// timeout ms. We only measure the time necessary to acquire the lock on the object +// (servant), not the streaming itself. // - -Freeze::WatchDogThread::WatchDogThread(long timeout, BackgroundSaveEvictorI& evictor) : - IceUtil::Thread("Freeze background save evictor watchdog thread"), - _timeout(IceUtil::Time::milliSeconds(timeout)), - _evictor(evictor), - _done(false), - _active(false) +class WatchDogTask : public IceUtil::TimerTask { -} +public: + WatchDogTask(BackgroundSaveEvictorI& evictor) : _evictor(evictor) + { + } -void -Freeze::WatchDogThread::run() -{ - Lock sync(*this); - - while(!_done) + virtual void runTimerTask() { - if(_active) - { - if(timedWait(_timeout) == false && _active && !_done) - { - Error out(_evictor.communicator()->getLogger()); - out << "Fatal error: streaming watch dog thread timed out."; - out.flush(); - handleFatalError(&_evictor, _evictor.communicator()); - } - } - else - { - wait(); - } + Error out(_evictor.communicator()->getLogger()); + out << "Fatal error: streaming watch dog timed out."; + out.flush(); + handleFatalError(&_evictor, _evictor.communicator()); } + +private: + + BackgroundSaveEvictorI& _evictor; +}; + } -void Freeze::WatchDogThread::activate() +// +// createEvictor functions +// + +Freeze::BackgroundSaveEvictorPtr +Freeze::createBackgroundSaveEvictor(const ObjectAdapterPtr& adapter, + const string& envName, + const string& filename, + const ServantInitializerPtr& initializer, + const vector<IndexPtr>& indices, + bool createDb) { - Lock sync(*this); - _active = true; - notify(); + return new BackgroundSaveEvictorI(adapter, envName, 0, filename, initializer, indices, createDb); } -void Freeze::WatchDogThread::deactivate() +BackgroundSaveEvictorPtr +Freeze::createBackgroundSaveEvictor(const ObjectAdapterPtr& adapter, + const string& envName, + DbEnv& dbEnv, + const string& filename, + const ServantInitializerPtr& initializer, + const vector<IndexPtr>& indices, + bool createDb) { - Lock sync(*this); - _active = false; - notify(); + return new BackgroundSaveEvictorI(adapter, envName, &dbEnv, filename, initializer, indices, createDb); } - -void -Freeze::WatchDogThread::terminate() + +FatalErrorCallback +Freeze::registerFatalErrorCallback(FatalErrorCallback cb) { - Lock sync(*this); - _done = true; - notify(); + IceUtilInternal::MutexPtrLock<IceUtil::Mutex> lock(fatalErrorCallbackMutex); + FatalErrorCallback result = fatalErrorCallback; + fatalErrorCallback = cb; + return result; } - // // BackgroundSaveEvictorI // @@ -211,13 +174,12 @@ Freeze::BackgroundSaveEvictorI::BackgroundSaveEvictorI(const ObjectAdapterPtr& a // // By default, no stream timeout // - long streamTimeout = _communicator->getProperties()-> + _streamTimeout = _communicator->getProperties()-> getPropertyAsIntWithDefault(propertyPrefix+ ".StreamTimeout", 0) * 1000; - if(streamTimeout > 0) + if(_streamTimeout > 0) { - _watchDogThread = new WatchDogThread(streamTimeout, *this); - _watchDogThread->start(); + _timer = IceInternal::getInstanceTimer(_communicator); } // @@ -847,12 +809,6 @@ Freeze::BackgroundSaveEvictorI::deactivate(const string&) sync.release(); getThreadControl().join(); - if(_watchDogThread != 0) - { - _watchDogThread->terminate(); - _watchDogThread->getThreadControl().join(); - } - closeDbEnv(); } catch(...) @@ -1011,14 +967,17 @@ Freeze::BackgroundSaveEvictorI::run() { lockElement.release(); - if(_watchDogThread != 0) + IceUtil::TimerTaskPtr watchDogTask; + if(_timer) { - _watchDogThread->activate(); + watchDogTask = new WatchDogTask(*this); + _timer->schedule(watchDogTask, IceUtil::Time::milliSeconds(_streamTimeout)); } lockServant.acquire(); - if(_watchDogThread != 0) + if(watchDogTask) { - _watchDogThread->deactivate(); + _timer->cancel(watchDogTask); + watchDogTask = 0; } lockElement.acquire(); diff --git a/cpp/src/Freeze/BackgroundSaveEvictorI.h b/cpp/src/Freeze/BackgroundSaveEvictorI.h index a5afff853ff..12037c53275 100644 --- a/cpp/src/Freeze/BackgroundSaveEvictorI.h +++ b/cpp/src/Freeze/BackgroundSaveEvictorI.h @@ -60,34 +60,6 @@ namespace Freeze class BackgroundSaveEvictorI; -// -// The WatchDogThread is used by the saving thread to ensure the -// streaming of some object does not take more than timeout ms. -// We only measure the time necessary to acquire the lock on the -// object (servant), not the streaming itself. -// - -class WatchDogThread : public IceUtil::Thread, private IceUtil::Monitor<IceUtil::Mutex> -{ -public: - - WatchDogThread(long, BackgroundSaveEvictorI&); - - void run(); - - void activate(); - void deactivate(); - void terminate(); - -private: - const IceUtil::Time _timeout; - BackgroundSaveEvictorI& _evictor; - bool _done; - bool _active; -}; - -typedef IceUtil::Handle<WatchDogThread> WatchDogThreadPtr; - struct BackgroundSaveEvictorElement; typedef IceUtil::Handle<BackgroundSaveEvictorElement> BackgroundSaveEvictorElementPtr; @@ -199,7 +171,8 @@ private: std::deque<BackgroundSaveEvictorElementPtr> _modifiedQueue; bool _savingThreadDone; - WatchDogThreadPtr _watchDogThread; + long _streamTimeout; + IceUtil::TimerPtr _timer; // // Threads that have requested a "saveNow" and are waiting for diff --git a/cpp/src/Glacier2Lib/Application.cpp b/cpp/src/Glacier2Lib/Application.cpp index d0f5e8eecbe..5f0b9bb2730 100644 --- a/cpp/src/Glacier2Lib/Application.cpp +++ b/cpp/src/Glacier2Lib/Application.cpp @@ -23,73 +23,40 @@ string Glacier2::Application::_category; namespace { -class SessionPingThreadI : virtual public IceUtil::Thread +class SessionRefreshTask : public IceUtil::TimerTask { public: - SessionPingThreadI(Glacier2::Application* app, const Glacier2::RouterPrx& router, IceUtil::Int64 period) : + SessionRefreshTask(Glacier2::Application* app, const Glacier2::RouterPrx& router) : _app(app), _router(router), - _period(period), - _done(false) + _callback(Glacier2::newCallback_Router_refreshSession(this, &SessionRefreshTask::exception)) { - assert(_period); } void exception(const Ice::Exception&) { // - // Here the session has been destroyed. The thread terminates, - // and we notify the application that the session has been - // destroyed. + // Here the session has been destroyed. Notify the application that the + // session has been destroyed. // - done(); _app->sessionDestroyed(); } - void - run() + virtual void + runTimerTask() { - IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor); - - Glacier2::Callback_Router_refreshSessionPtr callback = - Glacier2::newCallback_Router_refreshSession(this, &SessionPingThreadI::exception); - while(true) + try { - try - { - _router->begin_refreshSession(callback); - } - catch(const Ice::CommunicatorDestroyedException&) - { - // - // AMI requests can raise CommunicatorDestroyedException directly. - // - break; - } - - if(!_done) - { - _monitor.timedWait(IceUtil::Time::milliSeconds(_period)); - } - - if(_done) - { - break; - } + _router->begin_refreshSession(_callback); } - } - - void - done() - { - IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor); - if(!_done) + catch(const Ice::CommunicatorDestroyedException&) { - _done = true; - _monitor.notify(); + // + // AMI requests can raise CommunicatorDestroyedException directly. + // } } @@ -97,11 +64,8 @@ private: Glacier2::Application* _app; Glacier2::RouterPrx _router; - IceUtil::Int64 _period; - bool _done; - IceUtil::Monitor<IceUtil::Mutex> _monitor; + Glacier2::Callback_Router_refreshSessionPtr _callback; }; -typedef IceUtil::Handle<SessionPingThreadI> SessionPingThreadIPtr; class ConnectionCallbackI : public Ice::ConnectionCallback { @@ -119,6 +83,7 @@ public: virtual void closed(const Ice::ConnectionPtr&) { + cout << "session destroyed" << endl; _app->sessionDestroyed(); } @@ -230,7 +195,6 @@ Glacier2::Application::doMain(Ice::StringSeq& args, const Ice::InitializationDat bool restart = false; status = 0; - SessionPingThreadIPtr ping; try { IceInternal::Application::_communicator = Ice::initialize(args, initData); @@ -287,8 +251,14 @@ Glacier2::Application::doMain(Ice::StringSeq& args, const Ice::InitializationDat IceUtil::Int64 sessionTimeout = _router->getSessionTimeout(); if(sessionTimeout > 0) { - ping = new SessionPingThreadI(this, _router, (sessionTimeout * 1000) / 2); - ping->start(); + // + // Create a ping timer task. The task itself doesn't + // need to be canceled as the communicator is destroyed + // at the end. + // + IceUtil::TimerPtr timer = IceInternal::getInstanceTimer(communicator()); + timer->scheduleRepeated(new SessionRefreshTask(this, _router), + IceUtil::Time::seconds(sessionTimeout/2)); } } @@ -399,17 +369,6 @@ Glacier2::Application::doMain(Ice::StringSeq& args, const Ice::InitializationDat IceInternal::Application::_application = 0; } - if(ping) - { - ping->done(); - while(true) - { - ping->getThreadControl().join(); - break; - } - ping = 0; - } - if(_createdSession && _router) { try diff --git a/cpp/src/Glacier2Lib/SessionHelper.cpp b/cpp/src/Glacier2Lib/SessionHelper.cpp index 885dad9401d..ec7d29c932c 100644 --- a/cpp/src/Glacier2Lib/SessionHelper.cpp +++ b/cpp/src/Glacier2Lib/SessionHelper.cpp @@ -54,27 +54,6 @@ private: const Glacier2::SessionCallbackPtr _callback; }; -class SessionRefreshThread : public IceUtil::Thread -{ - -public: - - SessionRefreshThread(const Glacier2::SessionHelperPtr&, const Glacier2::RouterPrx&, Ice::Long); - virtual void run(); - void done(); - void success(); - void failure(const Ice::Exception&); - -private: - - const Glacier2::SessionHelperPtr _session; - const Glacier2::RouterPrx _router; - Ice::Long _period; - bool _done; - IceUtil::Monitor<IceUtil::Mutex> _monitor; -}; -typedef IceUtil::Handle<SessionRefreshThread> SessionRefreshThreadPtr; - class SessionHelperI : public Glacier2::SessionHelper { @@ -115,7 +94,6 @@ private: Ice::ObjectAdapterPtr _adapter; Glacier2::RouterPrx _router; Glacier2::SessionPrx _session; - SessionRefreshThreadPtr _refreshThread; std::string _category; bool _connected; bool _destroy; @@ -124,64 +102,44 @@ private: }; typedef IceUtil::Handle<SessionHelperI> SessionHelperIPtr; -SessionRefreshThread::SessionRefreshThread(const Glacier2::SessionHelperPtr& session, - const Glacier2::RouterPrx& router, Ice::Long period) : - _session(session), - _router(router), - _period(period), - _done(false) +class SessionRefreshTask : public IceUtil::TimerTask { -} +public: -void -SessionRefreshThread::run() -{ - Glacier2::Callback_Router_refreshSessionPtr cb = - Glacier2::newCallback_Router_refreshSession(this, &SessionRefreshThread::failure); - IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor); - while(true) + SessionRefreshTask(const Glacier2::SessionHelperPtr& session, const Glacier2::RouterPrx& router) : + _session(session), + _router(router), + _callback(Glacier2::newCallback_Router_refreshSession(this, &SessionRefreshTask::exception)) + { + } + + virtual void + runTimerTask() { try { - _router->begin_refreshSession(cb); + _router->begin_refreshSession(_callback); } catch(const Ice::CommunicatorDestroyedException&) { // // AMI requests can raise CommunicatorDestroyedException directly. // - break; - } - - if(!_done) - { - _monitor.timedWait(IceUtil::Time::seconds(_period)); - } - - if(_done) - { - break; } } -} -void -SessionRefreshThread::done() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor); - if(!_done) + void + exception(const Ice::Exception&) { - _done = true; - _monitor.notify(); + _session->destroy(); } -} -void -SessionRefreshThread::failure(const Ice::Exception&) -{ - done(); - _session->destroy(); -} +private: + + const Glacier2::SessionHelperPtr _session; + const Glacier2::RouterPrx _router; + const Glacier2::Callback_Router_refreshSessionPtr _callback; +}; class DestroyInternal : public IceUtil::Thread { @@ -407,16 +365,12 @@ SessionHelperI::destroyInternal(const Ice::DispatcherCallPtr& disconnected) assert(_destroy); Ice::CommunicatorPtr communicator; Glacier2::RouterPrx router; - SessionRefreshThreadPtr refreshThread; { IceUtil::Mutex::Lock sync(_mutex); router = _router; _router = 0; _connected = false; - refreshThread = _refreshThread; - _refreshThread = 0; - communicator = _communicator; } @@ -451,13 +405,6 @@ SessionHelperI::destroyInternal(const Ice::DispatcherCallPtr& disconnected) } } - if(refreshThread) - { - refreshThread->done(); - refreshThread->getThreadControl().join(); - refreshThread = 0; - } - if(communicator) { try @@ -745,8 +692,6 @@ SessionHelperI::connected(const Glacier2::RouterPrx& router, const Glacier2::Ses _session = session; _connected = true; - assert(!_refreshThread); - if(acmTimeout > 0) { Ice::ConnectionPtr connection = _router->ice_getCachedConnection(); @@ -756,8 +701,13 @@ SessionHelperI::connected(const Glacier2::RouterPrx& router, const Glacier2::Ses } else if(sessionTimeout > 0) { - _refreshThread = new SessionRefreshThread(this, _router, (sessionTimeout)/2); - _refreshThread->start(); + // + // Create a ping timer task. The task itself doesn't need to be + // canceled as the communicator is destroyed at the end. + // + IceUtil::TimerPtr timer = IceInternal::getInstanceTimer(communicator()); + timer->scheduleRepeated(new SessionRefreshTask(this, _router), + IceUtil::Time::seconds(sessionTimeout/2)); } } dispatchCallback(new Connected(_callback, this), conn); |