summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2001-11-30 18:40:03 +0000
committerMatthew Newhook <matthew@zeroc.com>2001-11-30 18:40:03 +0000
commit8ba83da8375d13409786c8a7829311114a69cac3 (patch)
tree9bf020000d7bbdff4760676ffb413784f88ca7b8 /cpp/src
parentice_invoke (diff)
downloadice-8ba83da8375d13409786c8a7829311114a69cac3.tar.bz2
ice-8ba83da8375d13409786c8a7829311114a69cac3.tar.xz
ice-8ba83da8375d13409786c8a7829311114a69cac3.zip
Added DBCursor to the Freeze module.
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Freeze/DBI.cpp314
-rw-r--r--cpp/src/Freeze/DBI.h32
-rw-r--r--cpp/src/IceStorm/Parser.cpp4
3 files changed, 346 insertions, 4 deletions
diff --git a/cpp/src/Freeze/DBI.cpp b/cpp/src/Freeze/DBI.cpp
index 6eae037d8b4..f70b7d44c36 100644
--- a/cpp/src/Freeze/DBI.cpp
+++ b/cpp/src/Freeze/DBI.cpp
@@ -294,6 +294,248 @@ Freeze::DBTransactionI::abort()
_tid = 0;
}
+DBCursorI::DBCursorI(const ::Ice::CommunicatorPtr& communicator, const std::string& name, DBC* cursor,
+ bool hasCurrentValue) :
+ _communicator(communicator),
+ _name(name),
+ _canRemove(false),
+ _hasCurrentValue(hasCurrentValue),
+ _cursor(cursor)
+{
+ PropertiesPtr properties = _communicator->getProperties();
+ string value;
+
+ value = properties->getProperty("Freeze.Trace.DBCursor");
+ if (!value.empty())
+ {
+ _trace = atoi(value.c_str());
+ }
+
+ _errorPrefix = "Freeze::DBCursor(\"" + _name += "\"): ";
+
+ if (_trace >= 1)
+ {
+ ostringstream s;
+ s << "creating cursor for \"" << _name << "\"";
+ _communicator->getLogger()->trace("DB", s.str());
+ }
+}
+
+DBCursorI::~DBCursorI()
+{
+ if (_cursor != 0)
+ {
+ ostringstream s;
+ s << _errorPrefix << "\"" << _name << "\" has not been closed";
+ _communicator->getLogger()->warning(s.str());
+ }
+}
+
+Ice::CommunicatorPtr
+DBCursorI::getCommunicator()
+{
+ // immutable
+ return _communicator;
+}
+
+bool
+DBCursorI::hasNext()
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ if (!_cursor)
+ {
+ ostringstream s;
+ s << _errorPrefix << "\"" << _name << "\" has been closed";
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+
+ //
+ // Note that the reads are partial reads since this method only
+ // verifies that there is a next value
+ //
+ DBT dbKey, dbData;
+ memset(&dbKey, 0, sizeof(dbKey));
+ dbKey.flags = DB_DBT_PARTIAL;
+
+ memset(&dbData, 0, sizeof(dbData));
+ dbData.flags = DB_DBT_PARTIAL;
+
+ //
+ // If we've already verified that there is a next record then
+ // verify that the current record still exists.
+ //
+ if (_hasCurrentValue)
+ {
+ try
+ {
+ checkBerkeleyDBReturn(_cursor->c_get(_cursor, &dbKey, &dbData, DB_CURRENT), _errorPrefix,
+ "DBcursor->c_get");\
+ }
+ catch(const DBNotFoundException&)
+ {
+ //
+ // There is no next record.
+ //
+ return false;
+ }
+ return true;
+ }
+
+ //
+ // Otherwise, move to the next record.
+ //
+ try
+ {
+ checkBerkeleyDBReturn(_cursor->c_get(_cursor, &dbKey, &dbData, DB_NEXT), _errorPrefix,
+ "DBcursor->c_get");\
+ }
+ catch(const DBNotFoundException&)
+ {
+ //
+ // There is no next record
+ //
+ return false;
+ }
+
+ //
+ // We now have a current value.
+ //
+ _hasCurrentValue = true;
+ return true;
+}
+
+void
+DBCursorI::next(Key& key, Value& value)
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ if (!_cursor)
+ {
+ ostringstream s;
+ s << _errorPrefix << "\"" << _name << "\" has been closed";
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+
+ DBT dbKey, dbData;
+ memset(&dbKey, 0, sizeof(dbKey));
+ memset(&dbData, 0, sizeof(dbData));
+
+ u_int32_t getFlags;
+ string desc;
+
+ //
+ // Do we need to move to the next record?
+ //
+ if (!_hasCurrentValue)
+ {
+ getFlags = DB_NEXT;
+ desc = "next";
+ }
+ else
+ {
+ getFlags = DB_CURRENT;
+ desc = "current";
+ }
+
+ if (_trace >= 1)
+ {
+ ostringstream s;
+ s << "reading " << desc << " value from database \"" << _name << "\"";
+ _communicator->getLogger()->trace("DBCursor", s.str());
+ }
+
+ checkBerkeleyDBReturn(_cursor->c_get(_cursor, &dbKey, &dbData, getFlags), _errorPrefix, "DBcursor->c_get");
+
+ //
+ // Copy the data from the read key & data
+ //
+ key = Key(static_cast<Byte*>(dbKey.data), static_cast<Byte*>(dbKey.data) + dbKey.size);
+ value = Value(static_cast<Byte*>(dbData.data), static_cast<Byte*>(dbData.data) + dbData.size);
+
+ _canRemove = true;
+ _hasCurrentValue = false;
+}
+
+void
+DBCursorI::remove()
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ if (!_cursor)
+ {
+ ostringstream s;
+ s << _errorPrefix << "\"" << _name << "\" has been closed";
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+
+ if (!_canRemove)
+ {
+ DBNotFoundException ex;
+ ex.message = "The next method has not yet been called, or the remove method has already been called "
+ "after the last call to the next method.";
+ throw ex;
+ }
+
+ if (_trace >= 1)
+ {
+ ostringstream s;
+ s << "deleting value from database \"" << _name << "\"";
+ _communicator->getLogger()->trace("DBCursor", s.str());
+ }
+
+ checkBerkeleyDBReturn( _cursor->c_del(_cursor, 0), _errorPrefix, "DBcursor->c_del");
+
+ _hasCurrentValue = false;
+ _canRemove = false;
+}
+
+DBCursorPtr
+DBCursorI::clone()
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ if (!_cursor)
+ {
+ ostringstream s;
+ s << _errorPrefix << "\"" << _name << "\" has been closed";
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+
+ DBC* cursor;
+ _cursor->c_dup(_cursor, &cursor, DB_POSITION);
+ return new DBCursorI(_communicator, _name, cursor, _hasCurrentValue);
+}
+
+void
+DBCursorI::close()
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ if (!_cursor)
+ {
+ return;
+ }
+
+ if (_trace >= 1)
+ {
+ ostringstream s;
+ s << "closing cursor \"" << _name << "\"";
+ _communicator->getLogger()->trace("DBCursor", s.str());
+ }
+
+ _cursor->c_close(_cursor);
+ _cursor = 0;
+}
+
Freeze::DBI::DBI(const CommunicatorPtr& communicator, const DBEnvironmentIPtr& dbEnvObj, ::DB* db,
const string& name) :
_communicator(communicator),
@@ -350,6 +592,78 @@ Freeze::DBI::getCommunicator()
return _communicator;
}
+DBCursorPtr
+Freeze::DBI::getCursor()
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ if (!_db)
+ {
+ ostringstream s;
+ s << _errorPrefix << "\"" << _name << "\" has been closed";
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+
+ DBC* cursor;
+
+ checkBerkeleyDBReturn(_db->cursor(_db, 0, &cursor, 0), _errorPrefix, "DB->cursor");
+
+ return new DBCursorI(_communicator, _name, cursor, false);
+}
+
+DBCursorPtr
+Freeze::DBI::getCursorForKey(const Key& key)
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ if (!_db)
+ {
+ ostringstream s;
+ s << _errorPrefix << "\"" << _name << "\" has been closed";
+ DBException ex;
+ ex.message = s.str();
+ throw ex;
+ }
+
+ DBC* cursor;
+
+ checkBerkeleyDBReturn(_db->cursor(_db, 0, &cursor, 0), _errorPrefix, "DB->cursor");
+
+ //
+ // Move to the requested record
+ //
+ DBT dbKey;
+ memset(&dbKey, 0, sizeof(dbKey));
+
+ //
+ // Note that the read of the data is partial (that is the data
+ // will not actually be read into memory since it isn't needed
+ // yet).
+ //
+ DBT dbData;
+ memset(&dbData, 0, sizeof(dbData));
+ dbData.flags = DB_DBT_PARTIAL;
+
+ dbKey.data = const_cast<void*>(static_cast<const void*>(key.begin()));
+ dbKey.size = key.size();
+ try
+ {
+ checkBerkeleyDBReturn(cursor->c_get(cursor, &dbKey, &dbData, DB_SET), _errorPrefix, "DBcursor->c_get");
+ }
+ catch(const DBNotFoundException&)
+ {
+ //
+ // Cleanup on failure.
+ //
+ cursor->c_close(cursor);
+ throw;
+ }
+
+ return new DBCursorI(_communicator, _name, cursor, true);
+}
+
void
Freeze::DBI::put(const Key& key, const Value& value)
{
diff --git a/cpp/src/Freeze/DBI.h b/cpp/src/Freeze/DBI.h
index 6afe2d63cc4..08987603dd9 100644
--- a/cpp/src/Freeze/DBI.h
+++ b/cpp/src/Freeze/DBI.h
@@ -83,6 +83,35 @@ private:
std::string _errorPrefix;
};
+class DBCursorI : public DBCursor, public JTCMutex
+{
+public:
+
+ DBCursorI(const ::Ice::CommunicatorPtr&, const std::string&, DBC*, bool);
+ ~DBCursorI();
+
+ virtual ::Ice::CommunicatorPtr getCommunicator();
+
+ virtual bool hasNext();
+ virtual void next(Key& key, Value& value);
+ virtual void remove();
+
+ virtual DBCursorPtr clone();
+ virtual void close();
+
+private:
+
+ ::Ice::CommunicatorPtr _communicator;
+ int _trace;
+ std::string _name;
+ std::string _errorPrefix;
+
+ bool _canRemove; // Can remove be called?
+ bool _hasCurrentValue; // Have we already verified that there is a next value?
+
+ DBC* _cursor;
+};
+
class DBI : public DB, public JTCMutex
{
public:
@@ -93,6 +122,9 @@ public:
virtual std::string getName();
virtual ::Ice::CommunicatorPtr getCommunicator();
+ virtual DBCursorPtr getCursor();
+ virtual DBCursorPtr getCursorForKey(const Key&);
+
virtual void put(const Key&, const Value&);
virtual Value get(const Key&);
virtual void del(const Key&);
diff --git a/cpp/src/IceStorm/Parser.cpp b/cpp/src/IceStorm/Parser.cpp
index f640e9890e9..3e40c537dd9 100644
--- a/cpp/src/IceStorm/Parser.cpp
+++ b/cpp/src/IceStorm/Parser.cpp
@@ -63,10 +63,6 @@ Parser::create(const list<string>& args)
try
{
- //
- // TODO: How is this supposed to work?
- //
- //for_each(args.begin(), args.end(), bind1st(IceUtil::memFun1(&TopicManager::create), _admin));
for (list<string>::const_iterator i = args.begin(); i != args.end() ; ++i)
{
_admin->create(*i);