24 #define access(f,a) _access(f,a)
34 #define SQLDB(x) ((sqlite3*)(x))
45 bool SubscriberOnSql::open(
const std::string& filename,
bool fresh) {
46 sqlite3 *db =
nullptr;
48 int result = access(filename.c_str(),
F_OK);
50 yCWarning(SUBSCRIBERONSQL,
"Database needs to be recreated.");
51 yCWarning(SUBSCRIBERONSQL,
"Please move %s out of the way.", filename.c_str());
56 int result = sqlite3_open_v2(filename.c_str(),
58 SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE|SQLITE_OPEN_NOMUTEX,
60 if (result!=SQLITE_OK) {
61 yCError(SUBSCRIBERONSQL,
"Failed to open database %s", filename.c_str());
68 const char *create_subscribe_table =
"CREATE TABLE IF NOT EXISTS subscriptions (\n\
69 id INTEGER PRIMARY KEY,\n\
76 result = sqlite3_exec(db, create_subscribe_table,
nullptr,
nullptr,
nullptr);
77 if (result!=SQLITE_OK) {
79 yCError(SUBSCRIBERONSQL,
"Failed to set up subscriptions table");
83 const char *check_subscriptions_size =
"PRAGMA table_info(subscriptions)";
85 sqlite3_stmt *statement =
nullptr;
86 result = sqlite3_prepare_v2(db, check_subscriptions_size, -1, &statement,
nullptr);
87 if (result!=SQLITE_OK) {
88 yCError(SUBSCRIBERONSQL,
"Failed to set up subscriptions table");
93 while (sqlite3_step(statement) == SQLITE_ROW) {
96 sqlite3_finalize(statement);
99 const char *add_structure =
"ALTER TABLE subscriptions ADD COLUMN mode";
100 result = sqlite3_exec(db, add_structure,
nullptr,
nullptr,
nullptr);
101 if (result!=SQLITE_OK) {
103 yCError(SUBSCRIBERONSQL,
"Failed to set up subscriptions table");
108 const char *create_topic_table =
"CREATE TABLE IF NOT EXISTS topics (\n\
109 id INTEGER PRIMARY KEY,\n\
113 result = sqlite3_exec(db, create_topic_table,
nullptr,
nullptr,
nullptr);
114 if (result!=SQLITE_OK) {
116 yCError(SUBSCRIBERONSQL,
"Failed to set up topics table");
120 const char *check_topic_size =
"PRAGMA table_info(topics)";
123 result = sqlite3_prepare_v2(db, check_topic_size, -1, &statement,
nullptr);
124 if (result!=SQLITE_OK) {
125 yCError(SUBSCRIBERONSQL,
"Failed to set up topics table");
130 while (sqlite3_step(statement) == SQLITE_ROW) {
134 sqlite3_finalize(statement);
137 const char *add_structure =
"ALTER TABLE topics ADD COLUMN structure";
138 result = sqlite3_exec(db, add_structure,
nullptr,
nullptr,
nullptr);
139 if (result!=SQLITE_OK) {
141 yCError(SUBSCRIBERONSQL,
"Failed to set up topics table");
146 const char *create_live_table =
"CREATE TABLE IF NOT EXISTS live (\n\
147 id INTEGER PRIMARY KEY,\n\
151 result = sqlite3_exec(db, create_live_table,
nullptr,
nullptr,
nullptr);
152 if (result!=SQLITE_OK) {
154 yCError(SUBSCRIBERONSQL,
"Failed to set up live table");
158 const char *create_struct_table =
"CREATE TABLE IF NOT EXISTS structures (\n\
159 name TEXT PRIMARY KEY,\n\
162 result = sqlite3_exec(db, create_struct_table,
nullptr,
nullptr,
nullptr);
163 if (result!=SQLITE_OK) {
165 yCError(SUBSCRIBERONSQL,
"Failed to set up structures table");
174 bool SubscriberOnSql::close() {
183 bool SubscriberOnSql::addSubscription(
const std::string& src,
184 const std::string& dest,
185 const std::string& mode) {
186 removeSubscription(src,dest);
197 const char *zmode = mode.c_str();
198 if (mode ==
"") zmode =
nullptr;
199 char *query = sqlite3_mprintf(
"INSERT INTO subscriptions (src,dest,srcFull,destFull,mode) VALUES(%Q,%Q,%Q,%Q,%Q)",
205 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
209 if (result!=SQLITE_OK) {
211 if (msg !=
nullptr) {
212 yCError(SUBSCRIBERONSQL,
"%s", msg);
237 bool SubscriberOnSql::removeSubscription(
const std::string& src,
238 const std::string& dest) {
242 char *query = sqlite3_mprintf(
"DELETE FROM subscriptions WHERE src = %Q AND dest = %Q",
245 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
249 if (result!=SQLITE_OK) {
250 yCError(SUBSCRIBERONSQL,
"Error in query");
259 bool SubscriberOnSql::welcome(
const std::string& port,
int activity) {
267 if (store !=
nullptr) {
284 query = sqlite3_mprintf(
"INSERT OR IGNORE INTO live (name,stamp) VALUES(%Q,DATETIME('now'))",
289 query = sqlite3_mprintf(
"DELETE FROM live WHERE name=%Q AND stamp < DATETIME('now','-30 seconds')",
293 query = sqlite3_mprintf(
"DELETE FROM live WHERE name=%Q",
297 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
301 if (result!=SQLITE_OK) {
303 if (msg !=
nullptr) {
304 yCError(SUBSCRIBERONSQL,
"%s", msg);
313 }
else if (activity<0) {
319 bool SubscriberOnSql::hookup(
const std::string& port) {
327 sqlite3_stmt *statement =
nullptr;
328 char *query =
nullptr;
330 query = sqlite3_mprintf(
"SELECT src,dest,srcFull,destFull FROM subscriptions WHERE (src = %Q OR dest= %Q) AND EXISTS (SELECT NULL FROM live WHERE name=src) AND EXISTS (SELECT NULL FROM live WHERE name=dest) UNION SELECT s1.src, s2.dest, s1.srcFull, s2.destFull FROM subscriptions s1, subscriptions s2, topics t WHERE (s1.dest = t.topic AND s2.src = t.topic) AND (s1.src = %Q OR s2.dest = %Q) AND EXISTS (SELECT NULL FROM live WHERE name=s1.src) AND EXISTS (SELECT NULL FROM live WHERE name=s2.dest)",port.c_str(), port.c_str(), port.c_str(), port.c_str());
332 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
335 if (result!=SQLITE_OK) {
337 if (msg !=
nullptr) {
338 yCError(SUBSCRIBERONSQL,
"%s", msg);
341 while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
342 char *src = (
char *)sqlite3_column_text(statement,0);
343 char *dest = (
char *)sqlite3_column_text(statement,1);
344 char *srcFull = (
char *)sqlite3_column_text(statement,2);
345 char *destFull = (
char *)sqlite3_column_text(statement,3);
346 char *mode = (
char *)sqlite3_column_text(statement,4);
347 checkSubscription(src,dest,srcFull,destFull,mode?mode:
"");
349 sqlite3_finalize(statement);
357 bool SubscriberOnSql::breakdown(
const std::string& port) {
365 sqlite3_stmt *statement =
nullptr;
366 char *query =
nullptr;
368 query = sqlite3_mprintf(
"SELECT src,dest,srcFull,destFull,mode FROM subscriptions WHERE ((src = %Q AND (mode IS NOT NULL OR EXISTS (SELECT NULL FROM live WHERE name=dest))) OR (dest = %Q AND (mode IS NOT NULL OR EXISTS (SELECT NULL FROM live WHERE name=src)))) UNION SELECT s1.src, s2.dest, s1.srcFull, s2.destFull, NULL FROM subscriptions s1, subscriptions s2, topics t WHERE (s1.dest = t.topic AND s2.src = t.topic AND ((s1.src = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s2.dest)) OR (s2.dest = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s1.src))))",port.c_str(), port.c_str(), port.c_str(), port.c_str());
369 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
372 if (result!=SQLITE_OK) {
374 if (msg !=
nullptr) {
375 yCError(SUBSCRIBERONSQL,
"%s", msg);
378 while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
379 char *src = (
char *)sqlite3_column_text(statement,0);
380 char *dest = (
char *)sqlite3_column_text(statement,1);
381 char *srcFull = (
char *)sqlite3_column_text(statement,2);
382 char *destFull = (
char *)sqlite3_column_text(statement,3);
383 char *mode = (
char *)sqlite3_column_text(statement,4);
384 breakSubscription(port,src,dest,srcFull,destFull,mode?mode:
"");
386 sqlite3_finalize(statement);
394 bool SubscriberOnSql::checkSubscription(
const std::string& src,
const std::string& dest,
395 const std::string& srcFull,
396 const std::string& destFull,
397 const std::string& mode) {
408 "+++ Checking %s %s / %s %s",
415 if (store !=
nullptr) {
420 bool destTopic = (cdest.
getCarrier()==
"topic");
421 if (!(srcTopic||destTopic)) {
423 "++> check connection %s %s",
426 connect(srcFull,destFull);
430 std::string mode_name = mode;
431 if (mode_name==
"from") {
433 removeSubscription(src,dest);
435 }
else if (mode_name==
"to") {
437 removeSubscription(src,dest);
446 bool SubscriberOnSql::breakSubscription(
const std::string& dropper,
447 const std::string& src,
const std::string& dest,
448 const std::string& srcFull,
449 const std::string& destFull,
450 const std::string& mode) {
461 "--- Checking %s %s / %s %s",
467 if (store !=
nullptr) {
468 bool srcDrop = std::string(dropper) == src;
471 contact = store->
query(src);
473 contact = store->
query(dest);
477 "--> check connection %s %s",
480 disconnect(srcFull,destFull,srcDrop);
483 std::string mode_name = mode;
484 if (mode_name==
"from") {
486 removeSubscription(src,dest);
488 }
else if (mode_name==
"to") {
490 removeSubscription(src,dest);
500 bool SubscriberOnSql::listSubscriptions(
const std::string& port,
503 sqlite3_stmt *statement =
nullptr;
504 char *query =
nullptr;
505 if (std::string(port)!=
"") {
506 query = sqlite3_mprintf(
"SELECT s.srcFull, s.DestFull, EXISTS(SELECT topic FROM topics WHERE topic = s.src), EXISTS(SELECT topic FROM topics WHERE topic = s.dest), s.mode FROM subscriptions s WHERE s.src = %Q OR s.dest= %Q ORDER BY s.src, s.dest",port.c_str(),port.c_str());
508 query = sqlite3_mprintf(
"SELECT s.srcFull, s.destFull, EXISTS(SELECT topic FROM topics WHERE topic = s.src), EXISTS(SELECT topic FROM topics WHERE topic = s.dest), s.mode FROM subscriptions s ORDER BY s.src, s.dest");
510 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
513 if (result!=SQLITE_OK) {
515 if (msg !=
nullptr) {
516 yCError(SUBSCRIBERONSQL,
"%s", msg);
520 while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
521 char *src = (
char *)sqlite3_column_text(statement,0);
522 char *dest = (
char *)sqlite3_column_text(statement,1);
523 int srcTopic = sqlite3_column_int(statement,2);
524 int destTopic = sqlite3_column_int(statement,3);
525 char *mode = (
char *)sqlite3_column_text(statement,4);
536 if (mode !=
nullptr) {
544 if (srcTopic||destTopic) {
552 sqlite3_finalize(statement);
560 bool SubscriberOnSql::setTopic(
const std::string& port,
const std::string& structure,
562 if (structure!=
"" || !active) {
564 char *query = sqlite3_mprintf(
"DELETE FROM topics WHERE topic = %Q",
566 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
570 if (result!=SQLITE_OK) {
571 yCError(SUBSCRIBERONSQL,
"Error in query");
576 if (!ok)
return false;
577 if (!active)
return true;
580 bool have_topic =
false;
583 sqlite3_stmt *statement =
nullptr;
584 char *query =
nullptr;
585 query = sqlite3_mprintf(
"SELECT topic FROM topics WHERE topic = %Q",
587 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
590 if (result!=SQLITE_OK) {
591 yCError(SUBSCRIBERONSQL,
"Error in query");
593 if (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
596 sqlite3_finalize(statement);
601 if (structure!=
"" || !have_topic) {
604 const char *pstructure = structure.c_str();
605 if (structure==
"") pstructure =
nullptr;
606 char *query = sqlite3_mprintf(
"INSERT INTO topics (topic,structure) VALUES(%Q,%Q)",
607 port.c_str(),pstructure);
608 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
612 if (result!=SQLITE_OK) {
614 if (msg !=
nullptr) {
615 yCError(SUBSCRIBERONSQL,
"%s", msg);
621 if (!ok)
return false;
624 vector<vector<std::string> > subs;
628 sqlite3_stmt *statement =
nullptr;
629 char *query = sqlite3_mprintf(
"SELECT s1.src, s2.dest, s1.srcFull, s2.destFull FROM subscriptions s1, subscriptions s2, topics t WHERE (t.topic = %Q AND s1.dest = t.topic AND s2.src = t.topic)", port.c_str());
630 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
633 if (result!=SQLITE_OK) {
635 if (msg !=
nullptr) {
636 yCError(SUBSCRIBERONSQL,
"%s", msg);
639 while (result == SQLITE_OK &&
640 sqlite3_step(statement) == SQLITE_ROW) {
641 char *src = (
char *)sqlite3_column_text(statement,0);
642 char *dest = (
char *)sqlite3_column_text(statement,1);
643 char *srcFull = (
char *)sqlite3_column_text(statement,2);
644 char *destFull = (
char *)sqlite3_column_text(statement,3);
645 char *mode = (
char *)sqlite3_column_text(statement,4);
646 vector<std::string> sub;
647 sub.emplace_back(src);
648 sub.emplace_back(dest);
649 sub.emplace_back(srcFull);
650 sub.emplace_back(destFull);
651 sub.emplace_back(mode?mode:
"");
654 sqlite3_finalize(statement);
658 for (
auto& sub : subs) {
659 checkSubscription(sub[0],sub[1],sub[2],sub[3],sub[4]);
668 sqlite3_stmt *statement =
nullptr;
669 char *query =
nullptr;
670 query = sqlite3_mprintf(
"SELECT topic FROM topics");
671 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
674 if (result!=SQLITE_OK) {
675 yCError(SUBSCRIBERONSQL,
"Error in query");
677 while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
678 char *topic = (
char *)sqlite3_column_text(statement,0);
681 sqlite3_finalize(statement);
689 bool SubscriberOnSql::setType(
const std::string& family,
690 const std::string& structure,
691 const std::string& value) {
694 char *query = sqlite3_mprintf(
"INSERT OR REPLACE INTO structures (name,%Q) VALUES(%Q,%Q)",
696 (structure==
"") ?
nullptr : structure.c_str(),
698 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
702 if (result!=SQLITE_OK) {
704 if (msg !=
nullptr) {
705 yCError(SUBSCRIBERONSQL,
"%s", msg);
714 std::string SubscriberOnSql::getType(
const std::string& family,
715 const std::string& structure) {
717 sqlite3_stmt *statement =
nullptr;
718 char *query =
nullptr;
719 query = sqlite3_mprintf(
"SELECT %s FROM structures WHERE name = %Q",
720 family.c_str(), structure.c_str());
721 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
725 if (result!=SQLITE_OK) {
726 yCError(SUBSCRIBERONSQL,
"Error in query");
728 if (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
729 sresult = (
const char *)sqlite3_column_text(statement,0);
731 sqlite3_finalize(statement);