diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-04-24 09:46:12 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-04-24 09:46:12 +0000 |
commit | a5fdbf7412b96cecd45314f9d9eb37aaf77a2bf1 (patch) | |
tree | a2f26b3e3207ac925239385101f18a5459da1664 /cpp/src/IceGrid | |
parent | Unix Fix. (diff) | |
download | ice-a5fdbf7412b96cecd45314f9d9eb37aaf77a2bf1.tar.bz2 ice-a5fdbf7412b96cecd45314f9d9eb37aaf77a2bf1.tar.xz ice-a5fdbf7412b96cecd45314f9d9eb37aaf77a2bf1.zip |
Added first cut of the allocation mechanism.
Diffstat (limited to 'cpp/src/IceGrid')
-rw-r--r-- | cpp/src/IceGrid/AdminSessionI.cpp | 11 | ||||
-rw-r--r-- | cpp/src/IceGrid/AdminSessionI.h | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/Allocatable.cpp | 187 | ||||
-rw-r--r-- | cpp/src/IceGrid/Allocatable.h | 86 | ||||
-rw-r--r-- | cpp/src/IceGrid/Cache.h | 1 | ||||
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 43 | ||||
-rw-r--r-- | cpp/src/IceGrid/Database.h | 4 | ||||
-rw-r--r-- | cpp/src/IceGrid/DescriptorBuilder.cpp | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/DescriptorHelper.cpp | 1 | ||||
-rw-r--r-- | cpp/src/IceGrid/LocatorI.cpp | 9 | ||||
-rw-r--r-- | cpp/src/IceGrid/Makefile | 1 | ||||
-rw-r--r-- | cpp/src/IceGrid/ObjectCache.cpp | 11 | ||||
-rw-r--r-- | cpp/src/IceGrid/ObjectCache.h | 33 | ||||
-rw-r--r-- | cpp/src/IceGrid/QueryI.cpp | 73 | ||||
-rw-r--r-- | cpp/src/IceGrid/QueryI.h | 27 | ||||
-rw-r--r-- | cpp/src/IceGrid/RegistryI.cpp | 18 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionI.cpp | 115 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionI.h | 17 |
18 files changed, 569 insertions, 73 deletions
diff --git a/cpp/src/IceGrid/AdminSessionI.cpp b/cpp/src/IceGrid/AdminSessionI.cpp index a07ee5b4b67..243ce14a8f4 100644 --- a/cpp/src/IceGrid/AdminSessionI.cpp +++ b/cpp/src/IceGrid/AdminSessionI.cpp @@ -16,10 +16,11 @@ using namespace IceGrid; AdminSessionI::AdminSessionI(const string& userId, const DatabasePtr& database, + const Ice::ObjectAdapterPtr& adapter, RegistryObserverTopic& registryObserverTopic, NodeObserverTopic& nodeObserverTopic, int timeout) : - SessionI(userId, "admin", database, timeout), + SessionI(userId, "admin", database, adapter, timeout), _updating(false), _registryObserverTopic(registryObserverTopic), _nodeObserverTopic(nodeObserverTopic) @@ -267,16 +268,16 @@ AdminSessionManagerI::create(const string& userId, const Glacier2::SessionContro // We don't add the session to the reaper thread, Glacier2 takes // care of reaping the session. // - SessionIPtr session = - new AdminSessionI(userId, _database, _registryObserverTopic, _nodeObserverTopic, _sessionTimeout); + SessionIPtr session = new AdminSessionI(userId, _database, current.adapter, _registryObserverTopic, + _nodeObserverTopic, _sessionTimeout); return Glacier2::SessionPrx::uncheckedCast(current.adapter->addWithUUID(session)); } SessionPrx AdminSessionManagerI::createLocalSession(const string& userId, const Ice::Current& current) { - SessionIPtr session = - new AdminSessionI(userId, _database, _registryObserverTopic, _nodeObserverTopic, _sessionTimeout); + SessionIPtr session = new AdminSessionI(userId, _database, current.adapter, _registryObserverTopic, + _nodeObserverTopic, _sessionTimeout); SessionPrx proxy = SessionPrx::uncheckedCast(current.adapter->addWithUUID(session)); _reaper->add(new SessionReapable(session, proxy)); return proxy; diff --git a/cpp/src/IceGrid/AdminSessionI.h b/cpp/src/IceGrid/AdminSessionI.h index 3b3c4e831dd..38eb00653b9 100644 --- a/cpp/src/IceGrid/AdminSessionI.h +++ b/cpp/src/IceGrid/AdminSessionI.h @@ -20,7 +20,8 @@ class AdminSessionI : public SessionI, public AdminSession { public: - AdminSessionI(const std::string&, const DatabasePtr&, RegistryObserverTopic&, NodeObserverTopic&, int); + AdminSessionI(const std::string&, const DatabasePtr&, const Ice::ObjectAdapterPtr&, RegistryObserverTopic&, + NodeObserverTopic&, int); virtual ~AdminSessionI(); virtual AdminPrx getAdmin(const Ice::Current&) const; diff --git a/cpp/src/IceGrid/Allocatable.cpp b/cpp/src/IceGrid/Allocatable.cpp new file mode 100644 index 00000000000..4760bafe09b --- /dev/null +++ b/cpp/src/IceGrid/Allocatable.cpp @@ -0,0 +1,187 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2006 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <IceGrid/Allocatable.h> +#include <IceGrid/SessionI.h> + +using namespace std; +using namespace IceGrid; + +AllocationRequest::~AllocationRequest() +{ +} + +bool +AllocationRequest::setAllocatable(const AllocatablePtr& allocatable) +{ + Lock sync(*this); + assert(!_allocatable); + if(_canceled) + { + return false; + } + _allocatable = allocatable; + allocated(_allocatable); + return true; +} + +void +AllocationRequest::cancel() +{ + Lock sync(*this); + if(_canceled) + { + return; + } + _canceled = true; + canceled(); +} + +bool +AllocationRequest::checkTimeout(const IceUtil::Time& now) +{ + assert(_timeout > 0); + { + Lock sync(*this); + if(_canceled) + { + return true; + } + _canceled = _expiration < now; + if(!_canceled) + { + return false; + } + timeout(); + } + _session->removeAllocationRequest(this); + return true; +} + +void +AllocationRequest::allocate() +{ + _session->addAllocationRequest(this); +} + +void +AllocationRequest::release(const SessionIPtr& session) +{ + // + // Check if the session releasing the object is indeed the session + // which initiated the allocation request. + // + if(_session != session) + { + throw AllocationException("can't release object which is not allocated"); + } + _session->removeAllocationRequest(this); +} + +bool +AllocationRequest::operator<(const AllocationRequest& r) const +{ + return this < &r; +} + +AllocationRequest::AllocationRequest(const SessionIPtr& session) : + _session(session), + _timeout(_session->getAllocationTimeout()), + _expiration(_timeout > 0 ? (IceUtil::Time::now() + IceUtil::Time::milliSeconds(_timeout)) : IceUtil::Time()), + _canceled(false) +{ +} + +Allocatable::Allocatable(bool allocatable) : _allocatable(allocatable), _allocated(false) +{ +} + +Allocatable::~Allocatable() +{ +} + +void +Allocatable::allocate(const AllocationRequestPtr& request, bool allocateOnce) +{ + IceUtil::Mutex::Lock sync(_allocateMutex); + if(_allocatable) + { + if(_allocated) + { + if(_allocated->getSession() == request->getSession()) + { + if(allocateOnce) + { + throw AllocationException("object already allocated by the session"); + } + request->setAllocatable(this); + } + else if(request->getTimeout()) + { + request->allocate(); + _requests.push_back(request); // TODO: XXX: monitor request timeout if timeout != -1 + } + else + { + request->timeout(); + } + } + else if(request->setAllocatable(this)) + { + _allocated = request; + _allocated->allocate(); + } + } + else + { + if(allocateOnce) + { + throw AllocationException("can't allocate non allocatable object"); + } + bool rc = request->setAllocatable(this); + assert(rc); + } +} + +bool +Allocatable::tryAllocate(const AllocationRequestPtr& request) +{ + IceUtil::Mutex::Lock sync(_allocateMutex); + if(_allocatable && _allocated) + { + return false; + } + else if(request->setAllocatable(this)) + { + _allocated = request; + _allocated->allocate(); + } + return true; +} + +void +Allocatable::release(const SessionIPtr& session) +{ + IceUtil::Mutex::Lock sync(_allocateMutex); + if(!_allocated) + { + throw AllocationException("object not allocated"); + } + _allocated->release(session); + while(!_requests.empty()) + { + _allocated = _requests.front(); + _requests.pop_front(); + if(_allocated->setAllocatable(this)) + { + return; + } + } + _allocated = 0; +} diff --git a/cpp/src/IceGrid/Allocatable.h b/cpp/src/IceGrid/Allocatable.h new file mode 100644 index 00000000000..0e315036223 --- /dev/null +++ b/cpp/src/IceGrid/Allocatable.h @@ -0,0 +1,86 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2006 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_GRID_ALLOCATABLE_H +#define ICE_GRID_ALLOCATABLE_H + +#include <IceUtil/Handle.h> +#include <IceUtil/Mutex.h> +#include <IceUtil/Shared.h> +#include <IceUtil/Time.h> +#include <list> + +namespace IceGrid +{ + +class SessionI; +typedef IceUtil::Handle<SessionI> SessionIPtr; + +class Allocatable; +typedef IceUtil::Handle<Allocatable> AllocatablePtr; + +class AllocationRequest : public IceUtil::Mutex, public IceUtil::Shared +{ +public: + + virtual ~AllocationRequest(); + + virtual void timeout() = 0; + virtual void allocated(const AllocatablePtr&) = 0; + virtual void canceled() = 0; + + bool setAllocatable(const AllocatablePtr&); + bool checkTimeout(const IceUtil::Time&); + void cancel(); + void allocate(); + void release(const SessionIPtr&); + + int getTimeout() const { return _timeout; } + const SessionIPtr& getSession() const { return _session; } + + bool operator<(const AllocationRequest&) const; + +protected: + + AllocationRequest(const SessionIPtr&); + +private: + + const SessionIPtr _session; + const int _timeout; + const IceUtil::Time _expiration; + bool _canceled; + AllocatablePtr _allocatable; +}; +typedef IceUtil::Handle<AllocationRequest> AllocationRequestPtr; + +class Allocatable : public IceUtil::Shared +{ +public: + + Allocatable(bool); + virtual ~Allocatable(); + + void allocate(const AllocationRequestPtr&, bool); + bool tryAllocate(const AllocationRequestPtr&); + void release(const SessionIPtr&); + + bool allocatable() const { return _allocatable; } + +protected: + + bool _allocatable; + IceUtil::Mutex _allocateMutex; + std::list<AllocationRequestPtr> _requests; + AllocationRequestPtr _allocated; +}; + +}; + +#endif diff --git a/cpp/src/IceGrid/Cache.h b/cpp/src/IceGrid/Cache.h index ab50cfdce44..2fe35ff48ef 100644 --- a/cpp/src/IceGrid/Cache.h +++ b/cpp/src/IceGrid/Cache.h @@ -18,7 +18,6 @@ namespace IceGrid { - template<typename Key, typename Value> class Cache : public IceUtil::Mutex { diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index f0b9dcb3dfe..a81250ce712 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -950,12 +950,53 @@ Database::updateObject(const Ice::ObjectPrx& proxy) } } +void +Database::allocateObject(const Ice::Identity& id, const ObjectAllocationRequestPtr& request, bool allocateOnce) +{ + try + { + _objectCache.get(id)->allocate(request, allocateOnce); + return; + } + catch(ObjectNotRegisteredException&) + { + } + + Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); + IdentityObjectInfoDict objects(connection, _objectDbName); + IdentityObjectInfoDict::const_iterator p = objects.find(id); + if(p == objects.end()) + { + ObjectNotRegisteredException ex; + ex.id = id; + throw ex; + } + request->response(p->second.proxy); +} + +void +Database::releaseObject(const Ice::Identity& id, const SessionIPtr& session) +{ + try + { + _objectCache.get(id)->release(session); + return; + } + catch(ObjectNotRegisteredException&) + { + } +} + Ice::ObjectPrx Database::getObjectProxy(const Ice::Identity& id) { try { - return _objectCache.get(id)->getProxy(); + // + // Only return proxies for non allocatable objects. + // + ObjectEntryPtr entry = _objectCache.get(id); + return entry->allocatable() ? Ice::ObjectPrx() : entry->getProxy(); } catch(ObjectNotRegisteredException&) { diff --git a/cpp/src/IceGrid/Database.h b/cpp/src/IceGrid/Database.h index ee0dc448650..791268fd8d4 100644 --- a/cpp/src/IceGrid/Database.h +++ b/cpp/src/IceGrid/Database.h @@ -87,6 +87,10 @@ public: void addObject(const ObjectInfo&); void removeObject(const Ice::Identity&); void updateObject(const Ice::ObjectPrx&); + + void allocateObject(const Ice::Identity&, const ObjectAllocationRequestPtr&, bool); + void releaseObject(const Ice::Identity&, const SessionIPtr&); + Ice::ObjectPrx getObjectProxy(const Ice::Identity&); Ice::ObjectPrx getObjectByType(const std::string&); Ice::ObjectPrx getObjectByTypeOnLeastLoadedNode(const std::string&, LoadSample); diff --git a/cpp/src/IceGrid/DescriptorBuilder.cpp b/cpp/src/IceGrid/DescriptorBuilder.cpp index 87482df7a8c..6f46b9f8bd1 100644 --- a/cpp/src/IceGrid/DescriptorBuilder.cpp +++ b/cpp/src/IceGrid/DescriptorBuilder.cpp @@ -293,6 +293,7 @@ ApplicationDescriptorBuilder::addObject(const XmlAttributesHelper& attrs) ObjectDescriptor object; object.type = attrs("type", ""); object.id = Ice::stringToIdentity(attrs("identity")); + object.allocatable = attrs.asBool("allocatable", false); _descriptor.replicaGroups.back().objects.push_back(object); } @@ -611,6 +612,7 @@ CommunicatorDescriptorBuilder::addObject(const XmlAttributesHelper& attrs) ObjectDescriptor object; object.type = attrs("type", ""); object.id = Ice::stringToIdentity(attrs("identity")); + object.allocatable = attrs.asBool("allocatable", false); _descriptor->adapters.back().objects.push_back(object); } diff --git a/cpp/src/IceGrid/DescriptorHelper.cpp b/cpp/src/IceGrid/DescriptorHelper.cpp index c1c93f30e9b..c50e54667d6 100644 --- a/cpp/src/IceGrid/DescriptorHelper.cpp +++ b/cpp/src/IceGrid/DescriptorHelper.cpp @@ -735,6 +735,7 @@ CommunicatorHelper::instantiateImpl(const CommunicatorDescriptorPtr& instance, c ObjectDescriptor obj; obj.type = resolve(q->type, "object type"); obj.id = Ice::stringToIdentity(resolve(Ice::identityToString(q->id), "object identity", false)); + obj.allocatable = q->allocatable; if(obj.id.name.empty()) { resolve.exception("invalid object identity `" + Ice::identityToString(q->id) + "': name empty"); diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp index ee19b2c19f1..bcc2a893df7 100644 --- a/cpp/src/IceGrid/LocatorI.cpp +++ b/cpp/src/IceGrid/LocatorI.cpp @@ -310,6 +310,15 @@ LocatorI::findObjectById_async(const Ice::AMD_Locator_findObjectByIdPtr& cb, } // + // If the proxy is 0, this means that the object is allocatable. + // + if(!proxy) + { + cb->ice_response(0); + return; + } + + // // OPTIMIZATION: If the object is registered with an adapter id, // try to get the adapter direct proxy (which might caused the // server activation). This will avoid the client to lookup for diff --git a/cpp/src/IceGrid/Makefile b/cpp/src/IceGrid/Makefile index 7de85c965f9..cfb3f845ef8 100644 --- a/cpp/src/IceGrid/Makefile +++ b/cpp/src/IceGrid/Makefile @@ -56,6 +56,7 @@ REGISTRY_OBJS = RegistryI.o \ IdentityObjectInfoDict.o \ StringAdapterInfoDict.o \ Database.o \ + Allocatable.o \ AdapterCache.o \ ObjectCache.o \ ServerCache.o \ diff --git a/cpp/src/IceGrid/ObjectCache.cpp b/cpp/src/IceGrid/ObjectCache.cpp index a14e2b46928..d8d89d8583f 100644 --- a/cpp/src/IceGrid/ObjectCache.cpp +++ b/cpp/src/IceGrid/ObjectCache.cpp @@ -33,6 +33,7 @@ ObjectCache::add(const string& app, const string& adapterId, const string& endpo ObjectInfo info; info.type = desc.type; + info.allocatable = desc.allocatable; if(adapterId.empty()) { info.proxy = _communicator->stringToProxy(Ice::identityToString(desc.id) + ":" + endpoints); @@ -106,7 +107,11 @@ ObjectCache::getObjectsByType(const string& type) } for(set<Ice::Identity>::const_iterator q = p->second.begin(); q != p->second.end(); ++q) { - proxies.push_back(getImpl(*q)->getProxy()); + ObjectEntryPtr entry = getImpl(*q); + if(!entry->allocatable()) + { + proxies.push_back(entry->getProxy()); + } } return proxies; } @@ -126,7 +131,8 @@ ObjectCache::getAll(const string& expression) return infos; } -ObjectEntry::ObjectEntry(Cache<Ice::Identity, ObjectEntry>&, const Ice::Identity&) +ObjectEntry::ObjectEntry(Cache<Ice::Identity, ObjectEntry>&, const Ice::Identity&) : + Allocatable(false) { } @@ -135,6 +141,7 @@ ObjectEntry::set(const string& app, const ObjectInfo& info) { _application = app; _info = info; + _allocatable = info.allocatable; } Ice::ObjectPrx diff --git a/cpp/src/IceGrid/ObjectCache.h b/cpp/src/IceGrid/ObjectCache.h index d89cd5858b8..cbc31e9405d 100644 --- a/cpp/src/IceGrid/ObjectCache.h +++ b/cpp/src/IceGrid/ObjectCache.h @@ -15,6 +15,7 @@ #include <Ice/CommunicatorF.h> #include <IceGrid/Cache.h> #include <IceGrid/Internal.h> +#include <IceGrid/Allocatable.h> namespace IceGrid { @@ -24,10 +25,7 @@ class ObjectCache; class ServerEntry; typedef IceUtil::Handle<ServerEntry> ServerEntryPtr; -class ObjectEntry; -typedef IceUtil::Handle<ObjectEntry> ObjectEntryPtr; - -class ObjectEntry : public IceUtil::Shared +class ObjectEntry : public Allocatable { public: @@ -67,6 +65,33 @@ private: std::map<std::string, std::set<Ice::Identity> > _types; }; +class ObjectAllocationRequest : public AllocationRequest +{ +public: + + ObjectAllocationRequest(const SessionIPtr& session) : AllocationRequest(session) { } + + virtual void response(const Ice::ObjectPrx&) = 0; + +private: + + virtual void allocated(const AllocatablePtr& allocatable) + { + response(ObjectEntryPtr::dynamicCast(allocatable)->getProxy()); + } + + virtual void timeout() + { + response(0); + } + + virtual void canceled() + { + response(0); + } +}; +typedef IceUtil::Handle<ObjectAllocationRequest> ObjectAllocationRequestPtr; + }; #endif diff --git a/cpp/src/IceGrid/QueryI.cpp b/cpp/src/IceGrid/QueryI.cpp index 74133e0a64d..e317bc4eee4 100644 --- a/cpp/src/IceGrid/QueryI.cpp +++ b/cpp/src/IceGrid/QueryI.cpp @@ -10,14 +10,39 @@ #include <IceGrid/Internal.h> #include <IceGrid/QueryI.h> #include <IceGrid/Database.h> +#include <IceGrid/ObjectCache.h> +#include <IceGrid/SessionI.h> using namespace std; using namespace Ice; using namespace IceGrid; -QueryI::QueryI(const CommunicatorPtr& communicator, const DatabasePtr& database) : +class GetObjectProxy : public ObjectAllocationRequest +{ +public: + + GetObjectProxy(const SessionIPtr& session, const AMD_Query_findObjectByIdPtr& cb) : + ObjectAllocationRequest(session), _cb(cb) + { + } + + virtual void + response(const Ice::ObjectPrx& proxy) + { + assert(_cb); + _cb->ice_response(proxy); + _cb = 0; + } + +private: + + AMD_Query_findObjectByIdPtr _cb; +}; + +QueryI::QueryI(const CommunicatorPtr& communicator, const DatabasePtr& database, const SessionIPtr& session) : _communicator(communicator), - _database(database) + _database(database), + _session(session) { } @@ -25,55 +50,67 @@ QueryI::~QueryI() { } -Ice::ObjectPrx -QueryI::findObjectById(const Ice::Identity& id, const Ice::Current&) const +void +QueryI::findObjectById_async(const AMD_Query_findObjectByIdPtr& cb, const Ice::Identity& id, const Ice::Current&) const { try { - return _database->getObjectProxy(id); + if(_session) + { + _database->allocateObject(id, new GetObjectProxy(_session, cb), false); + } + else + { + cb->ice_response(_database->getObjectProxy(id)); + } } catch(const ObjectNotRegisteredException&) { - return 0; + cb->ice_response(0); } } -Ice::ObjectPrx -QueryI::findObjectByType(const string& type, const Ice::Current&) const +void +QueryI::findObjectByType_async(const AMD_Query_findObjectByTypePtr& cb, const string& type, const Ice::Current&) const { try { - return _database->getObjectByType(type); + cb->ice_response(_database->getObjectByType(type)); } catch(const ObjectNotRegisteredException&) { - return 0; + cb->ice_response(0); } } -Ice::ObjectPrx -QueryI::findObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample, const Ice::Current&) const +void +QueryI::findObjectByTypeOnLeastLoadedNode_async(const AMD_Query_findObjectByTypeOnLeastLoadedNodePtr& cb, + const string& type, + LoadSample sample, + const Ice::Current&) const { try { - return _database->getObjectByTypeOnLeastLoadedNode(type, sample); + cb->ice_response(_database->getObjectByTypeOnLeastLoadedNode(type, sample)); } catch(const ObjectNotRegisteredException&) { - return 0; + cb->ice_response(0); } } -Ice::ObjectProxySeq -QueryI::findAllObjectsByType(const string& type, const Ice::Current&) const +void +QueryI::findAllObjectsByType_async(const AMD_Query_findAllObjectsByTypePtr& cb, + const string& type, + const Ice::Current&) const { try { - return _database->getObjectsByType(type); + cb->ice_response(_database->getObjectsByType(type)); } catch(const ObjectNotRegisteredException&) { - return Ice::ObjectProxySeq(); + cb->ice_response(Ice::ObjectProxySeq()); } } diff --git a/cpp/src/IceGrid/QueryI.h b/cpp/src/IceGrid/QueryI.h index 423dcd23c64..072345eda1a 100644 --- a/cpp/src/IceGrid/QueryI.h +++ b/cpp/src/IceGrid/QueryI.h @@ -19,23 +19,34 @@ namespace IceGrid class Database; typedef IceUtil::Handle<Database> DatabasePtr; +class SessionI; +typedef IceUtil::Handle<SessionI> SessionIPtr; + class QueryI : public Query, public IceUtil::Mutex { public: - QueryI(const Ice::CommunicatorPtr&, const DatabasePtr&); + QueryI(const Ice::CommunicatorPtr&, const DatabasePtr&, const SessionIPtr&); virtual ~QueryI(); - virtual ::Ice::ObjectPrx findObjectById(const ::Ice::Identity&, const ::Ice::Current&) const; - virtual ::Ice::ObjectPrx findObjectByType(const ::std::string&, const ::Ice::Current&) const; - virtual ::Ice::ObjectPrx findObjectByTypeOnLeastLoadedNode(const ::std::string&, LoadSample, - const ::Ice::Current&) const; - virtual ::Ice::ObjectProxySeq findAllObjectsByType(const ::std::string&, const ::Ice::Current&) const; + virtual void findObjectById_async(const AMD_Query_findObjectByIdPtr&, const ::Ice::Identity&, + const ::Ice::Current&) const; + + virtual void findObjectByType_async(const AMD_Query_findObjectByTypePtr&, const ::std::string&, + const ::Ice::Current&) const; + + virtual void findObjectByTypeOnLeastLoadedNode_async(const AMD_Query_findObjectByTypeOnLeastLoadedNodePtr&, + const ::std::string&, LoadSample, + const ::Ice::Current&) const; + + virtual void findAllObjectsByType_async(const AMD_Query_findAllObjectsByTypePtr&, const ::std::string&, + const ::Ice::Current&) const; private: - Ice::CommunicatorPtr _communicator; - DatabasePtr _database; + const Ice::CommunicatorPtr _communicator; + const DatabasePtr _database; + const SessionIPtr _session; }; } diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp index bb8ef76e42d..6691fd1d1f3 100644 --- a/cpp/src/IceGrid/RegistryI.cpp +++ b/cpp/src/IceGrid/RegistryI.cpp @@ -234,23 +234,29 @@ RegistryI::start(bool nowarn) // Create the query, admin, session manager interfaces // Identity queryId = stringToIdentity(instanceName + "/Query"); - clientAdapter->add(new QueryI(_communicator, _database), queryId); + clientAdapter->add(new QueryI(_communicator, _database, 0), queryId); + + ReapThreadPtr reaper = _adminReaper ? _adminReaper : _reaper; // TODO: XXX + + Identity sessionMgrId = stringToIdentity(instanceName + "/SessionManager"); + ObjectPtr sessionMgr = new ClientSessionManagerI(_database, reaper, adminSessionTimeout); // TODO: XXX + clientAdapter->add(sessionMgr, sessionMgrId); Identity adminId = stringToIdentity(instanceName + "/Admin"); adminAdapter->add(new AdminI(_database, this, traceLevels), adminId); - Identity sessionManagerId = stringToIdentity(instanceName + "/SessionManager"); - ReapThreadPtr reaper = _adminReaper ? _adminReaper : _reaper; - ObjectPtr sessionMgr = new AdminSessionManagerI(*regTopic, *nodeTopic, _database, reaper, adminSessionTimeout); - adminAdapter->add(sessionMgr, sessionManagerId); + Identity admSessionMgrId = stringToIdentity(instanceName + "/AdminSessionManager"); + ObjectPtr admSessionMgr = new AdminSessionManagerI(*regTopic, *nodeTopic, _database, reaper, adminSessionTimeout); + adminAdapter->add(admSessionMgr, admSessionMgrId); // // Register well known objects with the object registry. // addWellKnownObject(registryAdapter->createProxy(registryId), Registry::ice_staticId()); addWellKnownObject(clientAdapter->createProxy(queryId), Query::ice_staticId()); + addWellKnownObject(clientAdapter->createProxy(sessionMgrId), SessionManager::ice_staticId()); addWellKnownObject(adminAdapter->createProxy(adminId), Admin::ice_staticId()); - addWellKnownObject(adminAdapter->createProxy(sessionManagerId), SessionManager::ice_staticId()); + addWellKnownObject(adminAdapter->createProxy(admSessionMgrId), SessionManager::ice_staticId()); // // We are ready to go! diff --git a/cpp/src/IceGrid/SessionI.cpp b/cpp/src/IceGrid/SessionI.cpp index 75210a23b22..fd4c003978f 100644 --- a/cpp/src/IceGrid/SessionI.cpp +++ b/cpp/src/IceGrid/SessionI.cpp @@ -9,11 +9,44 @@ #include <Ice/Ice.h> #include <IceGrid/SessionI.h> +#include <IceGrid/QueryI.h> #include <IceGrid/Database.h> using namespace std; using namespace IceGrid; +class AllocateObject : public ObjectAllocationRequest +{ +public: + + AllocateObject(const SessionIPtr& session, const AMD_Session_allocateObjectPtr& cb) : + ObjectAllocationRequest(session), _cb(cb) + { + } + + virtual void response(const Ice::ObjectPrx& proxy) + { + assert(_cb); + if(proxy) + { + _cb->ice_response(); + } + else + { + // + // TODO: The request might also have been canceled! + // + + _cb->ice_exception(AllocationTimeoutException()); + } + _cb = 0; + } + +private: + + AMD_Session_allocateObjectPtr _cb; +}; + SessionReapable::SessionReapable(const SessionIPtr& session, const SessionPrx& proxy) : _session(session), _proxy(proxy) @@ -36,19 +69,30 @@ SessionReapable::destroy() _proxy->destroy(); } -SessionI::SessionI(const string& userId, const string& prefix, const DatabasePtr& database, int timeout) : +SessionI::SessionI(const string& userId, + const string& prefix, + const DatabasePtr& database, + const Ice::ObjectAdapterPtr& adapter, + int timeout) : _userId(userId), _prefix(prefix), _timeout(timeout), _traceLevels(database->getTraceLevels()), _database(database), - _destroyed(false) + _destroyed(false), + _allocationTimeout(-1) { if(_traceLevels && _traceLevels->session > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->sessionCat); out << _prefix << " session `" << _userId << "' created"; } + + // + // Register session based query and locator interfaces + // + _query = QueryPrx::uncheckedCast(adapter->addWithUUID(new QueryI(adapter->getCommunicator(), _database, this))); + //_locator = adapter->addWithUUID(new LocatorI()); } SessionI::~SessionI() @@ -84,45 +128,46 @@ SessionI::getTimeout(const Ice::Current&) const QueryPrx SessionI::getQuery(const Ice::Current& current) const { - // - // TODO: XXX - // - return QueryPrx::uncheckedCast( - current.adapter->getCommunicator()->stringToProxy(_database->getInstanceName() + "/Query")); + return _query; } Ice::LocatorPrx SessionI::getLocator(const Ice::Current& current) const { - // - // TODO: XXX - // - return Ice::LocatorPrx::uncheckedCast( - current.adapter->getCommunicator()->stringToProxy(_database->getInstanceName() + "/Locator")); + return _locator; } void -SessionI::allocateObject(const Ice::ObjectPrx& proxy, const Ice::Current&) +SessionI::allocateObject_async(const AMD_Session_allocateObjectPtr& cb, const Ice::ObjectPrx& prx, const Ice::Current&) { // - // TODO: XXX + // TODO: Check if the proxy points to a replicated object and eventually throw if that's the case. // + if(!prx) + { + throw AllocationException("proxy is null"); + } + _database->allocateObject(prx->ice_getIdentity(), new AllocateObject(this, cb), true); } void -SessionI::releaseObject(const Ice::ObjectPrx& proxy, const Ice::Current&) +SessionI::releaseObject(const Ice::ObjectPrx& prx, const Ice::Current&) { // - // TODO: XXX + // TODO: Check if the proxy points to a replicated object and eventually throw if that's the case. // + if(!prx) + { + throw AllocationException("proxy is null"); + } + _database->releaseObject(prx->ice_getIdentity(), this); } void SessionI::setAllocationTimeout(int timeout, const Ice::Current&) { - // - // TODO: XXX - // + Lock sync(*this); + _allocationTimeout = timeout; } void @@ -152,8 +197,30 @@ SessionI::timestamp() const return _timestamp; } -ClientSessionI::ClientSessionI(const string& userId, const DatabasePtr& database, int timeout) : - SessionI(userId, "client", database, timeout) +int +SessionI::getAllocationTimeout() const +{ + Lock sync(*this); + return _allocationTimeout; +} + +void +SessionI::addAllocationRequest(const AllocationRequestPtr& request) +{ + Lock sync(*this); + _allocations.insert(request); +} + +void +SessionI::removeAllocationRequest(const AllocationRequestPtr& request) +{ + Lock sync(*this); + _allocations.erase(request); +} + +ClientSessionI::ClientSessionI(const string& userId, const DatabasePtr& database, const Ice::ObjectAdapterPtr& adapter, + int timeout) : + SessionI(userId, "client", database, adapter, timeout) { } @@ -173,14 +240,14 @@ ClientSessionManagerI::create(const string& userId, const Glacier2::SessionContr // We don't add the session to the reaper thread, Glacier2 takes // care of reaping the session. // - SessionIPtr session = new ClientSessionI(userId, _database, _sessionTimeout); - return Glacier2::SessionPrx::uncheckedCast(current.adapter->addWithUUID(session)); + SessionIPtr session = new ClientSessionI(userId, _database, current.adapter, _sessionTimeout); + return Glacier2::SessionPrx::uncheckedCast(current.adapter->addWithUUID(session)); // TODO: XXX: category = userid? } SessionPrx ClientSessionManagerI::createLocalSession(const string& userId, const Ice::Current& current) { - SessionIPtr session = new ClientSessionI(userId, _database, _sessionTimeout); + SessionIPtr session = new ClientSessionI(userId, _database, current.adapter, _sessionTimeout); SessionPrx proxy = SessionPrx::uncheckedCast(current.adapter->addWithUUID(session)); _reaper->add(new SessionReapable(session, proxy)); return proxy; diff --git a/cpp/src/IceGrid/SessionI.h b/cpp/src/IceGrid/SessionI.h index 8e72d5b9cf5..2d8216e23b9 100644 --- a/cpp/src/IceGrid/SessionI.h +++ b/cpp/src/IceGrid/SessionI.h @@ -24,6 +24,9 @@ typedef IceUtil::Handle<Database> DatabasePtr; class TraceLevels; typedef IceUtil::Handle<TraceLevels> TraceLevelsPtr; +class AllocationRequest; +typedef IceUtil::Handle<AllocationRequest> AllocationRequestPtr; + class SessionI; typedef IceUtil::Handle<SessionI> SessionIPtr; @@ -53,31 +56,39 @@ public: virtual int getTimeout(const Ice::Current&) const; virtual QueryPrx getQuery(const Ice::Current&) const; virtual Ice::LocatorPrx getLocator(const Ice::Current&) const; - virtual void allocateObject(const Ice::ObjectPrx&, const Ice::Current&); + virtual void allocateObject_async(const AMD_Session_allocateObjectPtr&, const Ice::ObjectPrx&,const Ice::Current&); virtual void releaseObject(const Ice::ObjectPrx&, const Ice::Current&); virtual void setAllocationTimeout(int, const Ice::Current&); virtual void destroy(const Ice::Current&); virtual IceUtil::Time timestamp() const; + virtual int getAllocationTimeout() const; + + void addAllocationRequest(const AllocationRequestPtr&); + void removeAllocationRequest(const AllocationRequestPtr&); protected: - SessionI(const std::string&, const std::string&, const DatabasePtr&, int); + SessionI(const std::string&, const std::string&, const DatabasePtr&, const Ice::ObjectAdapterPtr&, int); const std::string _userId; const std::string _prefix; const int _timeout; const TraceLevelsPtr _traceLevels; const DatabasePtr _database; + IceGrid::QueryPrx _query; + Ice::LocatorPrx _locator; bool _destroyed; IceUtil::Time _timestamp; + int _allocationTimeout; + std::set<AllocationRequestPtr> _allocations; }; class ClientSessionI : public SessionI { public: - ClientSessionI(const std::string&, const DatabasePtr&, int); + ClientSessionI(const std::string&, const DatabasePtr&, const Ice::ObjectAdapterPtr&, int); }; class ClientSessionManagerI : virtual public SessionManager |