summaryrefslogtreecommitdiff
path: root/cpp/src/Freeze/SharedDbEnv.cpp
diff options
context:
space:
mode:
authorBernard Normier <bernard@zeroc.com>2003-07-18 19:47:14 +0000
committerBernard Normier <bernard@zeroc.com>2003-07-18 19:47:14 +0000
commit3c02842b6143c6ce9297feecd7c9497312c76da8 (patch)
tree478210bed985772e2957ee0a65b9cd3f5f390234 /cpp/src/Freeze/SharedDbEnv.cpp
parentuse lazy initialization of communicator for each request (diff)
downloadice-3c02842b6143c6ce9297feecd7c9497312c76da8.tar.bz2
ice-3c02842b6143c6ce9297feecd7c9497312c76da8.tar.xz
ice-3c02842b6143c6ce9297feecd7c9497312c76da8.zip
Major Freeze update
Diffstat (limited to 'cpp/src/Freeze/SharedDbEnv.cpp')
-rw-r--r--cpp/src/Freeze/SharedDbEnv.cpp429
1 files changed, 429 insertions, 0 deletions
diff --git a/cpp/src/Freeze/SharedDbEnv.cpp b/cpp/src/Freeze/SharedDbEnv.cpp
new file mode 100644
index 00000000000..392f8cafe70
--- /dev/null
+++ b/cpp/src/Freeze/SharedDbEnv.cpp
@@ -0,0 +1,429 @@
+// **********************************************************************
+//
+// Copyright (c) 2003
+// ZeroC, Inc.
+// Billerica, MA, USA
+//
+// All Rights Reserved.
+//
+// Ice is free software; you can redistribute it and/or modify it under
+// the terms of the GNU General Public License version 2 as published by
+// the Free Software Foundation.
+//
+// **********************************************************************
+
+#include <Freeze/SharedDbEnv.h>
+#include <IceUtil/StaticMutex.h>
+#include <IceUtil/Thread.h>
+#include <Freeze/DBException.h>
+#include <cstdlib>
+#include <map>
+#include <memory>
+#include <sys/stat.h>
+
+using namespace std;
+using namespace IceUtil;
+using namespace Ice;
+
+#ifdef _WIN32
+# define FREEZE_DB_MODE 0
+#else
+# define FREEZE_DB_MODE (S_IRUSR | S_IWUSR)
+#endif
+
+
+namespace Freeze
+{
+
+class CheckpointThread : public Thread, public Monitor<Mutex>
+{
+public:
+
+ CheckpointThread(SharedDbEnv&, const Time&, Int, bool);
+
+ virtual void run();
+
+ void terminate();
+
+private:
+ SharedDbEnv& _dbEnv;
+ bool _done;
+ Time _checkpointPeriod;
+ Int _kbyte;
+ bool _autoDelete;
+};
+
+}
+
+namespace
+{
+
+struct MapKey
+{
+ string envName;
+ Ice::CommunicatorPtr communicator;
+};
+
+inline bool
+operator<(const MapKey& lhs, const MapKey& rhs)
+{
+ return (lhs.communicator < rhs.communicator) ||
+ ((lhs.communicator == rhs.communicator) && (lhs.envName < rhs.envName));
+}
+
+void
+dbErrCallback(const char* prefix, char* msg)
+{
+ const Freeze::SharedDbEnv* env = reinterpret_cast<const Freeze::SharedDbEnv*>(prefix);
+ assert(env != 0);
+
+ Ice::Error out(env->getCommunicator()->getLogger());
+ out << "Freeze database error in DbEnv(\"" << env->getEnvName() << "\"): " << msg;
+}
+
+
+StaticMutex _mapMutex = ICE_STATIC_MUTEX_INITIALIZER;
+StaticMutex _refCountMutex = ICE_STATIC_MUTEX_INITIALIZER;
+
+typedef map<MapKey, Freeze::SharedDbEnv*> Map;
+Map* sharedDbEnvMap;
+
+}
+
+
+Freeze::SharedDbEnvPtr
+Freeze::SharedDbEnv::get(const Ice::CommunicatorPtr& communicator,
+ const std::string& envName)
+{
+ StaticMutex::Lock lock(_mapMutex);
+
+ if(sharedDbEnvMap == 0)
+ {
+ sharedDbEnvMap = new Map;
+ }
+
+ MapKey key;
+ key.envName = envName;
+ key.communicator = communicator;
+
+ {
+ Map::iterator p = sharedDbEnvMap->find(key);
+ if(p != sharedDbEnvMap->end())
+ {
+ return p->second;
+ }
+ }
+
+ //
+ // MapKey not found, let's create and open a new DbEnv
+ //
+ auto_ptr<SharedDbEnv> result(new SharedDbEnv(envName, communicator));
+
+ //
+ // Insert it into the map
+ //
+ pair<Map::iterator, bool> insertResult = sharedDbEnvMap->insert(Map::value_type(key, result.get()));
+ assert(insertResult.second);
+
+ return result.release();
+}
+
+Freeze::SharedDbEnv::~SharedDbEnv()
+{
+ if(_trace >= 1)
+ {
+ Trace out(_communicator->getLogger(), "DB");
+ out << "closing database environment \"" << _envName << "\"";
+ }
+
+ //
+ // First terminate checkpointing thread
+ //
+ _thread->terminate();
+ _thread = 0;
+
+ try
+ {
+ close(0);
+ }
+ catch(const ::DbException& dx)
+ {
+ DBException ex(__FILE__, __LINE__);
+ ex.message = dx.what();
+ throw ex;
+ }
+}
+
+void Freeze::SharedDbEnv::__incRef()
+{
+ IceUtil::StaticMutex::Lock lock(_refCountMutex);
+ _refCount++;
+}
+
+void Freeze::SharedDbEnv::__decRef()
+{
+ IceUtil::StaticMutex::Lock lock(_refCountMutex);
+ if(--_refCount == 0)
+ {
+ IceUtil::StaticMutex::TryLock mapLock(_mapMutex);
+ if(!mapLock.acquired())
+ {
+ //
+ // Reacquire mutex in proper order and check again
+ //
+ lock.release();
+ mapLock.acquire();
+ lock.acquire();
+ if(_refCount > 0)
+ {
+ return;
+ }
+ }
+
+ //
+ // Remove from map
+ //
+
+ MapKey key;
+ key.envName = _envName;
+ key.communicator = _communicator;
+ size_t one = sharedDbEnvMap->erase(key);
+ assert(one == 1);
+
+ if(sharedDbEnvMap->size() == 0)
+ {
+ delete sharedDbEnvMap;
+ sharedDbEnvMap = 0;
+ }
+
+ //
+ // Keep lock to prevent somebody else to re-open this DbEnv
+ // before it's closed.
+ //
+ delete this;
+ }
+}
+
+void
+Freeze::SharedDbEnv::deleteOldLogs()
+{
+ IceUtil::Mutex::Lock lock(_oldLogsMutex);
+
+ char** list = 0;
+
+ try
+ {
+ log_archive(&list, DB_ARCH_ABS);
+
+ if(list != 0)
+ {
+ for(int i = 0; list[i] != 0; i++)
+ {
+ //
+ // Remove each file
+ //
+#ifdef _WIN32
+
+#if defined(_MSC_VER) && (_MSC_VER <= 1200)
+ BOOL ok = DeleteFile(list[i]);
+#else
+ BOOL ok = DeleteFileA(list[i]);
+#endif
+ if(!ok)
+ {
+ DWORD err = GetLastError();
+ Warning out(_communicator->getLogger());
+ out << "could not delete file: \"" << list[i] << "\" error number: " << err;
+ }
+#else
+ int err = unlink(list[i]);
+ if(err != 0)
+ {
+ Warning out(_communicator->getLogger());
+ out << "could not unlink file: \"" << list[i] << "\": " << strerror(err);
+ }
+#endif
+
+ }
+ }
+ }
+ catch(const ::DbException& dx)
+ {
+ free(list);
+ DBException ex(__FILE__, __LINE__);
+ ex.message = dx.what();
+ throw ex;
+ }
+ catch(...)
+ {
+ free(list);
+ throw;
+ }
+ free(list);
+}
+
+void
+Freeze::SharedDbEnv::moveOldLogs()
+{
+ //
+ // Not yet implemented
+ //
+ assert(0);
+}
+
+
+Freeze::SharedDbEnv::SharedDbEnv(const std::string& envName,
+ const Ice::CommunicatorPtr& communicator) :
+ DbEnv(0),
+ _envName(envName),
+ _communicator(communicator),
+ _refCount(0)
+{
+ Ice::PropertiesPtr properties = _communicator->getProperties();
+
+ _trace = properties->getPropertyAsInt("Freeze.Trace.DB");
+
+ if(_trace >= 1)
+ {
+ Trace out(_communicator->getLogger(), "DB");
+ out << "opening database environment \"" << envName << "\"";
+ }
+
+ string propertyPrefix = string("Freeze.") + envName;
+
+ try
+ {
+ set_errpfx(reinterpret_cast<char*>(this));
+ set_errcall(dbErrCallback);
+
+#ifdef _WIN32
+ //
+ // Berkeley DB may use a different C++ runtime
+ //
+ set_alloc(::malloc, ::realloc, ::free);
+#endif
+
+ //
+ // Deadlock detection
+ //
+ set_lk_detect(DB_LOCK_MINLOCKS);
+
+ //
+ // Async tx
+ //
+ set_flags(DB_TXN_NOSYNC, true);
+
+ u_int32_t flags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN;
+
+ if(properties->getPropertyAsInt(propertyPrefix + ".DbRecoverFatal") != 0)
+ {
+ flags |= DB_RECOVER_FATAL | DB_CREATE;
+ }
+ else
+ {
+ flags |= DB_RECOVER | DB_CREATE;
+ }
+
+ if(properties->getPropertyAsIntWithDefault(propertyPrefix + ".DbPrivate", 1) != 0)
+ {
+ flags |= DB_PRIVATE;
+ }
+
+ //
+ // Threading
+ //
+ flags |= DB_THREAD;
+
+ string dbHome = properties->getPropertyWithDefault(
+ propertyPrefix + ".DbHome", envName);
+
+ open(dbHome.c_str(), flags, FREEZE_DB_MODE);
+ }
+ catch(const ::DbException& dx)
+ {
+ DBException ex(__FILE__, __LINE__);
+ ex.message = dx.what();
+ throw ex;
+ }
+
+ //
+ // Default checkpoint period is every 3 minutes
+ //
+ Int checkpointPeriod = properties->getPropertyAsIntWithDefault(
+ propertyPrefix + ".CheckpointPeriod", 3);
+ Int kbyte = properties->getPropertyAsInt(propertyPrefix + ".PeriodicCheckpointMinSize");
+
+ bool autoDelete = (properties->getPropertyAsIntWithDefault(
+ propertyPrefix + ".OldLogsAutoDelete", 1) != 0);
+
+ _thread = new CheckpointThread(*this, Time::seconds(checkpointPeriod * 60), kbyte, autoDelete);
+}
+
+
+
+Freeze::CheckpointThread::CheckpointThread(SharedDbEnv& dbEnv, const Time& checkpointPeriod, Int kbyte, bool autoDelete) :
+ _dbEnv(dbEnv),
+ _done(false),
+ _checkpointPeriod(checkpointPeriod),
+ _kbyte(kbyte),
+ _autoDelete(autoDelete)
+{
+ start();
+}
+
+void
+Freeze::CheckpointThread::terminate()
+{
+ {
+ Lock sync(*this);
+ _done = true;
+ notify();
+ }
+
+ getThreadControl().join();
+}
+
+
+void
+Freeze::CheckpointThread::run()
+{
+ for(;;)
+ {
+ {
+ Lock sync(*this);
+ while(!_done && timedWait(_checkpointPeriod))
+ {
+ //
+ // Loop
+ //
+ }
+ if(_done)
+ {
+ return;
+ }
+ }
+
+ try
+ {
+ _dbEnv.txn_checkpoint(_kbyte, 0, 0);
+ }
+ catch(const DbException& dx)
+ {
+ Warning out(_dbEnv.getCommunicator()->getLogger());
+ out << "checkpoint on DbEnv \"" << _dbEnv.getEnvName() << "\" raised DbException: " << dx.what();
+ }
+
+ if(_autoDelete)
+ {
+ try
+ {
+ _dbEnv.deleteOldLogs();
+ }
+ catch(const IceUtil::Exception& ex)
+ {
+ Warning out(_dbEnv.getCommunicator()->getLogger());
+ out << "deleteOldLogs on DbEnv \"" << _dbEnv.getEnvName() << "\" raised: " << ex;
+ }
+ }
+ }
+}