summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/SqlSubscriberMap.cpp
diff options
context:
space:
mode:
authorDwayne Boone <dwayne@zeroc.com>2009-09-28 11:05:44 -0230
committerDwayne Boone <dwayne@zeroc.com>2009-09-28 11:05:44 -0230
commit7d20430028f05cc26c412465176a75ce4ea5af9e (patch)
tree593695acf366f7e3a7081d15af8f474683ce4af7 /cpp/src/IceStorm/SqlSubscriberMap.cpp
parentRemoved unused __checkTwoway(const char*) from Proxy (diff)
downloadice-7d20430028f05cc26c412465176a75ce4ea5af9e.tar.bz2
ice-7d20430028f05cc26c412465176a75ce4ea5af9e.tar.xz
ice-7d20430028f05cc26c412465176a75ce4ea5af9e.zip
Bug 3231 - alternative storage for IceGrid and IceStorm
Diffstat (limited to 'cpp/src/IceStorm/SqlSubscriberMap.cpp')
-rw-r--r--cpp/src/IceStorm/SqlSubscriberMap.cpp406
1 files changed, 406 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/SqlSubscriberMap.cpp b/cpp/src/IceStorm/SqlSubscriberMap.cpp
new file mode 100644
index 00000000000..08ca87ffa2f
--- /dev/null
+++ b/cpp/src/IceStorm/SqlSubscriberMap.cpp
@@ -0,0 +1,406 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <Ice/Communicator.h>
+#include <IceStorm/SqlSubscriberMap.h>
+#include <QtSql/QSqlQuery>
+#include <QtCore/QVariant>
+#include <QtCore/QStringList>
+
+using namespace IceStorm;
+using namespace IceSQL;
+using namespace std;
+
+SqlSubscriberMap::QoSMap::QoSMap(const DatabaseConnectionPtr& connection,
+ const string& table,
+ const Ice::CommunicatorPtr& communicator) :
+ _table(table),
+ _communicator(communicator)
+{
+ QStringList tables = connection->connection.tables(QSql::Tables);
+ if(!tables.contains(_table.c_str(), Qt::CaseInsensitive))
+ {
+ QSqlQuery query(connection->connection);
+ string queryString = "CREATE TABLE ";
+ queryString += _table;
+ queryString += " (topic VARCHAR(255), id VARCHAR(255), name ";
+ if(connection->connection.driverName() == "QODBC")
+ {
+ queryString += "N";
+ }
+ queryString += "TEXT, value ";
+ if(connection->connection.driverName() == "QODBC")
+ {
+ queryString += "N";
+ }
+ queryString += "TEXT);";
+
+ if(!query.exec(queryString.c_str()))
+ {
+ throwDatabaseException(__FILE__, __LINE__, query.lastError());
+ }
+
+ QSqlQuery idx1Query(connection->connection);
+ queryString = "CREATE INDEX IDX_";
+ queryString += _table;
+ queryString += "_TOPIC_SUBSCRIBER ON ";
+ queryString += _table;
+ queryString += " (topic, id);";
+
+ if(!idx1Query.exec(queryString.c_str()))
+ {
+ throwDatabaseException(__FILE__, __LINE__, idx1Query.lastError());
+ }
+
+ QSqlQuery idx2Query(connection->connection);
+ queryString = "CREATE INDEX IDX_";
+ queryString += _table;
+ queryString += "_TOPIC ON ";
+ queryString += _table;
+ queryString += " (topic);";
+
+ if(!idx2Query.exec(queryString.c_str()))
+ {
+ throwDatabaseException(__FILE__, __LINE__, idx2Query.lastError());
+ }
+ }
+}
+
+void
+SqlSubscriberMap::QoSMap::put(const DatabaseConnectionPtr& connection,
+ const SubscriberRecordKey& key,
+ const QoS& qos)
+{
+ erase(connection, key);
+
+ for(QoS::const_iterator p = qos.begin(); p != qos.end(); ++p)
+ {
+ QSqlQuery query(connection->connection);
+ string queryString = "INSERT INTO ";
+ queryString += _table;
+ queryString += " VALUES(?, ?, ?, ?)";
+ query.prepare(queryString.c_str());
+ query.bindValue(0, _communicator->identityToString(key.topic).c_str());
+ query.bindValue(1, _communicator->identityToString(key.id).c_str());
+ query.bindValue(2, p->first.c_str());
+ query.bindValue(3, p->second.c_str());
+
+ if(!query.exec())
+ {
+ throwDatabaseException(__FILE__, __LINE__, query.lastError());
+ }
+ }
+}
+
+QoS
+SqlSubscriberMap::QoSMap::find(const DatabaseConnectionPtr& connection,
+ const SubscriberRecordKey& key)
+{
+ QSqlQuery query(connection->connection);
+ string queryString = "SELECT * FROM ";
+ queryString += _table;
+ queryString += " WHERE topic = ? AND id = ?;";
+
+ query.prepare(queryString.c_str());
+ query.bindValue(0, _communicator->identityToString(key.topic).c_str());
+ query.bindValue(1, _communicator->identityToString(key.id).c_str());
+
+ if(!query.exec())
+ {
+ throwDatabaseException(__FILE__, __LINE__, query.lastError());
+ }
+
+ QoS qos;
+ while(query.next())
+ {
+ qos[query.value(2).toString().toStdString()] = query.value(3).toString().toStdString();
+ }
+ return qos;
+}
+
+void
+SqlSubscriberMap::QoSMap::erase(const DatabaseConnectionPtr& connection,
+ const SubscriberRecordKey& key)
+{
+ QSqlQuery query(connection->connection);
+ string queryString = "DELETE FROM ";
+ queryString += _table;
+ queryString += " WHERE topic = ? AND id = ?;";
+
+ query.prepare(queryString.c_str());
+ query.bindValue(0, _communicator->identityToString(key.topic).c_str());
+ query.bindValue(1, _communicator->identityToString(key.id).c_str());
+
+ if(!query.exec())
+ {
+ throwDatabaseException(__FILE__, __LINE__, query.lastError());
+ }
+}
+
+void
+SqlSubscriberMap::QoSMap::eraseTopic(const DatabaseConnectionPtr& connection,
+ const Ice::Identity& topic)
+{
+ QSqlQuery query(connection->connection);
+ string queryString = "DELETE FROM ";
+ queryString += _table;
+ queryString += " WHERE topic = ?;";
+
+ query.prepare(queryString.c_str());
+ query.bindValue(0, _communicator->identityToString(topic).c_str());
+
+ if(!query.exec())
+ {
+ throwDatabaseException(__FILE__, __LINE__, query.lastError());
+ }
+}
+
+void
+SqlSubscriberMap::QoSMap::clear(const DatabaseConnectionPtr& connection)
+{
+ QSqlQuery query(connection->connection);
+ string queryString = "DELETE FROM ";
+ queryString += _table;
+ queryString += ";";
+
+ if(!query.exec(queryString.c_str()))
+ {
+ throwDatabaseException(__FILE__, __LINE__, query.lastError());
+ }
+}
+
+SqlSubscriberMap::SqlSubscriberMap(const DatabaseConnectionPtr& connection,
+ const string& table,
+ const Ice::CommunicatorPtr& communicator) :
+ _table(table),
+ _communicator(communicator),
+ _qosMap(connection, table + "_QoS", communicator)
+{
+ QStringList tables = connection->connection.tables(QSql::Tables);
+ if(!tables.contains(_table.c_str(), Qt::CaseInsensitive))
+ {
+ QSqlQuery query(connection->connection);
+ string queryString = "CREATE TABLE ";
+ queryString += _table;
+ queryString += " (topic VARCHAR(255), id VARCHAR(255), topicName ";
+ if(connection->connection.driverName() == "QODBC")
+ {
+ queryString += "N";
+ }
+ queryString += "TEXT, link TEXT, obj TEXT, cost INT, theTopic TEXT, PRIMARY KEY (topic, id));";
+
+ if(!query.exec(queryString.c_str()))
+ {
+ throwDatabaseException(__FILE__, __LINE__, query.lastError());
+ }
+
+ QSqlQuery idxQuery(connection->connection);
+ queryString = "CREATE INDEX IDX_";
+ queryString += _table;
+ queryString += "_TOPIC ON ";
+ queryString += _table;
+ queryString += " (topic);";
+
+ if(!idxQuery.exec(queryString.c_str()))
+ {
+ throwDatabaseException(__FILE__, __LINE__, idxQuery.lastError());
+ }
+ }
+}
+
+void
+SqlSubscriberMap::put(const DatabaseConnectionPtr& connection,
+ const SubscriberRecordKey& key,
+ const SubscriberRecord& record)
+{
+ QString driver = connection->connection.driverName();
+
+ QSqlQuery query(connection->connection);
+ ostringstream queryString;
+ queryString << "UPDATE " << _table
+ << " SET topicName = ?, link = '" << (record.link ? "true" : "false")
+ << "', obj = ?, cost = '" << record.cost
+ << "', theTopic = ? WHERE topic = ? AND id = ?;";
+
+ query.prepare(queryString.str().c_str());
+ query.bindValue(0, record.topicName.c_str());
+ query.bindValue(1, _communicator->proxyToString(record.obj).c_str());
+ query.bindValue(2, _communicator->proxyToString(record.theTopic).c_str());
+ query.bindValue(3, _communicator->identityToString(key.topic).c_str());
+ query.bindValue(4, _communicator->identityToString(key.id).c_str());
+
+ if(!query.exec())
+ {
+ throwDatabaseException(__FILE__, __LINE__, query.lastError());
+ }
+
+ if(query.numRowsAffected() == 0)
+ {
+ //
+ // We do a find since some databases (MySQL) return 0 for number of rows affected
+ // if row exists but data was not changed from previous values.
+ //
+ try
+ {
+ find(connection, key);
+ }
+ catch(const NotFoundException&)
+ {
+ QSqlQuery insertQuery(connection->connection);
+ queryString.str("");
+ queryString << "INSERT INTO " << _table << " VALUES(?, ?, ?, '" << (record.link ? "true" : "false")
+ << "', ?, '" << record.cost << "', ?);";
+
+ insertQuery.prepare(queryString.str().c_str());
+ insertQuery.bindValue(0, _communicator->identityToString(key.topic).c_str());
+ insertQuery.bindValue(1, _communicator->identityToString(key.id).c_str());
+ insertQuery.bindValue(2, record.topicName.c_str());
+ insertQuery.bindValue(3, _communicator->proxyToString(record.obj).c_str());
+ insertQuery.bindValue(4, _communicator->proxyToString(record.theTopic).c_str());
+
+ if(!insertQuery.exec())
+ {
+ throwDatabaseException(__FILE__, __LINE__, insertQuery.lastError());
+ }
+ }
+ }
+
+ _qosMap.put(connection, key, record.theQoS);
+}
+
+SubscriberRecord
+SqlSubscriberMap::find(const DatabaseConnectionPtr& connection,
+ const SubscriberRecordKey& key)
+{
+ QSqlQuery query(connection->connection);
+ string queryString = "SELECT * FROM ";
+ queryString += _table;
+ queryString += " WHERE topic = ? AND id = ?;";
+
+ query.prepare(queryString.c_str());
+ query.bindValue(0, _communicator->identityToString(key.topic).c_str());
+ query.bindValue(1, _communicator->identityToString(key.id).c_str());
+
+ if(!query.exec())
+ {
+ throwDatabaseException(__FILE__, __LINE__, query.lastError());
+ }
+
+ if(query.next())
+ {
+ SubscriberRecord record;
+ record.topicName = query.value(2).toString().toStdString();
+ record.id = key.id;
+ record.link = query.value(3).toString().toStdString() == "true" ? true : false;
+ record.obj = _communicator->stringToProxy(query.value(4).toString().toStdString());
+ record.cost = query.value(5).toInt();
+ record.theTopic =
+ TopicPrx::uncheckedCast(_communicator->stringToProxy(query.value(6).toString().toStdString()));
+
+ record.theQoS = _qosMap.find(connection, key);
+
+ return record;
+ }
+ else
+ {
+ throw NotFoundException(__FILE__, __LINE__);
+ }
+}
+
+void
+SqlSubscriberMap::getMap(const DatabaseConnectionPtr& connection,
+ SubscriberMap& smap)
+{
+ QSqlQuery query(connection->connection);
+ string queryString = "SELECT * FROM ";
+ queryString += _table;
+ queryString += ";";
+
+ if(!query.exec(queryString.c_str()))
+ {
+ throwDatabaseException(__FILE__, __LINE__, query.lastError());
+ }
+
+ while(query.next())
+ {
+ SubscriberRecordKey key;
+ key.topic = _communicator->stringToIdentity(query.value(0).toString().toStdString());
+ key.id = _communicator->stringToIdentity(query.value(1).toString().toStdString());
+
+ SubscriberRecord record;
+ record.topicName = query.value(2).toString().toStdString();
+ record.id = key.id;
+ record.link = query.value(3).toString().toStdString() == "true" ? true : false;
+ record.obj = _communicator->stringToProxy(query.value(4).toString().toStdString());
+ record.cost = query.value(5).toInt();
+ record.theTopic =
+ TopicPrx::uncheckedCast(_communicator->stringToProxy(query.value(6).toString().toStdString()));
+
+ record.theQoS = _qosMap.find(connection, key);
+
+ smap[key] = record;
+ }
+}
+
+void
+SqlSubscriberMap::erase(const DatabaseConnectionPtr& connection,
+ const SubscriberRecordKey& key)
+{
+ _qosMap.erase(connection, key);
+
+ QSqlQuery query(connection->connection);
+ string queryString = "DELETE FROM ";
+ queryString += _table;
+ queryString += " WHERE topic = ? AND id = ?;";
+
+ query.prepare(queryString.c_str());
+ query.bindValue(0, _communicator->identityToString(key.topic).c_str());
+ query.bindValue(1, _communicator->identityToString(key.id).c_str());
+
+ if(!query.exec())
+ {
+ throwDatabaseException(__FILE__, __LINE__, query.lastError());
+ }
+}
+
+void
+SqlSubscriberMap::eraseTopic(const DatabaseConnectionPtr& connection,
+ const Ice::Identity& topic)
+{
+ _qosMap.eraseTopic(connection, topic);
+
+ QSqlQuery query(connection->connection);
+ string queryString = "DELETE FROM ";
+ queryString += _table;
+ queryString += " WHERE topic = ?;";
+
+ query.prepare(queryString.c_str());
+ query.bindValue(0, _communicator->identityToString(topic).c_str());
+
+ if(!query.exec())
+ {
+ throwDatabaseException(__FILE__, __LINE__, query.lastError());
+ }
+}
+
+void
+SqlSubscriberMap::clear(const DatabaseConnectionPtr& connection)
+{
+ _qosMap.clear(connection);
+
+ QSqlQuery query(connection->connection);
+ string queryString = "DELETE FROM ";
+ queryString += _table;
+ queryString += ";";
+
+ if(!query.exec(queryString.c_str()))
+ {
+ throwDatabaseException(__FILE__, __LINE__, query.lastError());
+ }
+}