summaryrefslogtreecommitdiff
path: root/cpp/src/Freeze/ObjectStore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Freeze/ObjectStore.cpp')
-rw-r--r--cpp/src/Freeze/ObjectStore.cpp179
1 files changed, 83 insertions, 96 deletions
diff --git a/cpp/src/Freeze/ObjectStore.cpp b/cpp/src/Freeze/ObjectStore.cpp
index 1eaea6e2c92..b079086b2bb 100644
--- a/cpp/src/Freeze/ObjectStore.cpp
+++ b/cpp/src/Freeze/ObjectStore.cpp
@@ -22,7 +22,7 @@ using namespace Ice;
using namespace Freeze;
Freeze::ObjectStoreBase::ObjectStoreBase(const string& facet, const string& facetType,
- bool createDb, EvictorIBase* evictor,
+ bool createDb, EvictorIBase* evictor,
const vector<IndexPtr>& indices,
bool populateEmptyIndices) :
_facet(facet),
@@ -51,13 +51,13 @@ Freeze::ObjectStoreBase::ObjectStoreBase(const string& facet, const string& face
{
throw DatabaseException(__FILE__, __LINE__, "No object factory registered for type-id '" + facetType + "'");
}
-
+
_sampleServant = factory->create(facetType);
}
ConnectionPtr catalogConnection = createConnection(_communicator, evictor->dbEnv()->getEnvName());
Catalog catalog(catalogConnection, catalogName());
-
+
Catalog::iterator p = catalog.find(evictor->filename());
if(p != catalog.end())
{
@@ -96,7 +96,7 @@ Freeze::ObjectStoreBase::ObjectStoreBase(const string& facet, const string& face
_db->set_bt_minkey(btreeMinKey);
}
-
+
bool checksum = properties->getPropertyAsInt(propPrefix + "Checksum") > 0;
if(checksum)
{
@@ -108,7 +108,7 @@ Freeze::ObjectStoreBase::ObjectStoreBase(const string& facet, const string& face
_db->set_flags(DB_CHKSUM);
}
-
+
int pageSize = properties->getPropertyAsInt(propPrefix + "PageSize");
if(pageSize > 0)
{
@@ -120,7 +120,7 @@ Freeze::ObjectStoreBase::ObjectStoreBase(const string& facet, const string& face
_db->set_pagesize(pageSize);
}
-
+
TransactionPtr tx = catalogConnection->beginTransaction();
DbTxn* txn = getTxn(tx);
@@ -146,7 +146,7 @@ Freeze::ObjectStoreBase::ObjectStoreBase(const string& facet, const string& face
{
_indices[i]->_impl->associate(this, txn, createDb, populateEmptyIndices);
}
-
+
if(p == catalog.end())
{
CatalogData catalogData;
@@ -207,7 +207,7 @@ Freeze::ObjectStoreBase::~ObjectStoreBase()
try
{
_db->close(0);
-
+
for(size_t i = 0; i < _indices.size(); ++i)
{
try
@@ -228,7 +228,7 @@ Freeze::ObjectStoreBase::~ObjectStoreBase()
error << "Freeze: closing ObjectStore " << _dbName << " raised DbException: " << dx.what();
}
}
-
+
bool
Freeze::ObjectStoreBase::dbHasObject(const Identity& ident, const TransactionIPtr& transaction) const
{
@@ -242,11 +242,10 @@ Freeze::ObjectStoreBase::dbHasObject(const Identity& ident, const TransactionIPt
}
}
- Key key;
- marshal(ident, key, _communicator, _encoding);
Dbt dbKey;
- initializeInDbt(key, dbKey);
-
+ KeyMarshaler km(ident, _communicator, _encoding);
+ km.getDbt(dbKey);
+
//
// Keep 0 length since we're not interested in the data
//
@@ -258,7 +257,7 @@ Freeze::ObjectStoreBase::dbHasObject(const Identity& ident, const TransactionIPt
try
{
int err = _db->get(tx, &dbKey, &dbValue, 0);
-
+
if(err == 0)
{
return true;
@@ -278,7 +277,7 @@ Freeze::ObjectStoreBase::dbHasObject(const Identity& ident, const TransactionIPt
if(_evictor->deadlockWarning())
{
Warning out(_communicator->getLogger());
- out << "Deadlock in Freeze::ObjectStoreBase::dbHasObject while searching \""
+ out << "Deadlock in Freeze::ObjectStoreBase::dbHasObject while searching \""
<< _evictor->filename() + "/" + _dbName << "\"; retrying ...";
}
@@ -299,19 +298,15 @@ Freeze::ObjectStoreBase::dbHasObject(const Identity& ident, const TransactionIPt
}
void
-Freeze::ObjectStoreBase::save(Key& key, Value& value, Byte status, DbTxn* tx)
+Freeze::ObjectStoreBase::save(Dbt& key, Dbt& value, Byte status, DbTxn* tx)
{
switch(status)
{
case created:
case modified:
{
- Dbt dbKey;
- Dbt dbValue;
- initializeInDbt(key, dbKey);
- initializeInDbt(value, dbValue);
u_int32_t flags = (status == created) ? DB_NOOVERWRITE : 0;
- int err = _db->put(tx, &dbKey, &dbValue, flags);
+ int err = _db->put(tx, &key, &value, flags);
if(err != 0)
{
throw DatabaseException(__FILE__, __LINE__);
@@ -321,15 +316,13 @@ Freeze::ObjectStoreBase::save(Key& key, Value& value, Byte status, DbTxn* tx)
case destroyed:
{
- Dbt dbKey;
- initializeInDbt(key, dbKey);
- int err = _db->del(tx, &dbKey, 0);
+ int err = _db->del(tx, &key, 0);
if(err != 0)
{
throw DatabaseException(__FILE__, __LINE__);
}
break;
- }
+ }
default:
{
assert(0);
@@ -337,51 +330,55 @@ Freeze::ObjectStoreBase::save(Key& key, Value& value, Byte status, DbTxn* tx)
}
}
-void
-Freeze::ObjectStoreBase::marshal(const Identity& ident,
- Key& bytes,
- const CommunicatorPtr& communicator,
- const EncodingVersion& encoding)
+Freeze::ObjectStoreBase::Marshaler::Marshaler(const CommunicatorPtr& communicator,
+ const EncodingVersion& encoding) :
+ _os(IceInternal::getInstance(communicator).get(), encoding, true)
{
- IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
- IceInternal::BasicStream stream(instance.get(), encoding, true);
- stream.write(ident);
- vector<Byte>(stream.b.begin(), stream.b.end()).swap(bytes);
}
-
-void
-Freeze::ObjectStoreBase::unmarshal(Identity& ident,
- const Key& bytes,
- const CommunicatorPtr& communicator,
- const EncodingVersion& encoding)
+
+void
+Freeze::ObjectStoreBase::Marshaler::getDbt(Dbt& dbt) const
{
- IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
- IceInternal::BasicStream stream(instance.get(), encoding, &bytes[0], &bytes[0] + bytes.size());
- stream.read(ident);
+ initializeInDbt(const_cast<IceInternal::BasicStream&>(_os), dbt);
}
-void
-Freeze::ObjectStoreBase::marshal(const ObjectRecord& v,
- Value& bytes,
- const CommunicatorPtr& communicator,
- const EncodingVersion& encoding,
- bool keepStats)
+Freeze::ObjectStoreBase::KeyMarshaler::KeyMarshaler(const Identity& ident,
+ const CommunicatorPtr& communicator,
+ const EncodingVersion& encoding) :
+ Marshaler(communicator, encoding)
{
- IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
- IceInternal::BasicStream stream(instance.get(), encoding, true);
- stream.startWriteEncaps();
+ _os.write(ident);
+}
+
+Freeze::ObjectStoreBase::ValueMarshaler::ValueMarshaler(const ObjectRecord& rec,
+ const CommunicatorPtr& communicator,
+ const EncodingVersion& encoding,
+ bool keepStats) :
+ Marshaler(communicator, encoding)
+{
+ _os.startWriteEncaps();
if(keepStats)
{
- stream.write(v);
+ _os.write(rec);
}
else
{
- stream.write(v.servant);
+ _os.write(rec.servant);
}
- stream.writePendingObjects();
- stream.endWriteEncaps();
- vector<Byte>(stream.b.begin(), stream.b.end()).swap(bytes);
+ _os.writePendingObjects();
+ _os.endWriteEncaps();
+}
+
+void
+Freeze::ObjectStoreBase::unmarshal(Identity& ident,
+ const Key& bytes,
+ const CommunicatorPtr& communicator,
+ const EncodingVersion& encoding)
+{
+ IceInternal::InstancePtr instance = IceInternal::getInstance(communicator);
+ IceInternal::BasicStream stream(instance.get(), encoding, &bytes[0], &bytes[0] + bytes.size());
+ stream.read(ident);
}
void
@@ -395,7 +392,7 @@ Freeze::ObjectStoreBase::unmarshal(ObjectRecord& v,
IceInternal::BasicStream stream(instance.get(), encoding, &bytes[0], &bytes[0] + bytes.size());
stream.sliceObjects(false);
stream.startReadEncaps();
-
+
if(keepStats)
{
stream.read(v);
@@ -404,7 +401,7 @@ Freeze::ObjectStoreBase::unmarshal(ObjectRecord& v,
{
stream.read(v.servant);
}
-
+
stream.readPendingObjects();
stream.endReadEncaps();
}
@@ -418,17 +415,15 @@ Freeze::ObjectStoreBase::load(const Identity& ident, const TransactionIPtr& tran
}
DbTxn* txn = transaction->dbTxn();
-
+
if(txn == 0)
{
throw DatabaseException(__FILE__, __LINE__, "inactive transaction");
}
- Key key;
- marshal(ident, key, _communicator, _encoding);
-
Dbt dbKey;
- initializeInDbt(key, dbKey);
+ KeyMarshaler km(ident, _communicator, _encoding);
+ km.getDbt(dbKey);
const size_t defaultValueSize = 4096;
Value value(defaultValueSize);
@@ -457,7 +452,7 @@ Freeze::ObjectStoreBase::load(const Identity& ident, const TransactionIPtr& tran
if(_evictor->deadlockWarning())
{
Warning out(_communicator->getLogger());
- out << "Deadlock in Freeze::ObjectStoreBase::load while searching \""
+ out << "Deadlock in Freeze::ObjectStoreBase::load while searching \""
<< _evictor->filename() + "/" + _dbName << "\"";
}
throw DeadlockException(__FILE__, __LINE__, dx.what(), transaction);
@@ -467,7 +462,7 @@ Freeze::ObjectStoreBase::load(const Identity& ident, const TransactionIPtr& tran
handleDbException(dx, value, dbValue, __FILE__, __LINE__);
}
}
-
+
unmarshal(rec, value, _communicator, _encoding, _keepStats);
_evictor->initialize(ident, _facet, rec.servant);
return true;
@@ -482,22 +477,20 @@ Freeze::ObjectStoreBase::update(const Identity& ident, const ObjectRecord& rec,
}
DbTxn* txn = transaction->dbTxn();
-
+
if(txn == 0)
{
throw DatabaseException(__FILE__, __LINE__, "inactive transaction");
}
- Key key;
- marshal(ident, key, _communicator, _encoding);
-
- Value value;
- marshal(rec, value, _communicator, _encoding, _keepStats);
-
Dbt dbKey;
+ KeyMarshaler km(ident, _communicator, _encoding);
+ km.getDbt(dbKey);
+
Dbt dbValue;
- initializeInDbt(key, dbKey);
- initializeInDbt(value, dbValue);
+ ValueMarshaler vm(rec, _communicator, _encoding, _keepStats);
+ vm.getDbt(dbValue);
+
u_int32_t flags = 0;
try
@@ -509,7 +502,7 @@ Freeze::ObjectStoreBase::update(const Identity& ident, const ObjectRecord& rec,
if(_evictor->deadlockWarning())
{
Warning out(_communicator->getLogger());
- out << "Deadlock in Freeze::ObjectStoreBase::update while updating \""
+ out << "Deadlock in Freeze::ObjectStoreBase::update while updating \""
<< _evictor->filename() + "/" + _dbName << "\"";
}
throw DeadlockException(__FILE__, __LINE__, dx.what(), transaction);
@@ -533,16 +526,14 @@ Freeze::ObjectStoreBase::insert(const Identity& ident, const ObjectRecord& rec,
}
}
- Key key;
- marshal(ident, key, _communicator, _encoding);
-
- Value value;
- marshal(rec, value, _communicator, _encoding, _keepStats);
-
Dbt dbKey;
+ KeyMarshaler km(ident, _communicator, _encoding);
+ km.getDbt(dbKey);
+
Dbt dbValue;
- initializeInDbt(key, dbKey);
- initializeInDbt(value, dbValue);
+ ValueMarshaler vm(rec, _communicator, _encoding, _keepStats);
+ vm.getDbt(dbValue);
+
u_int32_t flags = DB_NOOVERWRITE;
if(tx == 0)
{
@@ -560,7 +551,7 @@ Freeze::ObjectStoreBase::insert(const Identity& ident, const ObjectRecord& rec,
if(_evictor->deadlockWarning())
{
Warning out(_communicator->getLogger());
- out << "Deadlock in Freeze::ObjectStoreBase::insert while updating \""
+ out << "Deadlock in Freeze::ObjectStoreBase::insert while updating \""
<< _evictor->filename() + "/" + _dbName << "\"";
}
if(tx != 0)
@@ -591,11 +582,9 @@ Freeze::ObjectStoreBase::remove(const Identity& ident, const TransactionIPtr& tr
}
}
- Key key;
- marshal(ident, key, _communicator, _encoding);
-
Dbt dbKey;
- initializeInDbt(key, dbKey);
+ KeyMarshaler km(ident, _communicator, _encoding);
+ km.getDbt(dbKey);
for(;;)
{
@@ -608,7 +597,7 @@ Freeze::ObjectStoreBase::remove(const Identity& ident, const TransactionIPtr& tr
if(_evictor->deadlockWarning())
{
Warning out(_communicator->getLogger());
- out << "Deadlock in Freeze::ObjectStoreBase::remove while updating \""
+ out << "Deadlock in Freeze::ObjectStoreBase::remove while updating \""
<< _evictor->filename() + "/" + _dbName << "\"";
}
if(tx != 0)
@@ -639,11 +628,9 @@ Freeze::ObjectStoreBase::dbName() const
bool
Freeze::ObjectStoreBase::loadImpl(const Identity& ident, ObjectRecord& rec)
{
- Key key;
- marshal(ident, key, _communicator, _encoding);
-
Dbt dbKey;
- initializeInDbt(key, dbKey);
+ KeyMarshaler km(ident, _communicator, _encoding);
+ km.getDbt(dbKey);
const size_t defaultValueSize = 4096;
Value value(defaultValueSize);
@@ -672,7 +659,7 @@ Freeze::ObjectStoreBase::loadImpl(const Identity& ident, ObjectRecord& rec)
if(_evictor->deadlockWarning())
{
Warning out(_communicator->getLogger());
- out << "Deadlock in Freeze::ObjectStoreBase::load while searching \""
+ out << "Deadlock in Freeze::ObjectStoreBase::load while searching \""
<< _evictor->filename() + "/" + _dbName << "\"; retrying ...";
}
//
@@ -684,7 +671,7 @@ Freeze::ObjectStoreBase::loadImpl(const Identity& ident, ObjectRecord& rec)
handleDbException(dx, value, dbValue, __FILE__, __LINE__);
}
}
-
+
unmarshal(rec, value, _communicator, _encoding, _keepStats);
_evictor->initialize(ident, _facet, rec.servant);
return true;