diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/IceGrid/AdapterCache.cpp | 37 | ||||
-rw-r--r-- | cpp/src/IceGrid/AdapterCache.h | 8 | ||||
-rw-r--r-- | cpp/src/IceGrid/Allocatable.cpp | 141 | ||||
-rw-r--r-- | cpp/src/IceGrid/Allocatable.h | 13 | ||||
-rw-r--r-- | cpp/src/IceGrid/ObjectCache.cpp | 49 | ||||
-rw-r--r-- | cpp/src/IceGrid/ObjectCache.h | 5 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReapThread.cpp | 64 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerCache.cpp | 5 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerCache.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionI.cpp | 15 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionI.h | 4 | ||||
-rw-r--r-- | cpp/test/IceGrid/allocation/AllTests.cpp | 340 | ||||
-rw-r--r-- | cpp/test/IceGrid/allocation/application.xml | 23 |
13 files changed, 541 insertions, 165 deletions
diff --git a/cpp/src/IceGrid/AdapterCache.cpp b/cpp/src/IceGrid/AdapterCache.cpp index 0366997fc1e..8e1fbdd0d48 100644 --- a/cpp/src/IceGrid/AdapterCache.cpp +++ b/cpp/src/IceGrid/AdapterCache.cpp @@ -177,8 +177,7 @@ AdapterCache::removeImpl(const string& id) return Cache<string, AdapterEntry>::removeImpl(id); } -AdapterEntry::AdapterEntry(AdapterCache& cache, const string& id, bool allocatable, const AllocatablePtr& parent) : - Allocatable(allocatable, parent), +AdapterEntry::AdapterEntry(AdapterCache& cache, const string& id) : _cache(cache), _id(id) { @@ -195,30 +194,35 @@ ServerAdapterEntry::ServerAdapterEntry(AdapterCache& cache, const string& replicaGroupId, bool allocatable, const ServerEntryPtr& server) : - AdapterEntry(cache, id, allocatable, server), + AdapterEntry(cache, id), + Allocatable(allocatable, server), _replicaGroupId(replicaGroupId), _server(server) { } vector<pair<string, AdapterPrx> > -ServerAdapterEntry::getProxies(int& nReplicas, const SessionIPtr& session) +ServerAdapterEntry::getProxies(int& nReplicas, const SessionIPtr&) { vector<pair<string, AdapterPrx> > adapters; try { nReplicas = 1; - if(allocatable()) - { - if(session == getSession()) - { - adapters.push_back(make_pair(_id, getProxy())); - } - } - else - { + // + // TODO: Remove this code if we really don't want to check the + // session for allocatable adapters. + // +// if(allocatable()) +// { +// if(session == getSession()) +// { +// adapters.push_back(make_pair(_id, getProxy())); +// } +// } +// else +// { adapters.push_back(make_pair(_id, getProxy())); - } +// } } catch(const NodeUnreachableException&) { @@ -288,7 +292,7 @@ ServerAdapterEntry::getServer() const return _server; } -void +bool ServerAdapterEntry::allocated(const SessionIPtr& session) { TraceLevelsPtr traceLevels = _cache.getTraceLevels(); @@ -297,6 +301,7 @@ ServerAdapterEntry::allocated(const SessionIPtr& session) Ice::Trace out(traceLevels->logger, traceLevels->adapterCat); out << "adapter `" << _id << "' allocated by `" << session->getUserId() << "' (" << _count << ")"; } + return true; } void @@ -314,7 +319,7 @@ ReplicaGroupEntry::ReplicaGroupEntry(AdapterCache& cache, const string& id, const string& application, const LoadBalancingPolicyPtr& policy) : - AdapterEntry(cache, id, false, 0), + AdapterEntry(cache, id), _application(application), _lastReplica(0) { diff --git a/cpp/src/IceGrid/AdapterCache.h b/cpp/src/IceGrid/AdapterCache.h index 8a70c23ddca..be7203955de 100644 --- a/cpp/src/IceGrid/AdapterCache.h +++ b/cpp/src/IceGrid/AdapterCache.h @@ -29,11 +29,11 @@ typedef std::vector<ServerEntryPtr> ServerEntrySeq; class AdapterEntry; typedef IceUtil::Handle<AdapterEntry> AdapterEntryPtr; -class AdapterEntry : public Allocatable, public IceUtil::Mutex +class AdapterEntry : virtual public IceUtil::Shared, public IceUtil::Mutex { public: - AdapterEntry(AdapterCache&, const std::string&, bool, const AllocatablePtr&); + AdapterEntry(AdapterCache&, const std::string&); virtual std::vector<std::pair<std::string, AdapterPrx> > getProxies(int&, const SessionIPtr&) = 0; virtual float getLeastLoadedNodeLoad(LoadSample) const = 0; @@ -49,7 +49,7 @@ protected: }; typedef IceUtil::Handle<AdapterEntry> AdapterEntryPtr; -class ServerAdapterEntry : public AdapterEntry +class ServerAdapterEntry : public AdapterEntry, public Allocatable { public: @@ -63,7 +63,7 @@ public: AdapterPrx getProxy(const std::string& = std::string()) const; - virtual void allocated(const SessionIPtr&); + virtual bool allocated(const SessionIPtr&); virtual void released(const SessionIPtr&); private: diff --git a/cpp/src/IceGrid/Allocatable.cpp b/cpp/src/IceGrid/Allocatable.cpp index 21d81cc06ce..3149f97b996 100644 --- a/cpp/src/IceGrid/Allocatable.cpp +++ b/cpp/src/IceGrid/Allocatable.cpp @@ -29,12 +29,18 @@ AllocationRequest::pending() canceled(AllocationTimeoutException()); return false; } - else if(_timeout > 0) + else if(!_session->addAllocationRequest(this)) + { + _state = Canceled; + canceled(AllocationException("session destroyed")); + return false; + } + + if(_timeout > 0) { _session->getWaitQueue()->add(this, IceUtil::Time::milliSeconds(_timeout)); } _state = Pending; - _session->addAllocationRequest(this); return true; } @@ -71,12 +77,16 @@ AllocationRequest::finish(const AllocatablePtr& allocatable, const SessionIPtr& canceled(AllocationException("already allocated by the session")); return false; } - else + else if(allocated(allocatable, _session)) { _state = Allocated; - allocated(allocatable, _session); return true; } + else + { + _state = Canceled; + return false; + } } void @@ -124,6 +134,13 @@ AllocationRequest::expired(bool destroyed) } bool +AllocationRequest::isCanceled() const +{ + Lock sync(*this); + return _state == Canceled; +} + +bool AllocationRequest::operator<(const AllocationRequest& r) const { return this < &r; @@ -144,17 +161,22 @@ ParentAllocationRequest::ParentAllocationRequest(const AllocationRequestPtr& req { } -void +bool ParentAllocationRequest::allocated(const AllocatablePtr& allocatable, const SessionIPtr& session) { try { - _allocatable->allocate(_request, false); - assert(_allocatable->getSession() == _request->getSession()); + if(_allocatable->allocate(_request, false)) + { + assert(_allocatable->getSession() == _request->getSession()); + return true; + } + return false; } catch(const AllocationException& ex) { _request->cancel(ex); + return false; } } @@ -176,7 +198,7 @@ Allocatable::~Allocatable() { } -void +bool Allocatable::allocate(const AllocationRequestPtr& request, bool checkParent) { IceUtil::RecMutex::Lock sync(_allocateMutex); @@ -184,20 +206,19 @@ Allocatable::allocate(const AllocationRequestPtr& request, bool checkParent) { throw NotAllocatableException("not allocatable"); } - - if(_session == request->getSession()) + else if(_session == request->getSession()) { if(request->finish(this, _session)) { ++_count; + return true; // Allocated } - return; + return false; } if(_parent && checkParent) { - _parent->allocate(new ParentAllocationRequest(request, this), true); - return; + return _parent->allocate(new ParentAllocationRequest(request, this), true); } if(_session) @@ -207,13 +228,14 @@ Allocatable::allocate(const AllocationRequestPtr& request, bool checkParent) _requests.push_back(request); } } - else if(request->finish(this, _session)) + else if(request->finish(this, _session) && allocated(request->getSession())) { assert(_count == 0); _session = request->getSession(); ++_count; - allocated(_session); + return true; // Allocated } + return false; } bool @@ -223,56 +245,99 @@ Allocatable::tryAllocateWithSession(const SessionIPtr& session, const Allocatabl assert(_allocatable); if(_session && _session != session) { - _attempts.insert(child); // Remember the allocation attempts of a child + // + // The allocatable is already allocated by another session. + // + // We keep track of the allocation attempt of a child. E.g.: + // if a session tries to allocate an object and it can't be + // allocated because the adapter is already allocated, we keep + // track of the object here. This will be used by release() to + // notify the object cache when the adapter is released that + // the object is potentially available. + // + _attempts.insert(child); return false; } else if(_session == session) { + // + // The allocatable is allocated by this session, we just + // increment the allocation count and return a true to + // indicate a successfull allocation. + // ++_count; return true; } + // + // This allocatable isn't allocated, so we now have to check if + // the parent is allocated or not. If the allocation of the parent + // is succsefull (returns "true"), we allocate this allocatable. + // if(_parent && !_parent->tryAllocateWithSession(session, child)) { return false; } - assert(_count == 0); - - _session = session; - ++_count; - allocated(_session); - return true; + if(allocated(session)) + { + assert(_count == 0); + _session = session; + ++_count; + return true; // Successfull allocation + } + return false; } bool Allocatable::tryAllocate(const AllocationRequestPtr& request) { IceUtil::RecMutex::Lock sync(_allocateMutex); - + // - // If not allocatable or already allocated or if the parent is - // allocated by a session other than the session from the given - // request, we can't allocate the allocatable. + // If not allocatable or already allocated, the allocation attempt + // fails. // if(!_allocatable || _session) { return false; } - + + // + // Try to allocate the parent. This should succeed if the parent + // is not already allocated or if it's already allocated by the + // session. + // if(_parent && !_parent->tryAllocateWithSession(request->getSession(), this)) { return false; } - - if(request->finish(this, _session)) + + // + // The parent could be allocated, we allocate this allocatable. + // + if(request->finish(this, _session) && allocated(request->getSession())) { assert(_count == 0); _session = request->getSession(); ++_count; - allocated(_session); + return true; // The allocatable was allocated. + } + + // + // If we reach here, either the request was canceled or the session + // destroyed. If that's the case, we need to release the parent. + // + if(_parent) + { + set<AllocatablePtr> releasedAllocatables; + if(_parent->release(request->getSession(), false, releasedAllocatables)) + { + assert(releasedAllocatables.empty()); + } } - return true; // The allocatable was allocated or the request was canceled. + + return true; // The request was canceled. } bool @@ -286,10 +351,15 @@ bool Allocatable::release(const SessionIPtr& session, bool all, set<AllocatablePtr>& releasedAllocatables) { IceUtil::RecMutex::Lock sync(_allocateMutex); - if(!_session || _session != session) + if(!_allocatable) + { + throw NotAllocatableException("not allocatable"); + } + else if(!_session || _session != session) { throw AllocationException("can't release object which is not allocated"); } + if(!all && --_count) { return false; @@ -313,11 +383,10 @@ Allocatable::release(const SessionIPtr& session, bool all, set<AllocatablePtr>& { AllocationRequestPtr request = _requests.front(); _requests.pop_front(); - if(request->finish(this, _session)) + if(request->finish(this, _session) && allocated(request->getSession())) { _session = request->getSession(); ++_count; - allocated(_session); // // Check if there's other requests from the session @@ -339,14 +408,14 @@ Allocatable::release(const SessionIPtr& session, bool all, set<AllocatablePtr>& ++p; } } - return false; + return false; // Not released yet! } } } releasedAllocatables.insert(_attempts.begin(), _attempts.end()); _attempts.clear(); - return true; + return true; // The allocatable is released. } bool diff --git a/cpp/src/IceGrid/Allocatable.h b/cpp/src/IceGrid/Allocatable.h index f12ecb06658..6ff439948d8 100644 --- a/cpp/src/IceGrid/Allocatable.h +++ b/cpp/src/IceGrid/Allocatable.h @@ -36,7 +36,7 @@ public: virtual ~AllocationRequest(); - virtual void allocated(const AllocatablePtr&, const SessionIPtr&) = 0; + virtual bool allocated(const AllocatablePtr&, const SessionIPtr&) = 0; virtual void canceled(const AllocationException&) = 0; virtual bool allocateOnce() { return false; } @@ -48,6 +48,7 @@ public: int getTimeout() const { return _timeout; } const SessionIPtr& getSession() const { return _session; } + bool isCanceled() const; bool operator<(const AllocationRequest&) const; @@ -77,7 +78,7 @@ public: ParentAllocationRequest(const AllocationRequestPtr&, const AllocatablePtr&); - virtual void allocated(const AllocatablePtr&, const SessionIPtr&); + virtual bool allocated(const AllocatablePtr&, const SessionIPtr&); virtual void canceled(const AllocationException&); private: @@ -86,14 +87,14 @@ private: const AllocatablePtr _allocatable; }; -class Allocatable : public IceUtil::Shared +class Allocatable : virtual public IceUtil::Shared { public: Allocatable(bool, const AllocatablePtr&); virtual ~Allocatable(); - virtual void allocate(const AllocationRequestPtr&, bool = true); + virtual bool allocate(const AllocationRequestPtr&, bool = true); virtual bool tryAllocate(const AllocationRequestPtr&); virtual bool release(const SessionIPtr&); @@ -101,8 +102,8 @@ public: bool isAllocated() const; SessionIPtr getSession() const; - virtual void allocated(const SessionIPtr&) { } - virtual void released(const SessionIPtr&) { } + virtual bool allocated(const SessionIPtr&) = 0; + virtual void released(const SessionIPtr&) = 0; bool operator<(const Allocatable&) const; diff --git a/cpp/src/IceGrid/ObjectCache.cpp b/cpp/src/IceGrid/ObjectCache.cpp index bf91f458d18..bb0ab56432a 100644 --- a/cpp/src/IceGrid/ObjectCache.cpp +++ b/cpp/src/IceGrid/ObjectCache.cpp @@ -40,12 +40,18 @@ ObjectCache::TypeEntry::TypeEntry(ObjectCache& cache) : _cache(cache) void ObjectCache::TypeEntry::add(const Ice::ObjectPrx& obj) { + // + // No mutex protection here, this is called with the cache locked. + // _objects.insert(lower_bound(_objects.begin(), _objects.end(), obj, ::Ice::proxyIdentityLess), obj); } bool ObjectCache::TypeEntry::remove(const Ice::ObjectPrx& obj) { + // + // No mutex protection here, this is called with the cache locked. + // Ice::ObjectProxySeq::iterator q = lower_bound(_objects.begin(), _objects.end(), obj, ::Ice::proxyIdentityLess); assert((*q)->ice_getIdentity() == obj->ice_getIdentity()); _objects.erase(q); @@ -55,6 +61,9 @@ ObjectCache::TypeEntry::remove(const Ice::ObjectPrx& obj) void ObjectCache::TypeEntry::addAllocationRequest(const ObjectAllocationRequestPtr& request) { + // + // No mutex protection here, this is called with the cache locked. + // if(request->pending()) { _requests.push_back(request); @@ -64,6 +73,9 @@ ObjectCache::TypeEntry::addAllocationRequest(const ObjectAllocationRequestPtr& r void ObjectCache::TypeEntry::released(const ObjectEntryPtr& entry) { + // + // No mutex protection here, this is called with the cache locked. + // while(!_requests.empty() && !entry->isAllocated()) { if(entry->tryAllocate(_requests.front())) @@ -220,13 +232,15 @@ ObjectCache::allocateByTypeOnLeastLoadedNode(const string& type, void ObjectCache::released(const ObjectEntryPtr& entry) { + // + // Notify the type entry that an object was released. + // Lock sync(*this); map<string, TypeEntry>::iterator p = _types.find(entry->getType()); if(p == _types.end()) { return; } - p->second.released(entry); } @@ -244,7 +258,7 @@ ObjectCache::getObjectsByType(const string& type) for(Ice::ObjectProxySeq::const_iterator q = objects.begin(); q != objects.end(); ++q) { ObjectEntryPtr entry = getImpl((*q)->ice_getIdentity()); - if(!entry->allocatable()) + if(!entry->allocatable()) // Only return non-allocatable objects. { proxies.push_back(*q); } @@ -298,16 +312,20 @@ ObjectEntry::ObjectEntry(ObjectCache& cache, } Ice::ObjectPrx -ObjectEntry::getProxy(const SessionIPtr& session) const +ObjectEntry::getProxy(const SessionIPtr&) const { - if(allocatable()) - { - return getSession() == session ? _info.proxy : Ice::ObjectPrx(); - } - else - { - return _info.proxy; - } + // + // TODO: Remove this code if we really don't want to check the + // session for allocatable objects. + // +// if(allocatable()) +// { +// return getSession() == session ? _info.proxy : Ice::ObjectPrx(); +// } +// else +// { + return _info.proxy; +// } } string @@ -363,14 +381,17 @@ ObjectEntry::release(const SessionIPtr& session) return false; } -void +bool ObjectEntry::allocated(const SessionIPtr& session) { // // Add the object allocation to the session. The object will be // released once the session is destroyed. // - session->addAllocation(this); + if(!session->addAllocation(this)) + { + return false; + } TraceLevelsPtr traceLevels = _cache.getTraceLevels(); if(traceLevels && traceLevels->object > 1) @@ -379,6 +400,8 @@ ObjectEntry::allocated(const SessionIPtr& session) const Ice::Identity id = _info.proxy->ice_getIdentity(); out << "object `" << id << "' allocated by `" << session->getUserId() << "' (" << _count << ")"; } + + return true; } void diff --git a/cpp/src/IceGrid/ObjectCache.h b/cpp/src/IceGrid/ObjectCache.h index 866350111fa..f19f602b7d4 100644 --- a/cpp/src/IceGrid/ObjectCache.h +++ b/cpp/src/IceGrid/ObjectCache.h @@ -39,7 +39,7 @@ public: bool canRemove(); virtual bool release(const SessionIPtr&); - virtual void allocated(const SessionIPtr&); + virtual bool allocated(const SessionIPtr&); virtual void released(const SessionIPtr&); private: @@ -61,9 +61,10 @@ public: private: - virtual void allocated(const AllocatablePtr& allocatable, const SessionIPtr& session) + virtual bool allocated(const AllocatablePtr& allocatable, const SessionIPtr& session) { response(ObjectEntryPtr::dynamicCast(allocatable)->getObjectInfo().proxy); + return true; } virtual void canceled(const AllocationException& ex) diff --git a/cpp/src/IceGrid/ReapThread.cpp b/cpp/src/IceGrid/ReapThread.cpp index 1a41eba89c4..7a312c8dbe6 100644 --- a/cpp/src/IceGrid/ReapThread.cpp +++ b/cpp/src/IceGrid/ReapThread.cpp @@ -22,49 +22,66 @@ ReapThread::ReapThread(int timeout) : void ReapThread::run() { - Lock sync(*this); - - while(!_terminated) + vector<ReapablePtr> reap; + while(true) { - list<ReapablePtr>::iterator p = _sessions.begin(); - while(p != _sessions.end()) { - try + Lock sync(*this); + if(_terminated) + { + break; + } + + timedWait(_timeout); + + list<ReapablePtr>::iterator p = _sessions.begin(); + while(p != _sessions.end()) { - if((IceUtil::Time::now() - (*p)->timestamp()) > _timeout) + try { - try + if((IceUtil::Time::now() - (*p)->timestamp()) > _timeout) { - (*p)->destroy(); + reap.push_back(*p); + p = _sessions.erase(p); } - catch(const Ice::LocalException&) + else { + ++p; } - p = _sessions.erase(p); } - else + catch(const Ice::ObjectNotExistException&) { - ++p; + p = _sessions.erase(p); } } - catch(const Ice::ObjectNotExistException&) + } + + for(vector<ReapablePtr>::const_iterator p = reap.begin(); p != reap.end(); ++p) + { + try + { + (*p)->destroy(); + } + catch(const Ice::LocalException&) { - p = _sessions.erase(p); } } - - timedWait(_timeout); + reap.clear(); } } void ReapThread::terminate() { - Lock sync(*this); - - _terminated = true; - notify(); + { + Lock sync(*this); + _terminated = true; + notify(); + } + // + // _sessions is immutable once the reap thread is terminated. + // for(list<ReapablePtr>::const_iterator p = _sessions.begin(); p != _sessions.end(); ++p) { try @@ -76,7 +93,6 @@ ReapThread::terminate() // Ignore. } } - _sessions.clear(); } @@ -84,6 +100,10 @@ void ReapThread::add(const ReapablePtr& reapable) { Lock sync(*this); + if(_terminated) + { + return; + } _sessions.push_back(reapable); } diff --git a/cpp/src/IceGrid/ServerCache.cpp b/cpp/src/IceGrid/ServerCache.cpp index ce0790d2b3f..ccc018edfe7 100644 --- a/cpp/src/IceGrid/ServerCache.cpp +++ b/cpp/src/IceGrid/ServerCache.cpp @@ -745,7 +745,7 @@ ServerEntry::canRemove() } -void +bool ServerEntry::allocated(const SessionIPtr& session) { TraceLevelsPtr traceLevels = _cache.getTraceLevels(); @@ -753,7 +753,8 @@ ServerEntry::allocated(const SessionIPtr& session) { Ice::Trace out(traceLevels->logger, traceLevels->serverCat); out << "server `" << _id << "' allocated by `" << session->getUserId() << "' (" << _count << ")"; - } + } + return true; } void diff --git a/cpp/src/IceGrid/ServerCache.h b/cpp/src/IceGrid/ServerCache.h index ce0da237349..6f6b04443f9 100644 --- a/cpp/src/IceGrid/ServerCache.h +++ b/cpp/src/IceGrid/ServerCache.h @@ -54,7 +54,7 @@ public: void destroyCallback(); void exception(const Ice::Exception&); - virtual void allocated(const SessionIPtr&); + virtual bool allocated(const SessionIPtr&); virtual void released(const SessionIPtr&); private: diff --git a/cpp/src/IceGrid/SessionI.cpp b/cpp/src/IceGrid/SessionI.cpp index e4cc9d55913..17576ae0534 100644 --- a/cpp/src/IceGrid/SessionI.cpp +++ b/cpp/src/IceGrid/SessionI.cpp @@ -102,6 +102,7 @@ SessionI::SessionI(const string& userId, _database(database), _waitQueue(waitQueue), _destroyed(false), + _timestamp(IceUtil::Time::now()), _allocationTimeout(-1) { if(_traceLevels && _traceLevels->session > 0) @@ -239,11 +240,16 @@ SessionI::getAllocationTimeout() const return _allocationTimeout; } -void +bool SessionI::addAllocationRequest(const AllocationRequestPtr& request) { Lock sync(*this); + if(_destroyed) + { + return false; + } _requests.insert(request); + return true; } void @@ -253,11 +259,16 @@ SessionI::removeAllocationRequest(const AllocationRequestPtr& request) _requests.erase(request); } -void +bool SessionI::addAllocation(const AllocatablePtr& allocatable) { Lock sync(*this); + if(_destroyed) + { + return false; + } _allocations.insert(allocatable); + return true; } void diff --git a/cpp/src/IceGrid/SessionI.h b/cpp/src/IceGrid/SessionI.h index 3ca54b2953b..e378f3c8318 100644 --- a/cpp/src/IceGrid/SessionI.h +++ b/cpp/src/IceGrid/SessionI.h @@ -74,9 +74,9 @@ public: const WaitQueuePtr& getWaitQueue() const { return _waitQueue; } const std::string& getUserId() const { return _userId; } - void addAllocationRequest(const AllocationRequestPtr&); + bool addAllocationRequest(const AllocationRequestPtr&); void removeAllocationRequest(const AllocationRequestPtr&); - void addAllocation(const AllocatablePtr&); + bool addAllocation(const AllocatablePtr&); void removeAllocation(const AllocatablePtr&); protected: diff --git a/cpp/test/IceGrid/allocation/AllTests.cpp b/cpp/test/IceGrid/allocation/AllTests.cpp index c05449071a9..8413044ab63 100644 --- a/cpp/test/IceGrid/allocation/AllTests.cpp +++ b/cpp/test/IceGrid/allocation/AllTests.cpp @@ -8,6 +8,7 @@ // ********************************************************************** #include <IceUtil/Thread.h> +#include <IceUtil/Random.h> #include <Ice/Ice.h> #include <IceGrid/Session.h> #include <IceGrid/Admin.h> @@ -98,6 +99,162 @@ public: }; typedef IceUtil::Handle<AllocateObjectByTypeCallback> AllocateObjectByTypeCallbackPtr; +class StressClient : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex> +{ +public: + + StressClient(int id, const SessionManagerPrx& manager, bool destroySession) : + _id(id), + _manager(manager), + _notified(false), + _terminated(false), + _destroySession(destroySession) + { + } + + virtual + void run() + { + { + Lock sync(*this); + while(!_notified) + { + wait(); + } + } + + SessionPrx session; + while(true) + { + { + Lock sync(*this); + if(_terminated) + { + if(session) + { + session->destroy(); + } + return; + } + } + + if(!session) + { + ostringstream os; + os << "Client-" << _id; + session = _manager->createLocalSession(os.str()); + session->setAllocationTimeout(IceUtil::random(200)); // 200ms timeout + } + + assert(session); + session->keepAlive(); + + Ice::ObjectPrx object; + switch(IceUtil::random(_destroySession ? 4 : 2)) + { + case 0: + object = allocate(session); + break; + case 1: + object = allocateByType(session); + break; + case 2: + allocateAndDestroy(session); + session = 0; + break; + case 3: + allocateByTypeAndDestroy(session); + session = 0; + break; + } + + if(object) + { + IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(IceUtil::random(20))); + switch(IceUtil::random(_destroySession ? 2 : 1)) + { + case 0: + session->releaseObject(object->ice_getIdentity()); + break; + case 1: + session->destroy(); + session = 0; + break; + } + } + } + } + + Ice::ObjectPrx + allocate(const SessionPrx& session) + { + ostringstream os; + os << "stress-" << IceUtil::random(6) + 1; + try + { + return session->allocateObjectById(Ice::stringToIdentity(os.str())); + } + catch(const AllocationTimeoutException&) + { + } + return 0; + } + + Ice::ObjectPrx + allocateByType(const SessionPrx& session) + { + try + { + return session->allocateObjectByType("::StressTest"); + } + catch(const AllocationTimeoutException&) + { + } + return 0; + } + + void + allocateAndDestroy(const SessionPrx& session) + { + ostringstream os; + os << "stress-" << IceUtil::random(3); + session->allocateObjectById_async(new AllocateObjectByIdCallback(), Ice::stringToIdentity(os.str())); + session->destroy(); + } + + void + allocateByTypeAndDestroy(const SessionPrx& session) + { + session->allocateObjectByType_async(new AllocateObjectByTypeCallback(), "::StressTest"); + session->destroy(); + } + + void + notifyThread() + { + Lock sync(*this); + _notified = true; + notify(); + } + + void + terminate() + { + Lock sync(*this); + _terminated = true; + notify(); + } + +protected: + + const int _id; + const SessionManagerPrx _manager; + bool _notified; + bool _terminated; + const bool _destroySession; +}; +typedef IceUtil::Handle<StressClient> StressClientPtr; + class SessionKeepAliveThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex> { public: @@ -180,8 +337,8 @@ allTests(const Ice::CommunicatorPtr& communicator) try { cout << "testing create session... " << flush; - SessionPrx session1 = SessionPrx::uncheckedCast(manager->createLocalSession("Client1")); - SessionPrx session2 = SessionPrx::uncheckedCast(manager->createLocalSession("Client2")); + SessionPrx session1 = manager->createLocalSession("Client1"); + SessionPrx session2 = manager->createLocalSession("Client2"); keepAlive->add(session1); keepAlive->add(session2); @@ -213,7 +370,7 @@ allTests(const Ice::CommunicatorPtr& communicator) session1->allocateObjectById(Ice::stringToIdentity("nonallocatable")); test(false); } - catch(const AllocationException& ex) + catch(const NotAllocatableException& ex) { } try @@ -221,7 +378,7 @@ allTests(const Ice::CommunicatorPtr& communicator) session2->allocateObjectById(Ice::stringToIdentity("nonallocatable")); test(false); } - catch(const AllocationException& ex) + catch(const NotAllocatableException& ex) { } try @@ -229,7 +386,7 @@ allTests(const Ice::CommunicatorPtr& communicator) session1->releaseObject(Ice::stringToIdentity("nonallocatable")); test(false); } - catch(const AllocationException& ex) + catch(const NotAllocatableException& ex) { } try @@ -237,7 +394,7 @@ allTests(const Ice::CommunicatorPtr& communicator) session2->releaseObject(Ice::stringToIdentity("nonallocatable")); test(false); } - catch(const AllocationException& ex) + catch(const NotAllocatableException& ex) { } @@ -335,7 +492,7 @@ allTests(const Ice::CommunicatorPtr& communicator) cout << "testing allocate object by type... " << flush; session1->setAllocationTimeout(0); - session2->setAllocationTimeout(obj); + session2->setAllocationTimeout(0); obj = session1->allocateObjectByType("::Test"); test(obj && obj->ice_getIdentity().name == "allocatable"); @@ -352,7 +509,7 @@ allTests(const Ice::CommunicatorPtr& communicator) session2->allocateObjectByType("::Test"); test(false); } - catch(const AllocationException&) + catch(const AllocationTimeoutException&) { } try @@ -387,7 +544,7 @@ allTests(const Ice::CommunicatorPtr& communicator) session1->allocateObjectByType("::Test"); test(false); } - catch(const AllocationException&) + catch(const AllocationTimeoutException&) { } session1->allocateObjectByType("::TestBis"); @@ -396,7 +553,7 @@ allTests(const Ice::CommunicatorPtr& communicator) session2->allocateObjectByType("::TestBis"); test(false); } - catch(const AllocationException&) + catch(const AllocationTimeoutException&) { } session1->releaseObject(allocatablebis); @@ -406,7 +563,7 @@ allTests(const Ice::CommunicatorPtr& communicator) session1->allocateObjectByType("::TestBis"); test(false); } - catch(const AllocationException&) + catch(const AllocationTimeoutException&) { } session2->releaseObject(allocatablebis); @@ -508,7 +665,7 @@ allTests(const Ice::CommunicatorPtr& communicator) session2->allocateObjectById(allocatable); test(false); } - catch(const AllocationException&) + catch(const AllocationTimeoutException&) { } test(time + IceUtil::Time::milliSeconds(100) < IceUtil::Time::now()); @@ -518,7 +675,7 @@ allTests(const Ice::CommunicatorPtr& communicator) session2->allocateObjectByType("::Test"); test(false); } - catch(const AllocationException&) + catch(const AllocationTimeoutException&) { } test(time + IceUtil::Time::milliSeconds(100) < IceUtil::Time::now()); @@ -554,27 +711,31 @@ allTests(const Ice::CommunicatorPtr& communicator) { } - Ice::ObjectPrx session1obj1 = communicator->stringToProxy("allocatable1@AdapterAlloc")->ice_locator(locator1); - Ice::ObjectPrx session1obj2 = communicator->stringToProxy("allocatable2@AdapterAlloc")->ice_locator(locator1); - Ice::ObjectPrx session2obj1 = communicator->stringToProxy("allocatable1@AdapterAlloc")->ice_locator(locator2); - Ice::ObjectPrx session2obj2 = communicator->stringToProxy("allocatable2@AdapterAlloc")->ice_locator(locator2); - session1obj1->ice_locatorCacheTimeout(0)->ice_ping(); - try - { - session2obj1->ice_locatorCacheTimeout(0)->ice_ping(); - test(false); - } - catch(const Ice::NoEndpointException&) - { - } - try - { - session2obj2->ice_locatorCacheTimeout(0)->ice_ping(); - test(false); - } - catch(const Ice::NoEndpointException&) - { - } + // + // Remove this code if we're sure we don't want to disallow + // resolving endpoints of allocatable objects. + // +// Ice::ObjectPrx session1obj1 = communicator->stringToProxy("allocatable1@AdapterAlloc")->ice_locator(locator1); +// Ice::ObjectPrx session1obj2 = communicator->stringToProxy("allocatable2@AdapterAlloc")->ice_locator(locator1); +// Ice::ObjectPrx session2obj1 = communicator->stringToProxy("allocatable1@AdapterAlloc")->ice_locator(locator2); +// Ice::ObjectPrx session2obj2 = communicator->stringToProxy("allocatable2@AdapterAlloc")->ice_locator(locator2); +// session1obj1->ice_locatorCacheTimeout(0)->ice_ping(); +// try +// { +// session2obj1->ice_locatorCacheTimeout(0)->ice_ping(); +// test(false); +// } +// catch(const Ice::NoEndpointException&) +// { +// } +// try +// { +// session2obj2->ice_locatorCacheTimeout(0)->ice_ping(); +// test(false); +// } +// catch(const Ice::NoEndpointException&) +// { +// } session1->allocateObjectById(allocatable2); session1->releaseObject(allocatable1); @@ -619,7 +780,7 @@ allTests(const Ice::CommunicatorPtr& communicator) session1->allocateObjectByType("::TestAdapter1"); test(false); } - catch(const AllocationException&) + catch(const AllocationTimeoutException&) { } try @@ -627,7 +788,7 @@ allTests(const Ice::CommunicatorPtr& communicator) session1->allocateObjectByType("::TestAdapter2"); test(false); } - catch(AllocationException&) + catch(AllocationTimeoutException&) { } test(session2->allocateObjectByType("::TestAdapter1")); @@ -731,28 +892,31 @@ allTests(const Ice::CommunicatorPtr& communicator) { } - Ice::ObjectPrx session1obj3 = communicator->stringToProxy("allocatable3@ServerAlloc")->ice_locator(locator1); - Ice::ObjectPrx session1obj4 = communicator->stringToProxy("allocatable4@ServerAlloc")->ice_locator(locator1); - Ice::ObjectPrx session2obj3 = communicator->stringToProxy("allocatable3@ServerAlloc")->ice_locator(locator2); - Ice::ObjectPrx session2obj4 = communicator->stringToProxy("allocatable4@ServerAlloc")->ice_locator(locator2); - session1obj3->ice_locatorCacheTimeout(0)->ice_ping(); - try - { - session2obj3->ice_locatorCacheTimeout(0)->ice_ping(); - test(false); - } - catch(const Ice::NoEndpointException&) - { - } - try - { - session2obj4->ice_locatorCacheTimeout(0)->ice_ping(); - test(false); - } - catch(const Ice::NoEndpointException&) - { - } - + // + // Remove this code if we're sure we don't want to disallow + // resolving endpoints of allocatable objects. + // +// Ice::ObjectPrx session1obj3 = communicator->stringToProxy("allocatable3@ServerAlloc")->ice_locator(locator1); +// Ice::ObjectPrx session1obj4 = communicator->stringToProxy("allocatable4@ServerAlloc")->ice_locator(locator1); +// Ice::ObjectPrx session2obj3 = communicator->stringToProxy("allocatable3@ServerAlloc")->ice_locator(locator2); +// Ice::ObjectPrx session2obj4 = communicator->stringToProxy("allocatable4@ServerAlloc")->ice_locator(locator2); +// session1obj3->ice_locatorCacheTimeout(0)->ice_ping(); +// try +// { +// session2obj3->ice_locatorCacheTimeout(0)->ice_ping(); +// test(false); +// } +// catch(const Ice::NoEndpointException&) +// { +// } +// try +// { +// session2obj4->ice_locatorCacheTimeout(0)->ice_ping(); +// test(false); +// } +// catch(const Ice::NoEndpointException&) +// { +// } session1->allocateObjectById(allocatable4); session1->releaseObject(allocatable3); @@ -881,6 +1045,31 @@ allTests(const Ice::CommunicatorPtr& communicator) session2->releaseObject(Ice::stringToIdentity("allocatable31")); session2->releaseObject(Ice::stringToIdentity("allocatable41")); + obj1 = session1->allocateObjectByType("::TestMultipleServer"); + test(obj1); + obj2 = session2->allocateObjectByType("::TestMultipleServer"); + test(obj2); + try + { + session1->allocateObjectByType("::TestMultipleServer"); + test(false); + } + catch(AllocationTimeoutException&) + { + } + try + { + session2->allocateObjectByType("::TestMultipleServer"); + test(false); + } + catch(AllocationTimeoutException&) + { + } + session1->releaseObject(obj1->ice_getIdentity()); + obj1 = session2->allocateObjectByType("::TestMultipleServer"); + session2->releaseObject(obj1->ice_getIdentity()); + session2->releaseObject(obj2->ice_getIdentity()); + cout << "ok" << endl; cout << "testing concurrent allocations... " << flush; @@ -950,6 +1139,41 @@ allTests(const Ice::CommunicatorPtr& communicator) session2->destroy(); cout << "ok" << endl; + + cout << "stress test... " << flush; + const int nClients = 6; + int i; + vector<StressClientPtr> clients; + for(i = 0; i < nClients - 2; ++i) + { + clients.push_back(new StressClient(i, manager, false)); + clients.back()->start(); + } + clients.push_back(new StressClient(i++, manager, true)); + clients.back()->start(); + clients.push_back(new StressClient(i++, manager, true)); + clients.back()->start(); + + for(vector<StressClientPtr>::const_iterator p = clients.begin(); p != clients.end(); ++p) + { + (*p)->notifyThread(); + } + + // + // Let the stress client run for a bit. + // + IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(5)); + + // + // Terminate the stress clients. + // + for(vector<StressClientPtr>::const_iterator q = clients.begin(); q != clients.end(); ++q) + { + (*q)->terminate(); + (*q)->getThreadControl().join(); + } + + cout << "ok" << endl; } catch(const NotAllocatableException& ex) { diff --git a/cpp/test/IceGrid/allocation/application.xml b/cpp/test/IceGrid/allocation/application.xml index 874e11bbb4a..46b1a045110 100644 --- a/cpp/test/IceGrid/allocation/application.xml +++ b/cpp/test/IceGrid/allocation/application.xml @@ -5,7 +5,7 @@ <node name="localnode"> <server id="ObjectAllocation" exe="${test.dir}/server" activation="on-demand" pwd="."> - <adapter name="Server" endpoints="default" allocatable="false"> + <adapter name="Server" endpoints="default"> <object identity="nonallocatable" type="::Test" allocatable="false"/> <object identity="allocatable" type="::Test" allocatable="true"/> <object identity="allocatablebis" type="::TestBis" allocatable="true"/> @@ -43,6 +43,27 @@ <object identity="allocatable6" type="::TestMultipleServer"/> </adapter> </server> + + <server id="ServerStress-1" exe="${test.dir}/server" activation="on-demand" pwd="."> + <adapter name="Server" endpoints="default"> + <object identity="stress-1" type="::StressTest" allocatable="true"/> + <object identity="stress-2" type="::StressTest" allocatable="true"/> + </adapter> + </server> + + <server id="ServerStress-2" exe="${test.dir}/server" activation="on-demand" pwd="."> + <adapter name="Server" endpoints="default" allocatable="true"> + <object identity="stress-3" type="::StressTest"/> + <object identity="stress-4" type="::StressTest"/> + </adapter> + </server> + + <server id="ServerStress-3" exe="${test.dir}/server" activation="on-demand" pwd="." allocatable="true"> + <adapter name="Server" endpoints="default"> + <object identity="stress-5" type="::StressTest"/> + <object identity="stress-6" type="::StressTest"/> + </adapter> + </server> </node> |