39 # include <ace/INET_Addr.h>
40 # include <ace/Sched_Params.h>
47 #if defined(__linux__) // used for configuring scheduling properties
49 # include <sys/types.h>
73 yCDebug(PORTCORE,
"Starting listening on %s", address.
toURI().c_str());
77 yCError(PORTCORE,
"YARP not initialized; create a yarp::os::Network object before using ports");
85 std::lock_guard<std::mutex> lock(m_stateMutex);
93 yCAssert(PORTCORE, !m_closing.load());
94 yCAssert(PORTCORE, !m_finished.load());
95 yCAssert(PORTCORE, m_face ==
nullptr);
103 m_address.setTimeout(m_timeout);
108 if (m_face ==
nullptr) {
113 if (m_address.getPort() <= 0) {
114 m_address = m_face->getLocalAddress();
115 if (m_address.getRegName() ==
"...") {
116 m_address.setName(std::string(
"/") + m_address.getHost() +
"_" +
NetType::toString(m_address.getPort()));
117 setName(m_address.getRegName());
122 m_listening.store(
true);
126 if (shouldAnnounce) {
146 yCAssert(PORTCORE, !m_running.load());
147 yCAssert(PORTCORE, m_reader ==
nullptr);
154 yCAssert(PORTCORE, !m_running.load());
155 yCAssert(PORTCORE, m_adminReader ==
nullptr);
156 m_adminReader = &reader;
162 yCAssert(PORTCORE, !m_running.load());
163 yCAssert(PORTCORE, m_readableCreator ==
nullptr);
164 m_readableCreator = &creator;
181 yCAssert(PORTCORE, m_listening.load());
182 yCAssert(PORTCORE, !m_running.load());
183 yCAssert(PORTCORE, !m_closing.load());
184 yCAssert(PORTCORE, !m_finished.load());
185 yCAssert(PORTCORE, m_starting.load());
190 std::lock_guard<std::mutex> lock(m_stateMutex);
191 m_running.store(
true);
192 m_starting.store(
false);
196 m_stateCv.notify_one();
198 yCTrace(PORTCORE,
"run running");
204 bool shouldStop =
false;
205 while (!shouldStop) {
212 std::lock_guard<std::mutex> lock(m_stateMutex);
217 yCDebug(PORTCORE,
"received something");
224 shouldStop |= m_closing.load();
237 yCDebug(PORTCORE,
"spun off a connection");
252 std::lock_guard<std::mutex> lock(m_stateMutex);
253 m_connectionListeners = 0;
254 m_connectionChangeCv.notify_all();
257 yCTrace(PORTCORE,
"run closing");
260 std::lock_guard<std::mutex> lock(m_stateMutex);
261 m_connectionListeners = 0;
262 m_connectionChangeCv.notify_all();
263 m_finished.store(
true);
271 if (m_prop !=
nullptr) {
275 m_modifier.releaseOutModifier();
276 m_modifier.releaseInModifier();
285 std::unique_lock<std::mutex> lock(m_stateMutex);
288 yCAssert(PORTCORE, m_listening.load());
289 yCAssert(PORTCORE, !m_running.load());
290 yCAssert(PORTCORE, !m_starting.load());
291 yCAssert(PORTCORE, !m_finished.load());
292 yCAssert(PORTCORE, !m_closing.load());
294 m_starting.store(
true);
304 m_stateCv.wait(lock, [&]{
return m_running.load(); });
305 yCAssert(PORTCORE, m_running.load());
313 yCTrace(PORTCORE,
"manualStart");
321 m_interruptable =
false;
331 m_interrupted =
false;
336 yCTrace(PORTCORE,
"interrupt");
339 if (!m_listening.load()) {
344 m_interrupted =
true;
349 if (!m_interruptable) {
359 std::lock_guard<std::mutex> lock(m_stateMutex);
360 if (m_reader !=
nullptr) {
361 yCDebug(PORTCORE,
"sending update-state message to listener");
371 void PortCore::closeMain()
373 yCTrace(PORTCORE,
"closeMain");
377 std::lock_guard<std::mutex> lock(m_stateMutex);
380 if (m_finishing || !(m_running.load() || m_manual)) {
381 yCTrace(PORTCORE,
"closeMain - nothing to do");
385 yCTrace(PORTCORE,
"closeMain - Central");
389 yCDebug(PORTCORE,
"now preparing to shut down port");
399 std::string prevName;
402 std::string removeName;
405 std::lock_guard<std::mutex> lock(m_stateMutex);
406 for (
auto* unit : m_units) {
407 if ((unit !=
nullptr) && unit->isInput() && !unit->isDoomed()) {
408 Route r = unit->getRoute();
410 if (s.length() >= 1 && s[0] ==
'/' && s != getName() && s != prevName) {
419 yCDebug(PORTCORE,
"requesting removal of connection from %s", removeName.c_str());
428 prevName = removeName;
440 std::lock_guard<std::mutex> lock(m_stateMutex);
441 for (
auto* unit : m_units) {
442 if ((unit !=
nullptr) && unit->isOutput() && !unit->isFinished()) {
443 removeRoute = unit->getRoute();
452 removeUnit(removeRoute,
true);
456 bool stopRunning = m_running.load();
461 m_closing.store(
true);
475 yCAssert(PORTCORE, m_finished.load());
484 std::lock_guard<std::mutex> lock(m_stateMutex);
485 m_finished.store(
false);
486 m_closing.store(
false);
487 m_running.store(
false);
488 yCInfo(PORTCORE,
"Resetting. m_running = %s", m_running.load() ?
"true" :
"false");
494 if (m_listening.load()) {
495 yCAssert(PORTCORE, m_face !=
nullptr);
499 m_listening.store(
false);
505 if (m_reader !=
nullptr) {
506 yCDebug(PORTCORE,
"sending end-of-port message to listener");
514 std::string name = getName();
515 if (name != std::string(
"")) {
516 if (m_controlRegistration) {
526 yCAssert(PORTCORE, !m_listening.load());
527 yCAssert(PORTCORE, !m_running.load());
528 yCAssert(PORTCORE, !m_starting.load());
529 yCAssert(PORTCORE, !m_closing.load());
530 yCAssert(PORTCORE, !m_finished.load());
532 yCAssert(PORTCORE, m_face ==
nullptr);
539 std::lock_guard<std::mutex> lock(m_stateMutex);
545 void PortCore::closeUnits()
549 yCAssert(PORTCORE, m_finished.load());
553 for (
auto& i : m_units) {
555 if (unit !=
nullptr) {
556 yCDebug(PORTCORE,
"closing a unit");
558 yCDebug(PORTCORE,
"joining a unit");
561 yCDebug(PORTCORE,
"deleting a unit");
570 void PortCore::reapUnits()
574 if (!m_finished.load()) {
575 std::lock_guard<std::mutex> lock(m_stateMutex);
576 for (
auto* unit : m_units) {
579 yCDebug(PORTCORE,
"Informing connection %s that it is doomed", s.c_str());
581 yCDebug(PORTCORE,
"Closed connection %s", s.c_str());
583 yCDebug(PORTCORE,
"Joined thread of connection %s", s.c_str());
590 void PortCore::cleanUnits(
bool blocking)
597 std::unique_lock<std::mutex> lock(m_stateMutex, std::defer_lock);
601 bool have_lock = lock.try_lock();
609 int updatedInputCount = 0;
610 int updatedOutputCount = 0;
611 int updatedDataOutputCount = 0;
612 yCDebug(PORTCORE,
"/ routine check of connections to this port begins");
613 if (!m_finished.load()) {
616 for (
auto& i : m_units) {
618 if (unit !=
nullptr) {
622 yCDebug(PORTCORE,
"| removing connection %s", con.c_str());
627 yCDebug(PORTCORE,
"| removed connection %s", con.c_str());
632 updatedOutputCount++;
634 updatedDataOutputCount++;
651 for (
size_t i2 = 0; i2 < m_units.size(); i2++) {
652 if (m_units[i2] !=
nullptr) {
654 m_units[rem] = m_units[i2];
655 m_units[i2] =
nullptr;
662 for (
size_t i3 = 0; i3 < m_units.size() - rem; i3++) {
668 m_dataOutputCount = updatedDataOutputCount;
671 m_packetMutex.lock();
672 m_inputCount = updatedInputCount;
673 m_outputCount = updatedOutputCount;
674 m_packetMutex.unlock();
675 yCDebug(PORTCORE,
"\\ routine check of connections to this port ends");
689 std::lock_guard<std::mutex> lock(m_stateMutex);
694 yCAssert(PORTCORE, unit !=
nullptr);
696 m_units.push_back(unit);
697 yCTrace(PORTCORE,
"there are now %zu units", m_units.size());
703 yCTrace(PORTCORE,
"addOutput");
713 if (!m_finished.load()) {
714 std::lock_guard<std::mutex> lock(m_stateMutex);
716 yCAssert(PORTCORE, unit !=
nullptr);
718 m_units.push_back(unit);
723 bool PortCore::isUnit(
const Route& route,
int index)
726 bool needReap =
false;
727 if (!m_finished.load()) {
728 for (
auto* unit : m_units) {
729 if (unit !=
nullptr) {
731 std::string wild =
"*";
734 ok = ok && (unit->
getIndex() == index);
758 bool PortCore::removeUnit(
const Route& route,
bool synch,
bool* except)
763 if (except !=
nullptr) {
764 yCDebug(PORTCORE,
"asked to remove connection in the way of %s", route.
toString().c_str());
767 yCDebug(PORTCORE,
"asked to remove connection %s", route.
toString().c_str());
772 std::vector<int> removals;
774 bool needReap =
false;
775 if (!m_finished.load()) {
776 std::lock_guard<std::mutex> lock(m_stateMutex);
777 for (
auto* unit : m_units) {
778 if (unit !=
nullptr) {
780 std::string wild =
"*";
789 if (except ==
nullptr) {
801 removals.push_back(unit->
getIndex());
814 yCDebug(PORTCORE,
"one or more connections need prodding to die");
826 yCDebug(PORTCORE,
"sent message to prod connection death");
830 yCDebug(PORTCORE,
"synchronizing with connection death");
833 std::unique_lock<std::mutex> lock(m_stateMutex);
834 while (std::any_of(removals.begin(), removals.end(), [&](
int removal){ return isUnit(route, removal); })) {
835 m_connectionListeners++;
836 m_connectionChangeCv.wait(lock, [&]{
return m_connectionListeners == 0; });
852 yCDebug(PORTCORE,
"asked to add output to %s", dest.c_str());
865 bw.
appendLine(std::string(
"Do not know how to connect to ") + dest);
882 yCDebug(PORTCORE,
"output already present to %s", dest.c_str());
883 bw.
appendLine(std::string(
"Desired connection already present from ") + getName() +
" to " + dest);
897 aname = address.
toURI(
false);
909 unsigned int f = getFlags();
914 bool is_log = (!mode.empty());
917 err =
"Logger configured as log." + mode +
", but only log.in is supported";
920 append =
"; " + r.
getFromName() +
" will forward messages and replies (if any) to " + r.
getToName();
931 err =
"Outputs not allowed";
936 if (m_dataOutputCount >= 1 && !is_log) {
937 err =
"RPC output already connected";
963 bool ok = op->
open(r);
965 yCDebug(PORTCORE,
"open route error");
973 bw.
appendLine(std::string(
"Cannot connect to ") + dest);
994 if (!m_finished.load()) {
995 std::lock_guard<std::mutex> lock(m_stateMutex);
1000 yCAssert(PORTCORE, unit !=
nullptr);
1002 m_units.push_back(unit);
1007 bw.
appendLine(std::string(
"Added connection from ") + getName() +
" to " + dest + append);
1008 if (os !=
nullptr) {
1019 yCDebug(PORTCORE,
"asked to remove output to %s", dest.c_str());
1023 if (removeUnit(
Route(
"*", dest,
"*"),
true)) {
1024 bw.
appendLine(std::string(
"Removed connection from ") + getName() +
" to " + dest);
1026 bw.
appendLine(std::string(
"Could not find an outgoing connection to ") + dest);
1028 if (os !=
nullptr) {
1037 yCDebug(PORTCORE,
"asked to remove input to %s", src.c_str());
1041 if (removeUnit(
Route(src,
"*",
"*"),
true)) {
1042 bw.
appendLine(std::string(
"Removing connection from ") + src +
" to " + getName());
1044 bw.
appendLine(std::string(
"Could not find an incoming connection from ") + src);
1046 if (os !=
nullptr) {
1063 std::lock_guard<std::mutex> lock(m_stateMutex);
1066 bw.
appendLine(std::string(
"This is ") + m_address.getRegName() +
" at " + m_address.toURI());
1070 for (
auto* unit : m_units) {
1079 bw.
appendLine(
"There are no outgoing connections");
1084 for (
auto* unit : m_units) {
1095 bw.
appendLine(
"There are no incoming connections");
1100 if (os !=
nullptr) {
1105 printf(
"%s\n", sos.
toString().c_str());
1114 std::lock_guard<std::mutex> lock(m_stateMutex);
1119 std::string portName = m_address.getRegName();
1120 baseInfo.
message = std::string(
"This is ") + portName +
" at " + m_address.toURI();
1121 reporter.
report(baseInfo);
1125 for (
auto* unit : m_units) {
1144 info.
message =
"There are no outgoing connections";
1150 for (
auto* unit : m_units) {
1169 info.
message =
"There are no incoming connections";
1177 std::lock_guard<std::mutex> lock(m_stateMutex);
1178 if (reporter !=
nullptr) {
1179 m_eventReporter = reporter;
1185 std::lock_guard<std::mutex> lock(m_stateMutex);
1186 m_eventReporter =
nullptr;
1197 if (m_eventReporter !=
nullptr) {
1198 m_eventReporter->report(info);
1215 if (m_reader !=
nullptr && !m_interrupted) {
1216 m_interruptable =
false;
1218 bool haveOutputs = (m_outputCount != 0);
1220 if (m_logNeeded && haveOutputs) {
1227 recorder.
init(&reader);
1229 result = m_reader->read(recorder);
1237 result = m_reader->read(reader);
1241 m_interruptable =
true;
1244 yCDebug(PORTCORE,
"data received, no reader for it");
1246 result = b.
read(reader);
1259 m_modifier.outputMutex.lock();
1260 if (m_modifier.outputModifier !=
nullptr) {
1261 if (!m_modifier.outputModifier->acceptOutgoingData(writer)) {
1262 m_modifier.outputMutex.unlock();
1265 m_modifier.outputModifier->modifyOutgoingData(writer);
1267 m_modifier.outputMutex.unlock();
1281 if (m_interrupted || m_finishing) {
1286 bool gotReply =
false;
1288 std::string envelopeString = m_envelope;
1298 yCTrace(PORTCORE,
"------- send in real");
1310 std::lock_guard<std::mutex> lock(m_stateMutex);
1313 if (m_finished.load()) {
1317 yCTrace(PORTCORE,
"------- send in");
1320 m_packetMutex.lock();
1322 yCAssert(PORTCORE, packet !=
nullptr);
1323 packet->
setContent(&writer,
false, callback);
1324 m_packetMutex.unlock();
1327 for (
auto* unit : m_units) {
1329 bool log = (!unit->
getMode().empty());
1339 yCTrace(PORTCORE,
"------- -- inc");
1340 m_packetMutex.lock();
1342 m_packetMutex.unlock();
1343 yCTrace(PORTCORE,
"------- -- pre-send");
1344 bool gotReplyOne =
false;
1346 void* out = unit->
send(writer,
1348 (callback !=
nullptr) ? callback : (&writer),
1349 reinterpret_cast<void*
>(packet),
1354 gotReply = gotReply || gotReplyOne;
1355 yCTrace(PORTCORE,
"------- -- send");
1356 if (out !=
nullptr) {
1358 m_packetMutex.lock();
1361 m_packetMutex.unlock();
1368 yCTrace(PORTCORE,
"------- -- dec");
1371 yCTrace(PORTCORE,
"------- pack check");
1372 m_packetMutex.lock();
1379 m_packets.checkPacket(packet);
1380 m_packetMutex.unlock();
1381 yCTrace(PORTCORE,
"------- packed");
1382 yCTrace(PORTCORE,
"------- send out");
1384 if (logCount == 0) {
1385 m_logNeeded =
false;
1389 yCTrace(PORTCORE,
"------- send out real");
1391 if (m_waitAfterSend && reader !=
nullptr) {
1392 all_ok = all_ok && gotReply;
1401 bool writing =
false;
1405 if (!m_finished.load()) {
1406 std::lock_guard<std::mutex> lock(m_stateMutex);
1407 for (
auto* unit : m_units) {
1421 m_packetMutex.lock();
1422 int result = m_inputCount;
1423 m_packetMutex.unlock();
1430 m_packetMutex.lock();
1431 int result = m_outputCount;
1432 m_packetMutex.unlock();
1439 yCTrace(PORTCORE,
"starting notifyCompletion");
1440 m_packetMutex.lock();
1441 if (tracker !=
nullptr) {
1445 m_packetMutex.unlock();
1446 yCTrace(PORTCORE,
"stopping notifyCompletion");
1452 m_envelopeWriter.restart();
1453 bool ok = envelope.
write(m_envelopeWriter);
1455 setEnvelope(m_envelopeWriter.toString());
1463 m_envelope = envelope;
1464 for (
size_t i = 0; i < m_envelope.length(); i++) {
1467 if (m_envelope[i] < 32) {
1468 m_envelope = m_envelope.substr(0, i);
1472 yCDebug(PORTCORE,
"set envelope to %s", m_envelope.c_str());
1483 sis.
add(m_envelope);
1487 sbr.
reset(sis,
nullptr, route, 0,
true);
1488 return envelope.
read(sbr);
1496 const char* carrier,
1502 style.
quiet = !verbose;
1519 result = addr.set(c.
getPort(),
"127.0.0.1");
1523 result = addr.set(c.
getPort(),
"127.0.1.1");
1572 if (cmd ==
"publisherUpdate") {
1573 return PortCoreCommand::RosPublisherUpdate;
1575 if (cmd ==
"requestTopic") {
1576 return PortCoreCommand::RosRequestTopic;
1578 if (cmd ==
"getPid") {
1579 return PortCoreCommand::RosGetPid;
1581 if (cmd ==
"getBusInfo") {
1582 return PortCoreCommand::RosGetBusInfo;
1586 auto cmd =
static_cast<PortCoreCommand
>(v.
asVocab());
1588 case PortCoreCommand::Help:
1589 case PortCoreCommand::Ver:
1590 case PortCoreCommand::Pray:
1591 case PortCoreCommand::Add:
1592 case PortCoreCommand::Del:
1593 case PortCoreCommand::Atch:
1594 case PortCoreCommand::Dtch:
1595 case PortCoreCommand::List:
1596 case PortCoreCommand::Set:
1597 case PortCoreCommand::Get:
1598 case PortCoreCommand::Prop:
1599 case PortCoreCommand::RosPublisherUpdate:
1600 case PortCoreCommand::RosRequestTopic:
1601 case PortCoreCommand::RosGetPid:
1602 case PortCoreCommand::RosGetBusInfo:
1605 return PortCoreCommand::Unknown;
1609 PortCoreConnectionDirection parseConnectionDirection(
yarp::conf::vocab32_t v,
bool errorIsOut =
false)
1611 auto dir =
static_cast<PortCoreConnectionDirection
>(v);
1613 case PortCoreConnectionDirection::In:
1614 case PortCoreConnectionDirection::Out:
1617 return errorIsOut ? PortCoreConnectionDirection::Out : PortCoreConnectionDirection::Error;
1623 auto action =
static_cast<PortCorePropertyAction
>(v);
1625 case PortCorePropertyAction::Get:
1626 case PortCorePropertyAction::Set:
1629 return PortCorePropertyAction::Error;
1633 void describeRoute(
const Route& route,
Bottle& result)
1650 bconnectionless.
addString(
"connectionless");
1653 if (!carrier->
isPush()) {
1675 yCDebug(PORTCORE,
"Port %s received command %s", getName().c_str(), cmd.
toString().c_str());
1677 auto handleAdminHelpCmd = []() {
1681 result.
addString(
"[help] # give this help");
1682 result.
addString(
"[ver] # report protocol version information");
1683 result.
addString(
"[add] $portname # add an output connection");
1684 result.
addString(
"[add] $portname $car # add an output with a given protocol");
1685 result.
addString(
"[del] $portname # remove an input or output connection");
1686 result.
addString(
"[list] [in] # list input connections");
1687 result.
addString(
"[list] [out] # list output connections");
1688 result.
addString(
"[list] [in] $portname # give details for input");
1689 result.
addString(
"[list] [out] $portname # give details for output");
1690 result.
addString(
"[prop] [get] # get all user-defined port properties");
1691 result.
addString(
"[prop] [get] $prop # get a user-defined port property (prop, val)");
1692 result.
addString(
"[prop] [set] $prop $val # set a user-defined port property (prop, val)");
1693 result.
addString(
"[prop] [get] $portname # get Qos properties of a connection to/from a port");
1694 result.
addString(
"[prop] [set] $portname # set Qos properties of a connection to/from a port");
1695 result.
addString(
"[prop] [get] $cur_port # get information about current process (e.g., scheduling priority, pid)");
1696 result.
addString(
"[prop] [set] $cur_port # set properties of the current process (e.g., scheduling priority, pid)");
1697 result.
addString(
"[atch] [out] $prop # attach a portmonitor plug-in to the port's output");
1698 result.
addString(
"[atch] [in] $prop # attach a portmonitor plug-in to the port's input");
1699 result.
addString(
"[dtch] [out] # detach portmonitor plug-in from the port's output");
1700 result.
addString(
"[dtch] [in] # detach portmonitor plug-in from the port's input");
1706 auto handleAdminVerCmd = []() {
1717 auto handleAdminPrayCmd = [
this]() {
1729 while (name[0] ==
'/') {
1730 name = name.substr(1);
1733 auto i = name.find(
'/');
1734 if (i != std::string::npos) {
1735 name = name.substr(0, i);
1739 std::random_device rd;
1740 std::mt19937 mt(rd());
1741 std::uniform_int_distribution<int> dist2(0,1);
1742 auto d2 = std::bind(dist2, mt);
1744 result.
addString(
"You begin praying to " + name +
".");
1745 result.
addString(
"You finish your prayer.");
1747 static const char* godvoices[] = {
1753 std::uniform_int_distribution<int> godvoices_dist(0, (
sizeof(godvoices) /
sizeof(godvoices[0])) - 1);
1754 auto godvoice = [&]() {
1755 return std::string(godvoices[godvoices_dist(mt)]);
1758 static const char* creatures[] = {
1763 std::uniform_int_distribution<int> creatures_dist(0, (
sizeof(creatures) /
sizeof(creatures[0])) - 1);
1764 auto creature = [&]() {
1765 return std::string(creatures[creatures_dist(mt)]);
1768 static const char* auras[] = {
1776 std::uniform_int_distribution<int> auras_dist(0, (
sizeof(auras) /
sizeof(auras[0])) - 1);
1778 return std::string(auras[auras_dist(mt)]);
1781 static const char* items[] = {
1791 std::uniform_int_distribution<int> items_dist(0, (
sizeof(items) /
sizeof(items[0])) - 1);
1793 return std::string(items[items_dist(mt)]);
1796 static const char* blessings[] = {
1797 "You feel more limber.",
1798 "The slime disappears.",
1799 "Your amulet vanishes! You can breathe again.",
1800 "You can breathe again.",
1801 "You are back on solid ground.",
1802 "Your stomach feels content.",
1804 "You feel much better.",
1805 "Your surroundings change.",
1806 "Your shape becomes uncertain.",
1807 "Your chain disappears.",
1808 "There's a tiger in your tank.",
1809 "You feel in good health again.",
1810 "Your eye feels better.",
1811 "Your eyes feel better.",
1812 "Looks like you are back in Kansas.",
1813 "Your <ITEM> softly glows <AURA>.",
1815 std::uniform_int_distribution<int> blessings_dist(0, (
sizeof(blessings) /
sizeof(blessings[0])) - 1);
1816 auto blessing = [&](){
1817 auto blessing = std::string(blessings[blessings_dist(mt)]);
1818 blessing = std::regex_replace(blessing, std::regex(
"<ITEM>"), item());
1819 blessing = std::regex_replace(blessing, std::regex(
"<AURA>"), aura());
1823 std::uniform_int_distribution<int> dist13(0,12);
1824 switch(dist13(mt)) {
1827 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"bummed" :
"displeased") +
".");
1831 result.
addString(
"The voice of " + name +
" " + godvoice() +
1832 ": \"Thou " + (d2() ?
"hast strayed from the path" :
"art arrogant") +
1833 ", " + creature() +
". Thou must relearn thy lessons!\"");
1837 result.
addString(
"The voice of " + name +
" " + godvoice() +
1838 ": \"Thou hast angered me.\"");
1839 result.
addString(
"A black glow surrounds you.");
1842 result.
addString(
"The voice of " + name +
" " + godvoice() +
1843 ": \"Thou hast angered me.\"");
1847 result.
addString(
"The voice of " + name +
" " + godvoice() +
1848 ": \"Thou durst " + (d2() ?
"scorn" :
"call upon") +
1849 " me? Then die, " + creature() +
"!\"");
1852 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"pleased as punch" :
"well-pleased") +
".");
1856 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"ticklish" :
"pleased") +
".");
1860 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"full" :
"satisfied") +
".");
1864 result.
addString(
"The voice of " + name +
" " + godvoice() +
1865 ": \"Thou hast angered me.\"");
1866 result.
addString(
"Suddenly, a bolt of lightning strikes you!");
1867 result.
addString(
"You fry to a crisp!");
1874 auto handleAdminAddCmd = [
this, id](std::string output,
1875 const std::string& carrier) {
1879 if (!carrier.empty()) {
1880 output = carrier +
":/" + output;
1882 addOutput(output,
id, &cache,
false);
1884 int v = (r[0] ==
'A') ? 0 : -1;
1890 auto handleAdminDelCmd = [
this, id](
const std::string& dest) {
1894 removeOutput(dest,
id, &cache);
1897 removeInput(dest,
id, &cache);
1899 int v = (r1[0] ==
'R' || r2[0] ==
'R') ? 0 : -1;
1901 if (r1[0] ==
'R' && r2[0] !=
'R') {
1903 }
else if (r1[0] !=
'R' && r2[0] ==
'R') {
1911 auto handleAdminAtchCmd = [
this](PortCoreConnectionDirection direction,
1914 switch (direction) {
1915 case PortCoreConnectionDirection::Out: {
1917 if (!attachPortMonitor(prop,
true, errMsg)) {
1924 case PortCoreConnectionDirection::In: {
1926 if (!attachPortMonitor(prop,
false, errMsg)) {
1933 case PortCoreConnectionDirection::Error:
1935 result.
addString(
"attach command must be followed by [out] or [in]");
1940 auto handleAdminDtchCmd = [
this](PortCoreConnectionDirection direction) {
1942 switch (direction) {
1943 case PortCoreConnectionDirection::Out: {
1944 if (detachPortMonitor(
true)) {
1950 case PortCoreConnectionDirection::In: {
1951 if (detachPortMonitor(
false)) {
1957 case PortCoreConnectionDirection::Error:
1959 result.
addString(
"detach command must be followed by [out] or [in]");
1964 auto handleAdminListCmd = [
this](
const PortCoreConnectionDirection direction,
1965 const std::string& target) {
1967 switch (direction) {
1968 case PortCoreConnectionDirection::In: {
1970 std::lock_guard<std::mutex> lock(m_stateMutex);
1971 for (
auto* unit : m_units) {
1974 if (target.empty()) {
1976 if (!name.empty()) {
1980 describeRoute(route, result);
1985 case PortCoreConnectionDirection::Out: {
1987 std::lock_guard<std::mutex> lock(m_stateMutex);
1988 for (
auto* unit : m_units) {
1991 if (target.empty()) {
1993 }
else if (route.
getToName() == target) {
1994 describeRoute(route, result);
1999 case PortCoreConnectionDirection::Error:
2007 auto handleAdminSetInCmd = [
this](
const std::string& target,
2011 std::lock_guard<std::mutex> lock(m_stateMutex);
2012 if (target.empty()) {
2014 result.
addString(
"target port is not specified.\r\n");
2016 if (target == getName()) {
2018 if (!setParamPortMonitor(property,
false, errMsg)) {
2025 for (
auto* unit : m_units) {
2031 std::string msg =
"Configured connection from ";
2040 if (result.
size() == 0) {
2042 std::string msg =
"Could not find an incoming connection from ";
2051 auto handleAdminSetOutCmd = [
this](
const std::string& target,
2055 std::lock_guard<std::mutex> lock(m_stateMutex);
2056 if (target.empty()) {
2058 result.
addString(
"target port is not specified.\r\n");
2060 if (target == getName()) {
2062 if (!setParamPortMonitor(property,
true, errMsg)) {
2069 for (
auto* unit : m_units) {
2075 std::string msg =
"Configured connection to ";
2084 if (result.
size() == 0) {
2086 std::string msg =
"Could not find an incoming connection to ";
2095 auto handleAdminGetInCmd = [
this](
const std::string& target) {
2098 std::lock_guard<std::mutex> lock(m_stateMutex);
2099 if (target.empty()) {
2101 result.
addString(
"target port is not specified.\r\n");
2102 }
else if (target == getName()) {
2105 if (!getParamPortMonitor(property,
false, errMsg)) {
2112 for (
auto* unit : m_units) {
2123 if (result.
size() == 0) {
2125 std::string msg =
"Could not find an incoming connection from ";
2134 auto handleAdminGetOutCmd = [
this](
const std::string& target) {
2137 std::lock_guard<std::mutex> lock(m_stateMutex);
2138 if (target.empty()) {
2140 result.
addString(
"target port is not specified.\r\n");
2141 }
else if (target == getName()) {
2144 if (!getParamPortMonitor(property,
true, errMsg)) {
2151 for (
auto* unit : m_units) {
2162 if (result.
size() == 0) {
2164 std::string msg =
"Could not find an incoming connection to ";
2173 auto handleAdminPropGetCmd = [
this](
const std::string& key) {
2175 Property* p = acquireProperties(
false);
2181 if (key[0] ==
'/') {
2182 bool bFound =
false;
2184 if (key == getName()) {
2189 sched_prop.
put(
"tid",
static_cast<int>(this->getTid()));
2190 sched_prop.
put(
"priority", this->getPriority());
2191 sched_prop.
put(
"policy", this->getPolicy());
2197 proc_prop.
put(
"pid", info.
pid);
2198 proc_prop.
put(
"name", (info.
pid != -1) ? info.
name :
"unknown");
2199 proc_prop.
put(
"arguments", (info.
pid != -1) ? info.
arguments :
"unknown");
2207 platform_prop.
put(
"os", pinfo.
name);
2208 platform_prop.
put(
"hostname", m_address.getHost());
2210 unsigned int f = getFlags();
2217 port_prop.
put(
"is_input", is_input);
2218 port_prop.
put(
"is_output", is_output);
2219 port_prop.
put(
"is_rpc", is_rpc);
2220 port_prop.
put(
"type", getType().getName());
2222 for (
auto* unit : m_units) {
2223 if ((unit !=
nullptr) && !unit->
isFinished()) {
2226 if (key == coreName) {
2230 int tos = getTypeOfService(unit);
2231 int tid =
static_cast<int>(unit->
getTid());
2235 sched_prop.
put(
"tid", tid);
2236 sched_prop.
put(
"priority", priority);
2237 sched_prop.
put(
"policy", policy);
2241 qos_prop.
put(
"tos", tos);
2249 std::string msg =
"cannot find any connection to/from ";
2259 releaseProperties(p);
2263 auto handleAdminPropSetCmd = [
this](
const std::string& key,
2269 Property* p = acquireProperties(
false);
2276 if (!process.isNull()) {
2277 std::string portName = key;
2278 if ((!portName.empty()) && (portName[0] ==
'/')) {
2280 if (portName == getName()) {
2283 if (process_prop !=
nullptr) {
2286 if (process_prop->
check(
"priority")) {
2289 if (process_prop->
check(
"policy")) {
2292 bOk = setProcessSchedulingParam(prio, policy);
2303 if (!sched.isNull()) {
2304 if ((!key.empty()) && (key[0] ==
'/')) {
2306 for (
auto* unit : m_units) {
2307 if ((unit !=
nullptr) && !unit->
isFinished()) {
2311 if (portName == key) {
2313 if (sched_prop !=
nullptr) {
2316 if (sched_prop->
check(
"priority")) {
2319 if (sched_prop->
check(
"policy")) {
2336 if (!qos.isNull()) {
2337 if ((!key.empty()) && (key[0] ==
'/')) {
2339 for (
auto* unit : m_units) {
2340 if ((unit !=
nullptr) && !unit->
isFinished()) {
2343 if (portName == key) {
2345 if (qos_prop !=
nullptr) {
2347 if (qos_prop->
check(
"priority")) {
2372 }
else if (qos_prop->
check(
"dscp")) {
2377 auto dscp_val = qos_prop->
find(
"dscp");
2378 if (dscp_val.isInt32()) {
2382 dscp =
static_cast<int>(dscp_class);
2384 if ((dscp >= 0) && (dscp < 64)) {
2387 }
else if (qos_prop->
check(
"tos")) {
2389 auto tos_val = qos_prop->
find(
"tos");
2390 if (tos_val.isInt32()) {
2395 bOk = setTypeOfService(unit, tos);
2407 releaseProperties(p);
2414 auto handleAdminRosPublisherUpdateCmd = [
this](
const std::string& topic,
Bottle* pubs) {
2423 if (pubs !=
nullptr) {
2425 for (
size_t i = 0; i < pubs->size(); i++) {
2426 std::string pub = pubs->get(i).asString();
2432 std::lock_guard<std::mutex> lock(m_stateMutex);
2433 for (
auto* unit : m_units) {
2434 if ((unit !=
nullptr) && unit->
isPupped()) {
2437 if (!listed.
check(me)) {
2443 for (
size_t i = 0; i < pubs->size(); i++) {
2444 std::string pub = pubs->get(i).asString();
2445 if (!present.
check(pub)) {
2446 yCDebug(PORTCORE,
"ROS ADD %s", pub.c_str());
2456 yCDebug(PORTCORE,
"Sending [%s] to %s", req.
toString().c_str(), pub.c_str());
2458 if (!
__pc_rpc(c,
"xmlrpc", req, reply,
false)) {
2459 fprintf(stderr,
"Cannot connect to ROS subscriber %s\n", pub.c_str());
2461 __pc_rpc(c,
"xmlrpc", req, reply,
true);
2465 std::string hostname;
2466 std::string carrier;
2469 fprintf(stderr,
"Failure looking up topic %s: %s\n", topic.c_str(), reply.
toString().c_str());
2470 }
else if (pref ==
nullptr) {
2471 fprintf(stderr,
"Failure looking up topic %s: expected list of protocols\n", topic.c_str());
2473 fprintf(stderr,
"Failure looking up topic %s: unsupported protocol %s\n", topic.c_str(), pref->
get(0).
asString().c_str());
2479 carrier =
"tcpros+role.pub+topic.";
2481 yCDebug(PORTCORE,
"topic %s available at %s:%d", topic.c_str(), hostname.c_str(), portnum);
2484 Contact addr(hostname, portnum);
2488 if (op ==
nullptr) {
2489 fprintf(stderr,
"NO CONNECTION\n");
2501 std::lock_guard<std::mutex> lock(m_stateMutex);
2506 yCAssert(PORTCORE, unit !=
nullptr);
2509 m_units.push_back(unit);
2521 auto handleAdminRosRequestTopicCmd = [
this]() {
2535 auto handleAdminRosGetPidCmd = []() {
2544 auto handleAdminRosGetBusInfoCmd = []() {
2554 auto handleAdminUnknownCmd = [
this](
const Bottle& cmd) {
2557 if (m_adminReader !=
nullptr) {
2561 ok = m_adminReader->read(con.
getReader());
2569 result.
addString(
"send [help] for list of valid commands");
2574 const PortCoreCommand command = parseCommand(cmd.
get(0));
2576 case PortCoreCommand::Help:
2577 result = handleAdminHelpCmd();
2579 case PortCoreCommand::Ver:
2580 result = handleAdminVerCmd();
2582 case PortCoreCommand::Pray:
2583 result = handleAdminPrayCmd();
2585 case PortCoreCommand::Add: {
2588 result = handleAdminAddCmd(std::move(output), carrier);
2590 case PortCoreCommand::Del: {
2592 result = handleAdminDelCmd(dest);
2594 case PortCoreCommand::Atch: {
2595 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab());
2597 result = handleAdminAtchCmd(direction, std::move(prop));
2599 case PortCoreCommand::Dtch: {
2600 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab());
2601 result = handleAdminDtchCmd(direction);
2603 case PortCoreCommand::List: {
2604 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab(),
true);
2606 result = handleAdminListCmd(direction, target);
2608 case PortCoreCommand::Set: {
2609 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab(),
true);
2613 switch (direction) {
2614 case PortCoreConnectionDirection::In:
2615 result = handleAdminSetInCmd(target, property);
2617 case PortCoreConnectionDirection::Out:
2618 result = handleAdminSetOutCmd(target, property);
2620 case PortCoreConnectionDirection::Error:
2625 case PortCoreCommand::Get: {
2626 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab(),
true);
2628 switch (direction) {
2629 case PortCoreConnectionDirection::In:
2630 result = handleAdminGetInCmd(target);
2632 case PortCoreConnectionDirection::Out:
2633 result = handleAdminGetOutCmd(target);
2635 case PortCoreConnectionDirection::Error:
2640 case PortCoreCommand::Prop: {
2641 PortCorePropertyAction action = parsePropertyAction(cmd.
get(1).
asVocab());
2645 case PortCorePropertyAction::Get:
2646 result = handleAdminPropGetCmd(key);
2648 case PortCorePropertyAction::Set: {
2653 result = handleAdminPropSetCmd(key, value, process, sched, qos);
2655 case PortCorePropertyAction::Error:
2657 result.
addString(
"property action not known");
2661 case PortCoreCommand::RosPublisherUpdate: {
2666 result = handleAdminRosPublisherUpdateCmd(topic, pubs);
2669 case PortCoreCommand::RosRequestTopic:
2674 result = handleAdminRosRequestTopicCmd();
2677 case PortCoreCommand::RosGetPid:
2679 result = handleAdminRosGetPidCmd();
2682 case PortCoreCommand::RosGetBusInfo:
2684 result = handleAdminRosGetBusInfoCmd();
2687 case PortCoreCommand::Unknown:
2688 result = handleAdminUnknownCmd(cmd);
2693 if (writer !=
nullptr) {
2694 result.
write(*writer);
2701 bool PortCore::setTypeOfService(
PortCoreUnit* unit,
int tos)
2703 if (unit ==
nullptr) {
2707 yCDebug(PORTCORE,
"Trying to set TOS = %d", tos);
2711 if (outUnit !=
nullptr) {
2713 if (op !=
nullptr) {
2714 yCDebug(PORTCORE,
"Trying to set TOS = %d on output unit", tos);
2717 yCWarning(PORTCORE,
"Setting TOS on output unit failed");
2731 if (inUnit !=
nullptr) {
2734 yCDebug(PORTCORE,
"Trying to set TOS = %d on input unit", tos);
2737 yCWarning(PORTCORE,
"Setting TOS on input unit failed");
2749 if (unit ==
nullptr) {
2755 if (outUnit !=
nullptr) {
2757 if (op !=
nullptr) {
2770 if (inUnit !=
nullptr) {
2781 bool PortCore::attachPortMonitor(
yarp::os::Property& prop,
bool isOutput, std::string& errMsg)
2785 if (portmonitor ==
nullptr) {
2786 errMsg =
"Portmonitor carrier modifier cannot be find or it is not enabled in Yarp!";
2791 detachPortMonitor(
true);
2792 prop.
put(
"source", getName());
2793 prop.
put(
"destination",
"");
2794 prop.
put(
"sender_side", 1);
2795 prop.
put(
"receiver_side", 0);
2796 prop.
put(
"carrier",
"");
2797 m_modifier.outputMutex.lock();
2798 m_modifier.outputModifier = portmonitor;
2799 if (!m_modifier.outputModifier->configureFromProperty(prop)) {
2800 m_modifier.releaseOutModifier();
2801 errMsg =
"Failed to configure the portmonitor plug-in";
2802 m_modifier.outputMutex.unlock();
2805 m_modifier.outputMutex.unlock();
2807 detachPortMonitor(
false);
2808 prop.
put(
"source",
"");
2809 prop.
put(
"destination", getName());
2810 prop.
put(
"sender_side", 0);
2811 prop.
put(
"receiver_side", 1);
2812 prop.
put(
"carrier",
"");
2813 m_modifier.inputMutex.lock();
2814 m_modifier.inputModifier = portmonitor;
2815 if (!m_modifier.inputModifier->configureFromProperty(prop)) {
2816 m_modifier.releaseInModifier();
2817 errMsg =
"Failed to configure the portmonitor plug-in";
2818 m_modifier.inputMutex.unlock();
2821 m_modifier.inputMutex.unlock();
2827 bool PortCore::detachPortMonitor(
bool isOutput)
2830 m_modifier.outputMutex.lock();
2831 m_modifier.releaseOutModifier();
2832 m_modifier.outputMutex.unlock();
2834 m_modifier.inputMutex.lock();
2835 m_modifier.releaseInModifier();
2836 m_modifier.inputMutex.unlock();
2843 std::string& errMsg)
2846 m_modifier.outputMutex.lock();
2847 if (m_modifier.outputModifier ==
nullptr) {
2848 errMsg =
"No port modifier is attached to the output";
2849 m_modifier.outputMutex.unlock();
2853 m_modifier.outputMutex.unlock();
2855 m_modifier.inputMutex.lock();
2856 if (m_modifier.inputModifier ==
nullptr) {
2857 errMsg =
"No port modifier is attached to the input";
2858 m_modifier.inputMutex.unlock();
2861 m_modifier.inputModifier->setCarrierParams(param);
2862 m_modifier.inputMutex.unlock();
2869 std::string& errMsg)
2872 m_modifier.outputMutex.lock();
2873 if (m_modifier.outputModifier ==
nullptr) {
2874 errMsg =
"No port modifier is attached to the output";
2875 m_modifier.outputMutex.unlock();
2878 m_modifier.outputModifier->getCarrierParams(param);
2879 m_modifier.outputMutex.unlock();
2881 m_modifier.inputMutex.lock();
2882 if (m_modifier.inputModifier ==
nullptr) {
2883 errMsg =
"No port modifier is attached to the input";
2884 m_modifier.inputMutex.unlock();
2887 m_modifier.inputModifier->getCarrierParams(param);
2888 m_modifier.inputMutex.unlock();
2896 if (unit !=
nullptr) {
2897 bool isLog = (!unit->
getMode().empty());
2904 bool PortCore::setProcessSchedulingParam(
int priority,
int policy)
2906 #if defined(__linux__)
2908 struct sched_param sch_param;
2909 sch_param.__sched_priority = priority;
2912 char path[PATH_MAX];
2915 dir = opendir(path);
2916 if (dir ==
nullptr) {
2924 while ((d = readdir(dir)) !=
nullptr) {
2925 if (isdigit(
static_cast<unsigned char>(*d->d_name)) == 0) {
2929 tid = strtol(d->d_name, &end, 10);
2930 if (d->d_name == end || ((end !=
nullptr) && (*end != 0))) {
2934 ret &= (sched_setscheduler(
static_cast<pid_t
>(tid), policy, &sch_param) == 0);
2938 #elif defined(YARP_HAS_ACE) // for other platforms
2940 ACE_Sched_Params param(policy, (ACE_Sched_Priority)priority, ACE_SCOPE_PROCESS);
2950 m_stateMutex.lock();
2952 if (m_prop ==
nullptr) {
2962 m_stateMutex.unlock();
2967 return removeUnit(route, synch);
2980 int PortCore::getNextIndex()
2982 int result = m_counter;
2984 if (m_counter < 0) {
2997 m_address.setName(str);
3002 return m_readableCreator;
3007 m_controlRegistration = flag;
3012 return m_listening.load();
3022 return m_interrupted;
3027 m_timeout = timeout;
3030 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3035 removeCallbackLock();
3036 if (mutex !=
nullptr) {
3037 m_old_mutex = mutex;
3038 m_mutexOwned =
false;
3041 m_mutexOwned =
true;
3046 #endif // YARP_NO_DEPRECATED
3050 removeCallbackLock();
3051 if (mutex !=
nullptr) {
3053 m_mutexOwned =
false;
3055 m_mutex =
new std::mutex;
3056 m_mutexOwned =
true;
3063 if (m_mutexOwned && (m_mutex !=
nullptr)) {
3067 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3068 m_old_mutex =
nullptr;
3069 #endif // YARP_NO_DEPRECATED
3070 m_mutexOwned =
false;
3076 if (m_mutex ==
nullptr) {
3077 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3078 if (m_old_mutex ==
nullptr) {
3081 m_old_mutex->lock();
3083 #else // YARP_NO_DEPRECATED
3085 #endif // YARP_NO_DEPRECATED
3093 if (m_mutex ==
nullptr) {
3094 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3095 if (m_old_mutex ==
nullptr) {
3098 return m_old_mutex->try_lock();
3099 #else // YARP_NO_DEPRECATED
3101 #endif // YARP_NO_DEPRECATED
3103 return m_mutex->try_lock();
3108 if (m_mutex ==
nullptr) {
3109 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3110 if (m_old_mutex ==
nullptr) {
3113 return m_old_mutex->unlock();
3114 #else // YARP_NO_DEPRECATED
3116 #endif // YARP_NO_DEPRECATED
3129 if (!m_checkedType) {
3130 if (!m_type.isValid()) {
3133 m_checkedType =
true;
3135 m_typeMutex.unlock();
3142 m_typeMutex.unlock();
3150 m_typeMutex.unlock();