summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/ReplicaCache.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-09-12 09:01:35 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-09-12 09:01:35 +0000
commitd89514da858be06f7e44129bcd6675a46181208e (patch)
tree662073ca1e8a9b6e56e9914087da66a824b88d6c /cpp/src/IceGrid/ReplicaCache.cpp
parentMinor Makefile tweak. (diff)
downloadice-d89514da858be06f7e44129bcd6675a46181208e.tar.bz2
ice-d89514da858be06f7e44129bcd6675a46181208e.tar.xz
ice-d89514da858be06f7e44129bcd6675a46181208e.zip
Fixes
Diffstat (limited to 'cpp/src/IceGrid/ReplicaCache.cpp')
-rw-r--r--cpp/src/IceGrid/ReplicaCache.cpp130
1 files changed, 105 insertions, 25 deletions
diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp
index e60fd1a21e8..8eb940fa669 100644
--- a/cpp/src/IceGrid/ReplicaCache.cpp
+++ b/cpp/src/IceGrid/ReplicaCache.cpp
@@ -88,24 +88,11 @@ ReplicaCache::remove(const string& name)
Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
out << "replica `" << name << "' down";
}
- }
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_waitForUpdatesMonitor);
- map<string, set<string> >::iterator p = _waitForUpdates.begin();
- while(p != _waitForUpdates.end())
- {
- p->second.erase(name);
- if(p->second.empty())
- {
- _waitForUpdates.erase(p++);
- _waitForUpdatesMonitor.notifyAll();
- }
- else
- {
- ++p;
- }
- }
+ //
+ // Remove the replica expected updates.
+ //
+ removeReplicaUpdates(name);
}
try
@@ -193,13 +180,17 @@ ReplicaCache::getEndpoints(const string& name, const Ice::ObjectPrx& proxy) cons
void
ReplicaCache::waitForUpdateReplication(const string& name, int serial)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_waitForUpdatesMonitor);
-
- vector<string> replicas = getAll("");
- if(replicas.empty())
+ Lock sync(*this);
+ if(_entries.empty())
{
return;
}
+
+ vector<string> replicas;
+ for(map<string, ReplicaEntryPtr>::const_iterator s = _entries.begin(); s != _entries.end(); ++s)
+ {
+ replicas.push_back(s->first);
+ }
ostringstream os;
os << name << "-" << serial;
@@ -215,19 +206,33 @@ ReplicaCache::waitForUpdateReplication(const string& name, int serial)
map<string, set<string> >::const_iterator p = _waitForUpdates.find(key);
if(p == _waitForUpdates.end())
{
+ map<string, map<string, string> >::iterator q = _updateFailures.find(key);
+ if(q != _updateFailures.end())
+ {
+ map<string, string> failures = q->second;
+ _updateFailures.erase(q);
+
+ ostringstream os;
+ for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r)
+ {
+ os << "replication failed on replica `" << r->first << "':\n" << r->second << "\n";
+ }
+ Ice::Error err(_traceLevels->logger);
+ err << os.str();
+ }
return;
}
else
{
- _waitForUpdatesMonitor.wait();
+ wait();
}
}
}
void
-ReplicaCache::replicaReceivedUpdate(const string& name, const string& update, int serial)
+ReplicaCache::replicaReceivedUpdate(const string& name, const string& update, int serial, const string& failure)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_waitForUpdatesMonitor);
+ Lock sync(*this);
ostringstream os;
os << update << "-" << serial;
@@ -237,12 +242,87 @@ ReplicaCache::replicaReceivedUpdate(const string& name, const string& update, in
if(p != _waitForUpdates.end())
{
p->second.erase(name);
+
+ if(!failure.empty())
+ {
+ map<string, map<string, string> >::iterator q = _updateFailures.find(key);
+ if(q == _updateFailures.end())
+ {
+ q = _updateFailures.insert(make_pair(key, map<string ,string>())).first;
+ }
+ q->second.insert(make_pair(name, failure));
+ }
+
if(p->second.empty())
{
_waitForUpdates.erase(p);
- _waitForUpdatesMonitor.notifyAll();
+ notifyAll();
+ }
+ }
+}
+
+void
+ReplicaCache::startApplicationReplication(const string& application, int revision)
+{
+ //
+ // Add the given application to the set of application being
+ // replicated.
+ //
+ Lock sync(*this);
+ _applicationReplication.insert(application);
+}
+
+
+void
+ReplicaCache::finishApplicationReplication(const string& application, int revision)
+{
+ //
+ // Notify waiting threads that the given application replication
+ // is completed.
+ //
+ Lock sync(*this);
+ _applicationReplication.erase(application);
+ notifyAll();
+}
+
+void
+ReplicaCache::waitForApplicationReplication(const string& application, int revision)
+{
+ //
+ // Wait for the given application to be replicated.
+ //
+ Lock sync(*this);
+ while(_applicationReplication.find(application) != _applicationReplication.end())
+ {
+ wait();
+ }
+}
+
+void
+ReplicaCache::removeReplicaUpdates(const string& name)
+{
+ // Must b called within the synchronization.
+
+ map<string, set<string> >::iterator p = _waitForUpdates.begin();
+ bool notifyMonitor = false;
+ while(p != _waitForUpdates.end())
+ {
+ p->second.erase(name);
+ if(p->second.empty())
+ {
+ _waitForUpdates.erase(p++);
+ notifyMonitor = true;
+ }
+ else
+ {
+ ++p;
}
}
+
+ if(notifyMonitor)
+ {
+ notifyAll();
+ }
}
ReplicaEntry::ReplicaEntry(const std::string& name, const ReplicaSessionIPtr& session) :