diff options
author | Matthew Newhook <matthew@zeroc.com> | 2001-11-30 18:40:03 +0000 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2001-11-30 18:40:03 +0000 |
commit | 8ba83da8375d13409786c8a7829311114a69cac3 (patch) | |
tree | 9bf020000d7bbdff4760676ffb413784f88ca7b8 /cpp/src | |
parent | ice_invoke (diff) | |
download | ice-8ba83da8375d13409786c8a7829311114a69cac3.tar.bz2 ice-8ba83da8375d13409786c8a7829311114a69cac3.tar.xz ice-8ba83da8375d13409786c8a7829311114a69cac3.zip |
Added DBCursor to the Freeze module.
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Freeze/DBI.cpp | 314 | ||||
-rw-r--r-- | cpp/src/Freeze/DBI.h | 32 | ||||
-rw-r--r-- | cpp/src/IceStorm/Parser.cpp | 4 |
3 files changed, 346 insertions, 4 deletions
diff --git a/cpp/src/Freeze/DBI.cpp b/cpp/src/Freeze/DBI.cpp index 6eae037d8b4..f70b7d44c36 100644 --- a/cpp/src/Freeze/DBI.cpp +++ b/cpp/src/Freeze/DBI.cpp @@ -294,6 +294,248 @@ Freeze::DBTransactionI::abort() _tid = 0; } +DBCursorI::DBCursorI(const ::Ice::CommunicatorPtr& communicator, const std::string& name, DBC* cursor, + bool hasCurrentValue) : + _communicator(communicator), + _name(name), + _canRemove(false), + _hasCurrentValue(hasCurrentValue), + _cursor(cursor) +{ + PropertiesPtr properties = _communicator->getProperties(); + string value; + + value = properties->getProperty("Freeze.Trace.DBCursor"); + if (!value.empty()) + { + _trace = atoi(value.c_str()); + } + + _errorPrefix = "Freeze::DBCursor(\"" + _name += "\"): "; + + if (_trace >= 1) + { + ostringstream s; + s << "creating cursor for \"" << _name << "\""; + _communicator->getLogger()->trace("DB", s.str()); + } +} + +DBCursorI::~DBCursorI() +{ + if (_cursor != 0) + { + ostringstream s; + s << _errorPrefix << "\"" << _name << "\" has not been closed"; + _communicator->getLogger()->warning(s.str()); + } +} + +Ice::CommunicatorPtr +DBCursorI::getCommunicator() +{ + // immutable + return _communicator; +} + +bool +DBCursorI::hasNext() +{ + JTCSyncT<JTCMutex> sync(*this); + + if (!_cursor) + { + ostringstream s; + s << _errorPrefix << "\"" << _name << "\" has been closed"; + DBException ex; + ex.message = s.str(); + throw ex; + } + + // + // Note that the reads are partial reads since this method only + // verifies that there is a next value + // + DBT dbKey, dbData; + memset(&dbKey, 0, sizeof(dbKey)); + dbKey.flags = DB_DBT_PARTIAL; + + memset(&dbData, 0, sizeof(dbData)); + dbData.flags = DB_DBT_PARTIAL; + + // + // If we've already verified that there is a next record then + // verify that the current record still exists. + // + if (_hasCurrentValue) + { + try + { + checkBerkeleyDBReturn(_cursor->c_get(_cursor, &dbKey, &dbData, DB_CURRENT), _errorPrefix, + "DBcursor->c_get");\ + } + catch(const DBNotFoundException&) + { + // + // There is no next record. + // + return false; + } + return true; + } + + // + // Otherwise, move to the next record. + // + try + { + checkBerkeleyDBReturn(_cursor->c_get(_cursor, &dbKey, &dbData, DB_NEXT), _errorPrefix, + "DBcursor->c_get");\ + } + catch(const DBNotFoundException&) + { + // + // There is no next record + // + return false; + } + + // + // We now have a current value. + // + _hasCurrentValue = true; + return true; +} + +void +DBCursorI::next(Key& key, Value& value) +{ + JTCSyncT<JTCMutex> sync(*this); + + if (!_cursor) + { + ostringstream s; + s << _errorPrefix << "\"" << _name << "\" has been closed"; + DBException ex; + ex.message = s.str(); + throw ex; + } + + DBT dbKey, dbData; + memset(&dbKey, 0, sizeof(dbKey)); + memset(&dbData, 0, sizeof(dbData)); + + u_int32_t getFlags; + string desc; + + // + // Do we need to move to the next record? + // + if (!_hasCurrentValue) + { + getFlags = DB_NEXT; + desc = "next"; + } + else + { + getFlags = DB_CURRENT; + desc = "current"; + } + + if (_trace >= 1) + { + ostringstream s; + s << "reading " << desc << " value from database \"" << _name << "\""; + _communicator->getLogger()->trace("DBCursor", s.str()); + } + + checkBerkeleyDBReturn(_cursor->c_get(_cursor, &dbKey, &dbData, getFlags), _errorPrefix, "DBcursor->c_get"); + + // + // Copy the data from the read key & data + // + key = Key(static_cast<Byte*>(dbKey.data), static_cast<Byte*>(dbKey.data) + dbKey.size); + value = Value(static_cast<Byte*>(dbData.data), static_cast<Byte*>(dbData.data) + dbData.size); + + _canRemove = true; + _hasCurrentValue = false; +} + +void +DBCursorI::remove() +{ + JTCSyncT<JTCMutex> sync(*this); + + if (!_cursor) + { + ostringstream s; + s << _errorPrefix << "\"" << _name << "\" has been closed"; + DBException ex; + ex.message = s.str(); + throw ex; + } + + if (!_canRemove) + { + DBNotFoundException ex; + ex.message = "The next method has not yet been called, or the remove method has already been called " + "after the last call to the next method."; + throw ex; + } + + if (_trace >= 1) + { + ostringstream s; + s << "deleting value from database \"" << _name << "\""; + _communicator->getLogger()->trace("DBCursor", s.str()); + } + + checkBerkeleyDBReturn( _cursor->c_del(_cursor, 0), _errorPrefix, "DBcursor->c_del"); + + _hasCurrentValue = false; + _canRemove = false; +} + +DBCursorPtr +DBCursorI::clone() +{ + JTCSyncT<JTCMutex> sync(*this); + + if (!_cursor) + { + ostringstream s; + s << _errorPrefix << "\"" << _name << "\" has been closed"; + DBException ex; + ex.message = s.str(); + throw ex; + } + + DBC* cursor; + _cursor->c_dup(_cursor, &cursor, DB_POSITION); + return new DBCursorI(_communicator, _name, cursor, _hasCurrentValue); +} + +void +DBCursorI::close() +{ + JTCSyncT<JTCMutex> sync(*this); + + if (!_cursor) + { + return; + } + + if (_trace >= 1) + { + ostringstream s; + s << "closing cursor \"" << _name << "\""; + _communicator->getLogger()->trace("DBCursor", s.str()); + } + + _cursor->c_close(_cursor); + _cursor = 0; +} + Freeze::DBI::DBI(const CommunicatorPtr& communicator, const DBEnvironmentIPtr& dbEnvObj, ::DB* db, const string& name) : _communicator(communicator), @@ -350,6 +592,78 @@ Freeze::DBI::getCommunicator() return _communicator; } +DBCursorPtr +Freeze::DBI::getCursor() +{ + JTCSyncT<JTCMutex> sync(*this); + + if (!_db) + { + ostringstream s; + s << _errorPrefix << "\"" << _name << "\" has been closed"; + DBException ex; + ex.message = s.str(); + throw ex; + } + + DBC* cursor; + + checkBerkeleyDBReturn(_db->cursor(_db, 0, &cursor, 0), _errorPrefix, "DB->cursor"); + + return new DBCursorI(_communicator, _name, cursor, false); +} + +DBCursorPtr +Freeze::DBI::getCursorForKey(const Key& key) +{ + JTCSyncT<JTCMutex> sync(*this); + + if (!_db) + { + ostringstream s; + s << _errorPrefix << "\"" << _name << "\" has been closed"; + DBException ex; + ex.message = s.str(); + throw ex; + } + + DBC* cursor; + + checkBerkeleyDBReturn(_db->cursor(_db, 0, &cursor, 0), _errorPrefix, "DB->cursor"); + + // + // Move to the requested record + // + DBT dbKey; + memset(&dbKey, 0, sizeof(dbKey)); + + // + // Note that the read of the data is partial (that is the data + // will not actually be read into memory since it isn't needed + // yet). + // + DBT dbData; + memset(&dbData, 0, sizeof(dbData)); + dbData.flags = DB_DBT_PARTIAL; + + dbKey.data = const_cast<void*>(static_cast<const void*>(key.begin())); + dbKey.size = key.size(); + try + { + checkBerkeleyDBReturn(cursor->c_get(cursor, &dbKey, &dbData, DB_SET), _errorPrefix, "DBcursor->c_get"); + } + catch(const DBNotFoundException&) + { + // + // Cleanup on failure. + // + cursor->c_close(cursor); + throw; + } + + return new DBCursorI(_communicator, _name, cursor, true); +} + void Freeze::DBI::put(const Key& key, const Value& value) { diff --git a/cpp/src/Freeze/DBI.h b/cpp/src/Freeze/DBI.h index 6afe2d63cc4..08987603dd9 100644 --- a/cpp/src/Freeze/DBI.h +++ b/cpp/src/Freeze/DBI.h @@ -83,6 +83,35 @@ private: std::string _errorPrefix; }; +class DBCursorI : public DBCursor, public JTCMutex +{ +public: + + DBCursorI(const ::Ice::CommunicatorPtr&, const std::string&, DBC*, bool); + ~DBCursorI(); + + virtual ::Ice::CommunicatorPtr getCommunicator(); + + virtual bool hasNext(); + virtual void next(Key& key, Value& value); + virtual void remove(); + + virtual DBCursorPtr clone(); + virtual void close(); + +private: + + ::Ice::CommunicatorPtr _communicator; + int _trace; + std::string _name; + std::string _errorPrefix; + + bool _canRemove; // Can remove be called? + bool _hasCurrentValue; // Have we already verified that there is a next value? + + DBC* _cursor; +}; + class DBI : public DB, public JTCMutex { public: @@ -93,6 +122,9 @@ public: virtual std::string getName(); virtual ::Ice::CommunicatorPtr getCommunicator(); + virtual DBCursorPtr getCursor(); + virtual DBCursorPtr getCursorForKey(const Key&); + virtual void put(const Key&, const Value&); virtual Value get(const Key&); virtual void del(const Key&); diff --git a/cpp/src/IceStorm/Parser.cpp b/cpp/src/IceStorm/Parser.cpp index f640e9890e9..3e40c537dd9 100644 --- a/cpp/src/IceStorm/Parser.cpp +++ b/cpp/src/IceStorm/Parser.cpp @@ -63,10 +63,6 @@ Parser::create(const list<string>& args) try { - // - // TODO: How is this supposed to work? - // - //for_each(args.begin(), args.end(), bind1st(IceUtil::memFun1(&TopicManager::create), _admin)); for (list<string>::const_iterator i = args.begin(); i != args.end() ; ++i) { _admin->create(*i); |