// ********************************************************************** // // Copyright (c) 2003-2010 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. // // ********************************************************************** #include #include #include #include #include #include #include using namespace IceStorm; using namespace std; extern "C" { ICE_DECLSPEC_EXPORT ::Ice::Plugin* createSqlDB(const Ice::CommunicatorPtr& communicator, const string& name, const Ice::StringSeq& args) { IceUtilInternal::ArgVector av(args); return new IceStorm::SqlDBPlugin(communicator, av.argc, av.argv); } } namespace { // // SQL wrappers for SQL tables // class SqlLLUWrapper : public LLUWrapper { public: SqlLLUWrapper(const SqlDB::DatabaseConnectionPtr& connection, const SqlLLUPtr& llu) : _connection(connection), _table(llu) { } virtual void put(const IceStormElection::LogUpdate& update) { _table->put(_connection, update); } virtual IceStormElection::LogUpdate get() { return _table->get(_connection); } private: const SqlDB::DatabaseConnectionPtr _connection; const SqlLLUPtr _table; }; class SqlSubscribersWrapper : public SqlDB::Wrapper, public SubscribersWrapper { public: SqlSubscribersWrapper(const SqlDB::DatabaseConnectionPtr& connection, const SqlSubscriberMapPtr& subscribers) : SqlDB::Wrapper(connection, subscribers) { } void eraseTopic(const Ice::Identity& topic) { _table->eraseTopic(_connection, topic); } }; } SqlDatabaseCache::SqlDatabaseCache(const Ice::CommunicatorPtr& communicator, const string& databaseType, const string& databaseName, const string& hostname, int port, const string& username, const string& password, const string& tablePrefix) : SqlDB::DatabaseCache(communicator, databaseType, databaseName, hostname, port, username, password, false) { IceDB::DatabaseConnectionPtr connection = getConnection(); IceDB::TransactionHolder txn(connection); SqlDB::DatabaseConnectionPtr c = SqlDB::DatabaseConnectionPtr::dynamicCast(connection.get()); const_cast(_llu) = new SqlLLU(c, tablePrefix + "LLU"); const_cast(_subscribers) = new SqlSubscriberMap(c, tablePrefix + "Subscribers", communicator); txn.commit(); } SqlDatabaseCache::~SqlDatabaseCache() { } LLUWrapperPtr SqlDatabaseCache::getLLU(const IceDB::DatabaseConnectionPtr& connection) { return new SqlLLUWrapper(SqlDB::DatabaseConnectionPtr::dynamicCast(connection.get()), _llu); } SubscribersWrapperPtr SqlDatabaseCache::getSubscribers(const IceDB::DatabaseConnectionPtr& connection) { return new SqlSubscribersWrapper(SqlDB::DatabaseConnectionPtr::dynamicCast(connection.get()), _subscribers); } SqlDBPlugin::SqlDBPlugin(const Ice::CommunicatorPtr& communicator, int argc, char** argv) : _communicator(communicator) { // // In order to load SQL drivers it is necessary for an instance of // QCoreApplication to be instantiated. However only one can be instantiated // per process. Therefore we do not destroy _qtApp as it may be required // by other services that are also using QT. // if(QCoreApplication::instance() == 0) { new QCoreApplication(argc, argv); QTextCodec::setCodecForCStrings(QTextCodec::codecForName("UTF-8")); } Ice::PluginPtr(new Ice::ThreadHookPlugin(_communicator, new SqlDB::ThreadHook())); } SqlDBPlugin::~SqlDBPlugin() { } void SqlDBPlugin::initialize() { } void SqlDBPlugin::destroy() { // // Break cyclic reference count (thread hook holds a reference on the cache and the cache holds // a reference on the communicator through the SQL dictionaries). // SqlDB::ThreadHookPtr threadHook = SqlDB::ThreadHookPtr::dynamicCast(IceInternal::getInstance(_communicator)->initializationData().threadHook); if(threadHook) { threadHook->setDatabaseCache(0); } } DatabaseCachePtr SqlDBPlugin::getDatabaseCache(const string& name) { Ice::PropertiesPtr properties = _communicator->getProperties(); string tablePrefix = properties->getPropertyWithDefault(name + ".InstanceName", "IceStorm"); int id = properties->getPropertyAsIntWithDefault(name + ".NodeId", -1); if(id != -1) { ostringstream os; os << tablePrefix << "_" << id; tablePrefix = os.str(); replace(tablePrefix.begin(), tablePrefix.end(), '.', '_'); replace(tablePrefix.begin(), tablePrefix.end(), '-', '_'); replace(tablePrefix.begin(), tablePrefix.end(), ' ', '_'); replace(tablePrefix.begin(), tablePrefix.end(), ';', '_'); } tablePrefix += "_"; SqlDatabaseCachePtr databaseCache = new SqlDatabaseCache(_communicator, properties->getProperty(name + ".SQL.DatabaseType"), properties->getProperty(name + ".SQL.DatabaseName"), properties->getProperty(name + ".SQL.HostName"), properties->getPropertyAsInt(name + ".SQL.Port"), properties->getProperty(name + ".SQL.UserName"), properties->getProperty(name + ".SQL.Password"), tablePrefix); SqlDB::ThreadHookPtr threadHook = SqlDB::ThreadHookPtr::dynamicCast(IceInternal::getInstance(_communicator)->initializationData().threadHook); assert(threadHook); threadHook->setDatabaseCache(databaseCache); return databaseCache; }