YARP
Yet Another Robot Platform
PriorityCarrier.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * All rights reserved.
4  *
5  * This software may be modified and distributed under the terms of the
6  * BSD-3-Clause license. See the accompanying LICENSE file for details.
7  */
8 
9 #include "PriorityCarrier.h"
10 
11 #include <yarp/os/Log.h>
12 #include <yarp/os/LogComponent.h>
14 #include <yarp/os/Route.h>
15 
16 #include <yarp/math/Math.h>
17 #include <yarp/math/SVD.h>
18 #include <string>
19 
20 
21 using namespace yarp::os;
22 using namespace yarp::math;
23 
24 namespace {
25 YARP_LOG_COMPONENT(PRIORITYCARRIER,
26  "yarp.carrier.priority",
30  nullptr)
31 }
32 
33 
38 ElectionOf<PriorityGroup> *PriorityCarrier::peers = nullptr;
39 
40 // Make a singleton manager for finding peer carriers.
41 ElectionOf<PriorityGroup>& PriorityCarrier::getPeers() {
43  if (peers==nullptr) {
44  peers = new ElectionOf<PriorityGroup>;
46  yCAssert(PRIORITYCARRIER, peers);
47  } else {
49  }
50  return *peers;
51 }
52 
53 // Decide whether data should be accepted.
55  getPeers().lock();
56  yCAssert(PRIORITYCARRIER, group);
57  bool result = group->acceptIncomingData(reader,this);
58  getPeers().unlock();
59  return result;
60 }
61 
62 // Read connection settings.
64  portName = proto.getRoute().getToName();
65  sourceName = proto.getRoute().getFromName();
66  group = getPeers().add(portName,this);
67  if (!group) return false;
68 
69  Property options;
70  options.fromString(proto.getSenderSpecifier());
71 
72  timeConstant = fabs(options.check("tc",Value(1.0)).asFloat64());
73  timeResting = fabs(options.check("tr",Value(0.0)).asFloat64());
74  stimulation = fabs(options.check("st",Value(STIMUL_THRESHOLD*10)).asFloat64());
75  // Zero stimulation is undefined and will be interpreted as S=Thresould.
76  if(stimulation == 0)
77  stimulation = STIMUL_THRESHOLD*10;
78  stimulation /= 10.0;
79 
80  bias = options.check("bs",Value(STIMUL_THRESHOLD*10)).asFloat64();
81  bias /= 10.0;
82 
83  excitation = options.findGroup("ex");
84  isVirtual = options.check("virtual");
85 
86 #ifdef WITH_PRIORITY_DEBUG
87  if(options.check("debug"))
88  {
89  std::string msg;
90  char dummy[1024];
91  std::snprintf(dummy, 1024, "\n%s:\n", sourceName.c_str());
92  msg+= dummy;
93  std::snprintf(dummy, 1024, " stimulation: %.2f\n", stimulation);
94  msg+= dummy;
95  std::snprintf(dummy, 1024, " bias: %.2f\n", bias);
96  msg+= dummy;
97  std::snprintf(dummy, 1024, " tc: %.2fs\n", timeConstant);
98  msg+= dummy;
99  std::snprintf(dummy, 1024, " tr: %.2fs\n", timeResting);
100  msg+= dummy;
101  std::snprintf(dummy, 1024, " ex: ");
102  msg+= dummy;
103  for(size_t i=0; i<excitation.size(); i++)
104  {
105  Value v = excitation.get(i);
106  if(v.isList() && (v.asList()->size()>=2))
107  {
108  Bottle* b = v.asList();
109  std::snprintf(dummy, 1024, "(%s, %.2f) ",
110  b->get(0).asString().c_str(),
111  b->get(1).asFloat64()/10.0 );
112  msg+= dummy;
113  }
114  }
115  //std::snprintf(dummy, 1024, "\n");
116  msg+= "\n";
117  std::snprintf(dummy, 1024, " virtual: %s\n",
118  (isVirtual)?"yes":"no");
119  msg+= dummy;
120  double rate = options.check("rate", Value(10)).asInt32() / 1000.0;
121  std::snprintf(dummy, 1024, " db.rate: %fs\n", rate);
122  msg+= dummy;
123  yCInfo(PRIORITYCARRIER, "%s", msg.c_str());
124  debugger.stop();
125  debugger.setPeriod(rate);
126  debugger.start();
127  }
128 #endif
129  return true;
130 }
131 
132 
134 {
135  //
136  // +P| ---___
137  // | | -__
138  // | | -_
139  // 0|------------------> (Active state)
140  // Ta Tc
141  //
142  // Ta Tr
143  // 0|-------------_----> (Resting state && P<0)
144  // | | __-
145  // | | ___-
146  // -P| ---
147  //
148  //
149  // P(t) = Pi * (1-exp((t-Tc-Ta)/Tc*5) + exp(-5))
150  // t:time, Pi: temporal Priority level
151  // Tc: reset time, Ta: arrival time
152  //
153  // we do not consider ports which has not seen any message
154  // from them yet.
155  //if(timeArrival == 0)
156  // return 0;
157 
158  double dt = t - timeArrival;
159  // Temporal priority is inverted if this is a neuron model and the temporal
160  // stimulation has already reached to STIMUL_THRESHOLD and waited for Tc.
161  if((timeResting > 0)
162  && (dt >= fabs(timeConstant))
163  && (temporalStimulation >= STIMUL_THRESHOLD))
164  temporalStimulation = -temporalStimulation;
165 
166  double actualStimulation;
167  if(!isResting(temporalStimulation)) // behavior is in stimulation state
168  {
169  // After a gap bigger than Tc, the
170  // priority is set to zero to avoid redundant calculation.
171  if(dt > fabs(timeConstant))
172  actualStimulation = 0;
173  else
174  actualStimulation = temporalStimulation *
175  (1.0 - exp((dt-timeConstant)/timeConstant*5.0) + exp(-5.0));
176  }
177  else // behavior is in resting state
178  {
179  // it is in waiting state for Tc
180  if(temporalStimulation > 0)
181  actualStimulation = temporalStimulation;
182  else
183  {
184  dt -= fabs(timeConstant);
185  // After a gap bigger than Tr, the
186  // priority is set to zero to avoid redundant calculation.
187  if(dt > fabs(timeResting))
188  actualStimulation = 0;
189  else
190  actualStimulation = temporalStimulation *
191  (1.0 - exp((dt-timeResting)/timeResting*5.0) + exp(-5.0));
192  }
193  }
194 
195  if(actualStimulation <= 0)
196  isActive = false;
197 
198  return actualStimulation;
199 }
200 
202 {
203  // calculating E(t) = Sum(e.I(t)) + b
204  if(!isActive)
205  return 0.0;
206 
207  double E = 0;
208  for (auto& it : group->peerSet)
209  {
210  PriorityCarrier *peer = it.first;
211  if(peer != this)
212  {
213  for(size_t i=0; i<peer->excitation.size(); i++)
214  {
215  Value v = peer->excitation.get(i);
216  if(v.isList() && (v.asList()->size()>=2))
217  {
218  Bottle* b = v.asList();
219  // an exitatory to this priority carrier
220  if(sourceName == b->get(0).asString())
221  E += peer->getActualInput(t) * (b->get(1).asFloat64()/10.0);
222  }
223  }
224 
225  }
226  }
227  E += bias;
228  double I = E * getActualStimulation(t);
229  return ((I<0) ? 0 : I); //I'(t)
230 }
231 
232 
238 {
239  //TODO: find the correct way to get the size of peerSet
240  int nConnections = 0;
241  for(auto it=peerSet.begin(); it!=peerSet.end(); it++)
242  nConnections++;
243 
244  // calculate matrices X, B, InvA and Y
245  X.resize(nConnections, 1);
246  B.resize(nConnections, 1);
247  Y.resize(nConnections, 1);
248  InvA.resize(nConnections, nConnections);
249  InvA.eye();
250 
251  int row = 0;
252  for(auto& rowItr : peerSet)
253  {
254  PriorityCarrier* peer = rowItr.first;
255  // call 'getActualStimulation' to update 'isActive'
256  peer->getActualStimulation(t);
257  double xi = (peer->isActive) ? STIMUL_THRESHOLD : 0.0;
258  B(row,0) = peer->bias * xi;
259  X(row,0) = xi;
260 
261  int col = 0;
262  for(auto& it : peerSet)
263  {
264  PriorityCarrier *peerCol = it.first;
265  for(size_t i=0; i<peerCol->excitation.size(); i++)
266  {
267  Value v = peerCol->excitation.get(i);
268  if(v.isList() && (v.asList()->size()>=2))
269  {
270  Bottle* b = v.asList();
271  // an exitatory link to this connection
272  if(peer->sourceName == b->get(0).asString())
273  InvA(row,col) = -(b->get(1).asFloat64()/10.0)*xi;
274  }
275  }
276  col++;
277  }
278  row++;
279  }
280 
281  yCTrace(PRIORITYCARRIER, "A:\n %s", InvA.toString(1).c_str());
282 
283  // calclulating the determinant
284  double determinant = yarp::math::det(InvA);
285  if(determinant == 0)
286  {
287  yCError(PRIORITYCARRIER, "Inconsistent regulation! non-invertible weight matrix");
288  return false;
289  }
290 
291  // inverting the weight matrix
292  InvA = yarp::math::luinv(InvA);
293  Y = InvA * B;
294 
295  yCTrace(PRIORITYCARRIER, "X:\n %s", X.toString(1).c_str());
296  yCTrace(PRIORITYCARRIER, "B:\n %s", B.toString(1).c_str());
297  yCTrace(PRIORITYCARRIER, "Y:\n %s", Y.toString(1).c_str());
298 
299  return true;
300 }
301 
302 // Decide whether data should be accepted, for real.
304  PriorityCarrier *source) {
305 
306  bool accept;
307  // updates message's arrival time
308  double tNow = yarp::os::Time::now();
309  source->stimulate(tNow);
310 
311  if(!recalculate(tNow))
312  return false;
313 
314  int row = 0;
315  PriorityCarrier *maxPeer = nullptr;
316  double maxStimuli = 0.0;
317  for(auto& it : peerSet)
318  {
319  PriorityCarrier *peer = it.first;
320  double output = Y(row,0) * X(row,0);
321  peer->yi = output; // only for debug purpose
322 
323  if(!peer->isVirtual)
324  {
325  if(output > maxStimuli)
326  {
327  maxStimuli = output;
328  maxPeer = peer;
329  }
330  }
331  row++;
332  }
333  accept = (maxPeer == source);
334 
335  // a virtual message will never be delivered. It will be only
336  // used for the coordination
337  if(source->isVirtual)
338  accept = false;
339 
340  return accept;
341 }
342 
343 
348 #ifdef WITH_PRIORITY_DEBUG
350 {
351  pcarrier = carrier;
352  count = 0;
353 }
354 
356 {
357  if(isRunning()) stop();
358 }
359 
361 {
363  v.resize(4);
364  // a vector of [t, S(t), S'(t), I'(t)]
365  double t = yarp::os::Time::now();
366  v[0] = t;
368  v[2] = pcarrier->yi;
369  debugPort.write();
370 }
371 
373 {
374  debugPortName = pcarrier->portName + pcarrier->sourceName + std::string(":debug");
375  return debugPort.open(debugPortName);
376 }
377 
379 {
380  debugPort.close();
381 }
382 
383 #endif //WITH_PRIORITY_DEBUG
SVD.h
yarp::os::Bottle
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:73
yarp::sig::VectorOf::resize
void resize(size_t size) override
Resize the vector.
Definition: Vector.h:254
yarp::os::Bottle::size
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:254
PriorityCarrier::sourceName
std::string sourceName
Definition: PriorityCarrier.h:186
PriorityDebugThread::threadInit
bool threadInit() override
Initialization method.
Definition: PriorityCarrier.cpp:372
t
float t
Definition: FfmpegWriter.cpp:74
yarp::os::NetworkBase::lock
static void lock()
Call wait() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1462
yarp::os::Property::fromString
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
Definition: Property.cpp:1046
PriorityCarrier::stimulate
void stimulate(double t)
Definition: PriorityCarrier.h:127
yarp::os::Log::LogTypeReserved
@ LogTypeReserved
Definition: Log.h:83
YARP_LOG_COMPONENT
#define YARP_LOG_COMPONENT(name,...)
Definition: LogComponent.h:80
PriorityGroup::recalculate
bool recalculate(double t)
Class PriorityGroup.
Definition: PriorityCarrier.cpp:237
PriorityGroup::acceptIncomingData
virtual bool acceptIncomingData(yarp::os::ConnectionReader &reader, PriorityCarrier *source)
Definition: PriorityCarrier.cpp:303
yarp::math
Definition: FrameTransform.h:18
yarp::math::luinv
yarp::sig::Matrix luinv(const yarp::sig::Matrix &in)
Invert a square matrix using LU-decomposition (defined in Math.h).
Definition: math.cpp:588
yarp::os::PeriodicThread::isRunning
bool isRunning() const
Returns true when the thread is started, false otherwise.
Definition: PeriodicThread.cpp:316
yarp::os::Time::now
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition: Time.cpp:124
PriorityCarrier::configure
bool configure(yarp::os::ConnectionState &proto) override
Give carrier a shot at looking at how the connection is set up.
Definition: PriorityCarrier.cpp:63
PriorityCarrier::bias
double bias
Definition: PriorityCarrier.h:184
PriorityDebugThread::debugPort
yarp::os::BufferedPort< yarp::sig::Vector > debugPort
Definition: PriorityCarrier.h:67
PriorityCarrier::isActive
bool isActive
Definition: PriorityCarrier.h:183
yarp::os::BufferedPort::prepare
T & prepare()
Access the object which will be transmitted by the next call to yarp::os::BufferedPort::write.
Definition: BufferedPort-inl.h:114
yarp::sig::VectorOf< double >
Log.h
yarp::os::ConnectionState::getRoute
virtual const Route & getRoute() const =0
Get the route associated with this connection.
Route.h
yarp::os::Bottle::get
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:249
ConnectionState.h
PriorityCarrier
Allow priority-based message selection.
Definition: PriorityCarrier.h:79
Math.h
PriorityDebugThread::~PriorityDebugThread
~PriorityDebugThread() override
Definition: PriorityCarrier.cpp:355
yarp::os::Log::minimumPrintLevel
static LogType minimumPrintLevel()
Get current minimum print level.
Definition: Log.cpp:805
PriorityCarrier::acceptIncomingData
bool acceptIncomingData(yarp::os::ConnectionReader &reader) override
Determine whether incoming data should be accepted.
Definition: PriorityCarrier.cpp:54
yarp::os::Value::asString
virtual std::string asString() const
Get string value.
Definition: Value.cpp:237
yarp::os::BufferedPort::open
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
Definition: BufferedPort-inl.h:41
PriorityCarrier::yi
double yi
Definition: PriorityCarrier.h:188
yarp::os::Property::check
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Property.cpp:1024
yarp::os::PeriodicThread
An abstraction for a periodic thread.
Definition: PeriodicThread.h:25
yarp::math::det
double det(const yarp::sig::Matrix &in)
Computes the determinant of a matrix (defined in Math.h).
Definition: math.cpp:583
PriorityDebugThread::count
int count
Definition: PriorityCarrier.h:64
LogComponent.h
yarp::os::Value::isList
virtual bool isList() const
Checks if value is a list.
Definition: Value.cpp:165
yCAssert
#define yCAssert(component, x)
Definition: LogComponent.h:172
yarp::os::ElectionOf< PriorityGroup >
yarp::os::ConnectionReader
An interface for reading from a network connection.
Definition: ConnectionReader.h:40
PriorityDebugThread::PriorityDebugThread
PriorityDebugThread(PriorityCarrier *carrier)
Class PriorityDebugThread.
Definition: PriorityCarrier.cpp:349
yarp::os::ConnectionState
The basic state of a connection - route, streams in use, etc.
Definition: ConnectionState.h:31
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
yarp::os::ConnectionState::getSenderSpecifier
virtual std::string getSenderSpecifier() const =0
Extract a name for the sender, if the connection type supports that.
yarp::os::BufferedPort::write
void write(bool forceStrict=false)
Write the current object being returned by BufferedPort::prepare.
Definition: BufferedPort-inl.h:126
PriorityCarrier::getActualInput
double getActualInput(double t)
Definition: PriorityCarrier.cpp:201
yCInfo
#define yCInfo(component,...)
Definition: LogComponent.h:135
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
PriorityCarrier::excitation
yarp::os::Bottle excitation
Definition: PriorityCarrier.h:185
yarp::os::NetworkBase::unlock
static void unlock()
Call post() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1467
PriorityCarrier::getActualStimulation
double getActualStimulation(double t)
Definition: PriorityCarrier.cpp:133
PriorityDebugThread::threadRelease
void threadRelease() override
Release method.
Definition: PriorityCarrier.cpp:378
yarp::os::BufferedPort::close
void close() override
Stop port activity.
Definition: BufferedPort-inl.h:73
yarp::os::Route::getToName
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:106
yarp::os::Value::asList
virtual Bottle * asList() const
Get list value.
Definition: Value.cpp:243
yarp::os::PeriodicThread::stop
void stop()
Call this to stop the thread, this call blocks until the thread is terminated (and releaseThread() ca...
Definition: PeriodicThread.cpp:296
yarp::os::Property::findGroup
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
Definition: Property.cpp:1125
yarp::os::Log::printCallback
static LogCallback printCallback()
Get current print callback.
Definition: Log.cpp:852
PriorityDebugThread::debugPortName
std::string debugPortName
Definition: PriorityCarrier.h:66
PriorityCarrier.h
STIMUL_THRESHOLD
#define STIMUL_THRESHOLD
Definition: PriorityCarrier.h:24
yCTrace
#define yCTrace(component,...)
Definition: LogComponent.h:88
PriorityDebugThread::pcarrier
PriorityCarrier * pcarrier
Definition: PriorityCarrier.h:65
PriorityDebugThread::run
void run() override
Loop function.
Definition: PriorityCarrier.cpp:360
yarp::os::Value
A single value (typically within a Bottle).
Definition: Value.h:47
yarp::os::Route::getFromName
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:96
yarp::os::Value::asFloat64
virtual yarp::conf::float64_t asFloat64() const
Get 64-bit floating point value.
Definition: Value.cpp:225
yarp::os::Property
A class for storing options and configuration information.
Definition: Property.h:37
PriorityCarrier::isVirtual
bool isVirtual
Definition: PriorityCarrier.h:182