diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-09-12 09:01:35 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-09-12 09:01:35 +0000 |
commit | d89514da858be06f7e44129bcd6675a46181208e (patch) | |
tree | 662073ca1e8a9b6e56e9914087da66a824b88d6c /cpp/src/IceGrid/ReplicaCache.cpp | |
parent | Minor Makefile tweak. (diff) | |
download | ice-d89514da858be06f7e44129bcd6675a46181208e.tar.bz2 ice-d89514da858be06f7e44129bcd6675a46181208e.tar.xz ice-d89514da858be06f7e44129bcd6675a46181208e.zip |
Fixes
Diffstat (limited to 'cpp/src/IceGrid/ReplicaCache.cpp')
-rw-r--r-- | cpp/src/IceGrid/ReplicaCache.cpp | 130 |
1 files changed, 105 insertions, 25 deletions
diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp index e60fd1a21e8..8eb940fa669 100644 --- a/cpp/src/IceGrid/ReplicaCache.cpp +++ b/cpp/src/IceGrid/ReplicaCache.cpp @@ -88,24 +88,11 @@ ReplicaCache::remove(const string& name) Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat); out << "replica `" << name << "' down"; } - } - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_waitForUpdatesMonitor); - map<string, set<string> >::iterator p = _waitForUpdates.begin(); - while(p != _waitForUpdates.end()) - { - p->second.erase(name); - if(p->second.empty()) - { - _waitForUpdates.erase(p++); - _waitForUpdatesMonitor.notifyAll(); - } - else - { - ++p; - } - } + // + // Remove the replica expected updates. + // + removeReplicaUpdates(name); } try @@ -193,13 +180,17 @@ ReplicaCache::getEndpoints(const string& name, const Ice::ObjectPrx& proxy) cons void ReplicaCache::waitForUpdateReplication(const string& name, int serial) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_waitForUpdatesMonitor); - - vector<string> replicas = getAll(""); - if(replicas.empty()) + Lock sync(*this); + if(_entries.empty()) { return; } + + vector<string> replicas; + for(map<string, ReplicaEntryPtr>::const_iterator s = _entries.begin(); s != _entries.end(); ++s) + { + replicas.push_back(s->first); + } ostringstream os; os << name << "-" << serial; @@ -215,19 +206,33 @@ ReplicaCache::waitForUpdateReplication(const string& name, int serial) map<string, set<string> >::const_iterator p = _waitForUpdates.find(key); if(p == _waitForUpdates.end()) { + map<string, map<string, string> >::iterator q = _updateFailures.find(key); + if(q != _updateFailures.end()) + { + map<string, string> failures = q->second; + _updateFailures.erase(q); + + ostringstream os; + for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r) + { + os << "replication failed on replica `" << r->first << "':\n" << r->second << "\n"; + } + Ice::Error err(_traceLevels->logger); + err << os.str(); + } return; } else { - _waitForUpdatesMonitor.wait(); + wait(); } } } void -ReplicaCache::replicaReceivedUpdate(const string& name, const string& update, int serial) +ReplicaCache::replicaReceivedUpdate(const string& name, const string& update, int serial, const string& failure) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_waitForUpdatesMonitor); + Lock sync(*this); ostringstream os; os << update << "-" << serial; @@ -237,12 +242,87 @@ ReplicaCache::replicaReceivedUpdate(const string& name, const string& update, in if(p != _waitForUpdates.end()) { p->second.erase(name); + + if(!failure.empty()) + { + map<string, map<string, string> >::iterator q = _updateFailures.find(key); + if(q == _updateFailures.end()) + { + q = _updateFailures.insert(make_pair(key, map<string ,string>())).first; + } + q->second.insert(make_pair(name, failure)); + } + if(p->second.empty()) { _waitForUpdates.erase(p); - _waitForUpdatesMonitor.notifyAll(); + notifyAll(); + } + } +} + +void +ReplicaCache::startApplicationReplication(const string& application, int revision) +{ + // + // Add the given application to the set of application being + // replicated. + // + Lock sync(*this); + _applicationReplication.insert(application); +} + + +void +ReplicaCache::finishApplicationReplication(const string& application, int revision) +{ + // + // Notify waiting threads that the given application replication + // is completed. + // + Lock sync(*this); + _applicationReplication.erase(application); + notifyAll(); +} + +void +ReplicaCache::waitForApplicationReplication(const string& application, int revision) +{ + // + // Wait for the given application to be replicated. + // + Lock sync(*this); + while(_applicationReplication.find(application) != _applicationReplication.end()) + { + wait(); + } +} + +void +ReplicaCache::removeReplicaUpdates(const string& name) +{ + // Must b called within the synchronization. + + map<string, set<string> >::iterator p = _waitForUpdates.begin(); + bool notifyMonitor = false; + while(p != _waitForUpdates.end()) + { + p->second.erase(name); + if(p->second.empty()) + { + _waitForUpdates.erase(p++); + notifyMonitor = true; + } + else + { + ++p; } } + + if(notifyMonitor) + { + notifyAll(); + } } ReplicaEntry::ReplicaEntry(const std::string& name, const ReplicaSessionIPtr& session) : |