summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/IceGrid/AdapterCache.cpp37
-rw-r--r--cpp/src/IceGrid/AdapterCache.h8
-rw-r--r--cpp/src/IceGrid/Allocatable.cpp141
-rw-r--r--cpp/src/IceGrid/Allocatable.h13
-rw-r--r--cpp/src/IceGrid/ObjectCache.cpp49
-rw-r--r--cpp/src/IceGrid/ObjectCache.h5
-rw-r--r--cpp/src/IceGrid/ReapThread.cpp64
-rw-r--r--cpp/src/IceGrid/ServerCache.cpp5
-rw-r--r--cpp/src/IceGrid/ServerCache.h2
-rw-r--r--cpp/src/IceGrid/SessionI.cpp15
-rw-r--r--cpp/src/IceGrid/SessionI.h4
-rw-r--r--cpp/test/IceGrid/allocation/AllTests.cpp340
-rw-r--r--cpp/test/IceGrid/allocation/application.xml23
13 files changed, 541 insertions, 165 deletions
diff --git a/cpp/src/IceGrid/AdapterCache.cpp b/cpp/src/IceGrid/AdapterCache.cpp
index 0366997fc1e..8e1fbdd0d48 100644
--- a/cpp/src/IceGrid/AdapterCache.cpp
+++ b/cpp/src/IceGrid/AdapterCache.cpp
@@ -177,8 +177,7 @@ AdapterCache::removeImpl(const string& id)
return Cache<string, AdapterEntry>::removeImpl(id);
}
-AdapterEntry::AdapterEntry(AdapterCache& cache, const string& id, bool allocatable, const AllocatablePtr& parent) :
- Allocatable(allocatable, parent),
+AdapterEntry::AdapterEntry(AdapterCache& cache, const string& id) :
_cache(cache),
_id(id)
{
@@ -195,30 +194,35 @@ ServerAdapterEntry::ServerAdapterEntry(AdapterCache& cache,
const string& replicaGroupId,
bool allocatable,
const ServerEntryPtr& server) :
- AdapterEntry(cache, id, allocatable, server),
+ AdapterEntry(cache, id),
+ Allocatable(allocatable, server),
_replicaGroupId(replicaGroupId),
_server(server)
{
}
vector<pair<string, AdapterPrx> >
-ServerAdapterEntry::getProxies(int& nReplicas, const SessionIPtr& session)
+ServerAdapterEntry::getProxies(int& nReplicas, const SessionIPtr&)
{
vector<pair<string, AdapterPrx> > adapters;
try
{
nReplicas = 1;
- if(allocatable())
- {
- if(session == getSession())
- {
- adapters.push_back(make_pair(_id, getProxy()));
- }
- }
- else
- {
+ //
+ // TODO: Remove this code if we really don't want to check the
+ // session for allocatable adapters.
+ //
+// if(allocatable())
+// {
+// if(session == getSession())
+// {
+// adapters.push_back(make_pair(_id, getProxy()));
+// }
+// }
+// else
+// {
adapters.push_back(make_pair(_id, getProxy()));
- }
+// }
}
catch(const NodeUnreachableException&)
{
@@ -288,7 +292,7 @@ ServerAdapterEntry::getServer() const
return _server;
}
-void
+bool
ServerAdapterEntry::allocated(const SessionIPtr& session)
{
TraceLevelsPtr traceLevels = _cache.getTraceLevels();
@@ -297,6 +301,7 @@ ServerAdapterEntry::allocated(const SessionIPtr& session)
Ice::Trace out(traceLevels->logger, traceLevels->adapterCat);
out << "adapter `" << _id << "' allocated by `" << session->getUserId() << "' (" << _count << ")";
}
+ return true;
}
void
@@ -314,7 +319,7 @@ ReplicaGroupEntry::ReplicaGroupEntry(AdapterCache& cache,
const string& id,
const string& application,
const LoadBalancingPolicyPtr& policy) :
- AdapterEntry(cache, id, false, 0),
+ AdapterEntry(cache, id),
_application(application),
_lastReplica(0)
{
diff --git a/cpp/src/IceGrid/AdapterCache.h b/cpp/src/IceGrid/AdapterCache.h
index 8a70c23ddca..be7203955de 100644
--- a/cpp/src/IceGrid/AdapterCache.h
+++ b/cpp/src/IceGrid/AdapterCache.h
@@ -29,11 +29,11 @@ typedef std::vector<ServerEntryPtr> ServerEntrySeq;
class AdapterEntry;
typedef IceUtil::Handle<AdapterEntry> AdapterEntryPtr;
-class AdapterEntry : public Allocatable, public IceUtil::Mutex
+class AdapterEntry : virtual public IceUtil::Shared, public IceUtil::Mutex
{
public:
- AdapterEntry(AdapterCache&, const std::string&, bool, const AllocatablePtr&);
+ AdapterEntry(AdapterCache&, const std::string&);
virtual std::vector<std::pair<std::string, AdapterPrx> > getProxies(int&, const SessionIPtr&) = 0;
virtual float getLeastLoadedNodeLoad(LoadSample) const = 0;
@@ -49,7 +49,7 @@ protected:
};
typedef IceUtil::Handle<AdapterEntry> AdapterEntryPtr;
-class ServerAdapterEntry : public AdapterEntry
+class ServerAdapterEntry : public AdapterEntry, public Allocatable
{
public:
@@ -63,7 +63,7 @@ public:
AdapterPrx getProxy(const std::string& = std::string()) const;
- virtual void allocated(const SessionIPtr&);
+ virtual bool allocated(const SessionIPtr&);
virtual void released(const SessionIPtr&);
private:
diff --git a/cpp/src/IceGrid/Allocatable.cpp b/cpp/src/IceGrid/Allocatable.cpp
index 21d81cc06ce..3149f97b996 100644
--- a/cpp/src/IceGrid/Allocatable.cpp
+++ b/cpp/src/IceGrid/Allocatable.cpp
@@ -29,12 +29,18 @@ AllocationRequest::pending()
canceled(AllocationTimeoutException());
return false;
}
- else if(_timeout > 0)
+ else if(!_session->addAllocationRequest(this))
+ {
+ _state = Canceled;
+ canceled(AllocationException("session destroyed"));
+ return false;
+ }
+
+ if(_timeout > 0)
{
_session->getWaitQueue()->add(this, IceUtil::Time::milliSeconds(_timeout));
}
_state = Pending;
- _session->addAllocationRequest(this);
return true;
}
@@ -71,12 +77,16 @@ AllocationRequest::finish(const AllocatablePtr& allocatable, const SessionIPtr&
canceled(AllocationException("already allocated by the session"));
return false;
}
- else
+ else if(allocated(allocatable, _session))
{
_state = Allocated;
- allocated(allocatable, _session);
return true;
}
+ else
+ {
+ _state = Canceled;
+ return false;
+ }
}
void
@@ -124,6 +134,13 @@ AllocationRequest::expired(bool destroyed)
}
bool
+AllocationRequest::isCanceled() const
+{
+ Lock sync(*this);
+ return _state == Canceled;
+}
+
+bool
AllocationRequest::operator<(const AllocationRequest& r) const
{
return this < &r;
@@ -144,17 +161,22 @@ ParentAllocationRequest::ParentAllocationRequest(const AllocationRequestPtr& req
{
}
-void
+bool
ParentAllocationRequest::allocated(const AllocatablePtr& allocatable, const SessionIPtr& session)
{
try
{
- _allocatable->allocate(_request, false);
- assert(_allocatable->getSession() == _request->getSession());
+ if(_allocatable->allocate(_request, false))
+ {
+ assert(_allocatable->getSession() == _request->getSession());
+ return true;
+ }
+ return false;
}
catch(const AllocationException& ex)
{
_request->cancel(ex);
+ return false;
}
}
@@ -176,7 +198,7 @@ Allocatable::~Allocatable()
{
}
-void
+bool
Allocatable::allocate(const AllocationRequestPtr& request, bool checkParent)
{
IceUtil::RecMutex::Lock sync(_allocateMutex);
@@ -184,20 +206,19 @@ Allocatable::allocate(const AllocationRequestPtr& request, bool checkParent)
{
throw NotAllocatableException("not allocatable");
}
-
- if(_session == request->getSession())
+ else if(_session == request->getSession())
{
if(request->finish(this, _session))
{
++_count;
+ return true; // Allocated
}
- return;
+ return false;
}
if(_parent && checkParent)
{
- _parent->allocate(new ParentAllocationRequest(request, this), true);
- return;
+ return _parent->allocate(new ParentAllocationRequest(request, this), true);
}
if(_session)
@@ -207,13 +228,14 @@ Allocatable::allocate(const AllocationRequestPtr& request, bool checkParent)
_requests.push_back(request);
}
}
- else if(request->finish(this, _session))
+ else if(request->finish(this, _session) && allocated(request->getSession()))
{
assert(_count == 0);
_session = request->getSession();
++_count;
- allocated(_session);
+ return true; // Allocated
}
+ return false;
}
bool
@@ -223,56 +245,99 @@ Allocatable::tryAllocateWithSession(const SessionIPtr& session, const Allocatabl
assert(_allocatable);
if(_session && _session != session)
{
- _attempts.insert(child); // Remember the allocation attempts of a child
+ //
+ // The allocatable is already allocated by another session.
+ //
+ // We keep track of the allocation attempt of a child. E.g.:
+ // if a session tries to allocate an object and it can't be
+ // allocated because the adapter is already allocated, we keep
+ // track of the object here. This will be used by release() to
+ // notify the object cache when the adapter is released that
+ // the object is potentially available.
+ //
+ _attempts.insert(child);
return false;
}
else if(_session == session)
{
+ //
+ // The allocatable is allocated by this session, we just
+ // increment the allocation count and return a true to
+ // indicate a successfull allocation.
+ //
++_count;
return true;
}
+ //
+ // This allocatable isn't allocated, so we now have to check if
+ // the parent is allocated or not. If the allocation of the parent
+ // is succsefull (returns "true"), we allocate this allocatable.
+ //
if(_parent && !_parent->tryAllocateWithSession(session, child))
{
return false;
}
- assert(_count == 0);
-
- _session = session;
- ++_count;
- allocated(_session);
- return true;
+ if(allocated(session))
+ {
+ assert(_count == 0);
+ _session = session;
+ ++_count;
+ return true; // Successfull allocation
+ }
+ return false;
}
bool
Allocatable::tryAllocate(const AllocationRequestPtr& request)
{
IceUtil::RecMutex::Lock sync(_allocateMutex);
-
+
//
- // If not allocatable or already allocated or if the parent is
- // allocated by a session other than the session from the given
- // request, we can't allocate the allocatable.
+ // If not allocatable or already allocated, the allocation attempt
+ // fails.
//
if(!_allocatable || _session)
{
return false;
}
-
+
+ //
+ // Try to allocate the parent. This should succeed if the parent
+ // is not already allocated or if it's already allocated by the
+ // session.
+ //
if(_parent && !_parent->tryAllocateWithSession(request->getSession(), this))
{
return false;
}
-
- if(request->finish(this, _session))
+
+ //
+ // The parent could be allocated, we allocate this allocatable.
+ //
+ if(request->finish(this, _session) && allocated(request->getSession()))
{
assert(_count == 0);
_session = request->getSession();
++_count;
- allocated(_session);
+ return true; // The allocatable was allocated.
+ }
+
+ //
+ // If we reach here, either the request was canceled or the session
+ // destroyed. If that's the case, we need to release the parent.
+ //
+ if(_parent)
+ {
+ set<AllocatablePtr> releasedAllocatables;
+ if(_parent->release(request->getSession(), false, releasedAllocatables))
+ {
+ assert(releasedAllocatables.empty());
+ }
}
- return true; // The allocatable was allocated or the request was canceled.
+
+ return true; // The request was canceled.
}
bool
@@ -286,10 +351,15 @@ bool
Allocatable::release(const SessionIPtr& session, bool all, set<AllocatablePtr>& releasedAllocatables)
{
IceUtil::RecMutex::Lock sync(_allocateMutex);
- if(!_session || _session != session)
+ if(!_allocatable)
+ {
+ throw NotAllocatableException("not allocatable");
+ }
+ else if(!_session || _session != session)
{
throw AllocationException("can't release object which is not allocated");
}
+
if(!all && --_count)
{
return false;
@@ -313,11 +383,10 @@ Allocatable::release(const SessionIPtr& session, bool all, set<AllocatablePtr>&
{
AllocationRequestPtr request = _requests.front();
_requests.pop_front();
- if(request->finish(this, _session))
+ if(request->finish(this, _session) && allocated(request->getSession()))
{
_session = request->getSession();
++_count;
- allocated(_session);
//
// Check if there's other requests from the session
@@ -339,14 +408,14 @@ Allocatable::release(const SessionIPtr& session, bool all, set<AllocatablePtr>&
++p;
}
}
- return false;
+ return false; // Not released yet!
}
}
}
releasedAllocatables.insert(_attempts.begin(), _attempts.end());
_attempts.clear();
- return true;
+ return true; // The allocatable is released.
}
bool
diff --git a/cpp/src/IceGrid/Allocatable.h b/cpp/src/IceGrid/Allocatable.h
index f12ecb06658..6ff439948d8 100644
--- a/cpp/src/IceGrid/Allocatable.h
+++ b/cpp/src/IceGrid/Allocatable.h
@@ -36,7 +36,7 @@ public:
virtual ~AllocationRequest();
- virtual void allocated(const AllocatablePtr&, const SessionIPtr&) = 0;
+ virtual bool allocated(const AllocatablePtr&, const SessionIPtr&) = 0;
virtual void canceled(const AllocationException&) = 0;
virtual bool allocateOnce() { return false; }
@@ -48,6 +48,7 @@ public:
int getTimeout() const { return _timeout; }
const SessionIPtr& getSession() const { return _session; }
+ bool isCanceled() const;
bool operator<(const AllocationRequest&) const;
@@ -77,7 +78,7 @@ public:
ParentAllocationRequest(const AllocationRequestPtr&, const AllocatablePtr&);
- virtual void allocated(const AllocatablePtr&, const SessionIPtr&);
+ virtual bool allocated(const AllocatablePtr&, const SessionIPtr&);
virtual void canceled(const AllocationException&);
private:
@@ -86,14 +87,14 @@ private:
const AllocatablePtr _allocatable;
};
-class Allocatable : public IceUtil::Shared
+class Allocatable : virtual public IceUtil::Shared
{
public:
Allocatable(bool, const AllocatablePtr&);
virtual ~Allocatable();
- virtual void allocate(const AllocationRequestPtr&, bool = true);
+ virtual bool allocate(const AllocationRequestPtr&, bool = true);
virtual bool tryAllocate(const AllocationRequestPtr&);
virtual bool release(const SessionIPtr&);
@@ -101,8 +102,8 @@ public:
bool isAllocated() const;
SessionIPtr getSession() const;
- virtual void allocated(const SessionIPtr&) { }
- virtual void released(const SessionIPtr&) { }
+ virtual bool allocated(const SessionIPtr&) = 0;
+ virtual void released(const SessionIPtr&) = 0;
bool operator<(const Allocatable&) const;
diff --git a/cpp/src/IceGrid/ObjectCache.cpp b/cpp/src/IceGrid/ObjectCache.cpp
index bf91f458d18..bb0ab56432a 100644
--- a/cpp/src/IceGrid/ObjectCache.cpp
+++ b/cpp/src/IceGrid/ObjectCache.cpp
@@ -40,12 +40,18 @@ ObjectCache::TypeEntry::TypeEntry(ObjectCache& cache) : _cache(cache)
void
ObjectCache::TypeEntry::add(const Ice::ObjectPrx& obj)
{
+ //
+ // No mutex protection here, this is called with the cache locked.
+ //
_objects.insert(lower_bound(_objects.begin(), _objects.end(), obj, ::Ice::proxyIdentityLess), obj);
}
bool
ObjectCache::TypeEntry::remove(const Ice::ObjectPrx& obj)
{
+ //
+ // No mutex protection here, this is called with the cache locked.
+ //
Ice::ObjectProxySeq::iterator q = lower_bound(_objects.begin(), _objects.end(), obj, ::Ice::proxyIdentityLess);
assert((*q)->ice_getIdentity() == obj->ice_getIdentity());
_objects.erase(q);
@@ -55,6 +61,9 @@ ObjectCache::TypeEntry::remove(const Ice::ObjectPrx& obj)
void
ObjectCache::TypeEntry::addAllocationRequest(const ObjectAllocationRequestPtr& request)
{
+ //
+ // No mutex protection here, this is called with the cache locked.
+ //
if(request->pending())
{
_requests.push_back(request);
@@ -64,6 +73,9 @@ ObjectCache::TypeEntry::addAllocationRequest(const ObjectAllocationRequestPtr& r
void
ObjectCache::TypeEntry::released(const ObjectEntryPtr& entry)
{
+ //
+ // No mutex protection here, this is called with the cache locked.
+ //
while(!_requests.empty() && !entry->isAllocated())
{
if(entry->tryAllocate(_requests.front()))
@@ -220,13 +232,15 @@ ObjectCache::allocateByTypeOnLeastLoadedNode(const string& type,
void
ObjectCache::released(const ObjectEntryPtr& entry)
{
+ //
+ // Notify the type entry that an object was released.
+ //
Lock sync(*this);
map<string, TypeEntry>::iterator p = _types.find(entry->getType());
if(p == _types.end())
{
return;
}
-
p->second.released(entry);
}
@@ -244,7 +258,7 @@ ObjectCache::getObjectsByType(const string& type)
for(Ice::ObjectProxySeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
{
ObjectEntryPtr entry = getImpl((*q)->ice_getIdentity());
- if(!entry->allocatable())
+ if(!entry->allocatable()) // Only return non-allocatable objects.
{
proxies.push_back(*q);
}
@@ -298,16 +312,20 @@ ObjectEntry::ObjectEntry(ObjectCache& cache,
}
Ice::ObjectPrx
-ObjectEntry::getProxy(const SessionIPtr& session) const
+ObjectEntry::getProxy(const SessionIPtr&) const
{
- if(allocatable())
- {
- return getSession() == session ? _info.proxy : Ice::ObjectPrx();
- }
- else
- {
- return _info.proxy;
- }
+ //
+ // TODO: Remove this code if we really don't want to check the
+ // session for allocatable objects.
+ //
+// if(allocatable())
+// {
+// return getSession() == session ? _info.proxy : Ice::ObjectPrx();
+// }
+// else
+// {
+ return _info.proxy;
+// }
}
string
@@ -363,14 +381,17 @@ ObjectEntry::release(const SessionIPtr& session)
return false;
}
-void
+bool
ObjectEntry::allocated(const SessionIPtr& session)
{
//
// Add the object allocation to the session. The object will be
// released once the session is destroyed.
//
- session->addAllocation(this);
+ if(!session->addAllocation(this))
+ {
+ return false;
+ }
TraceLevelsPtr traceLevels = _cache.getTraceLevels();
if(traceLevels && traceLevels->object > 1)
@@ -379,6 +400,8 @@ ObjectEntry::allocated(const SessionIPtr& session)
const Ice::Identity id = _info.proxy->ice_getIdentity();
out << "object `" << id << "' allocated by `" << session->getUserId() << "' (" << _count << ")";
}
+
+ return true;
}
void
diff --git a/cpp/src/IceGrid/ObjectCache.h b/cpp/src/IceGrid/ObjectCache.h
index 866350111fa..f19f602b7d4 100644
--- a/cpp/src/IceGrid/ObjectCache.h
+++ b/cpp/src/IceGrid/ObjectCache.h
@@ -39,7 +39,7 @@ public:
bool canRemove();
virtual bool release(const SessionIPtr&);
- virtual void allocated(const SessionIPtr&);
+ virtual bool allocated(const SessionIPtr&);
virtual void released(const SessionIPtr&);
private:
@@ -61,9 +61,10 @@ public:
private:
- virtual void allocated(const AllocatablePtr& allocatable, const SessionIPtr& session)
+ virtual bool allocated(const AllocatablePtr& allocatable, const SessionIPtr& session)
{
response(ObjectEntryPtr::dynamicCast(allocatable)->getObjectInfo().proxy);
+ return true;
}
virtual void canceled(const AllocationException& ex)
diff --git a/cpp/src/IceGrid/ReapThread.cpp b/cpp/src/IceGrid/ReapThread.cpp
index 1a41eba89c4..7a312c8dbe6 100644
--- a/cpp/src/IceGrid/ReapThread.cpp
+++ b/cpp/src/IceGrid/ReapThread.cpp
@@ -22,49 +22,66 @@ ReapThread::ReapThread(int timeout) :
void
ReapThread::run()
{
- Lock sync(*this);
-
- while(!_terminated)
+ vector<ReapablePtr> reap;
+ while(true)
{
- list<ReapablePtr>::iterator p = _sessions.begin();
- while(p != _sessions.end())
{
- try
+ Lock sync(*this);
+ if(_terminated)
+ {
+ break;
+ }
+
+ timedWait(_timeout);
+
+ list<ReapablePtr>::iterator p = _sessions.begin();
+ while(p != _sessions.end())
{
- if((IceUtil::Time::now() - (*p)->timestamp()) > _timeout)
+ try
{
- try
+ if((IceUtil::Time::now() - (*p)->timestamp()) > _timeout)
{
- (*p)->destroy();
+ reap.push_back(*p);
+ p = _sessions.erase(p);
}
- catch(const Ice::LocalException&)
+ else
{
+ ++p;
}
- p = _sessions.erase(p);
}
- else
+ catch(const Ice::ObjectNotExistException&)
{
- ++p;
+ p = _sessions.erase(p);
}
}
- catch(const Ice::ObjectNotExistException&)
+ }
+
+ for(vector<ReapablePtr>::const_iterator p = reap.begin(); p != reap.end(); ++p)
+ {
+ try
+ {
+ (*p)->destroy();
+ }
+ catch(const Ice::LocalException&)
{
- p = _sessions.erase(p);
}
}
-
- timedWait(_timeout);
+ reap.clear();
}
}
void
ReapThread::terminate()
{
- Lock sync(*this);
-
- _terminated = true;
- notify();
+ {
+ Lock sync(*this);
+ _terminated = true;
+ notify();
+ }
+ //
+ // _sessions is immutable once the reap thread is terminated.
+ //
for(list<ReapablePtr>::const_iterator p = _sessions.begin(); p != _sessions.end(); ++p)
{
try
@@ -76,7 +93,6 @@ ReapThread::terminate()
// Ignore.
}
}
-
_sessions.clear();
}
@@ -84,6 +100,10 @@ void
ReapThread::add(const ReapablePtr& reapable)
{
Lock sync(*this);
+ if(_terminated)
+ {
+ return;
+ }
_sessions.push_back(reapable);
}
diff --git a/cpp/src/IceGrid/ServerCache.cpp b/cpp/src/IceGrid/ServerCache.cpp
index ce0790d2b3f..ccc018edfe7 100644
--- a/cpp/src/IceGrid/ServerCache.cpp
+++ b/cpp/src/IceGrid/ServerCache.cpp
@@ -745,7 +745,7 @@ ServerEntry::canRemove()
}
-void
+bool
ServerEntry::allocated(const SessionIPtr& session)
{
TraceLevelsPtr traceLevels = _cache.getTraceLevels();
@@ -753,7 +753,8 @@ ServerEntry::allocated(const SessionIPtr& session)
{
Ice::Trace out(traceLevels->logger, traceLevels->serverCat);
out << "server `" << _id << "' allocated by `" << session->getUserId() << "' (" << _count << ")";
- }
+ }
+ return true;
}
void
diff --git a/cpp/src/IceGrid/ServerCache.h b/cpp/src/IceGrid/ServerCache.h
index ce0da237349..6f6b04443f9 100644
--- a/cpp/src/IceGrid/ServerCache.h
+++ b/cpp/src/IceGrid/ServerCache.h
@@ -54,7 +54,7 @@ public:
void destroyCallback();
void exception(const Ice::Exception&);
- virtual void allocated(const SessionIPtr&);
+ virtual bool allocated(const SessionIPtr&);
virtual void released(const SessionIPtr&);
private:
diff --git a/cpp/src/IceGrid/SessionI.cpp b/cpp/src/IceGrid/SessionI.cpp
index e4cc9d55913..17576ae0534 100644
--- a/cpp/src/IceGrid/SessionI.cpp
+++ b/cpp/src/IceGrid/SessionI.cpp
@@ -102,6 +102,7 @@ SessionI::SessionI(const string& userId,
_database(database),
_waitQueue(waitQueue),
_destroyed(false),
+ _timestamp(IceUtil::Time::now()),
_allocationTimeout(-1)
{
if(_traceLevels && _traceLevels->session > 0)
@@ -239,11 +240,16 @@ SessionI::getAllocationTimeout() const
return _allocationTimeout;
}
-void
+bool
SessionI::addAllocationRequest(const AllocationRequestPtr& request)
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ return false;
+ }
_requests.insert(request);
+ return true;
}
void
@@ -253,11 +259,16 @@ SessionI::removeAllocationRequest(const AllocationRequestPtr& request)
_requests.erase(request);
}
-void
+bool
SessionI::addAllocation(const AllocatablePtr& allocatable)
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ return false;
+ }
_allocations.insert(allocatable);
+ return true;
}
void
diff --git a/cpp/src/IceGrid/SessionI.h b/cpp/src/IceGrid/SessionI.h
index 3ca54b2953b..e378f3c8318 100644
--- a/cpp/src/IceGrid/SessionI.h
+++ b/cpp/src/IceGrid/SessionI.h
@@ -74,9 +74,9 @@ public:
const WaitQueuePtr& getWaitQueue() const { return _waitQueue; }
const std::string& getUserId() const { return _userId; }
- void addAllocationRequest(const AllocationRequestPtr&);
+ bool addAllocationRequest(const AllocationRequestPtr&);
void removeAllocationRequest(const AllocationRequestPtr&);
- void addAllocation(const AllocatablePtr&);
+ bool addAllocation(const AllocatablePtr&);
void removeAllocation(const AllocatablePtr&);
protected:
diff --git a/cpp/test/IceGrid/allocation/AllTests.cpp b/cpp/test/IceGrid/allocation/AllTests.cpp
index c05449071a9..8413044ab63 100644
--- a/cpp/test/IceGrid/allocation/AllTests.cpp
+++ b/cpp/test/IceGrid/allocation/AllTests.cpp
@@ -8,6 +8,7 @@
// **********************************************************************
#include <IceUtil/Thread.h>
+#include <IceUtil/Random.h>
#include <Ice/Ice.h>
#include <IceGrid/Session.h>
#include <IceGrid/Admin.h>
@@ -98,6 +99,162 @@ public:
};
typedef IceUtil::Handle<AllocateObjectByTypeCallback> AllocateObjectByTypeCallbackPtr;
+class StressClient : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex>
+{
+public:
+
+ StressClient(int id, const SessionManagerPrx& manager, bool destroySession) :
+ _id(id),
+ _manager(manager),
+ _notified(false),
+ _terminated(false),
+ _destroySession(destroySession)
+ {
+ }
+
+ virtual
+ void run()
+ {
+ {
+ Lock sync(*this);
+ while(!_notified)
+ {
+ wait();
+ }
+ }
+
+ SessionPrx session;
+ while(true)
+ {
+ {
+ Lock sync(*this);
+ if(_terminated)
+ {
+ if(session)
+ {
+ session->destroy();
+ }
+ return;
+ }
+ }
+
+ if(!session)
+ {
+ ostringstream os;
+ os << "Client-" << _id;
+ session = _manager->createLocalSession(os.str());
+ session->setAllocationTimeout(IceUtil::random(200)); // 200ms timeout
+ }
+
+ assert(session);
+ session->keepAlive();
+
+ Ice::ObjectPrx object;
+ switch(IceUtil::random(_destroySession ? 4 : 2))
+ {
+ case 0:
+ object = allocate(session);
+ break;
+ case 1:
+ object = allocateByType(session);
+ break;
+ case 2:
+ allocateAndDestroy(session);
+ session = 0;
+ break;
+ case 3:
+ allocateByTypeAndDestroy(session);
+ session = 0;
+ break;
+ }
+
+ if(object)
+ {
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(IceUtil::random(20)));
+ switch(IceUtil::random(_destroySession ? 2 : 1))
+ {
+ case 0:
+ session->releaseObject(object->ice_getIdentity());
+ break;
+ case 1:
+ session->destroy();
+ session = 0;
+ break;
+ }
+ }
+ }
+ }
+
+ Ice::ObjectPrx
+ allocate(const SessionPrx& session)
+ {
+ ostringstream os;
+ os << "stress-" << IceUtil::random(6) + 1;
+ try
+ {
+ return session->allocateObjectById(Ice::stringToIdentity(os.str()));
+ }
+ catch(const AllocationTimeoutException&)
+ {
+ }
+ return 0;
+ }
+
+ Ice::ObjectPrx
+ allocateByType(const SessionPrx& session)
+ {
+ try
+ {
+ return session->allocateObjectByType("::StressTest");
+ }
+ catch(const AllocationTimeoutException&)
+ {
+ }
+ return 0;
+ }
+
+ void
+ allocateAndDestroy(const SessionPrx& session)
+ {
+ ostringstream os;
+ os << "stress-" << IceUtil::random(3);
+ session->allocateObjectById_async(new AllocateObjectByIdCallback(), Ice::stringToIdentity(os.str()));
+ session->destroy();
+ }
+
+ void
+ allocateByTypeAndDestroy(const SessionPrx& session)
+ {
+ session->allocateObjectByType_async(new AllocateObjectByTypeCallback(), "::StressTest");
+ session->destroy();
+ }
+
+ void
+ notifyThread()
+ {
+ Lock sync(*this);
+ _notified = true;
+ notify();
+ }
+
+ void
+ terminate()
+ {
+ Lock sync(*this);
+ _terminated = true;
+ notify();
+ }
+
+protected:
+
+ const int _id;
+ const SessionManagerPrx _manager;
+ bool _notified;
+ bool _terminated;
+ const bool _destroySession;
+};
+typedef IceUtil::Handle<StressClient> StressClientPtr;
+
class SessionKeepAliveThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex>
{
public:
@@ -180,8 +337,8 @@ allTests(const Ice::CommunicatorPtr& communicator)
try
{
cout << "testing create session... " << flush;
- SessionPrx session1 = SessionPrx::uncheckedCast(manager->createLocalSession("Client1"));
- SessionPrx session2 = SessionPrx::uncheckedCast(manager->createLocalSession("Client2"));
+ SessionPrx session1 = manager->createLocalSession("Client1");
+ SessionPrx session2 = manager->createLocalSession("Client2");
keepAlive->add(session1);
keepAlive->add(session2);
@@ -213,7 +370,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
session1->allocateObjectById(Ice::stringToIdentity("nonallocatable"));
test(false);
}
- catch(const AllocationException& ex)
+ catch(const NotAllocatableException& ex)
{
}
try
@@ -221,7 +378,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
session2->allocateObjectById(Ice::stringToIdentity("nonallocatable"));
test(false);
}
- catch(const AllocationException& ex)
+ catch(const NotAllocatableException& ex)
{
}
try
@@ -229,7 +386,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
session1->releaseObject(Ice::stringToIdentity("nonallocatable"));
test(false);
}
- catch(const AllocationException& ex)
+ catch(const NotAllocatableException& ex)
{
}
try
@@ -237,7 +394,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
session2->releaseObject(Ice::stringToIdentity("nonallocatable"));
test(false);
}
- catch(const AllocationException& ex)
+ catch(const NotAllocatableException& ex)
{
}
@@ -335,7 +492,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
cout << "testing allocate object by type... " << flush;
session1->setAllocationTimeout(0);
- session2->setAllocationTimeout(obj);
+ session2->setAllocationTimeout(0);
obj = session1->allocateObjectByType("::Test");
test(obj && obj->ice_getIdentity().name == "allocatable");
@@ -352,7 +509,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
session2->allocateObjectByType("::Test");
test(false);
}
- catch(const AllocationException&)
+ catch(const AllocationTimeoutException&)
{
}
try
@@ -387,7 +544,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
session1->allocateObjectByType("::Test");
test(false);
}
- catch(const AllocationException&)
+ catch(const AllocationTimeoutException&)
{
}
session1->allocateObjectByType("::TestBis");
@@ -396,7 +553,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
session2->allocateObjectByType("::TestBis");
test(false);
}
- catch(const AllocationException&)
+ catch(const AllocationTimeoutException&)
{
}
session1->releaseObject(allocatablebis);
@@ -406,7 +563,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
session1->allocateObjectByType("::TestBis");
test(false);
}
- catch(const AllocationException&)
+ catch(const AllocationTimeoutException&)
{
}
session2->releaseObject(allocatablebis);
@@ -508,7 +665,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
session2->allocateObjectById(allocatable);
test(false);
}
- catch(const AllocationException&)
+ catch(const AllocationTimeoutException&)
{
}
test(time + IceUtil::Time::milliSeconds(100) < IceUtil::Time::now());
@@ -518,7 +675,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
session2->allocateObjectByType("::Test");
test(false);
}
- catch(const AllocationException&)
+ catch(const AllocationTimeoutException&)
{
}
test(time + IceUtil::Time::milliSeconds(100) < IceUtil::Time::now());
@@ -554,27 +711,31 @@ allTests(const Ice::CommunicatorPtr& communicator)
{
}
- Ice::ObjectPrx session1obj1 = communicator->stringToProxy("allocatable1@AdapterAlloc")->ice_locator(locator1);
- Ice::ObjectPrx session1obj2 = communicator->stringToProxy("allocatable2@AdapterAlloc")->ice_locator(locator1);
- Ice::ObjectPrx session2obj1 = communicator->stringToProxy("allocatable1@AdapterAlloc")->ice_locator(locator2);
- Ice::ObjectPrx session2obj2 = communicator->stringToProxy("allocatable2@AdapterAlloc")->ice_locator(locator2);
- session1obj1->ice_locatorCacheTimeout(0)->ice_ping();
- try
- {
- session2obj1->ice_locatorCacheTimeout(0)->ice_ping();
- test(false);
- }
- catch(const Ice::NoEndpointException&)
- {
- }
- try
- {
- session2obj2->ice_locatorCacheTimeout(0)->ice_ping();
- test(false);
- }
- catch(const Ice::NoEndpointException&)
- {
- }
+ //
+ // Remove this code if we're sure we don't want to disallow
+ // resolving endpoints of allocatable objects.
+ //
+// Ice::ObjectPrx session1obj1 = communicator->stringToProxy("allocatable1@AdapterAlloc")->ice_locator(locator1);
+// Ice::ObjectPrx session1obj2 = communicator->stringToProxy("allocatable2@AdapterAlloc")->ice_locator(locator1);
+// Ice::ObjectPrx session2obj1 = communicator->stringToProxy("allocatable1@AdapterAlloc")->ice_locator(locator2);
+// Ice::ObjectPrx session2obj2 = communicator->stringToProxy("allocatable2@AdapterAlloc")->ice_locator(locator2);
+// session1obj1->ice_locatorCacheTimeout(0)->ice_ping();
+// try
+// {
+// session2obj1->ice_locatorCacheTimeout(0)->ice_ping();
+// test(false);
+// }
+// catch(const Ice::NoEndpointException&)
+// {
+// }
+// try
+// {
+// session2obj2->ice_locatorCacheTimeout(0)->ice_ping();
+// test(false);
+// }
+// catch(const Ice::NoEndpointException&)
+// {
+// }
session1->allocateObjectById(allocatable2);
session1->releaseObject(allocatable1);
@@ -619,7 +780,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
session1->allocateObjectByType("::TestAdapter1");
test(false);
}
- catch(const AllocationException&)
+ catch(const AllocationTimeoutException&)
{
}
try
@@ -627,7 +788,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
session1->allocateObjectByType("::TestAdapter2");
test(false);
}
- catch(AllocationException&)
+ catch(AllocationTimeoutException&)
{
}
test(session2->allocateObjectByType("::TestAdapter1"));
@@ -731,28 +892,31 @@ allTests(const Ice::CommunicatorPtr& communicator)
{
}
- Ice::ObjectPrx session1obj3 = communicator->stringToProxy("allocatable3@ServerAlloc")->ice_locator(locator1);
- Ice::ObjectPrx session1obj4 = communicator->stringToProxy("allocatable4@ServerAlloc")->ice_locator(locator1);
- Ice::ObjectPrx session2obj3 = communicator->stringToProxy("allocatable3@ServerAlloc")->ice_locator(locator2);
- Ice::ObjectPrx session2obj4 = communicator->stringToProxy("allocatable4@ServerAlloc")->ice_locator(locator2);
- session1obj3->ice_locatorCacheTimeout(0)->ice_ping();
- try
- {
- session2obj3->ice_locatorCacheTimeout(0)->ice_ping();
- test(false);
- }
- catch(const Ice::NoEndpointException&)
- {
- }
- try
- {
- session2obj4->ice_locatorCacheTimeout(0)->ice_ping();
- test(false);
- }
- catch(const Ice::NoEndpointException&)
- {
- }
-
+ //
+ // Remove this code if we're sure we don't want to disallow
+ // resolving endpoints of allocatable objects.
+ //
+// Ice::ObjectPrx session1obj3 = communicator->stringToProxy("allocatable3@ServerAlloc")->ice_locator(locator1);
+// Ice::ObjectPrx session1obj4 = communicator->stringToProxy("allocatable4@ServerAlloc")->ice_locator(locator1);
+// Ice::ObjectPrx session2obj3 = communicator->stringToProxy("allocatable3@ServerAlloc")->ice_locator(locator2);
+// Ice::ObjectPrx session2obj4 = communicator->stringToProxy("allocatable4@ServerAlloc")->ice_locator(locator2);
+// session1obj3->ice_locatorCacheTimeout(0)->ice_ping();
+// try
+// {
+// session2obj3->ice_locatorCacheTimeout(0)->ice_ping();
+// test(false);
+// }
+// catch(const Ice::NoEndpointException&)
+// {
+// }
+// try
+// {
+// session2obj4->ice_locatorCacheTimeout(0)->ice_ping();
+// test(false);
+// }
+// catch(const Ice::NoEndpointException&)
+// {
+// }
session1->allocateObjectById(allocatable4);
session1->releaseObject(allocatable3);
@@ -881,6 +1045,31 @@ allTests(const Ice::CommunicatorPtr& communicator)
session2->releaseObject(Ice::stringToIdentity("allocatable31"));
session2->releaseObject(Ice::stringToIdentity("allocatable41"));
+ obj1 = session1->allocateObjectByType("::TestMultipleServer");
+ test(obj1);
+ obj2 = session2->allocateObjectByType("::TestMultipleServer");
+ test(obj2);
+ try
+ {
+ session1->allocateObjectByType("::TestMultipleServer");
+ test(false);
+ }
+ catch(AllocationTimeoutException&)
+ {
+ }
+ try
+ {
+ session2->allocateObjectByType("::TestMultipleServer");
+ test(false);
+ }
+ catch(AllocationTimeoutException&)
+ {
+ }
+ session1->releaseObject(obj1->ice_getIdentity());
+ obj1 = session2->allocateObjectByType("::TestMultipleServer");
+ session2->releaseObject(obj1->ice_getIdentity());
+ session2->releaseObject(obj2->ice_getIdentity());
+
cout << "ok" << endl;
cout << "testing concurrent allocations... " << flush;
@@ -950,6 +1139,41 @@ allTests(const Ice::CommunicatorPtr& communicator)
session2->destroy();
cout << "ok" << endl;
+
+ cout << "stress test... " << flush;
+ const int nClients = 6;
+ int i;
+ vector<StressClientPtr> clients;
+ for(i = 0; i < nClients - 2; ++i)
+ {
+ clients.push_back(new StressClient(i, manager, false));
+ clients.back()->start();
+ }
+ clients.push_back(new StressClient(i++, manager, true));
+ clients.back()->start();
+ clients.push_back(new StressClient(i++, manager, true));
+ clients.back()->start();
+
+ for(vector<StressClientPtr>::const_iterator p = clients.begin(); p != clients.end(); ++p)
+ {
+ (*p)->notifyThread();
+ }
+
+ //
+ // Let the stress client run for a bit.
+ //
+ IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(5));
+
+ //
+ // Terminate the stress clients.
+ //
+ for(vector<StressClientPtr>::const_iterator q = clients.begin(); q != clients.end(); ++q)
+ {
+ (*q)->terminate();
+ (*q)->getThreadControl().join();
+ }
+
+ cout << "ok" << endl;
}
catch(const NotAllocatableException& ex)
{
diff --git a/cpp/test/IceGrid/allocation/application.xml b/cpp/test/IceGrid/allocation/application.xml
index 874e11bbb4a..46b1a045110 100644
--- a/cpp/test/IceGrid/allocation/application.xml
+++ b/cpp/test/IceGrid/allocation/application.xml
@@ -5,7 +5,7 @@
<node name="localnode">
<server id="ObjectAllocation" exe="${test.dir}/server" activation="on-demand" pwd=".">
- <adapter name="Server" endpoints="default" allocatable="false">
+ <adapter name="Server" endpoints="default">
<object identity="nonallocatable" type="::Test" allocatable="false"/>
<object identity="allocatable" type="::Test" allocatable="true"/>
<object identity="allocatablebis" type="::TestBis" allocatable="true"/>
@@ -43,6 +43,27 @@
<object identity="allocatable6" type="::TestMultipleServer"/>
</adapter>
</server>
+
+ <server id="ServerStress-1" exe="${test.dir}/server" activation="on-demand" pwd=".">
+ <adapter name="Server" endpoints="default">
+ <object identity="stress-1" type="::StressTest" allocatable="true"/>
+ <object identity="stress-2" type="::StressTest" allocatable="true"/>
+ </adapter>
+ </server>
+
+ <server id="ServerStress-2" exe="${test.dir}/server" activation="on-demand" pwd=".">
+ <adapter name="Server" endpoints="default" allocatable="true">
+ <object identity="stress-3" type="::StressTest"/>
+ <object identity="stress-4" type="::StressTest"/>
+ </adapter>
+ </server>
+
+ <server id="ServerStress-3" exe="${test.dir}/server" activation="on-demand" pwd="." allocatable="true">
+ <adapter name="Server" endpoints="default">
+ <object identity="stress-5" type="::StressTest"/>
+ <object identity="stress-6" type="::StressTest"/>
+ </adapter>
+ </server>
</node>