diff options
Diffstat (limited to 'cpp/src/IceGrid/Database.h')
-rw-r--r-- | cpp/src/IceGrid/Database.h | 175 |
1 files changed, 84 insertions, 91 deletions
diff --git a/cpp/src/IceGrid/Database.h b/cpp/src/IceGrid/Database.h index 877f1d25a6f..f1e032be3c3 100644 --- a/cpp/src/IceGrid/Database.h +++ b/cpp/src/IceGrid/Database.h @@ -5,8 +5,6 @@ #ifndef ICE_GRID_DATABASE_H #define ICE_GRID_DATABASE_H -#include <IceUtil/Mutex.h> -#include <IceUtil/Shared.h> #include <IceUtil/FileUtil.h> #include <Ice/CommunicatorF.h> #include <IceGrid/Admin.h> @@ -25,126 +23,117 @@ namespace IceGrid { -class TraceLevels; -typedef IceUtil::Handle<TraceLevels> TraceLevelsPtr; - +class AdminSessionI; +class ApplicationHelper; class NodeSessionI; -typedef IceUtil::Handle<NodeSessionI> NodeSessionIPtr; - class ReplicaSessionI; -typedef IceUtil::Handle<ReplicaSessionI> ReplicaSessionIPtr; - -class AdminSessionI; - class ServerEntry; -typedef IceUtil::Handle<ServerEntry> ServerEntryPtr; - -class ApplicationHelper; - -typedef IceDB::Dbi<std::string, IceGrid::ApplicationInfo, IceDB::IceContext, Ice::OutputStream> - StringApplicationInfoMap; - -typedef IceDB::Dbi<Ice::Identity, IceGrid::ObjectInfo, IceDB::IceContext, Ice::OutputStream> IdentityObjectInfoMap; -typedef IceDB::Dbi<std::string, Ice::Identity, IceDB::IceContext, Ice::OutputStream> StringIdentityMap; - -typedef IceDB::Dbi<std::string, IceGrid::AdapterInfo, IceDB::IceContext, Ice::OutputStream> StringAdapterInfoMap; -typedef IceDB::Dbi<std::string, std::string, IceDB::IceContext, Ice::OutputStream> StringStringMap; - -typedef IceDB::Dbi<std::string, Ice::Long, IceDB::IceContext, Ice::OutputStream> StringLongMap; +class TraceLevels; -class Database : public IceUtil::Shared, public IceUtil::Monitor<IceUtil::Mutex> +using StringApplicationInfoMap = IceDB::Dbi<std::string, + IceGrid::ApplicationInfo, + IceDB::IceContext, Ice::OutputStream>; +using IdentityObjectInfoMap = IceDB::Dbi<Ice::Identity, + IceGrid::ObjectInfo, + IceDB::IceContext, + Ice::OutputStream>; +using StringIdentityMap = IceDB::Dbi<std::string, Ice::Identity, IceDB::IceContext, Ice::OutputStream>; +using StringAdapterInfoMap = IceDB::Dbi<std::string, IceGrid::AdapterInfo, IceDB::IceContext, Ice::OutputStream>; +using StringStringMap = IceDB::Dbi<std::string, std::string, IceDB::IceContext, Ice::OutputStream>; +using StringLongMap = IceDB::Dbi<std::string, Ice::Long, IceDB::IceContext, Ice::OutputStream>; + +class Database final { public: -#ifdef __SUNPRO_CC - using IceUtil::Monitor<IceUtil::Mutex>::lock; - using IceUtil::Monitor<IceUtil::Mutex>::unlock; -#endif - - Database(const Ice::ObjectAdapterPtr&, const IceStorm::TopicManagerPrx&, const std::string&, const TraceLevelsPtr&, - const RegistryInfo&, bool); + static std::shared_ptr<Database> + create(const std::shared_ptr<Ice::ObjectAdapter>&, const std::shared_ptr<IceStorm::TopicManagerPrx>&, + const std::string&, const std::shared_ptr<TraceLevels>&, const RegistryInfo&, bool); std::string getInstanceName() const; bool isReadOnly() const { return _readonly; } - const TraceLevelsPtr& getTraceLevels() const { return _traceLevels; } - const Ice::CommunicatorPtr& getCommunicator() const { return _communicator; } - const Ice::ObjectAdapterPtr& getInternalAdapter() { return _internalAdapter; } + const std::shared_ptr<TraceLevels>& getTraceLevels() const { return _traceLevels; } + const std::shared_ptr<Ice::Communicator>& getCommunicator() const { return _communicator; } + const std::shared_ptr<Ice::ObjectAdapter>& getInternalAdapter() { return _internalAdapter; } void destroy(); - ObserverTopicPtr getObserverTopic(TopicName) const; + std::shared_ptr<ObserverTopic> getObserverTopic(TopicName) const; int lock(AdminSessionI*, const std::string&); void unlock(AdminSessionI*); - void syncApplications(const ApplicationInfoSeq&, Ice::Long); - void syncAdapters(const AdapterInfoSeq&, Ice::Long); - void syncObjects(const ObjectInfoSeq&, Ice::Long); + void syncApplications(const ApplicationInfoSeq&, long long); + void syncAdapters(const AdapterInfoSeq&, long long); + void syncObjects(const ObjectInfoSeq&, long long); - ApplicationInfoSeq getApplications(Ice::Long&); - AdapterInfoSeq getAdapters(Ice::Long&); - ObjectInfoSeq getObjects(Ice::Long&); + ApplicationInfoSeq getApplications(long long&); + AdapterInfoSeq getAdapters(long long&); + ObjectInfoSeq getObjects(long long&); StringLongDict getSerials() const; - void addApplication(const ApplicationInfo&, AdminSessionI*, Ice::Long = 0); - void updateApplication(const ApplicationUpdateInfo&, bool, AdminSessionI*, Ice::Long = 0); + void addApplication(const ApplicationInfo&, AdminSessionI*, long long = 0); + void updateApplication(const ApplicationUpdateInfo&, bool, AdminSessionI*, long long = 0); void syncApplicationDescriptor(const ApplicationDescriptor&, bool, AdminSessionI*); void instantiateServer(const std::string&, const std::string&, const ServerInstanceDescriptor&, AdminSessionI*); - void removeApplication(const std::string&, AdminSessionI*, Ice::Long = 0); + void removeApplication(const std::string&, AdminSessionI*, long long = 0); ApplicationInfo getApplicationInfo(const std::string&); Ice::StringSeq getAllApplications(const std::string& = std::string()); - void waitForApplicationUpdate(const AMD_NodeSession_waitForApplicationUpdatePtr&, const std::string&, int); + void waitForApplicationUpdate(const std::string&, int, std::function<void()>, + std::function<void(std::exception_ptr)>); NodeCache& getNodeCache(); - NodeEntryPtr getNode(const std::string&, bool = false) const; + std::shared_ptr<NodeEntry> getNode(const std::string&, bool = false) const; ReplicaCache& getReplicaCache(); - ReplicaEntryPtr getReplica(const std::string&) const; + std::shared_ptr<ReplicaEntry> getReplica(const std::string&) const; ServerCache& getServerCache(); - ServerEntryPtr getServer(const std::string&) const; + std::shared_ptr<ServerEntry> getServer(const std::string&) const; AllocatableObjectCache& getAllocatableObjectCache(); - AllocatableObjectEntryPtr getAllocatableObject(const Ice::Identity&) const; + std::shared_ptr<AllocatableObjectEntry> getAllocatableObject(const Ice::Identity&) const; - void setAdapterDirectProxy(const std::string&, const std::string&, const Ice::ObjectPrx&, Ice::Long = 0); - Ice::ObjectPrx getAdapterDirectProxy(const std::string&, const Ice::EncodingVersion&, const Ice::ConnectionPtr&, - const Ice::Context&); + void setAdapterDirectProxy(const std::string&, const std::string&, const std::shared_ptr<Ice::ObjectPrx>&, + long long = 0); + std::shared_ptr<Ice::ObjectPrx> getAdapterDirectProxy(const std::string&, const Ice::EncodingVersion&, + const std::shared_ptr<Ice::Connection>&, + const Ice::Context&); void removeAdapter(const std::string&); - AdapterPrx getAdapterProxy(const std::string&, const std::string&, bool); - void getLocatorAdapterInfo(const std::string&, const Ice::ConnectionPtr&, const Ice::Context&, + std::shared_ptr<AdapterPrx> getAdapterProxy(const std::string&, const std::string&, bool); + void getLocatorAdapterInfo(const std::string&, const std::shared_ptr<Ice::Connection>&, const Ice::Context&, LocatorAdapterInfoSeq&, int&, bool&, bool&, const std::set<std::string>& = std::set<std::string>()); - bool addAdapterSyncCallback(const std::string&, const SynchronizationCallbackPtr&, + bool addAdapterSyncCallback(const std::string&, const std::shared_ptr<SynchronizationCallback>&, const std::set<std::string>& = std::set<std::string>()); - std::vector<std::pair<std::string, AdapterPrx> > getAdapters(const std::string&, int&, bool&); + std::vector<std::pair<std::string, std::shared_ptr<AdapterPrx>>> getAdapters(const std::string&, int&, bool&); AdapterInfoSeq getAdapterInfo(const std::string&); - AdapterInfoSeq getFilteredAdapterInfo(const std::string&, const Ice::ConnectionPtr&, const Ice::Context&); + AdapterInfoSeq getFilteredAdapterInfo(const std::string&, const std::shared_ptr<Ice::Connection>&, const Ice::Context&); std::string getAdapterServer(const std::string&) const; std::string getAdapterApplication(const std::string&) const; std::string getAdapterNode(const std::string&) const; Ice::StringSeq getAllAdapters(const std::string& = std::string()); void addObject(const ObjectInfo&); - void addOrUpdateObject(const ObjectInfo&, Ice::Long = 0); - void removeObject(const Ice::Identity&, Ice::Long = 0); - void updateObject(const Ice::ObjectPrx&); + void addOrUpdateObject(const ObjectInfo&, long long = 0); + void removeObject(const Ice::Identity&, long long = 0); + void updateObject(const std::shared_ptr<Ice::ObjectPrx>&); int addOrUpdateRegistryWellKnownObjects(const ObjectInfoSeq&); int removeRegistryWellKnownObjects(const ObjectInfoSeq&); - Ice::ObjectPrx getObjectProxy(const Ice::Identity&); - Ice::ObjectPrx getObjectByType(const std::string&, - const Ice::ConnectionPtr& = Ice::ConnectionPtr(), - const Ice::Context& = Ice::Context()); - Ice::ObjectPrx getObjectByTypeOnLeastLoadedNode(const std::string&, LoadSample, - const Ice::ConnectionPtr& = Ice::ConnectionPtr(), + std::shared_ptr<Ice::ObjectPrx> getObjectProxy(const Ice::Identity&); + std::shared_ptr<Ice::ObjectPrx> getObjectByType(const std::string&, + const std::shared_ptr<Ice::Connection>& = nullptr, const Ice::Context& = Ice::Context()); + std::shared_ptr<Ice::ObjectPrx> getObjectByTypeOnLeastLoadedNode(const std::string&, LoadSample, + const std::shared_ptr<Ice::Connection>& = nullptr, + const Ice::Context& = Ice::Context()); Ice::ObjectProxySeq getObjectsByType(const std::string&, - const Ice::ConnectionPtr& = Ice::ConnectionPtr(), + const std::shared_ptr<Ice::Connection>& = nullptr, const Ice::Context& = Ice::Context()); ObjectInfo getObjectInfo(const Ice::Identity&); ObjectInfoSeq getObjectInfosByType(const std::string&); @@ -156,6 +145,9 @@ public: private: + Database(const std::shared_ptr<Ice::ObjectAdapter>&, const std::shared_ptr<IceStorm::TopicManagerPrx>&, + const std::string&, const std::shared_ptr<TraceLevels>&, const RegistryInfo&, bool); + void checkForAddition(const ApplicationHelper&, const IceDB::ReadWriteTxn&); void checkForUpdate(const ApplicationHelper&, const ApplicationHelper&, const IceDB::ReadWriteTxn&); void checkForRemove(const ApplicationHelper&); @@ -172,20 +164,20 @@ private: void checkUpdate(const ApplicationHelper&, const ApplicationHelper&, const std::string&, int, bool); - Ice::Long saveApplication(const ApplicationInfo&, const IceDB::ReadWriteTxn&, Ice::Long = 0); - Ice::Long removeApplication(const std::string&, const IceDB::ReadWriteTxn&, Ice::Long = 0); + long long saveApplication(const ApplicationInfo&, const IceDB::ReadWriteTxn&, long long = 0); + long long removeApplication(const std::string&, const IceDB::ReadWriteTxn&, long long = 0); void finishApplicationUpdate(const ApplicationUpdateInfo&, const ApplicationInfo&, const ApplicationHelper&, - const ApplicationHelper&, AdminSessionI*, bool, Ice::Long = 0); + const ApplicationHelper&, AdminSessionI*, bool, long long = 0); void checkSessionLock(AdminSessionI*); - void waitForUpdate(const std::string&); + void waitForUpdate(std::unique_lock<std::mutex>&, const std::string&); void startUpdating(const std::string&, const std::string&, int); void finishUpdating(const std::string&); - Ice::Long getSerial(const IceDB::Txn&, const std::string&); - Ice::Long updateSerial(const IceDB::ReadWriteTxn&, const std::string&, Ice::Long = 0); + long long getSerial(const IceDB::Txn&, const std::string&); + long long updateSerial(const IceDB::ReadWriteTxn&, const std::string&, long long = 0); void addAdapter(const IceDB::ReadWriteTxn&, const AdapterInfo&); void deleteAdapter(const IceDB::ReadWriteTxn&, const AdapterInfo&); @@ -201,11 +193,11 @@ private: static const std::string _adapterDbName; static const std::string _replicaGroupDbName; - const Ice::CommunicatorPtr _communicator; - const Ice::ObjectAdapterPtr _internalAdapter; - const IceStorm::TopicManagerPrx _topicManager; + const std::shared_ptr<Ice::Communicator> _communicator; + const std::shared_ptr<Ice::ObjectAdapter> _internalAdapter; + const std::shared_ptr<IceStorm::TopicManagerPrx> _topicManager; const std::string _instanceName; - const TraceLevelsPtr _traceLevels; + const std::shared_ptr<TraceLevels> _traceLevels; const bool _master; const bool _readonly; @@ -216,11 +208,11 @@ private: AllocatableObjectCache _allocatableObjectCache; ServerCache _serverCache; - RegistryObserverTopicPtr _registryObserverTopic; - NodeObserverTopicPtr _nodeObserverTopic; - ApplicationObserverTopicPtr _applicationObserverTopic; - AdapterObserverTopicPtr _adapterObserverTopic; - ObjectObserverTopicPtr _objectObserverTopic; + std::shared_ptr<RegistryObserverTopic> _registryObserverTopic; + std::shared_ptr<NodeObserverTopic> _nodeObserverTopic; + std::shared_ptr<ApplicationObserverTopic> _applicationObserverTopic; + std::shared_ptr<AdapterObserverTopic> _adapterObserverTopic; + std::shared_ptr<ObjectObserverTopic> _objectObserverTopic; IceUtilInternal::FileLock _dbLock; IceDB::Env _env; @@ -238,7 +230,7 @@ private: StringLongMap _serials; - RegistryPluginFacadeIPtr _pluginFacade; + std::shared_ptr<RegistryPluginFacadeI> _pluginFacade; AdminSessionI* _lock; std::string _lockUserId; @@ -248,7 +240,7 @@ private: std::string name; std::string uuid; int revision; - std::vector<AMD_NodeSession_waitForApplicationUpdatePtr> cbs; + std::vector<std::pair<std::function<void()>, std::function<void(std::exception_ptr)>>> cbs; bool updated; UpdateInfo(const std::string& n, const std::string& u, int r) : @@ -268,10 +260,9 @@ private: void markUpdated() { updated = true; - std::vector<AMD_NodeSession_waitForApplicationUpdatePtr>::const_iterator q; - for(q = cbs.begin(); q != cbs.end(); ++q) + for(const auto& cb: cbs) { - (*q)->ice_response(); + cb.first(); } cbs.clear(); } @@ -282,9 +273,11 @@ private: } }; std::vector<UpdateInfo> _updating; -}; -typedef IceUtil::Handle<Database> DatabasePtr; + mutable std::mutex _mutex; + std::condition_variable _condVar; }; +} + #endif |