diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 194 |
1 files changed, 106 insertions, 88 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index e799b5fe532..77b6ae00542 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -14,26 +14,32 @@ #include <IceStorm/TraceLevels.h> #include <IceStorm/NodeI.h> #include <IceStorm/Observers.h> - #include <Ice/LoggerUtil.h> -#include <Freeze/Freeze.h> - #include <algorithm> using namespace std; using namespace IceStorm; using namespace IceStormElection; +#ifdef QTSQL +using namespace IceSQL; +#else +using namespace Freeze; +#endif + namespace { void -halt(const Ice::CommunicatorPtr& com, const Freeze::DatabaseException& ex) +halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex) { { Ice::Error error(com->getLogger()); error << "fatal exception: " << ex << "\n*** Aborting application ***"; +#ifdef __GNUC__ + error << "\n" << ex.ice_stackTrace(); +#endif } abort(); @@ -400,18 +406,19 @@ TopicImpl::TopicImpl( const InstancePtr& instance, const string& name, const Ice::Identity& id, - const SubscriberRecordSeq& subscribers) : + const SubscriberRecordSeq& subscribers, + const IceStorm::DatabaseCachePtr& databaseCache + ) : _instance(instance), _name(name), _id(id), - _connection(Freeze::createConnection(instance->communicator(), instance->serviceName())), - _subscriberMap(_connection, "subscribers"), - _llumap(_connection, "llu"), + _databaseCache(databaseCache), _destroyed(false) { try { __setNoDelete(true); + // TODO: If we want to improve the performance of the // non-replicated case we could allocate a null-topic impl here. _servant = new TopicI(this, instance); @@ -644,6 +651,8 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj) LogUpdate llu; + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); if(p != _subscribers.end()) { @@ -656,28 +665,28 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj) { try { - Freeze::TransactionHolder txn(_connection); + TransactionHolder txn(connection); + SubscriberRecordKey key; key.topic = _id; key.id = record.id; - SubscriberMap::iterator e = _subscriberMap.find(key); - if(e != _subscriberMap.end()) - { - _subscriberMap.erase(e); - } - LLUMap::iterator ci = _llumap.find("_manager"); - assert(ci != _llumap.end()); - llu = ci->second; + + SubscriberMapWrapper subscriberMap(_databaseCache, connection); + subscriberMap.erase(key); + + LLUWrapper llumap(_databaseCache, connection); + llu = llumap.get(); llu.iteration++; - ci.set(llu); + llumap.put(llu); + txn.commit(); break; } - catch(const Freeze::DeadlockException&) + catch(const DeadlockException&) { continue; } - catch(const Freeze::DatabaseException& ex) + catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); } @@ -692,25 +701,29 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj) { try { - Freeze::TransactionHolder txn(_connection); + TransactionHolder txn(connection); + SubscriberRecordKey key; key.topic = _id; key.id = subscriber->id(); - _subscriberMap.put(SubscriberMap::value_type(key, record)); + + SubscriberMapWrapper subscriberMap(_databaseCache, connection); + subscriberMap.put(key, record); + // Update the LLU. - LLUMap::iterator ci = _llumap.find("_manager"); - assert(ci != _llumap.end()); - llu = ci->second; + LLUWrapper llumap(_databaseCache, connection); + llu = llumap.get(); llu.iteration++; - ci.set(llu); + llumap.put(llu); + txn.commit(); break; } - catch(const Freeze::DeadlockException&) + catch(const DeadlockException&) { continue; } - catch(const Freeze::DatabaseException& ex) + catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); } @@ -770,26 +783,29 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj) { try { - Freeze::TransactionHolder txn(_connection); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txn(connection); SubscriberRecordKey key; key.topic = _id; key.id = subscriber->id(); - _subscriberMap.put(SubscriberMap::value_type(key, record)); - LLUMap::iterator ci = _llumap.find("_manager"); - assert(ci != _llumap.end()); - llu = ci->second; + SubscriberMapWrapper subscriberMap(_databaseCache, connection); + subscriberMap.put(key, record); + + LLUWrapper llumap(_databaseCache, connection); + llu = llumap.get(); llu.iteration++; - ci.set(llu); + llumap.put(llu); + txn.commit(); break; } - catch(const Freeze::DeadlockException&) + catch(const DeadlockException&) { continue; } - catch(const Freeze::DatabaseException& ex) + catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); } @@ -889,26 +905,29 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost) { try { - Freeze::TransactionHolder txn(_connection); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txn(connection); SubscriberRecordKey key; key.topic = _id; key.id = id; - _subscriberMap.put(SubscriberMap::value_type(key, record)); - LLUMap::iterator ci = _llumap.find("_manager"); - assert(ci != _llumap.end()); - llu = ci->second; + SubscriberMapWrapper subscriberMap(_databaseCache, connection); + subscriberMap.put(key, record); + + LLUWrapper llumap(_databaseCache, connection); + llu = llumap.get(); llu.iteration++; - ci.set(llu); + llumap.put(llu); + txn.commit(); break; } - catch(const Freeze::DeadlockException&) + catch(const DeadlockException&) { continue; } - catch(const Freeze::DatabaseException& ex) + catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); } @@ -1283,25 +1302,28 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r { try { - Freeze::TransactionHolder txn(_connection); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txn(connection); SubscriberRecordKey key; key.topic = _id; key.id = subscriber->id(); - _subscriberMap.put(SubscriberMap::value_type(key, record)); + + SubscriberMapWrapper subscriberMap(_databaseCache, connection); + subscriberMap.put(key, record); // Update the LLU. - LLUMap::iterator ci = _llumap.find("_manager"); - assert(ci != _llumap.end()); - ci.set(llu); + LLUWrapper llumap(_databaseCache, connection); + llumap.put(llu); + txn.commit(); break; } - catch(const Freeze::DeadlockException&) + catch(const DeadlockException&) { continue; } - catch(const Freeze::DatabaseException& ex) + catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); } @@ -1349,29 +1371,30 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq { try { - Freeze::TransactionHolder txn(_connection); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txn(connection); + for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) { SubscriberRecordKey key; key.topic = _id; key.id = *id; - SubscriberMap::iterator e = _subscriberMap.find(key); - if(e != _subscriberMap.end()) - { - _subscriberMap.erase(e); - } + + SubscriberMapWrapper subscriberMap(_databaseCache, connection); + subscriberMap.erase(key); } - LLUMap::iterator ci = _llumap.find("_manager"); - assert(ci != _llumap.end()); - ci.set(llu); + + LLUWrapper llumap(_databaseCache, connection); + llumap.put(llu); + txn.commit(); break; } - catch(const Freeze::DeadlockException&) + catch(const DeadlockException&) { continue; } - catch(const Freeze::DatabaseException& ex) + catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); } @@ -1424,38 +1447,34 @@ TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master) { try { - SubscriberRecordKey key; - key.topic = _id; + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txn(connection); - Freeze::TransactionHolder txn(_connection); // Erase all subscriber records and the topic record. - SubscriberMap::iterator p = _subscriberMap.find(key); - while(p != _subscriberMap.end() && p->first.topic == key.topic) - { - _subscriberMap.erase(p++); - } + SubscriberMapWrapper subscriberMap(_databaseCache, connection); + subscriberMap.eraseTopic(_id); // Update the LLU. - LLUMap::iterator ci = _llumap.find("_manager"); - assert(ci != _llumap.end()); + LLUWrapper llumap(_databaseCache, connection); if(master) { - llu = ci->second; + llu = llumap.get(); llu.iteration++; } else { llu = origLLU; } - ci.set(llu); + llumap.put(llu); + txn.commit(); break; } - catch(const Freeze::DeadlockException&) + catch(const DeadlockException&) { continue; } - catch(const Freeze::DatabaseException& ex) + catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); } @@ -1503,33 +1522,32 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids) { try { - Freeze::TransactionHolder txn(_connection); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txn(connection); for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) { SubscriberRecordKey key; key.topic = _id; key.id = *id; - SubscriberMap::iterator e = _subscriberMap.find(key); - if(e != _subscriberMap.end()) - { - _subscriberMap.erase(e); - } + + SubscriberMapWrapper subscriberMap(_databaseCache, connection); + subscriberMap.erase(key); } - LLUMap::iterator ci = _llumap.find("_manager"); - assert(ci != _llumap.end()); - llu = ci->second; + LLUWrapper llumap(_databaseCache, connection); + llu = llumap.get(); llu.iteration++; - ci.set(llu); + llumap.put(llu); + txn.commit(); break; } - catch(const Freeze::DeadlockException&) + catch(const DeadlockException&) { continue; } - catch(const Freeze::DatabaseException& ex) + catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); } |