diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-10-07 18:18:37 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-10-07 18:18:37 +0200 |
commit | 5fc2dc27228263e4c56ba3a49852ab3f8c724299 (patch) | |
tree | a1340491094705a1e604a3df22ec4dad0c8d1a8e /cpp/src/IceStorm/SqlDB/SqlSubscriberMap.cpp | |
parent | Bug 4251 - add IceUtil::Time double initializers (diff) | |
download | ice-5fc2dc27228263e4c56ba3a49852ab3f8c724299.tar.bz2 ice-5fc2dc27228263e4c56ba3a49852ab3f8c724299.tar.xz ice-5fc2dc27228263e4c56ba3a49852ab3f8c724299.zip |
- Bug 4286: added support for IceStorm/IceGrid database plugins
- Fixed IceGrid database code to first save to the database and then
do state changes.
Diffstat (limited to 'cpp/src/IceStorm/SqlDB/SqlSubscriberMap.cpp')
-rw-r--r-- | cpp/src/IceStorm/SqlDB/SqlSubscriberMap.cpp | 406 |
1 files changed, 406 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/SqlDB/SqlSubscriberMap.cpp b/cpp/src/IceStorm/SqlDB/SqlSubscriberMap.cpp new file mode 100644 index 00000000000..958766e25e2 --- /dev/null +++ b/cpp/src/IceStorm/SqlDB/SqlSubscriberMap.cpp @@ -0,0 +1,406 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <Ice/Communicator.h> +#include <IceStorm/SqlDB/SqlSubscriberMap.h> +#include <QtSql/QSqlQuery> +#include <QtCore/QVariant> +#include <QtCore/QStringList> + +using namespace IceStorm; +using namespace SqlDB; +using namespace std; + +SqlSubscriberMap::QoSMap::QoSMap(const DatabaseConnectionPtr& connection, + const string& table, + const Ice::CommunicatorPtr& communicator) : + _table(table), + _communicator(communicator) +{ + QStringList tables = connection->sqlConnection().tables(QSql::Tables); + if(!tables.contains(_table.c_str(), Qt::CaseInsensitive)) + { + QSqlQuery query(connection->sqlConnection()); + string queryString = "CREATE TABLE "; + queryString += _table; + queryString += " (topic VARCHAR(255), id VARCHAR(255), name "; + if(connection->sqlConnection().driverName() == "QODBC") + { + queryString += "N"; + } + queryString += "TEXT, value "; + if(connection->sqlConnection().driverName() == "QODBC") + { + queryString += "N"; + } + queryString += "TEXT);"; + + if(!query.exec(queryString.c_str())) + { + throwDatabaseException(__FILE__, __LINE__, query.lastError()); + } + + QSqlQuery idx1Query(connection->sqlConnection()); + queryString = "CREATE INDEX IDX_"; + queryString += _table; + queryString += "_TOPIC_SUBSCRIBER ON "; + queryString += _table; + queryString += " (topic, id);"; + + if(!idx1Query.exec(queryString.c_str())) + { + throwDatabaseException(__FILE__, __LINE__, idx1Query.lastError()); + } + + QSqlQuery idx2Query(connection->sqlConnection()); + queryString = "CREATE INDEX IDX_"; + queryString += _table; + queryString += "_TOPIC ON "; + queryString += _table; + queryString += " (topic);"; + + if(!idx2Query.exec(queryString.c_str())) + { + throwDatabaseException(__FILE__, __LINE__, idx2Query.lastError()); + } + } +} + +void +SqlSubscriberMap::QoSMap::put(const DatabaseConnectionPtr& connection, + const SubscriberRecordKey& key, + const QoS& qos) +{ + erase(connection, key); + + for(QoS::const_iterator p = qos.begin(); p != qos.end(); ++p) + { + QSqlQuery query(connection->sqlConnection()); + string queryString = "INSERT INTO "; + queryString += _table; + queryString += " VALUES(?, ?, ?, ?)"; + query.prepare(queryString.c_str()); + query.bindValue(0, _communicator->identityToString(key.topic).c_str()); + query.bindValue(1, _communicator->identityToString(key.id).c_str()); + query.bindValue(2, p->first.c_str()); + query.bindValue(3, p->second.c_str()); + + if(!query.exec()) + { + throwDatabaseException(__FILE__, __LINE__, query.lastError()); + } + } +} + +QoS +SqlSubscriberMap::QoSMap::find(const DatabaseConnectionPtr& connection, + const SubscriberRecordKey& key) +{ + QSqlQuery query(connection->sqlConnection()); + string queryString = "SELECT * FROM "; + queryString += _table; + queryString += " WHERE topic = ? AND id = ?;"; + + query.prepare(queryString.c_str()); + query.bindValue(0, _communicator->identityToString(key.topic).c_str()); + query.bindValue(1, _communicator->identityToString(key.id).c_str()); + + if(!query.exec()) + { + throwDatabaseException(__FILE__, __LINE__, query.lastError()); + } + + QoS qos; + while(query.next()) + { + qos[query.value(2).toString().toStdString()] = query.value(3).toString().toStdString(); + } + return qos; +} + +void +SqlSubscriberMap::QoSMap::erase(const DatabaseConnectionPtr& connection, + const SubscriberRecordKey& key) +{ + QSqlQuery query(connection->sqlConnection()); + string queryString = "DELETE FROM "; + queryString += _table; + queryString += " WHERE topic = ? AND id = ?;"; + + query.prepare(queryString.c_str()); + query.bindValue(0, _communicator->identityToString(key.topic).c_str()); + query.bindValue(1, _communicator->identityToString(key.id).c_str()); + + if(!query.exec()) + { + throwDatabaseException(__FILE__, __LINE__, query.lastError()); + } +} + +void +SqlSubscriberMap::QoSMap::eraseTopic(const DatabaseConnectionPtr& connection, + const Ice::Identity& topic) +{ + QSqlQuery query(connection->sqlConnection()); + string queryString = "DELETE FROM "; + queryString += _table; + queryString += " WHERE topic = ?;"; + + query.prepare(queryString.c_str()); + query.bindValue(0, _communicator->identityToString(topic).c_str()); + + if(!query.exec()) + { + throwDatabaseException(__FILE__, __LINE__, query.lastError()); + } +} + +void +SqlSubscriberMap::QoSMap::clear(const DatabaseConnectionPtr& connection) +{ + QSqlQuery query(connection->sqlConnection()); + string queryString = "DELETE FROM "; + queryString += _table; + queryString += ";"; + + if(!query.exec(queryString.c_str())) + { + throwDatabaseException(__FILE__, __LINE__, query.lastError()); + } +} + +SqlSubscriberMap::SqlSubscriberMap(const DatabaseConnectionPtr& connection, + const string& table, + const Ice::CommunicatorPtr& communicator) : + _table(table), + _communicator(communicator), + _qosMap(connection, table + "_QoS", communicator) +{ + QStringList tables = connection->sqlConnection().tables(QSql::Tables); + if(!tables.contains(_table.c_str(), Qt::CaseInsensitive)) + { + QSqlQuery query(connection->sqlConnection()); + string queryString = "CREATE TABLE "; + queryString += _table; + queryString += " (topic VARCHAR(255), id VARCHAR(255), topicName "; + if(connection->sqlConnection().driverName() == "QODBC") + { + queryString += "N"; + } + queryString += "TEXT, link TEXT, obj TEXT, cost INT, theTopic TEXT, PRIMARY KEY (topic, id));"; + + if(!query.exec(queryString.c_str())) + { + throwDatabaseException(__FILE__, __LINE__, query.lastError()); + } + + QSqlQuery idxQuery(connection->sqlConnection()); + queryString = "CREATE INDEX IDX_"; + queryString += _table; + queryString += "_TOPIC ON "; + queryString += _table; + queryString += " (topic);"; + + if(!idxQuery.exec(queryString.c_str())) + { + throwDatabaseException(__FILE__, __LINE__, idxQuery.lastError()); + } + } +} + +void +SqlSubscriberMap::put(const DatabaseConnectionPtr& connection, + const SubscriberRecordKey& key, + const SubscriberRecord& record) +{ + QString driver = connection->sqlConnection().driverName(); + + QSqlQuery query(connection->sqlConnection()); + ostringstream queryString; + queryString << "UPDATE " << _table + << " SET topicName = ?, link = '" << (record.link ? "true" : "false") + << "', obj = ?, cost = '" << record.cost + << "', theTopic = ? WHERE topic = ? AND id = ?;"; + + query.prepare(queryString.str().c_str()); + query.bindValue(0, record.topicName.c_str()); + query.bindValue(1, _communicator->proxyToString(record.obj).c_str()); + query.bindValue(2, _communicator->proxyToString(record.theTopic).c_str()); + query.bindValue(3, _communicator->identityToString(key.topic).c_str()); + query.bindValue(4, _communicator->identityToString(key.id).c_str()); + + if(!query.exec()) + { + throwDatabaseException(__FILE__, __LINE__, query.lastError()); + } + + if(query.numRowsAffected() == 0) + { + // + // We do a find since some databases (MySQL) return 0 for number of rows affected + // if row exists but data was not changed from previous values. + // + try + { + find(connection, key); + } + catch(const NotFoundException&) + { + QSqlQuery insertQuery(connection->sqlConnection()); + queryString.str(""); + queryString << "INSERT INTO " << _table << " VALUES(?, ?, ?, '" << (record.link ? "true" : "false") + << "', ?, '" << record.cost << "', ?);"; + + insertQuery.prepare(queryString.str().c_str()); + insertQuery.bindValue(0, _communicator->identityToString(key.topic).c_str()); + insertQuery.bindValue(1, _communicator->identityToString(key.id).c_str()); + insertQuery.bindValue(2, record.topicName.c_str()); + insertQuery.bindValue(3, _communicator->proxyToString(record.obj).c_str()); + insertQuery.bindValue(4, _communicator->proxyToString(record.theTopic).c_str()); + + if(!insertQuery.exec()) + { + throwDatabaseException(__FILE__, __LINE__, insertQuery.lastError()); + } + } + } + + _qosMap.put(connection, key, record.theQoS); +} + +SubscriberRecord +SqlSubscriberMap::find(const DatabaseConnectionPtr& connection, + const SubscriberRecordKey& key) +{ + QSqlQuery query(connection->sqlConnection()); + string queryString = "SELECT * FROM "; + queryString += _table; + queryString += " WHERE topic = ? AND id = ?;"; + + query.prepare(queryString.c_str()); + query.bindValue(0, _communicator->identityToString(key.topic).c_str()); + query.bindValue(1, _communicator->identityToString(key.id).c_str()); + + if(!query.exec()) + { + throwDatabaseException(__FILE__, __LINE__, query.lastError()); + } + + if(query.next()) + { + SubscriberRecord record; + record.topicName = query.value(2).toString().toStdString(); + record.id = key.id; + record.link = query.value(3).toString().toStdString() == "true" ? true : false; + record.obj = _communicator->stringToProxy(query.value(4).toString().toStdString()); + record.cost = query.value(5).toInt(); + record.theTopic = + TopicPrx::uncheckedCast(_communicator->stringToProxy(query.value(6).toString().toStdString())); + + record.theQoS = _qosMap.find(connection, key); + + return record; + } + else + { + throw NotFoundException(__FILE__, __LINE__); + } +} + +void +SqlSubscriberMap::getMap(const DatabaseConnectionPtr& connection, + SubscriberMap& smap) +{ + QSqlQuery query(connection->sqlConnection()); + string queryString = "SELECT * FROM "; + queryString += _table; + queryString += ";"; + + if(!query.exec(queryString.c_str())) + { + throwDatabaseException(__FILE__, __LINE__, query.lastError()); + } + + while(query.next()) + { + SubscriberRecordKey key; + key.topic = _communicator->stringToIdentity(query.value(0).toString().toStdString()); + key.id = _communicator->stringToIdentity(query.value(1).toString().toStdString()); + + SubscriberRecord record; + record.topicName = query.value(2).toString().toStdString(); + record.id = key.id; + record.link = query.value(3).toString().toStdString() == "true" ? true : false; + record.obj = _communicator->stringToProxy(query.value(4).toString().toStdString()); + record.cost = query.value(5).toInt(); + record.theTopic = + TopicPrx::uncheckedCast(_communicator->stringToProxy(query.value(6).toString().toStdString())); + + record.theQoS = _qosMap.find(connection, key); + + smap[key] = record; + } +} + +void +SqlSubscriberMap::erase(const DatabaseConnectionPtr& connection, + const SubscriberRecordKey& key) +{ + _qosMap.erase(connection, key); + + QSqlQuery query(connection->sqlConnection()); + string queryString = "DELETE FROM "; + queryString += _table; + queryString += " WHERE topic = ? AND id = ?;"; + + query.prepare(queryString.c_str()); + query.bindValue(0, _communicator->identityToString(key.topic).c_str()); + query.bindValue(1, _communicator->identityToString(key.id).c_str()); + + if(!query.exec()) + { + throwDatabaseException(__FILE__, __LINE__, query.lastError()); + } +} + +void +SqlSubscriberMap::eraseTopic(const DatabaseConnectionPtr& connection, + const Ice::Identity& topic) +{ + _qosMap.eraseTopic(connection, topic); + + QSqlQuery query(connection->sqlConnection()); + string queryString = "DELETE FROM "; + queryString += _table; + queryString += " WHERE topic = ?;"; + + query.prepare(queryString.c_str()); + query.bindValue(0, _communicator->identityToString(topic).c_str()); + + if(!query.exec()) + { + throwDatabaseException(__FILE__, __LINE__, query.lastError()); + } +} + +void +SqlSubscriberMap::clear(const DatabaseConnectionPtr& connection) +{ + _qosMap.clear(connection); + + QSqlQuery query(connection->sqlConnection()); + string queryString = "DELETE FROM "; + queryString += _table; + queryString += ";"; + + if(!query.exec(queryString.c_str())) + { + throwDatabaseException(__FILE__, __LINE__, query.lastError()); + } +} |