summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2014-07-23 15:06:02 -0230
committerMatthew Newhook <matthew@zeroc.com>2014-07-23 15:06:02 -0230
commit866f9ff17391176b836f9bb49f6da40c2c938441 (patch)
tree7366963294ef3356c7b887cd89af753988c21beb /cpp
parentadding ACM tests for Python/Ruby/PHP (diff)
downloadice-866f9ff17391176b836f9bb49f6da40c2c938441.tar.bz2
ice-866f9ff17391176b836f9bb49f6da40c2c938441.tar.xz
ice-866f9ff17391176b836f9bb49f6da40c2c938441.zip
ICE-4234 - Update Ice to use current Java threading constructs
- Use ScheduledThreadPoolDispatcher not IceUtilInternal.Timer. - Use Ice timer in glacier2, Freeze impl. - Align C++, C# with java changes. - Database demo now supports mariadb.
Diffstat (limited to 'cpp')
-rw-r--r--cpp/include/Glacier2/Application.h5
-rw-r--r--cpp/src/Freeze/BackgroundSaveEvictorI.cpp157
-rw-r--r--cpp/src/Freeze/BackgroundSaveEvictorI.h31
-rw-r--r--cpp/src/Glacier2Lib/Application.cpp87
-rw-r--r--cpp/src/Glacier2Lib/SessionHelper.cpp106
5 files changed, 114 insertions, 272 deletions
diff --git a/cpp/include/Glacier2/Application.h b/cpp/include/Glacier2/Application.h
index b8ab2e48f5a..aa0d7ae89a9 100644
--- a/cpp/include/Glacier2/Application.h
+++ b/cpp/include/Glacier2/Application.h
@@ -136,8 +136,9 @@ public:
/**
* Called when the session refresh thread detects that the session has been
* destroyed. A subclass can override this method to take action after the
- * loss of connectivity with the Glacier2 router. This method is always
- * called from the session refresh thread.
+ * loss of connectivity with the Glacier2 router. This method is called
+ * according to the Ice invocation dipsatch rules (in other words, it
+ * uses the same rules as an servant upcall or AMI callback).
**/
virtual void sessionDestroyed()
{
diff --git a/cpp/src/Freeze/BackgroundSaveEvictorI.cpp b/cpp/src/Freeze/BackgroundSaveEvictorI.cpp
index 1da527561e3..f37d44bad7e 100644
--- a/cpp/src/Freeze/BackgroundSaveEvictorI.cpp
+++ b/cpp/src/Freeze/BackgroundSaveEvictorI.cpp
@@ -24,34 +24,6 @@ using namespace std;
using namespace Freeze;
using namespace Ice;
-
-//
-// createEvictor functions
-//
-
-Freeze::BackgroundSaveEvictorPtr
-Freeze::createBackgroundSaveEvictor(const ObjectAdapterPtr& adapter,
- const string& envName,
- const string& filename,
- const ServantInitializerPtr& initializer,
- const vector<IndexPtr>& indices,
- bool createDb)
-{
- return new BackgroundSaveEvictorI(adapter, envName, 0, filename, initializer, indices, createDb);
-}
-
-BackgroundSaveEvictorPtr
-Freeze::createBackgroundSaveEvictor(const ObjectAdapterPtr& adapter,
- const string& envName,
- DbEnv& dbEnv,
- const string& filename,
- const ServantInitializerPtr& initializer,
- const vector<IndexPtr>& indices,
- bool createDb)
-{
- return new BackgroundSaveEvictorI(adapter, envName, &dbEnv, filename, initializer, indices, createDb);
-}
-
namespace
{
@@ -79,18 +51,7 @@ public:
};
Init init;
-}
-
-FatalErrorCallback
-Freeze::registerFatalErrorCallback(FatalErrorCallback cb)
-{
- IceUtilInternal::MutexPtrLock<IceUtil::Mutex> lock(fatalErrorCallbackMutex);
- FatalErrorCallback result = fatalErrorCallback;
- fatalErrorCallback = cb;
- return result;
-}
-
-static void
+void
handleFatalError(const Freeze::BackgroundSaveEvictorPtr& evictor, const Ice::CommunicatorPtr& communicator)
{
IceUtilInternal::MutexPtrLock<IceUtil::Mutex> lock(fatalErrorCallbackMutex);
@@ -104,68 +65,70 @@ handleFatalError(const Freeze::BackgroundSaveEvictorPtr& evictor, const Ice::Com
}
}
-
//
-// WatchDogThread
+// The timer is used to ensure the streaming of some object does not take more than
+// timeout ms. We only measure the time necessary to acquire the lock on the object
+// (servant), not the streaming itself.
//
-
-Freeze::WatchDogThread::WatchDogThread(long timeout, BackgroundSaveEvictorI& evictor) :
- IceUtil::Thread("Freeze background save evictor watchdog thread"),
- _timeout(IceUtil::Time::milliSeconds(timeout)),
- _evictor(evictor),
- _done(false),
- _active(false)
+class WatchDogTask : public IceUtil::TimerTask
{
-}
+public:
+ WatchDogTask(BackgroundSaveEvictorI& evictor) : _evictor(evictor)
+ {
+ }
-void
-Freeze::WatchDogThread::run()
-{
- Lock sync(*this);
-
- while(!_done)
+ virtual void runTimerTask()
{
- if(_active)
- {
- if(timedWait(_timeout) == false && _active && !_done)
- {
- Error out(_evictor.communicator()->getLogger());
- out << "Fatal error: streaming watch dog thread timed out.";
- out.flush();
- handleFatalError(&_evictor, _evictor.communicator());
- }
- }
- else
- {
- wait();
- }
+ Error out(_evictor.communicator()->getLogger());
+ out << "Fatal error: streaming watch dog timed out.";
+ out.flush();
+ handleFatalError(&_evictor, _evictor.communicator());
}
+
+private:
+
+ BackgroundSaveEvictorI& _evictor;
+};
+
}
-void Freeze::WatchDogThread::activate()
+//
+// createEvictor functions
+//
+
+Freeze::BackgroundSaveEvictorPtr
+Freeze::createBackgroundSaveEvictor(const ObjectAdapterPtr& adapter,
+ const string& envName,
+ const string& filename,
+ const ServantInitializerPtr& initializer,
+ const vector<IndexPtr>& indices,
+ bool createDb)
{
- Lock sync(*this);
- _active = true;
- notify();
+ return new BackgroundSaveEvictorI(adapter, envName, 0, filename, initializer, indices, createDb);
}
-void Freeze::WatchDogThread::deactivate()
+BackgroundSaveEvictorPtr
+Freeze::createBackgroundSaveEvictor(const ObjectAdapterPtr& adapter,
+ const string& envName,
+ DbEnv& dbEnv,
+ const string& filename,
+ const ServantInitializerPtr& initializer,
+ const vector<IndexPtr>& indices,
+ bool createDb)
{
- Lock sync(*this);
- _active = false;
- notify();
+ return new BackgroundSaveEvictorI(adapter, envName, &dbEnv, filename, initializer, indices, createDb);
}
-
-void
-Freeze::WatchDogThread::terminate()
+
+FatalErrorCallback
+Freeze::registerFatalErrorCallback(FatalErrorCallback cb)
{
- Lock sync(*this);
- _done = true;
- notify();
+ IceUtilInternal::MutexPtrLock<IceUtil::Mutex> lock(fatalErrorCallbackMutex);
+ FatalErrorCallback result = fatalErrorCallback;
+ fatalErrorCallback = cb;
+ return result;
}
-
//
// BackgroundSaveEvictorI
//
@@ -211,13 +174,12 @@ Freeze::BackgroundSaveEvictorI::BackgroundSaveEvictorI(const ObjectAdapterPtr& a
//
// By default, no stream timeout
//
- long streamTimeout = _communicator->getProperties()->
+ _streamTimeout = _communicator->getProperties()->
getPropertyAsIntWithDefault(propertyPrefix+ ".StreamTimeout", 0) * 1000;
- if(streamTimeout > 0)
+ if(_streamTimeout > 0)
{
- _watchDogThread = new WatchDogThread(streamTimeout, *this);
- _watchDogThread->start();
+ _timer = IceInternal::getInstanceTimer(_communicator);
}
//
@@ -847,12 +809,6 @@ Freeze::BackgroundSaveEvictorI::deactivate(const string&)
sync.release();
getThreadControl().join();
- if(_watchDogThread != 0)
- {
- _watchDogThread->terminate();
- _watchDogThread->getThreadControl().join();
- }
-
closeDbEnv();
}
catch(...)
@@ -1011,14 +967,17 @@ Freeze::BackgroundSaveEvictorI::run()
{
lockElement.release();
- if(_watchDogThread != 0)
+ IceUtil::TimerTaskPtr watchDogTask;
+ if(_timer)
{
- _watchDogThread->activate();
+ watchDogTask = new WatchDogTask(*this);
+ _timer->schedule(watchDogTask, IceUtil::Time::milliSeconds(_streamTimeout));
}
lockServant.acquire();
- if(_watchDogThread != 0)
+ if(watchDogTask)
{
- _watchDogThread->deactivate();
+ _timer->cancel(watchDogTask);
+ watchDogTask = 0;
}
lockElement.acquire();
diff --git a/cpp/src/Freeze/BackgroundSaveEvictorI.h b/cpp/src/Freeze/BackgroundSaveEvictorI.h
index a5afff853ff..12037c53275 100644
--- a/cpp/src/Freeze/BackgroundSaveEvictorI.h
+++ b/cpp/src/Freeze/BackgroundSaveEvictorI.h
@@ -60,34 +60,6 @@ namespace Freeze
class BackgroundSaveEvictorI;
-//
-// The WatchDogThread is used by the saving thread to ensure the
-// streaming of some object does not take more than timeout ms.
-// We only measure the time necessary to acquire the lock on the
-// object (servant), not the streaming itself.
-//
-
-class WatchDogThread : public IceUtil::Thread, private IceUtil::Monitor<IceUtil::Mutex>
-{
-public:
-
- WatchDogThread(long, BackgroundSaveEvictorI&);
-
- void run();
-
- void activate();
- void deactivate();
- void terminate();
-
-private:
- const IceUtil::Time _timeout;
- BackgroundSaveEvictorI& _evictor;
- bool _done;
- bool _active;
-};
-
-typedef IceUtil::Handle<WatchDogThread> WatchDogThreadPtr;
-
struct BackgroundSaveEvictorElement;
typedef IceUtil::Handle<BackgroundSaveEvictorElement> BackgroundSaveEvictorElementPtr;
@@ -199,7 +171,8 @@ private:
std::deque<BackgroundSaveEvictorElementPtr> _modifiedQueue;
bool _savingThreadDone;
- WatchDogThreadPtr _watchDogThread;
+ long _streamTimeout;
+ IceUtil::TimerPtr _timer;
//
// Threads that have requested a "saveNow" and are waiting for
diff --git a/cpp/src/Glacier2Lib/Application.cpp b/cpp/src/Glacier2Lib/Application.cpp
index d0f5e8eecbe..5f0b9bb2730 100644
--- a/cpp/src/Glacier2Lib/Application.cpp
+++ b/cpp/src/Glacier2Lib/Application.cpp
@@ -23,73 +23,40 @@ string Glacier2::Application::_category;
namespace
{
-class SessionPingThreadI : virtual public IceUtil::Thread
+class SessionRefreshTask : public IceUtil::TimerTask
{
public:
- SessionPingThreadI(Glacier2::Application* app, const Glacier2::RouterPrx& router, IceUtil::Int64 period) :
+ SessionRefreshTask(Glacier2::Application* app, const Glacier2::RouterPrx& router) :
_app(app),
_router(router),
- _period(period),
- _done(false)
+ _callback(Glacier2::newCallback_Router_refreshSession(this, &SessionRefreshTask::exception))
{
- assert(_period);
}
void
exception(const Ice::Exception&)
{
//
- // Here the session has been destroyed. The thread terminates,
- // and we notify the application that the session has been
- // destroyed.
+ // Here the session has been destroyed. Notify the application that the
+ // session has been destroyed.
//
- done();
_app->sessionDestroyed();
}
- void
- run()
+ virtual void
+ runTimerTask()
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);
-
- Glacier2::Callback_Router_refreshSessionPtr callback =
- Glacier2::newCallback_Router_refreshSession(this, &SessionPingThreadI::exception);
- while(true)
+ try
{
- try
- {
- _router->begin_refreshSession(callback);
- }
- catch(const Ice::CommunicatorDestroyedException&)
- {
- //
- // AMI requests can raise CommunicatorDestroyedException directly.
- //
- break;
- }
-
- if(!_done)
- {
- _monitor.timedWait(IceUtil::Time::milliSeconds(_period));
- }
-
- if(_done)
- {
- break;
- }
+ _router->begin_refreshSession(_callback);
}
- }
-
- void
- done()
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);
- if(!_done)
+ catch(const Ice::CommunicatorDestroyedException&)
{
- _done = true;
- _monitor.notify();
+ //
+ // AMI requests can raise CommunicatorDestroyedException directly.
+ //
}
}
@@ -97,11 +64,8 @@ private:
Glacier2::Application* _app;
Glacier2::RouterPrx _router;
- IceUtil::Int64 _period;
- bool _done;
- IceUtil::Monitor<IceUtil::Mutex> _monitor;
+ Glacier2::Callback_Router_refreshSessionPtr _callback;
};
-typedef IceUtil::Handle<SessionPingThreadI> SessionPingThreadIPtr;
class ConnectionCallbackI : public Ice::ConnectionCallback
{
@@ -119,6 +83,7 @@ public:
virtual void
closed(const Ice::ConnectionPtr&)
{
+ cout << "session destroyed" << endl;
_app->sessionDestroyed();
}
@@ -230,7 +195,6 @@ Glacier2::Application::doMain(Ice::StringSeq& args, const Ice::InitializationDat
bool restart = false;
status = 0;
- SessionPingThreadIPtr ping;
try
{
IceInternal::Application::_communicator = Ice::initialize(args, initData);
@@ -287,8 +251,14 @@ Glacier2::Application::doMain(Ice::StringSeq& args, const Ice::InitializationDat
IceUtil::Int64 sessionTimeout = _router->getSessionTimeout();
if(sessionTimeout > 0)
{
- ping = new SessionPingThreadI(this, _router, (sessionTimeout * 1000) / 2);
- ping->start();
+ //
+ // Create a ping timer task. The task itself doesn't
+ // need to be canceled as the communicator is destroyed
+ // at the end.
+ //
+ IceUtil::TimerPtr timer = IceInternal::getInstanceTimer(communicator());
+ timer->scheduleRepeated(new SessionRefreshTask(this, _router),
+ IceUtil::Time::seconds(sessionTimeout/2));
}
}
@@ -399,17 +369,6 @@ Glacier2::Application::doMain(Ice::StringSeq& args, const Ice::InitializationDat
IceInternal::Application::_application = 0;
}
- if(ping)
- {
- ping->done();
- while(true)
- {
- ping->getThreadControl().join();
- break;
- }
- ping = 0;
- }
-
if(_createdSession && _router)
{
try
diff --git a/cpp/src/Glacier2Lib/SessionHelper.cpp b/cpp/src/Glacier2Lib/SessionHelper.cpp
index 885dad9401d..ec7d29c932c 100644
--- a/cpp/src/Glacier2Lib/SessionHelper.cpp
+++ b/cpp/src/Glacier2Lib/SessionHelper.cpp
@@ -54,27 +54,6 @@ private:
const Glacier2::SessionCallbackPtr _callback;
};
-class SessionRefreshThread : public IceUtil::Thread
-{
-
-public:
-
- SessionRefreshThread(const Glacier2::SessionHelperPtr&, const Glacier2::RouterPrx&, Ice::Long);
- virtual void run();
- void done();
- void success();
- void failure(const Ice::Exception&);
-
-private:
-
- const Glacier2::SessionHelperPtr _session;
- const Glacier2::RouterPrx _router;
- Ice::Long _period;
- bool _done;
- IceUtil::Monitor<IceUtil::Mutex> _monitor;
-};
-typedef IceUtil::Handle<SessionRefreshThread> SessionRefreshThreadPtr;
-
class SessionHelperI : public Glacier2::SessionHelper
{
@@ -115,7 +94,6 @@ private:
Ice::ObjectAdapterPtr _adapter;
Glacier2::RouterPrx _router;
Glacier2::SessionPrx _session;
- SessionRefreshThreadPtr _refreshThread;
std::string _category;
bool _connected;
bool _destroy;
@@ -124,64 +102,44 @@ private:
};
typedef IceUtil::Handle<SessionHelperI> SessionHelperIPtr;
-SessionRefreshThread::SessionRefreshThread(const Glacier2::SessionHelperPtr& session,
- const Glacier2::RouterPrx& router, Ice::Long period) :
- _session(session),
- _router(router),
- _period(period),
- _done(false)
+class SessionRefreshTask : public IceUtil::TimerTask
{
-}
+public:
-void
-SessionRefreshThread::run()
-{
- Glacier2::Callback_Router_refreshSessionPtr cb =
- Glacier2::newCallback_Router_refreshSession(this, &SessionRefreshThread::failure);
- IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);
- while(true)
+ SessionRefreshTask(const Glacier2::SessionHelperPtr& session, const Glacier2::RouterPrx& router) :
+ _session(session),
+ _router(router),
+ _callback(Glacier2::newCallback_Router_refreshSession(this, &SessionRefreshTask::exception))
+ {
+ }
+
+ virtual void
+ runTimerTask()
{
try
{
- _router->begin_refreshSession(cb);
+ _router->begin_refreshSession(_callback);
}
catch(const Ice::CommunicatorDestroyedException&)
{
//
// AMI requests can raise CommunicatorDestroyedException directly.
//
- break;
- }
-
- if(!_done)
- {
- _monitor.timedWait(IceUtil::Time::seconds(_period));
- }
-
- if(_done)
- {
- break;
}
}
-}
-void
-SessionRefreshThread::done()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);
- if(!_done)
+ void
+ exception(const Ice::Exception&)
{
- _done = true;
- _monitor.notify();
+ _session->destroy();
}
-}
-void
-SessionRefreshThread::failure(const Ice::Exception&)
-{
- done();
- _session->destroy();
-}
+private:
+
+ const Glacier2::SessionHelperPtr _session;
+ const Glacier2::RouterPrx _router;
+ const Glacier2::Callback_Router_refreshSessionPtr _callback;
+};
class DestroyInternal : public IceUtil::Thread
{
@@ -407,16 +365,12 @@ SessionHelperI::destroyInternal(const Ice::DispatcherCallPtr& disconnected)
assert(_destroy);
Ice::CommunicatorPtr communicator;
Glacier2::RouterPrx router;
- SessionRefreshThreadPtr refreshThread;
{
IceUtil::Mutex::Lock sync(_mutex);
router = _router;
_router = 0;
_connected = false;
- refreshThread = _refreshThread;
- _refreshThread = 0;
-
communicator = _communicator;
}
@@ -451,13 +405,6 @@ SessionHelperI::destroyInternal(const Ice::DispatcherCallPtr& disconnected)
}
}
- if(refreshThread)
- {
- refreshThread->done();
- refreshThread->getThreadControl().join();
- refreshThread = 0;
- }
-
if(communicator)
{
try
@@ -745,8 +692,6 @@ SessionHelperI::connected(const Glacier2::RouterPrx& router, const Glacier2::Ses
_session = session;
_connected = true;
- assert(!_refreshThread);
-
if(acmTimeout > 0)
{
Ice::ConnectionPtr connection = _router->ice_getCachedConnection();
@@ -756,8 +701,13 @@ SessionHelperI::connected(const Glacier2::RouterPrx& router, const Glacier2::Ses
}
else if(sessionTimeout > 0)
{
- _refreshThread = new SessionRefreshThread(this, _router, (sessionTimeout)/2);
- _refreshThread->start();
+ //
+ // Create a ping timer task. The task itself doesn't need to be
+ // canceled as the communicator is destroyed at the end.
+ //
+ IceUtil::TimerPtr timer = IceInternal::getInstanceTimer(communicator());
+ timer->scheduleRepeated(new SessionRefreshTask(this, _router),
+ IceUtil::Time::seconds(sessionTimeout/2));
}
}
dispatchCallback(new Connected(_callback, this), conn);