summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceBox/Makefile2
-rw-r--r--cpp/src/IceGrid/AdminSessionI.cpp11
-rw-r--r--cpp/src/IceGrid/AdminSessionI.h3
-rw-r--r--cpp/src/IceGrid/Allocatable.cpp187
-rw-r--r--cpp/src/IceGrid/Allocatable.h86
-rw-r--r--cpp/src/IceGrid/Cache.h1
-rw-r--r--cpp/src/IceGrid/Database.cpp43
-rw-r--r--cpp/src/IceGrid/Database.h4
-rw-r--r--cpp/src/IceGrid/DescriptorBuilder.cpp2
-rw-r--r--cpp/src/IceGrid/DescriptorHelper.cpp1
-rw-r--r--cpp/src/IceGrid/LocatorI.cpp9
-rw-r--r--cpp/src/IceGrid/Makefile1
-rw-r--r--cpp/src/IceGrid/ObjectCache.cpp11
-rw-r--r--cpp/src/IceGrid/ObjectCache.h33
-rw-r--r--cpp/src/IceGrid/QueryI.cpp73
-rw-r--r--cpp/src/IceGrid/QueryI.h27
-rw-r--r--cpp/src/IceGrid/RegistryI.cpp18
-rw-r--r--cpp/src/IceGrid/SessionI.cpp115
-rw-r--r--cpp/src/IceGrid/SessionI.h17
19 files changed, 570 insertions, 74 deletions
diff --git a/cpp/src/IceBox/Makefile b/cpp/src/IceBox/Makefile
index 3d616795891..be838688b9f 100644
--- a/cpp/src/IceBox/Makefile
+++ b/cpp/src/IceBox/Makefile
@@ -57,7 +57,7 @@ $(libdir)/$(LIBNAME): $(libdir)/$(SONAME)
$(SERVER): $(SOBJS) $(LIBTARGETS)
rm -f $@
- $(CXX) $(LDFLAGS) -o $@ $(SOBJS) -lIceBox $(LIBS)
+ $(CXX) $(LDFLAGS) -o $@ $(SOBJS) -lIceStorm -lIceBox $(LIBS)
$(ADMIN): $(AOBJS) $(LIBTARGETS)
rm -f $@
diff --git a/cpp/src/IceGrid/AdminSessionI.cpp b/cpp/src/IceGrid/AdminSessionI.cpp
index a07ee5b4b67..243ce14a8f4 100644
--- a/cpp/src/IceGrid/AdminSessionI.cpp
+++ b/cpp/src/IceGrid/AdminSessionI.cpp
@@ -16,10 +16,11 @@ using namespace IceGrid;
AdminSessionI::AdminSessionI(const string& userId,
const DatabasePtr& database,
+ const Ice::ObjectAdapterPtr& adapter,
RegistryObserverTopic& registryObserverTopic,
NodeObserverTopic& nodeObserverTopic,
int timeout) :
- SessionI(userId, "admin", database, timeout),
+ SessionI(userId, "admin", database, adapter, timeout),
_updating(false),
_registryObserverTopic(registryObserverTopic),
_nodeObserverTopic(nodeObserverTopic)
@@ -267,16 +268,16 @@ AdminSessionManagerI::create(const string& userId, const Glacier2::SessionContro
// We don't add the session to the reaper thread, Glacier2 takes
// care of reaping the session.
//
- SessionIPtr session =
- new AdminSessionI(userId, _database, _registryObserverTopic, _nodeObserverTopic, _sessionTimeout);
+ SessionIPtr session = new AdminSessionI(userId, _database, current.adapter, _registryObserverTopic,
+ _nodeObserverTopic, _sessionTimeout);
return Glacier2::SessionPrx::uncheckedCast(current.adapter->addWithUUID(session));
}
SessionPrx
AdminSessionManagerI::createLocalSession(const string& userId, const Ice::Current& current)
{
- SessionIPtr session =
- new AdminSessionI(userId, _database, _registryObserverTopic, _nodeObserverTopic, _sessionTimeout);
+ SessionIPtr session = new AdminSessionI(userId, _database, current.adapter, _registryObserverTopic,
+ _nodeObserverTopic, _sessionTimeout);
SessionPrx proxy = SessionPrx::uncheckedCast(current.adapter->addWithUUID(session));
_reaper->add(new SessionReapable(session, proxy));
return proxy;
diff --git a/cpp/src/IceGrid/AdminSessionI.h b/cpp/src/IceGrid/AdminSessionI.h
index 3b3c4e831dd..38eb00653b9 100644
--- a/cpp/src/IceGrid/AdminSessionI.h
+++ b/cpp/src/IceGrid/AdminSessionI.h
@@ -20,7 +20,8 @@ class AdminSessionI : public SessionI, public AdminSession
{
public:
- AdminSessionI(const std::string&, const DatabasePtr&, RegistryObserverTopic&, NodeObserverTopic&, int);
+ AdminSessionI(const std::string&, const DatabasePtr&, const Ice::ObjectAdapterPtr&, RegistryObserverTopic&,
+ NodeObserverTopic&, int);
virtual ~AdminSessionI();
virtual AdminPrx getAdmin(const Ice::Current&) const;
diff --git a/cpp/src/IceGrid/Allocatable.cpp b/cpp/src/IceGrid/Allocatable.cpp
new file mode 100644
index 00000000000..4760bafe09b
--- /dev/null
+++ b/cpp/src/IceGrid/Allocatable.cpp
@@ -0,0 +1,187 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2006 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceGrid/Allocatable.h>
+#include <IceGrid/SessionI.h>
+
+using namespace std;
+using namespace IceGrid;
+
+AllocationRequest::~AllocationRequest()
+{
+}
+
+bool
+AllocationRequest::setAllocatable(const AllocatablePtr& allocatable)
+{
+ Lock sync(*this);
+ assert(!_allocatable);
+ if(_canceled)
+ {
+ return false;
+ }
+ _allocatable = allocatable;
+ allocated(_allocatable);
+ return true;
+}
+
+void
+AllocationRequest::cancel()
+{
+ Lock sync(*this);
+ if(_canceled)
+ {
+ return;
+ }
+ _canceled = true;
+ canceled();
+}
+
+bool
+AllocationRequest::checkTimeout(const IceUtil::Time& now)
+{
+ assert(_timeout > 0);
+ {
+ Lock sync(*this);
+ if(_canceled)
+ {
+ return true;
+ }
+ _canceled = _expiration < now;
+ if(!_canceled)
+ {
+ return false;
+ }
+ timeout();
+ }
+ _session->removeAllocationRequest(this);
+ return true;
+}
+
+void
+AllocationRequest::allocate()
+{
+ _session->addAllocationRequest(this);
+}
+
+void
+AllocationRequest::release(const SessionIPtr& session)
+{
+ //
+ // Check if the session releasing the object is indeed the session
+ // which initiated the allocation request.
+ //
+ if(_session != session)
+ {
+ throw AllocationException("can't release object which is not allocated");
+ }
+ _session->removeAllocationRequest(this);
+}
+
+bool
+AllocationRequest::operator<(const AllocationRequest& r) const
+{
+ return this < &r;
+}
+
+AllocationRequest::AllocationRequest(const SessionIPtr& session) :
+ _session(session),
+ _timeout(_session->getAllocationTimeout()),
+ _expiration(_timeout > 0 ? (IceUtil::Time::now() + IceUtil::Time::milliSeconds(_timeout)) : IceUtil::Time()),
+ _canceled(false)
+{
+}
+
+Allocatable::Allocatable(bool allocatable) : _allocatable(allocatable), _allocated(false)
+{
+}
+
+Allocatable::~Allocatable()
+{
+}
+
+void
+Allocatable::allocate(const AllocationRequestPtr& request, bool allocateOnce)
+{
+ IceUtil::Mutex::Lock sync(_allocateMutex);
+ if(_allocatable)
+ {
+ if(_allocated)
+ {
+ if(_allocated->getSession() == request->getSession())
+ {
+ if(allocateOnce)
+ {
+ throw AllocationException("object already allocated by the session");
+ }
+ request->setAllocatable(this);
+ }
+ else if(request->getTimeout())
+ {
+ request->allocate();
+ _requests.push_back(request); // TODO: XXX: monitor request timeout if timeout != -1
+ }
+ else
+ {
+ request->timeout();
+ }
+ }
+ else if(request->setAllocatable(this))
+ {
+ _allocated = request;
+ _allocated->allocate();
+ }
+ }
+ else
+ {
+ if(allocateOnce)
+ {
+ throw AllocationException("can't allocate non allocatable object");
+ }
+ bool rc = request->setAllocatable(this);
+ assert(rc);
+ }
+}
+
+bool
+Allocatable::tryAllocate(const AllocationRequestPtr& request)
+{
+ IceUtil::Mutex::Lock sync(_allocateMutex);
+ if(_allocatable && _allocated)
+ {
+ return false;
+ }
+ else if(request->setAllocatable(this))
+ {
+ _allocated = request;
+ _allocated->allocate();
+ }
+ return true;
+}
+
+void
+Allocatable::release(const SessionIPtr& session)
+{
+ IceUtil::Mutex::Lock sync(_allocateMutex);
+ if(!_allocated)
+ {
+ throw AllocationException("object not allocated");
+ }
+ _allocated->release(session);
+ while(!_requests.empty())
+ {
+ _allocated = _requests.front();
+ _requests.pop_front();
+ if(_allocated->setAllocatable(this))
+ {
+ return;
+ }
+ }
+ _allocated = 0;
+}
diff --git a/cpp/src/IceGrid/Allocatable.h b/cpp/src/IceGrid/Allocatable.h
new file mode 100644
index 00000000000..0e315036223
--- /dev/null
+++ b/cpp/src/IceGrid/Allocatable.h
@@ -0,0 +1,86 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2006 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#ifndef ICE_GRID_ALLOCATABLE_H
+#define ICE_GRID_ALLOCATABLE_H
+
+#include <IceUtil/Handle.h>
+#include <IceUtil/Mutex.h>
+#include <IceUtil/Shared.h>
+#include <IceUtil/Time.h>
+#include <list>
+
+namespace IceGrid
+{
+
+class SessionI;
+typedef IceUtil::Handle<SessionI> SessionIPtr;
+
+class Allocatable;
+typedef IceUtil::Handle<Allocatable> AllocatablePtr;
+
+class AllocationRequest : public IceUtil::Mutex, public IceUtil::Shared
+{
+public:
+
+ virtual ~AllocationRequest();
+
+ virtual void timeout() = 0;
+ virtual void allocated(const AllocatablePtr&) = 0;
+ virtual void canceled() = 0;
+
+ bool setAllocatable(const AllocatablePtr&);
+ bool checkTimeout(const IceUtil::Time&);
+ void cancel();
+ void allocate();
+ void release(const SessionIPtr&);
+
+ int getTimeout() const { return _timeout; }
+ const SessionIPtr& getSession() const { return _session; }
+
+ bool operator<(const AllocationRequest&) const;
+
+protected:
+
+ AllocationRequest(const SessionIPtr&);
+
+private:
+
+ const SessionIPtr _session;
+ const int _timeout;
+ const IceUtil::Time _expiration;
+ bool _canceled;
+ AllocatablePtr _allocatable;
+};
+typedef IceUtil::Handle<AllocationRequest> AllocationRequestPtr;
+
+class Allocatable : public IceUtil::Shared
+{
+public:
+
+ Allocatable(bool);
+ virtual ~Allocatable();
+
+ void allocate(const AllocationRequestPtr&, bool);
+ bool tryAllocate(const AllocationRequestPtr&);
+ void release(const SessionIPtr&);
+
+ bool allocatable() const { return _allocatable; }
+
+protected:
+
+ bool _allocatable;
+ IceUtil::Mutex _allocateMutex;
+ std::list<AllocationRequestPtr> _requests;
+ AllocationRequestPtr _allocated;
+};
+
+};
+
+#endif
diff --git a/cpp/src/IceGrid/Cache.h b/cpp/src/IceGrid/Cache.h
index ab50cfdce44..2fe35ff48ef 100644
--- a/cpp/src/IceGrid/Cache.h
+++ b/cpp/src/IceGrid/Cache.h
@@ -18,7 +18,6 @@
namespace IceGrid
{
-
template<typename Key, typename Value>
class Cache : public IceUtil::Mutex
{
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index f0b9dcb3dfe..a81250ce712 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -950,12 +950,53 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
}
}
+void
+Database::allocateObject(const Ice::Identity& id, const ObjectAllocationRequestPtr& request, bool allocateOnce)
+{
+ try
+ {
+ _objectCache.get(id)->allocate(request, allocateOnce);
+ return;
+ }
+ catch(ObjectNotRegisteredException&)
+ {
+ }
+
+ Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
+ IdentityObjectInfoDict objects(connection, _objectDbName);
+ IdentityObjectInfoDict::const_iterator p = objects.find(id);
+ if(p == objects.end())
+ {
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
+ }
+ request->response(p->second.proxy);
+}
+
+void
+Database::releaseObject(const Ice::Identity& id, const SessionIPtr& session)
+{
+ try
+ {
+ _objectCache.get(id)->release(session);
+ return;
+ }
+ catch(ObjectNotRegisteredException&)
+ {
+ }
+}
+
Ice::ObjectPrx
Database::getObjectProxy(const Ice::Identity& id)
{
try
{
- return _objectCache.get(id)->getProxy();
+ //
+ // Only return proxies for non allocatable objects.
+ //
+ ObjectEntryPtr entry = _objectCache.get(id);
+ return entry->allocatable() ? Ice::ObjectPrx() : entry->getProxy();
}
catch(ObjectNotRegisteredException&)
{
diff --git a/cpp/src/IceGrid/Database.h b/cpp/src/IceGrid/Database.h
index ee0dc448650..791268fd8d4 100644
--- a/cpp/src/IceGrid/Database.h
+++ b/cpp/src/IceGrid/Database.h
@@ -87,6 +87,10 @@ public:
void addObject(const ObjectInfo&);
void removeObject(const Ice::Identity&);
void updateObject(const Ice::ObjectPrx&);
+
+ void allocateObject(const Ice::Identity&, const ObjectAllocationRequestPtr&, bool);
+ void releaseObject(const Ice::Identity&, const SessionIPtr&);
+
Ice::ObjectPrx getObjectProxy(const Ice::Identity&);
Ice::ObjectPrx getObjectByType(const std::string&);
Ice::ObjectPrx getObjectByTypeOnLeastLoadedNode(const std::string&, LoadSample);
diff --git a/cpp/src/IceGrid/DescriptorBuilder.cpp b/cpp/src/IceGrid/DescriptorBuilder.cpp
index 87482df7a8c..6f46b9f8bd1 100644
--- a/cpp/src/IceGrid/DescriptorBuilder.cpp
+++ b/cpp/src/IceGrid/DescriptorBuilder.cpp
@@ -293,6 +293,7 @@ ApplicationDescriptorBuilder::addObject(const XmlAttributesHelper& attrs)
ObjectDescriptor object;
object.type = attrs("type", "");
object.id = Ice::stringToIdentity(attrs("identity"));
+ object.allocatable = attrs.asBool("allocatable", false);
_descriptor.replicaGroups.back().objects.push_back(object);
}
@@ -611,6 +612,7 @@ CommunicatorDescriptorBuilder::addObject(const XmlAttributesHelper& attrs)
ObjectDescriptor object;
object.type = attrs("type", "");
object.id = Ice::stringToIdentity(attrs("identity"));
+ object.allocatable = attrs.asBool("allocatable", false);
_descriptor->adapters.back().objects.push_back(object);
}
diff --git a/cpp/src/IceGrid/DescriptorHelper.cpp b/cpp/src/IceGrid/DescriptorHelper.cpp
index c1c93f30e9b..c50e54667d6 100644
--- a/cpp/src/IceGrid/DescriptorHelper.cpp
+++ b/cpp/src/IceGrid/DescriptorHelper.cpp
@@ -735,6 +735,7 @@ CommunicatorHelper::instantiateImpl(const CommunicatorDescriptorPtr& instance, c
ObjectDescriptor obj;
obj.type = resolve(q->type, "object type");
obj.id = Ice::stringToIdentity(resolve(Ice::identityToString(q->id), "object identity", false));
+ obj.allocatable = q->allocatable;
if(obj.id.name.empty())
{
resolve.exception("invalid object identity `" + Ice::identityToString(q->id) + "': name empty");
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp
index ee19b2c19f1..bcc2a893df7 100644
--- a/cpp/src/IceGrid/LocatorI.cpp
+++ b/cpp/src/IceGrid/LocatorI.cpp
@@ -310,6 +310,15 @@ LocatorI::findObjectById_async(const Ice::AMD_Locator_findObjectByIdPtr& cb,
}
//
+ // If the proxy is 0, this means that the object is allocatable.
+ //
+ if(!proxy)
+ {
+ cb->ice_response(0);
+ return;
+ }
+
+ //
// OPTIMIZATION: If the object is registered with an adapter id,
// try to get the adapter direct proxy (which might caused the
// server activation). This will avoid the client to lookup for
diff --git a/cpp/src/IceGrid/Makefile b/cpp/src/IceGrid/Makefile
index 7de85c965f9..cfb3f845ef8 100644
--- a/cpp/src/IceGrid/Makefile
+++ b/cpp/src/IceGrid/Makefile
@@ -56,6 +56,7 @@ REGISTRY_OBJS = RegistryI.o \
IdentityObjectInfoDict.o \
StringAdapterInfoDict.o \
Database.o \
+ Allocatable.o \
AdapterCache.o \
ObjectCache.o \
ServerCache.o \
diff --git a/cpp/src/IceGrid/ObjectCache.cpp b/cpp/src/IceGrid/ObjectCache.cpp
index a14e2b46928..d8d89d8583f 100644
--- a/cpp/src/IceGrid/ObjectCache.cpp
+++ b/cpp/src/IceGrid/ObjectCache.cpp
@@ -33,6 +33,7 @@ ObjectCache::add(const string& app, const string& adapterId, const string& endpo
ObjectInfo info;
info.type = desc.type;
+ info.allocatable = desc.allocatable;
if(adapterId.empty())
{
info.proxy = _communicator->stringToProxy(Ice::identityToString(desc.id) + ":" + endpoints);
@@ -106,7 +107,11 @@ ObjectCache::getObjectsByType(const string& type)
}
for(set<Ice::Identity>::const_iterator q = p->second.begin(); q != p->second.end(); ++q)
{
- proxies.push_back(getImpl(*q)->getProxy());
+ ObjectEntryPtr entry = getImpl(*q);
+ if(!entry->allocatable())
+ {
+ proxies.push_back(entry->getProxy());
+ }
}
return proxies;
}
@@ -126,7 +131,8 @@ ObjectCache::getAll(const string& expression)
return infos;
}
-ObjectEntry::ObjectEntry(Cache<Ice::Identity, ObjectEntry>&, const Ice::Identity&)
+ObjectEntry::ObjectEntry(Cache<Ice::Identity, ObjectEntry>&, const Ice::Identity&) :
+ Allocatable(false)
{
}
@@ -135,6 +141,7 @@ ObjectEntry::set(const string& app, const ObjectInfo& info)
{
_application = app;
_info = info;
+ _allocatable = info.allocatable;
}
Ice::ObjectPrx
diff --git a/cpp/src/IceGrid/ObjectCache.h b/cpp/src/IceGrid/ObjectCache.h
index d89cd5858b8..cbc31e9405d 100644
--- a/cpp/src/IceGrid/ObjectCache.h
+++ b/cpp/src/IceGrid/ObjectCache.h
@@ -15,6 +15,7 @@
#include <Ice/CommunicatorF.h>
#include <IceGrid/Cache.h>
#include <IceGrid/Internal.h>
+#include <IceGrid/Allocatable.h>
namespace IceGrid
{
@@ -24,10 +25,7 @@ class ObjectCache;
class ServerEntry;
typedef IceUtil::Handle<ServerEntry> ServerEntryPtr;
-class ObjectEntry;
-typedef IceUtil::Handle<ObjectEntry> ObjectEntryPtr;
-
-class ObjectEntry : public IceUtil::Shared
+class ObjectEntry : public Allocatable
{
public:
@@ -67,6 +65,33 @@ private:
std::map<std::string, std::set<Ice::Identity> > _types;
};
+class ObjectAllocationRequest : public AllocationRequest
+{
+public:
+
+ ObjectAllocationRequest(const SessionIPtr& session) : AllocationRequest(session) { }
+
+ virtual void response(const Ice::ObjectPrx&) = 0;
+
+private:
+
+ virtual void allocated(const AllocatablePtr& allocatable)
+ {
+ response(ObjectEntryPtr::dynamicCast(allocatable)->getProxy());
+ }
+
+ virtual void timeout()
+ {
+ response(0);
+ }
+
+ virtual void canceled()
+ {
+ response(0);
+ }
+};
+typedef IceUtil::Handle<ObjectAllocationRequest> ObjectAllocationRequestPtr;
+
};
#endif
diff --git a/cpp/src/IceGrid/QueryI.cpp b/cpp/src/IceGrid/QueryI.cpp
index 74133e0a64d..e317bc4eee4 100644
--- a/cpp/src/IceGrid/QueryI.cpp
+++ b/cpp/src/IceGrid/QueryI.cpp
@@ -10,14 +10,39 @@
#include <IceGrid/Internal.h>
#include <IceGrid/QueryI.h>
#include <IceGrid/Database.h>
+#include <IceGrid/ObjectCache.h>
+#include <IceGrid/SessionI.h>
using namespace std;
using namespace Ice;
using namespace IceGrid;
-QueryI::QueryI(const CommunicatorPtr& communicator, const DatabasePtr& database) :
+class GetObjectProxy : public ObjectAllocationRequest
+{
+public:
+
+ GetObjectProxy(const SessionIPtr& session, const AMD_Query_findObjectByIdPtr& cb) :
+ ObjectAllocationRequest(session), _cb(cb)
+ {
+ }
+
+ virtual void
+ response(const Ice::ObjectPrx& proxy)
+ {
+ assert(_cb);
+ _cb->ice_response(proxy);
+ _cb = 0;
+ }
+
+private:
+
+ AMD_Query_findObjectByIdPtr _cb;
+};
+
+QueryI::QueryI(const CommunicatorPtr& communicator, const DatabasePtr& database, const SessionIPtr& session) :
_communicator(communicator),
- _database(database)
+ _database(database),
+ _session(session)
{
}
@@ -25,55 +50,67 @@ QueryI::~QueryI()
{
}
-Ice::ObjectPrx
-QueryI::findObjectById(const Ice::Identity& id, const Ice::Current&) const
+void
+QueryI::findObjectById_async(const AMD_Query_findObjectByIdPtr& cb, const Ice::Identity& id, const Ice::Current&) const
{
try
{
- return _database->getObjectProxy(id);
+ if(_session)
+ {
+ _database->allocateObject(id, new GetObjectProxy(_session, cb), false);
+ }
+ else
+ {
+ cb->ice_response(_database->getObjectProxy(id));
+ }
}
catch(const ObjectNotRegisteredException&)
{
- return 0;
+ cb->ice_response(0);
}
}
-Ice::ObjectPrx
-QueryI::findObjectByType(const string& type, const Ice::Current&) const
+void
+QueryI::findObjectByType_async(const AMD_Query_findObjectByTypePtr& cb, const string& type, const Ice::Current&) const
{
try
{
- return _database->getObjectByType(type);
+ cb->ice_response(_database->getObjectByType(type));
}
catch(const ObjectNotRegisteredException&)
{
- return 0;
+ cb->ice_response(0);
}
}
-Ice::ObjectPrx
-QueryI::findObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample, const Ice::Current&) const
+void
+QueryI::findObjectByTypeOnLeastLoadedNode_async(const AMD_Query_findObjectByTypeOnLeastLoadedNodePtr& cb,
+ const string& type,
+ LoadSample sample,
+ const Ice::Current&) const
{
try
{
- return _database->getObjectByTypeOnLeastLoadedNode(type, sample);
+ cb->ice_response(_database->getObjectByTypeOnLeastLoadedNode(type, sample));
}
catch(const ObjectNotRegisteredException&)
{
- return 0;
+ cb->ice_response(0);
}
}
-Ice::ObjectProxySeq
-QueryI::findAllObjectsByType(const string& type, const Ice::Current&) const
+void
+QueryI::findAllObjectsByType_async(const AMD_Query_findAllObjectsByTypePtr& cb,
+ const string& type,
+ const Ice::Current&) const
{
try
{
- return _database->getObjectsByType(type);
+ cb->ice_response(_database->getObjectsByType(type));
}
catch(const ObjectNotRegisteredException&)
{
- return Ice::ObjectProxySeq();
+ cb->ice_response(Ice::ObjectProxySeq());
}
}
diff --git a/cpp/src/IceGrid/QueryI.h b/cpp/src/IceGrid/QueryI.h
index 423dcd23c64..072345eda1a 100644
--- a/cpp/src/IceGrid/QueryI.h
+++ b/cpp/src/IceGrid/QueryI.h
@@ -19,23 +19,34 @@ namespace IceGrid
class Database;
typedef IceUtil::Handle<Database> DatabasePtr;
+class SessionI;
+typedef IceUtil::Handle<SessionI> SessionIPtr;
+
class QueryI : public Query, public IceUtil::Mutex
{
public:
- QueryI(const Ice::CommunicatorPtr&, const DatabasePtr&);
+ QueryI(const Ice::CommunicatorPtr&, const DatabasePtr&, const SessionIPtr&);
virtual ~QueryI();
- virtual ::Ice::ObjectPrx findObjectById(const ::Ice::Identity&, const ::Ice::Current&) const;
- virtual ::Ice::ObjectPrx findObjectByType(const ::std::string&, const ::Ice::Current&) const;
- virtual ::Ice::ObjectPrx findObjectByTypeOnLeastLoadedNode(const ::std::string&, LoadSample,
- const ::Ice::Current&) const;
- virtual ::Ice::ObjectProxySeq findAllObjectsByType(const ::std::string&, const ::Ice::Current&) const;
+ virtual void findObjectById_async(const AMD_Query_findObjectByIdPtr&, const ::Ice::Identity&,
+ const ::Ice::Current&) const;
+
+ virtual void findObjectByType_async(const AMD_Query_findObjectByTypePtr&, const ::std::string&,
+ const ::Ice::Current&) const;
+
+ virtual void findObjectByTypeOnLeastLoadedNode_async(const AMD_Query_findObjectByTypeOnLeastLoadedNodePtr&,
+ const ::std::string&, LoadSample,
+ const ::Ice::Current&) const;
+
+ virtual void findAllObjectsByType_async(const AMD_Query_findAllObjectsByTypePtr&, const ::std::string&,
+ const ::Ice::Current&) const;
private:
- Ice::CommunicatorPtr _communicator;
- DatabasePtr _database;
+ const Ice::CommunicatorPtr _communicator;
+ const DatabasePtr _database;
+ const SessionIPtr _session;
};
}
diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp
index bb8ef76e42d..6691fd1d1f3 100644
--- a/cpp/src/IceGrid/RegistryI.cpp
+++ b/cpp/src/IceGrid/RegistryI.cpp
@@ -234,23 +234,29 @@ RegistryI::start(bool nowarn)
// Create the query, admin, session manager interfaces
//
Identity queryId = stringToIdentity(instanceName + "/Query");
- clientAdapter->add(new QueryI(_communicator, _database), queryId);
+ clientAdapter->add(new QueryI(_communicator, _database, 0), queryId);
+
+ ReapThreadPtr reaper = _adminReaper ? _adminReaper : _reaper; // TODO: XXX
+
+ Identity sessionMgrId = stringToIdentity(instanceName + "/SessionManager");
+ ObjectPtr sessionMgr = new ClientSessionManagerI(_database, reaper, adminSessionTimeout); // TODO: XXX
+ clientAdapter->add(sessionMgr, sessionMgrId);
Identity adminId = stringToIdentity(instanceName + "/Admin");
adminAdapter->add(new AdminI(_database, this, traceLevels), adminId);
- Identity sessionManagerId = stringToIdentity(instanceName + "/SessionManager");
- ReapThreadPtr reaper = _adminReaper ? _adminReaper : _reaper;
- ObjectPtr sessionMgr = new AdminSessionManagerI(*regTopic, *nodeTopic, _database, reaper, adminSessionTimeout);
- adminAdapter->add(sessionMgr, sessionManagerId);
+ Identity admSessionMgrId = stringToIdentity(instanceName + "/AdminSessionManager");
+ ObjectPtr admSessionMgr = new AdminSessionManagerI(*regTopic, *nodeTopic, _database, reaper, adminSessionTimeout);
+ adminAdapter->add(admSessionMgr, admSessionMgrId);
//
// Register well known objects with the object registry.
//
addWellKnownObject(registryAdapter->createProxy(registryId), Registry::ice_staticId());
addWellKnownObject(clientAdapter->createProxy(queryId), Query::ice_staticId());
+ addWellKnownObject(clientAdapter->createProxy(sessionMgrId), SessionManager::ice_staticId());
addWellKnownObject(adminAdapter->createProxy(adminId), Admin::ice_staticId());
- addWellKnownObject(adminAdapter->createProxy(sessionManagerId), SessionManager::ice_staticId());
+ addWellKnownObject(adminAdapter->createProxy(admSessionMgrId), SessionManager::ice_staticId());
//
// We are ready to go!
diff --git a/cpp/src/IceGrid/SessionI.cpp b/cpp/src/IceGrid/SessionI.cpp
index 75210a23b22..fd4c003978f 100644
--- a/cpp/src/IceGrid/SessionI.cpp
+++ b/cpp/src/IceGrid/SessionI.cpp
@@ -9,11 +9,44 @@
#include <Ice/Ice.h>
#include <IceGrid/SessionI.h>
+#include <IceGrid/QueryI.h>
#include <IceGrid/Database.h>
using namespace std;
using namespace IceGrid;
+class AllocateObject : public ObjectAllocationRequest
+{
+public:
+
+ AllocateObject(const SessionIPtr& session, const AMD_Session_allocateObjectPtr& cb) :
+ ObjectAllocationRequest(session), _cb(cb)
+ {
+ }
+
+ virtual void response(const Ice::ObjectPrx& proxy)
+ {
+ assert(_cb);
+ if(proxy)
+ {
+ _cb->ice_response();
+ }
+ else
+ {
+ //
+ // TODO: The request might also have been canceled!
+ //
+
+ _cb->ice_exception(AllocationTimeoutException());
+ }
+ _cb = 0;
+ }
+
+private:
+
+ AMD_Session_allocateObjectPtr _cb;
+};
+
SessionReapable::SessionReapable(const SessionIPtr& session, const SessionPrx& proxy) :
_session(session),
_proxy(proxy)
@@ -36,19 +69,30 @@ SessionReapable::destroy()
_proxy->destroy();
}
-SessionI::SessionI(const string& userId, const string& prefix, const DatabasePtr& database, int timeout) :
+SessionI::SessionI(const string& userId,
+ const string& prefix,
+ const DatabasePtr& database,
+ const Ice::ObjectAdapterPtr& adapter,
+ int timeout) :
_userId(userId),
_prefix(prefix),
_timeout(timeout),
_traceLevels(database->getTraceLevels()),
_database(database),
- _destroyed(false)
+ _destroyed(false),
+ _allocationTimeout(-1)
{
if(_traceLevels && _traceLevels->session > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->sessionCat);
out << _prefix << " session `" << _userId << "' created";
}
+
+ //
+ // Register session based query and locator interfaces
+ //
+ _query = QueryPrx::uncheckedCast(adapter->addWithUUID(new QueryI(adapter->getCommunicator(), _database, this)));
+ //_locator = adapter->addWithUUID(new LocatorI());
}
SessionI::~SessionI()
@@ -84,45 +128,46 @@ SessionI::getTimeout(const Ice::Current&) const
QueryPrx
SessionI::getQuery(const Ice::Current& current) const
{
- //
- // TODO: XXX
- //
- return QueryPrx::uncheckedCast(
- current.adapter->getCommunicator()->stringToProxy(_database->getInstanceName() + "/Query"));
+ return _query;
}
Ice::LocatorPrx
SessionI::getLocator(const Ice::Current& current) const
{
- //
- // TODO: XXX
- //
- return Ice::LocatorPrx::uncheckedCast(
- current.adapter->getCommunicator()->stringToProxy(_database->getInstanceName() + "/Locator"));
+ return _locator;
}
void
-SessionI::allocateObject(const Ice::ObjectPrx& proxy, const Ice::Current&)
+SessionI::allocateObject_async(const AMD_Session_allocateObjectPtr& cb, const Ice::ObjectPrx& prx, const Ice::Current&)
{
//
- // TODO: XXX
+ // TODO: Check if the proxy points to a replicated object and eventually throw if that's the case.
//
+ if(!prx)
+ {
+ throw AllocationException("proxy is null");
+ }
+ _database->allocateObject(prx->ice_getIdentity(), new AllocateObject(this, cb), true);
}
void
-SessionI::releaseObject(const Ice::ObjectPrx& proxy, const Ice::Current&)
+SessionI::releaseObject(const Ice::ObjectPrx& prx, const Ice::Current&)
{
//
- // TODO: XXX
+ // TODO: Check if the proxy points to a replicated object and eventually throw if that's the case.
//
+ if(!prx)
+ {
+ throw AllocationException("proxy is null");
+ }
+ _database->releaseObject(prx->ice_getIdentity(), this);
}
void
SessionI::setAllocationTimeout(int timeout, const Ice::Current&)
{
- //
- // TODO: XXX
- //
+ Lock sync(*this);
+ _allocationTimeout = timeout;
}
void
@@ -152,8 +197,30 @@ SessionI::timestamp() const
return _timestamp;
}
-ClientSessionI::ClientSessionI(const string& userId, const DatabasePtr& database, int timeout) :
- SessionI(userId, "client", database, timeout)
+int
+SessionI::getAllocationTimeout() const
+{
+ Lock sync(*this);
+ return _allocationTimeout;
+}
+
+void
+SessionI::addAllocationRequest(const AllocationRequestPtr& request)
+{
+ Lock sync(*this);
+ _allocations.insert(request);
+}
+
+void
+SessionI::removeAllocationRequest(const AllocationRequestPtr& request)
+{
+ Lock sync(*this);
+ _allocations.erase(request);
+}
+
+ClientSessionI::ClientSessionI(const string& userId, const DatabasePtr& database, const Ice::ObjectAdapterPtr& adapter,
+ int timeout) :
+ SessionI(userId, "client", database, adapter, timeout)
{
}
@@ -173,14 +240,14 @@ ClientSessionManagerI::create(const string& userId, const Glacier2::SessionContr
// We don't add the session to the reaper thread, Glacier2 takes
// care of reaping the session.
//
- SessionIPtr session = new ClientSessionI(userId, _database, _sessionTimeout);
- return Glacier2::SessionPrx::uncheckedCast(current.adapter->addWithUUID(session));
+ SessionIPtr session = new ClientSessionI(userId, _database, current.adapter, _sessionTimeout);
+ return Glacier2::SessionPrx::uncheckedCast(current.adapter->addWithUUID(session)); // TODO: XXX: category = userid?
}
SessionPrx
ClientSessionManagerI::createLocalSession(const string& userId, const Ice::Current& current)
{
- SessionIPtr session = new ClientSessionI(userId, _database, _sessionTimeout);
+ SessionIPtr session = new ClientSessionI(userId, _database, current.adapter, _sessionTimeout);
SessionPrx proxy = SessionPrx::uncheckedCast(current.adapter->addWithUUID(session));
_reaper->add(new SessionReapable(session, proxy));
return proxy;
diff --git a/cpp/src/IceGrid/SessionI.h b/cpp/src/IceGrid/SessionI.h
index 8e72d5b9cf5..2d8216e23b9 100644
--- a/cpp/src/IceGrid/SessionI.h
+++ b/cpp/src/IceGrid/SessionI.h
@@ -24,6 +24,9 @@ typedef IceUtil::Handle<Database> DatabasePtr;
class TraceLevels;
typedef IceUtil::Handle<TraceLevels> TraceLevelsPtr;
+class AllocationRequest;
+typedef IceUtil::Handle<AllocationRequest> AllocationRequestPtr;
+
class SessionI;
typedef IceUtil::Handle<SessionI> SessionIPtr;
@@ -53,31 +56,39 @@ public:
virtual int getTimeout(const Ice::Current&) const;
virtual QueryPrx getQuery(const Ice::Current&) const;
virtual Ice::LocatorPrx getLocator(const Ice::Current&) const;
- virtual void allocateObject(const Ice::ObjectPrx&, const Ice::Current&);
+ virtual void allocateObject_async(const AMD_Session_allocateObjectPtr&, const Ice::ObjectPrx&,const Ice::Current&);
virtual void releaseObject(const Ice::ObjectPrx&, const Ice::Current&);
virtual void setAllocationTimeout(int, const Ice::Current&);
virtual void destroy(const Ice::Current&);
virtual IceUtil::Time timestamp() const;
+ virtual int getAllocationTimeout() const;
+
+ void addAllocationRequest(const AllocationRequestPtr&);
+ void removeAllocationRequest(const AllocationRequestPtr&);
protected:
- SessionI(const std::string&, const std::string&, const DatabasePtr&, int);
+ SessionI(const std::string&, const std::string&, const DatabasePtr&, const Ice::ObjectAdapterPtr&, int);
const std::string _userId;
const std::string _prefix;
const int _timeout;
const TraceLevelsPtr _traceLevels;
const DatabasePtr _database;
+ IceGrid::QueryPrx _query;
+ Ice::LocatorPrx _locator;
bool _destroyed;
IceUtil::Time _timestamp;
+ int _allocationTimeout;
+ std::set<AllocationRequestPtr> _allocations;
};
class ClientSessionI : public SessionI
{
public:
- ClientSessionI(const std::string&, const DatabasePtr&, int);
+ ClientSessionI(const std::string&, const DatabasePtr&, const Ice::ObjectAdapterPtr&, int);
};
class ClientSessionManagerI : virtual public SessionManager