diff options
Diffstat (limited to 'cpp/src/IceGrid/AllocatableObjectCache.cpp')
-rw-r--r-- | cpp/src/IceGrid/AllocatableObjectCache.cpp | 352 |
1 files changed, 352 insertions, 0 deletions
diff --git a/cpp/src/IceGrid/AllocatableObjectCache.cpp b/cpp/src/IceGrid/AllocatableObjectCache.cpp new file mode 100644 index 00000000000..78bac949bbf --- /dev/null +++ b/cpp/src/IceGrid/AllocatableObjectCache.cpp @@ -0,0 +1,352 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2006 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <IceUtil/Random.h> +#include <Ice/Communicator.h> +#include <Ice/LoggerUtil.h> +#include <Ice/LocalException.h> +#include <IceGrid/AllocatableObjectCache.h> +#include <IceGrid/SessionI.h> + +using namespace std; +using namespace IceGrid; + +pointer_to_unary_function<int, int> AllocatableObjectCache::_rand(IceUtil::random); + +namespace IceGrid +{ + +struct AllocatableObjectEntryCI : binary_function<AllocatableObjectEntryPtr&, AllocatableObjectEntryPtr&, bool> +{ + + bool + operator()(const AllocatableObjectEntryPtr& lhs, const AllocatableObjectEntryPtr& rhs) + { + return ::Ice::proxyIdentityLess(lhs->getProxy(), rhs->getProxy()); + } +}; + +}; + +AllocatableObjectCache::TypeEntry::TypeEntry() +{ +} + +void +AllocatableObjectCache::TypeEntry::add(const AllocatableObjectEntryPtr& obj) +{ + // + // No mutex protection here, this is called with the cache locked. + // + _objects.insert(lower_bound(_objects.begin(), _objects.end(), obj, AllocatableObjectEntryCI()), obj); + if(!_requests.empty()) + { + canTryAllocate(obj, false); + } +} + +bool +AllocatableObjectCache::TypeEntry::remove(const AllocatableObjectEntryPtr& obj) +{ + // + // No mutex protection here, this is called with the cache locked. + // + vector<AllocatableObjectEntryPtr>::iterator q; + q = lower_bound(_objects.begin(), _objects.end(), obj, AllocatableObjectEntryCI()); + assert(q->get() == obj.get()); + _objects.erase(q); + + if(!_requests.empty() && _objects.empty()) + { + for(list<ObjectAllocationRequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + { + (*p)->cancel(AllocationException("no allocatable objects with type `" + obj->getType() + "' registered")); + } + } + return _objects.empty(); +} + +void +AllocatableObjectCache::TypeEntry::addAllocationRequest(const ObjectAllocationRequestPtr& request) +{ + // + // No mutex protection here, this is called with the cache locked. + // + if(request->pending()) + { + _requests.push_back(request); + } +} + +bool +AllocatableObjectCache::TypeEntry::canTryAllocate(const AllocatableObjectEntryPtr& entry, bool fromRelease) +{ + // + // No mutex protection here, this is called with the cache locked. + // + list<ObjectAllocationRequestPtr>::iterator p = _requests.begin(); + while(p != _requests.end()) + { + AllocationRequestPtr request = *p; + try + { + if(request->isCanceled()) // If the request has been canceled, we just remove it. + { + p = _requests.erase(p); + } + else if(entry->tryAllocate(request, fromRelease)) + { + p = _requests.erase(p); + return true; // The request successfully allocated the entry! + } + else if(entry->getSession()) // If entry is allocated, we're done + { + return false; + } + else + { + ++p; + } + } + catch(const SessionDestroyedException&) + { + p = _requests.erase(p); + } + } + return false; +} + +AllocatableObjectCache::AllocatableObjectCache(const Ice::CommunicatorPtr& communicator) : + _communicator(communicator) +{ +} + +void +AllocatableObjectCache::add(const ObjectInfo& info, const AllocatablePtr& parent) +{ + const Ice::Identity& id = info.proxy->ice_getIdentity(); + + Lock sync(*this); + assert(!getImpl(id)); + + AllocatableObjectEntryPtr entry = new AllocatableObjectEntry(*this, info, parent); + addImpl(id, entry); + + map<string, TypeEntry>::iterator p = _types.find(entry->getType()); + if(p == _types.end()) + { + p = _types.insert(p, map<string, TypeEntry>::value_type(entry->getType(), TypeEntry())); + } + p->second.add(entry); + + if(_traceLevels && _traceLevels->object > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); + out << "added allocatable object `" << _communicator->identityToString(id) << "'"; + } +} + +AllocatableObjectEntryPtr +AllocatableObjectCache::get(const Ice::Identity& id) const +{ + Lock sync(*this); + AllocatableObjectEntryPtr entry = getImpl(id); + if(!entry) + { + throw ObjectNotRegisteredException(id); + } + return entry; +} + +AllocatableObjectEntryPtr +AllocatableObjectCache::remove(const Ice::Identity& id) +{ + Lock sync(*this); + AllocatableObjectEntryPtr entry = removeImpl(id); + assert(entry); + + map<string, TypeEntry>::iterator p = _types.find(entry->getType()); + assert(p != _types.end()); + if(p->second.remove(entry)) + { + _types.erase(p); + } + + if(_traceLevels && _traceLevels->object > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); + out << "removed allocatable object `" << _communicator->identityToString(id) << "'"; + } + + return entry; +} + +void +AllocatableObjectCache::allocateByType(const string& type, const ObjectAllocationRequestPtr& request) +{ + Lock sync(*this); + map<string, TypeEntry>::iterator p = _types.find(type); + if(p == _types.end()) + { + throw AllocationException("no allocatable objects with type `" + type + "' registered"); + } + + vector<AllocatableObjectEntryPtr> objects = p->second.getObjects(); + random_shuffle(objects.begin(), objects.end(), _rand); // TODO: OPTIMIZE + try + { + for(vector<AllocatableObjectEntryPtr>::const_iterator q = objects.begin(); q != objects.end(); ++q) + { + if((*q)->tryAllocate(request)) + { + return; + } + } + } + catch(const SessionDestroyedException&) + { + return; + } + + p->second.addAllocationRequest(request); +} + +bool +AllocatableObjectCache::canTryAllocate(const AllocatableObjectEntryPtr& entry) +{ + // + // Notify the type entry that an object was released. + // + Lock sync(*this); + map<string, TypeEntry>::iterator p = _types.find(entry->getType()); + if(p == _types.end()) + { + return false; + } + return p->second.canTryAllocate(entry, true); +} + +AllocatableObjectEntry::AllocatableObjectEntry(AllocatableObjectCache& cache, + const ObjectInfo& info, + const AllocatablePtr& parent) : + Allocatable(true, parent), + _cache(cache), + _info(info), + _destroyed(false) +{ +} + +Ice::ObjectPrx +AllocatableObjectEntry::getProxy() const +{ + return _info.proxy; +} + +string +AllocatableObjectEntry::getType() const +{ + return _info.type; +} + +bool +AllocatableObjectEntry::canRemove() +{ + return true; +} + +void +AllocatableObjectEntry::allocated(const SessionIPtr& session) +{ + // + // Add the object allocation to the session. The object will be + // released once the session is destroyed. + // + session->addAllocation(this); + + TraceLevelsPtr traceLevels = _cache.getTraceLevels(); + if(traceLevels && traceLevels->object > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->objectCat); + out << "object `" << _info.proxy << "' allocated by `" << session->getId() << "' (" << _count << ")"; + } + + Glacier2::SessionControlPrx ctl = session->getSessionControl(); + if(ctl) + { + try + { + Ice::IdentitySeq seq(1); + seq.push_back(_info.proxy->ice_getIdentity()); + ctl->identities()->add(seq); + } + catch(const Ice::ObjectNotExistException&) + { + } + } +} + +void +AllocatableObjectEntry::released(const SessionIPtr& session) +{ + // + // Remove the object allocation from the session. + // + session->removeAllocation(this); + + Glacier2::SessionControlPrx ctl = session->getSessionControl(); + if(ctl) + { + try + { + Ice::IdentitySeq seq(1); + seq.push_back(_info.proxy->ice_getIdentity()); + ctl->identities()->remove(seq); + } + catch(const Ice::ObjectNotExistException&) + { + } + } + + TraceLevelsPtr traceLevels = _cache.getTraceLevels(); + if(traceLevels && traceLevels->object > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->objectCat); + out << "object `" << _info.proxy << "' released by `" << session->getId() << "' (" << _count << ")"; + } +} + +void +AllocatableObjectEntry::destroy() +{ + SessionIPtr session; + { + Lock sync(*this); + _destroyed = true; + session = _session; + } + release(session); +} + +void +AllocatableObjectEntry::checkAllocatable() +{ + if(_destroyed) + { + throw ObjectNotRegisteredException(_info.proxy->ice_getIdentity()); + } + + Allocatable::checkAllocatable(); +} + +bool +AllocatableObjectEntry::canTryAllocate() +{ + return _cache.canTryAllocate(this); +} + |