YARP
Yet Another Robot Platform
SubscriberOnSql.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #include <cstdlib>
11 #include <cstdio>
12 
13 #include <sqlite3.h>
14 
15 #include <yarp/os/RosNameSpace.h>
19 
20 #if !defined(_WIN32)
21 #include <unistd.h>
22 #else
23 #include <io.h>
24 #define access(f,a) _access(f,a)
25 #endif
26 
27 #include <vector>
28 #include <string>
29 
30 #ifndef F_OK
31 #define F_OK 0
32 #endif
33 
34 #define SQLDB(x) ((sqlite3*)(x))
35 
36 using namespace yarp::os;
37 using namespace yarp::serversql::impl;
38 using namespace std;
39 
40 namespace {
41 YARP_SERVERSQL_LOG_COMPONENT(SUBSCRIBERONSQL, "yarp.serversql.impl.SubscriberOnSql")
42 } // namespace
43 
44 
45 bool SubscriberOnSql::open(const std::string& filename, bool fresh) {
46  sqlite3 *db = nullptr;
47  if (fresh) {
48  int result = access(filename.c_str(),F_OK);
49  if (result==0) {
50  yCWarning(SUBSCRIBERONSQL, "Database needs to be recreated.");
51  yCWarning(SUBSCRIBERONSQL, "Please move %s out of the way.", filename.c_str());
52  return false;
53  }
54 
55  }
56  int result = sqlite3_open_v2(filename.c_str(),
57  &db,
58  SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE|SQLITE_OPEN_NOMUTEX,
59  nullptr);
60  if (result!=SQLITE_OK) {
61  yCError(SUBSCRIBERONSQL, "Failed to open database %s", filename.c_str());
62  if (db != nullptr) {
63  sqlite3_close(db);
64  }
65  return false;
66  }
67 
68  const char *create_subscribe_table = "CREATE TABLE IF NOT EXISTS subscriptions (\n\
69  id INTEGER PRIMARY KEY,\n\
70  src TEXT,\n\
71  dest TEXT,\n\
72  srcFull TEXT,\n\
73  destFull TEXT,\n\
74  mode TEXT);";
75 
76  result = sqlite3_exec(db, create_subscribe_table, nullptr, nullptr, nullptr);
77  if (result!=SQLITE_OK) {
78  sqlite3_close(db);
79  yCError(SUBSCRIBERONSQL, "Failed to set up subscriptions table");
80  std::exit(1);
81  }
82 
83  const char *check_subscriptions_size = "PRAGMA table_info(subscriptions)";
84 
85  sqlite3_stmt *statement = nullptr;
86  result = sqlite3_prepare_v2(db, check_subscriptions_size, -1, &statement, nullptr);
87  if (result!=SQLITE_OK) {
88  yCError(SUBSCRIBERONSQL, "Failed to set up subscriptions table");
89  std::exit(1);
90  }
91 
92  int count = 0;
93  while (sqlite3_step(statement) == SQLITE_ROW) {
94  count++;
95  }
96  sqlite3_finalize(statement);
97 
98  if (count==5) {
99  const char *add_structure = "ALTER TABLE subscriptions ADD COLUMN mode";
100  result = sqlite3_exec(db, add_structure, nullptr, nullptr, nullptr);
101  if (result!=SQLITE_OK) {
102  sqlite3_close(db);
103  yCError(SUBSCRIBERONSQL, "Failed to set up subscriptions table");
104  std::exit(1);
105  }
106  }
107 
108  const char *create_topic_table = "CREATE TABLE IF NOT EXISTS topics (\n\
109  id INTEGER PRIMARY KEY,\n\
110  topic TEXT,\n\
111  structure TEXT);";
112 
113  result = sqlite3_exec(db, create_topic_table, nullptr, nullptr, nullptr);
114  if (result!=SQLITE_OK) {
115  sqlite3_close(db);
116  yCError(SUBSCRIBERONSQL, "Failed to set up topics table");
117  std::exit(1);
118  }
119 
120  const char *check_topic_size = "PRAGMA table_info(topics)";
121 
122  statement = nullptr;
123  result = sqlite3_prepare_v2(db, check_topic_size, -1, &statement, nullptr);
124  if (result!=SQLITE_OK) {
125  yCError(SUBSCRIBERONSQL, "Failed to set up topics table");
126  std::exit(1);
127  }
128 
129  count = 0;
130  while (sqlite3_step(statement) == SQLITE_ROW) {
131  //sqlite3_column_text(statement,1);
132  count++;
133  }
134  sqlite3_finalize(statement);
135 
136  if (count==2) {
137  const char *add_structure = "ALTER TABLE topics ADD COLUMN structure";
138  result = sqlite3_exec(db, add_structure, nullptr, nullptr, nullptr);
139  if (result!=SQLITE_OK) {
140  sqlite3_close(db);
141  yCError(SUBSCRIBERONSQL, "Failed to set up topics table");
142  std::exit(1);
143  }
144  }
145 
146  const char *create_live_table = "CREATE TABLE IF NOT EXISTS live (\n\
147  id INTEGER PRIMARY KEY,\n\
148  name TEXT UNIQUE,\n\
149  stamp DATETIME);";
150 
151  result = sqlite3_exec(db, create_live_table, nullptr, nullptr, nullptr);
152  if (result!=SQLITE_OK) {
153  sqlite3_close(db);
154  yCError(SUBSCRIBERONSQL, "Failed to set up live table");
155  std::exit(1);
156  }
157 
158  const char *create_struct_table = "CREATE TABLE IF NOT EXISTS structures (\n\
159  name TEXT PRIMARY KEY,\n\
160  yarp TEXT);";
161 
162  result = sqlite3_exec(db, create_struct_table, nullptr, nullptr, nullptr);
163  if (result!=SQLITE_OK) {
164  sqlite3_close(db);
165  yCError(SUBSCRIBERONSQL, "Failed to set up structures table");
166  std::exit(1);
167  }
168 
169  implementation = db;
170  return true;
171 }
172 
173 
174 bool SubscriberOnSql::close() {
175  if (implementation != nullptr) {
176  auto* db = (sqlite3 *)implementation;
177  sqlite3_close(db);
178  implementation = nullptr;
179  }
180  return true;
181 }
182 
183 bool SubscriberOnSql::addSubscription(const std::string& src,
184  const std::string& dest,
185  const std::string& mode) {
186  removeSubscription(src,dest);
187  ParseName psrc, pdest;
188  psrc.apply(src);
189  pdest.apply(dest);
190  if (psrc.getCarrier()=="topic") {
191  setTopic(psrc.getPortName(),"",true);
192  }
193  if (pdest.getCarrier()=="topic") {
194  setTopic(pdest.getPortName(),"",true);
195  }
196  char *msg = nullptr;
197  const char *zmode = mode.c_str();
198  if (mode == "") zmode = nullptr;
199  char *query = sqlite3_mprintf("INSERT INTO subscriptions (src,dest,srcFull,destFull,mode) VALUES(%Q,%Q,%Q,%Q,%Q)",
200  psrc.getPortName().c_str(),
201  pdest.getPortName().c_str(),
202  src.c_str(),
203  dest.c_str(),
204  zmode);
205  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
206 
207  bool ok = true;
208  int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, &msg);
209  if (result!=SQLITE_OK) {
210  ok = false;
211  if (msg != nullptr) {
212  yCError(SUBSCRIBERONSQL, "%s", msg);
213  sqlite3_free(msg);
214  }
215  }
216  sqlite3_free(query);
217  if (ok) {
218  if (psrc.getCarrier()!="topic") {
219  if (pdest.getCarrier()!="topic") {
220  checkSubscription(psrc.getPortName(),
221  pdest.getPortName(),
222  src,
223  dest,
224  mode);
225  } else {
226  hookup(psrc.getPortName());
227  }
228  } else {
229  if (pdest.getCarrier()!="topic") {
230  hookup(pdest.getPortName());
231  }
232  }
233  }
234  return ok;
235 }
236 
237 bool SubscriberOnSql::removeSubscription(const std::string& src,
238  const std::string& dest) {
239  ParseName psrc, pdest;
240  psrc.apply(src);
241  pdest.apply(dest);
242  char *query = sqlite3_mprintf("DELETE FROM subscriptions WHERE src = %Q AND dest = %Q",
243  psrc.getPortName().c_str(),
244  pdest.getPortName().c_str());
245  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
246 
247  int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, nullptr);
248  bool ok = true;
249  if (result!=SQLITE_OK) {
250  yCError(SUBSCRIBERONSQL, "Error in query");
251  ok = false;
252  }
253  sqlite3_free(query);
254 
255  return ok;
256 }
257 
258 
259 bool SubscriberOnSql::welcome(const std::string& port, int activity) {
260  mutex.lock();
261 
262  NameSpace *ns = getDelegate();
263  if (ns) {
264  NestedContact nc(port);
265  if (nc.getNestedName().size()>0) {
266  NameStore *store = getStore();
267  if (store != nullptr) {
268  Contact node = store->query(nc.getNodeName());
269  Contact me = store->query(port);
270  if (node.isValid() && me.isValid()) {
271  if (activity>0) {
272  ns->registerAdvanced(me,store);
273  } else {
274  ns->unregisterAdvanced(port,store);
275  }
276  }
277  }
278  }
279  }
280 
281  char *msg = nullptr;
282  char *query;
283  if (activity>0) {
284  query = sqlite3_mprintf("INSERT OR IGNORE INTO live (name,stamp) VALUES(%Q,DATETIME('now'))",
285  port.c_str());
286  } else {
287  // Port not responding. Mark as non-live.
288  if (activity==0) {
289  query = sqlite3_mprintf("DELETE FROM live WHERE name=%Q AND stamp < DATETIME('now','-30 seconds')",
290  port.c_str());
291  } else {
292  // activity = -1 -- definite dodo
293  query = sqlite3_mprintf("DELETE FROM live WHERE name=%Q",
294  port.c_str());
295  }
296  }
297  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
298 
299  bool ok = true;
300  int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, &msg);
301  if (result!=SQLITE_OK) {
302  ok = false;
303  if (msg != nullptr) {
304  yCError(SUBSCRIBERONSQL, "%s", msg);
305  sqlite3_free(msg);
306  }
307  }
308  sqlite3_free(query);
309  mutex.unlock();
310 
311  if (activity>0) {
312  hookup(port);
313  } else if (activity<0) {
314  breakdown(port);
315  }
316  return ok;
317 }
318 
319 bool SubscriberOnSql::hookup(const std::string& port) {
320  if (getDelegate()) {
321  NestedContact nc(port);
322  if (nc.getNestedName().size()>0) {
323  return false;
324  }
325  }
326  mutex.lock();
327  sqlite3_stmt *statement = nullptr;
328  char *query = nullptr;
329  //query = sqlite3_mprintf("SELECT * FROM subscriptions WHERE src = %Q OR dest= %Q",port, port);
330  query = sqlite3_mprintf("SELECT src,dest,srcFull,destFull FROM subscriptions WHERE (src = %Q OR dest= %Q) AND EXISTS (SELECT NULL FROM live WHERE name=src) AND EXISTS (SELECT NULL FROM live WHERE name=dest) UNION SELECT s1.src, s2.dest, s1.srcFull, s2.destFull FROM subscriptions s1, subscriptions s2, topics t WHERE (s1.dest = t.topic AND s2.src = t.topic) AND (s1.src = %Q OR s2.dest = %Q) AND EXISTS (SELECT NULL FROM live WHERE name=s1.src) AND EXISTS (SELECT NULL FROM live WHERE name=s2.dest)",port.c_str(), port.c_str(), port.c_str(), port.c_str());
331  //
332  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
333 
334  int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
335  if (result!=SQLITE_OK) {
336  const char *msg = sqlite3_errmsg(SQLDB(implementation));
337  if (msg != nullptr) {
338  yCError(SUBSCRIBERONSQL, "%s", msg);
339  }
340  }
341  while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
342  char *src = (char *)sqlite3_column_text(statement,0);
343  char *dest = (char *)sqlite3_column_text(statement,1);
344  char *srcFull = (char *)sqlite3_column_text(statement,2);
345  char *destFull = (char *)sqlite3_column_text(statement,3);
346  char *mode = (char *)sqlite3_column_text(statement,4);
347  checkSubscription(src,dest,srcFull,destFull,mode?mode:"");
348  }
349  sqlite3_finalize(statement);
350  sqlite3_free(query);
351  mutex.unlock();
352 
353  return false;
354 }
355 
356 
357 bool SubscriberOnSql::breakdown(const std::string& port) {
358  if (getDelegate()) {
359  NestedContact nc(port);
360  if (nc.getNestedName().size()>0) {
361  return false;
362  }
363  }
364  mutex.lock();
365  sqlite3_stmt *statement = nullptr;
366  char *query = nullptr;
367  // query = sqlite3_mprintf("SELECT src,dest,srcFull,destFull,mode FROM subscriptions WHERE ((src = %Q AND EXISTS (SELECT NULL FROM live WHERE name=dest)) OR (dest = %Q AND EXISTS (SELECT NULL FROM live WHERE name=src))) UNION SELECT s1.src, s2.dest, s1.srcFull, s2.destFull, NULL FROM subscriptions s1, subscriptions s2, topics t WHERE (s1.dest = t.topic AND s2.src = t.topic AND ((s1.src = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s2.dest)) OR (s2.dest = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s1.src))))",port, port, port, port);
368  query = sqlite3_mprintf("SELECT src,dest,srcFull,destFull,mode FROM subscriptions WHERE ((src = %Q AND (mode IS NOT NULL OR EXISTS (SELECT NULL FROM live WHERE name=dest))) OR (dest = %Q AND (mode IS NOT NULL OR EXISTS (SELECT NULL FROM live WHERE name=src)))) UNION SELECT s1.src, s2.dest, s1.srcFull, s2.destFull, NULL FROM subscriptions s1, subscriptions s2, topics t WHERE (s1.dest = t.topic AND s2.src = t.topic AND ((s1.src = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s2.dest)) OR (s2.dest = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s1.src))))",port.c_str(), port.c_str(), port.c_str(), port.c_str());
369  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
370 
371  int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
372  if (result!=SQLITE_OK) {
373  const char *msg = sqlite3_errmsg(SQLDB(implementation));
374  if (msg != nullptr) {
375  yCError(SUBSCRIBERONSQL, "%s", msg);
376  }
377  }
378  while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
379  char *src = (char *)sqlite3_column_text(statement,0);
380  char *dest = (char *)sqlite3_column_text(statement,1);
381  char *srcFull = (char *)sqlite3_column_text(statement,2);
382  char *destFull = (char *)sqlite3_column_text(statement,3);
383  char *mode = (char *)sqlite3_column_text(statement,4);
384  breakSubscription(port,src,dest,srcFull,destFull,mode?mode:"");
385  }
386  sqlite3_finalize(statement);
387  sqlite3_free(query);
388  mutex.unlock();
389 
390  return false;
391 }
392 
393 
394 bool SubscriberOnSql::checkSubscription(const std::string& src,const std::string& dest,
395  const std::string& srcFull,
396  const std::string& destFull,
397  const std::string& mode) {
398  if (getDelegate()) {
399  NestedContact nc(src);
400  if (nc.getNestedName().size()>0) {
401  NestedContact nc(dest);
402  if (nc.getNestedName().size()>0) {
403  return false;
404  }
405  }
406  }
407  yCDebug(SUBSCRIBERONSQL,
408  "+++ Checking %s %s / %s %s",
409  src.c_str(),
410  dest.c_str(),
411  srcFull.c_str(),
412  destFull.c_str());
413 
414  NameStore *store = getStore();
415  if (store != nullptr) {
416  Contact csrc = store->query(src);
417  Contact cdest = store->query(dest);
418  if (csrc.isValid()&&cdest.isValid()) {
419  bool srcTopic = (csrc.getCarrier()=="topic");
420  bool destTopic = (cdest.getCarrier()=="topic");
421  if (!(srcTopic||destTopic)) {
422  yCDebug(SUBSCRIBERONSQL,
423  "++> check connection %s %s",
424  srcFull.c_str(),
425  destFull.c_str());
426  connect(srcFull,destFull);
427  }
428  }
429  if (mode!="") {
430  std::string mode_name = mode;
431  if (mode_name=="from") {
432  if (!csrc.isValid()) {
433  removeSubscription(src,dest);
434  }
435  } else if (mode_name=="to") {
436  if (!cdest.isValid()) {
437  removeSubscription(src,dest);
438  }
439  }
440  }
441  }
442  return false;
443 }
444 
445 
446 bool SubscriberOnSql::breakSubscription(const std::string& dropper,
447  const std::string& src, const std::string& dest,
448  const std::string& srcFull,
449  const std::string& destFull,
450  const std::string& mode) {
451  if (getDelegate()) {
452  NestedContact nc(src);
453  if (nc.getNestedName().size()>0) {
454  NestedContact nc(dest);
455  if (nc.getNestedName().size()>0) {
456  return false;
457  }
458  }
459  }
460  yCDebug(SUBSCRIBERONSQL,
461  "--- Checking %s %s / %s %s",
462  src.c_str(),
463  dest.c_str(),
464  srcFull.c_str(),
465  destFull.c_str());
466  NameStore *store = getStore();
467  if (store != nullptr) {
468  bool srcDrop = std::string(dropper) == src;
469  Contact contact;
470  if (srcDrop) {
471  contact = store->query(src);
472  } else {
473  contact = store->query(dest);
474  }
475  if (contact.isValid()) {
476  yCDebug(SUBSCRIBERONSQL,
477  "--> check connection %s %s",
478  srcFull.c_str(),
479  destFull.c_str());
480  disconnect(srcFull,destFull,srcDrop);
481  }
482  if (mode!="") {
483  std::string mode_name = mode;
484  if (mode_name=="from") {
485  if (srcDrop) {
486  removeSubscription(src,dest);
487  }
488  } else if (mode_name=="to") {
489  if (!srcDrop) {
490  removeSubscription(src,dest);
491  }
492  }
493  }
494  }
495  return false;
496 }
497 
498 
499 
500 bool SubscriberOnSql::listSubscriptions(const std::string& port,
501  yarp::os::Bottle& reply) {
502  mutex.lock();
503  sqlite3_stmt *statement = nullptr;
504  char *query = nullptr;
505  if (std::string(port)!="") {
506  query = sqlite3_mprintf("SELECT s.srcFull, s.DestFull, EXISTS(SELECT topic FROM topics WHERE topic = s.src), EXISTS(SELECT topic FROM topics WHERE topic = s.dest), s.mode FROM subscriptions s WHERE s.src = %Q OR s.dest= %Q ORDER BY s.src, s.dest",port.c_str(),port.c_str());
507  } else {
508  query = sqlite3_mprintf("SELECT s.srcFull, s.destFull, EXISTS(SELECT topic FROM topics WHERE topic = s.src), EXISTS(SELECT topic FROM topics WHERE topic = s.dest), s.mode FROM subscriptions s ORDER BY s.src, s.dest");
509  }
510  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
511 
512  int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
513  if (result!=SQLITE_OK) {
514  const char *msg = sqlite3_errmsg(SQLDB(implementation));
515  if (msg != nullptr) {
516  yCError(SUBSCRIBERONSQL, "%s", msg);
517  }
518  }
519  reply.addString("subscriptions");
520  while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
521  char *src = (char *)sqlite3_column_text(statement,0);
522  char *dest = (char *)sqlite3_column_text(statement,1);
523  int srcTopic = sqlite3_column_int(statement,2);
524  int destTopic = sqlite3_column_int(statement,3);
525  char *mode = (char *)sqlite3_column_text(statement,4);
526  Bottle& b = reply.addList();
527  b.addString("subscription");
528  Bottle bsrc;
529  bsrc.addString("src");
530  bsrc.addString(src);
531  Bottle bdest;
532  bdest.addString("dest");
533  bdest.addString(dest);
534  b.addList() = bsrc;
535  b.addList() = bdest;
536  if (mode != nullptr) {
537  if (mode[0]!='\0') {
538  Bottle bmode;
539  bmode.addString("mode");
540  bmode.addString(mode);
541  b.addList() = bmode;
542  }
543  }
544  if (srcTopic||destTopic) {
545  Bottle btopic;
546  btopic.addString("topic");
547  btopic.addInt32(srcTopic);
548  btopic.addInt32(destTopic);
549  b.addList() = btopic;
550  }
551  }
552  sqlite3_finalize(statement);
553  sqlite3_free(query);
554  mutex.unlock();
555 
556  return true;
557 }
558 
559 
560 bool SubscriberOnSql::setTopic(const std::string& port, const std::string& structure,
561  bool active) {
562  if (structure!="" || !active) {
563  mutex.lock();
564  char *query = sqlite3_mprintf("DELETE FROM topics WHERE topic = %Q",
565  port.c_str());
566  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
567 
568  int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, nullptr);
569  bool ok = true;
570  if (result!=SQLITE_OK) {
571  yCError(SUBSCRIBERONSQL, "Error in query");
572  ok = false;
573  }
574  sqlite3_free(query);
575  mutex.unlock();
576  if (!ok) return false;
577  if (!active) return true;
578  }
579 
580  bool have_topic = false;
581  if (structure=="") {
582  mutex.lock();
583  sqlite3_stmt *statement = nullptr;
584  char *query = nullptr;
585  query = sqlite3_mprintf("SELECT topic FROM topics WHERE topic = %Q",
586  port.c_str());
587  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
588 
589  int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
590  if (result!=SQLITE_OK) {
591  yCError(SUBSCRIBERONSQL, "Error in query");
592  }
593  if (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
594  have_topic = true;
595  }
596  sqlite3_finalize(statement);
597  sqlite3_free(query);
598  mutex.unlock();
599  }
600 
601  if (structure!="" || !have_topic) {
602  mutex.lock();
603  char *msg = nullptr;
604  const char *pstructure = structure.c_str();
605  if (structure=="") pstructure = nullptr;
606  char *query = sqlite3_mprintf("INSERT INTO topics (topic,structure) VALUES(%Q,%Q)",
607  port.c_str(),pstructure);
608  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
609 
610  bool ok = true;
611  int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, &msg);
612  if (result!=SQLITE_OK) {
613  ok = false;
614  if (msg != nullptr) {
615  yCError(SUBSCRIBERONSQL, "%s", msg);
616  sqlite3_free(msg);
617  }
618  }
619  sqlite3_free(query);
620  mutex.unlock();
621  if (!ok) return false;
622  }
623 
624  vector<vector<std::string> > subs;
625 
626  // go ahead and connect anything needed
627  mutex.lock();
628  sqlite3_stmt *statement = nullptr;
629  char *query = sqlite3_mprintf("SELECT s1.src, s2.dest, s1.srcFull, s2.destFull FROM subscriptions s1, subscriptions s2, topics t WHERE (t.topic = %Q AND s1.dest = t.topic AND s2.src = t.topic)", port.c_str());
630  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
631 
632  int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
633  if (result!=SQLITE_OK) {
634  const char *msg = sqlite3_errmsg(SQLDB(implementation));
635  if (msg != nullptr) {
636  yCError(SUBSCRIBERONSQL, "%s", msg);
637  }
638  }
639  while (result == SQLITE_OK &&
640  sqlite3_step(statement) == SQLITE_ROW) {
641  char *src = (char *)sqlite3_column_text(statement,0);
642  char *dest = (char *)sqlite3_column_text(statement,1);
643  char *srcFull = (char *)sqlite3_column_text(statement,2);
644  char *destFull = (char *)sqlite3_column_text(statement,3);
645  char *mode = (char *)sqlite3_column_text(statement,4);
646  vector<std::string> sub;
647  sub.emplace_back(src);
648  sub.emplace_back(dest);
649  sub.emplace_back(srcFull);
650  sub.emplace_back(destFull);
651  sub.emplace_back(mode?mode:"");
652  subs.push_back(sub);
653  }
654  sqlite3_finalize(statement);
655  sqlite3_free(query);
656  mutex.unlock();
657 
658  for (auto& sub : subs) {
659  checkSubscription(sub[0],sub[1],sub[2],sub[3],sub[4]);
660  }
661 
662  return true;
663 }
664 
665 
666 bool SubscriberOnSql::listTopics(yarp::os::Bottle& topics) {
667  mutex.lock();
668  sqlite3_stmt *statement = nullptr;
669  char *query = nullptr;
670  query = sqlite3_mprintf("SELECT topic FROM topics");
671  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
672 
673  int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
674  if (result!=SQLITE_OK) {
675  yCError(SUBSCRIBERONSQL, "Error in query");
676  }
677  while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
678  char *topic = (char *)sqlite3_column_text(statement,0);
679  topics.addString(topic);
680  }
681  sqlite3_finalize(statement);
682  sqlite3_free(query);
683  mutex.unlock();
684 
685  return true;
686 }
687 
688 
689 bool SubscriberOnSql::setType(const std::string& family,
690  const std::string& structure,
691  const std::string& value) {
692  mutex.lock();
693  char *msg = nullptr;
694  char *query = sqlite3_mprintf("INSERT OR REPLACE INTO structures (name,%Q) VALUES(%Q,%Q)",
695  family.c_str(),
696  (structure=="") ? nullptr : structure.c_str(),
697  value.c_str());
698  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
699 
700  bool ok = true;
701  int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, &msg);
702  if (result!=SQLITE_OK) {
703  ok = false;
704  if (msg != nullptr) {
705  yCError(SUBSCRIBERONSQL, "%s", msg);
706  sqlite3_free(msg);
707  }
708  }
709  sqlite3_free(query);
710  mutex.unlock();
711  return ok;
712 }
713 
714 std::string SubscriberOnSql::getType(const std::string& family,
715  const std::string& structure) {
716  mutex.lock();
717  sqlite3_stmt *statement = nullptr;
718  char *query = nullptr;
719  query = sqlite3_mprintf("SELECT %s FROM structures WHERE name = %Q",
720  family.c_str(), structure.c_str());
721  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
722 
723  int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
724  std::string sresult;
725  if (result!=SQLITE_OK) {
726  yCError(SUBSCRIBERONSQL, "Error in query");
727  }
728  if (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
729  sresult = (const char *)sqlite3_column_text(statement,0);
730  }
731  sqlite3_finalize(statement);
732  sqlite3_free(query);
733  mutex.unlock();
734 
735  return sresult;
736 }
yarp::os::Bottle
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:73
RosNameSpace.h
YARP_SERVERSQL_LOG_COMPONENT
#define YARP_SERVERSQL_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
yarp::os::NestedContact
A placeholder for rich contact information.
Definition: NestedContact.h:27
ParseName.h
yarp::os::NameStore
Abstract interface for a database of port names.
Definition: NameStore.h:23
yCWarning
#define yCWarning(component,...)
Definition: LogComponent.h:146
yarp::serversql::impl::ParseName::getPortName
std::string getPortName()
Definition: ParseName.h:29
yarp::serversql::impl::ParseName
Definition: ParseName.h:21
yarp::os::NestedContact::getNodeName
std::string getNodeName() const
Definition: NestedContact.cpp:184
F_OK
#define F_OK
Definition: SubscriberOnSql.cpp:31
yarp::os::NameSpace::unregisterAdvanced
virtual Contact unregisterAdvanced(const std::string &name, NameStore *store)
Remove contact information, with access to the contact information of other ports for cross-referenci...
Definition: NameSpace.h:107
LogComponent.h
yarp::os::Bottle::addList
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
Definition: Bottle.cpp:185
yarp::os::Contact::getCarrier
std::string getCarrier() const
Get the carrier associated with this Contact for socket communication.
Definition: Contact.cpp:253
yarp::os::Bottle::addInt32
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
Definition: Bottle.cpp:143
yarp::os::NameStore::query
virtual Contact query(const std::string &name)=0
yarp::os::Bottle::addString
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:173
SubscriberOnSql.h
yarp::os::NameSpace
An abstract name space for ports.
Definition: NameSpace.h:26
yarp::serversql::impl::ParseName::getCarrier
std::string getCarrier()
Definition: ParseName.h:34
yarp::os::NameSpace::registerAdvanced
virtual Contact registerAdvanced(const Contact &contact, NameStore *store)
Record contact information, with access to the contact information of other ports for cross-referenci...
Definition: NameSpace.h:96
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
SQLDB
#define SQLDB(x)
Definition: SubscriberOnSql.cpp:34
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
yarp::os::Contact::isValid
bool isValid() const
Checks if a Contact is tagged as valid.
Definition: Contact.cpp:301
yarp::serversql::impl::ParseName::apply
void apply(const std::string &str)
Definition: ParseName.cpp:17
yarp::os::Contact
Represents how to reach a part of a YARP network.
Definition: Contact.h:39
implementation
RandScalar * implementation(void *t)
Definition: RandnScalar.cpp:20
yarp::serversql::impl
Definition: Allocator.h:18
yarp::os::NestedContact::getNestedName
std::string getNestedName() const
Definition: NestedContact.cpp:189