YARP
Yet Another Robot Platform
McastCarrier.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
11 
12 #include <yarp/conf/system.h>
13 
15 #include <yarp/os/Network.h>
16 #include <yarp/os/Route.h>
18 
19 #include <cstdlib>
20 
21 using namespace yarp::os::impl;
22 using namespace yarp::os;
23 
24 namespace {
25 YARP_OS_LOG_COMPONENT(MCASTCARRIER, "yarp.os.impl.McastCarrier")
26 } // namespace
27 
29 
31 {
33  if (caster == nullptr) {
36  if (caster == nullptr) {
37  yCError(MCASTCARRIER, "No memory for McastCarrier::caster");
38  std::exit(1);
39  }
40  } else {
42  }
43  return *caster;
44 }
45 
46 
48 {
49  stream = nullptr;
50  key = "";
51 }
52 
54 {
55  if (!key.empty()) {
56  bool elect = isElect();
57  removeSender(key);
58  if (elect) {
59  McastCarrier* peer = getCaster().getElect(key);
60  if (peer == nullptr) {
61  // time to remove registration
62  NetworkBase::unregisterName(mcastName);
63  } else {
64  if (!peer->takeElection()) {
65  yCError(MCASTCARRIER, "Something went wrong during the shift of the election...");
66  }
67  }
68  }
69  }
70 }
71 
73 {
74  return new McastCarrier();
75 }
76 
78 {
79  return "mcast";
80 }
81 
83 {
84  return 1;
85 }
86 
87 
89 {
90  // need to do more than the default
91  bool ok = defaultSendHeader(proto);
92  if (!ok) {
93  return false;
94  }
95 
96  yCDebug(MCASTCARRIER, "Adding extra mcast header");
97 
98  Contact addr;
99 
100  Contact alt = proto.getStreams().getLocalAddress();
101  std::string altKey = proto.getRoute().getFromName() + "/net=" + alt.getHost();
102  McastCarrier* elect = getCaster().getElect(altKey);
103  if (elect != nullptr) {
104  yCDebug(MCASTCARRIER, "picking up peer mcast name");
105  addr = elect->mcastAddress;
106  mcastName = elect->mcastName;
107  } else {
108 
109  // fetch an mcast address
110  Contact target("...", "mcast", "...", 0);
111  addr = NetworkBase::registerContact(target);
112  mcastName = addr.getRegName();
113  if (addr.isValid()) {
114  // mark owner of mcast address
116  "owns",
117  Value(mcastName));
118  }
119  }
120 
121  int ip[] = {224, 3, 1, 1};
122  int port = 11000;
123  if (addr.isValid()) {
124  SplitString ss(addr.getHost().c_str(), '.');
125  if (ss.size() != 4) {
126  addr = Contact();
127  } else {
128  yCAssert(MCASTCARRIER, ss.size() == 4);
129  for (int i = 0; i < 4; i++) {
130  ip[i] = NetType::toInt(ss.get(i));
131  }
132  port = addr.getPort();
133  }
134  }
135 
136  if (!addr.isValid()) {
137  yCError(MCASTCARRIER, "Name server not responding helpfully, setting mcast name arbitrarily.");
138  yCError(MCASTCARRIER, "Only a single mcast address supported in this mode.");
139  addr = Contact("/tmp/mcast", "mcast", "224.3.1.1", 11000);
140  }
141 
142  ManagedBytes block(6);
143  for (int i = 0; i < 4; i++) {
144  ((unsigned char*)block.get())[i] = (unsigned char)ip[i];
145  }
146  block.get()[5] = (char)(port % 256);
147  block.get()[4] = (char)(port / 256);
148  proto.os().write(block.bytes());
149  mcastAddress = addr;
150  return true;
151 }
152 
154 {
155  yCDebug(MCASTCARRIER, "Expecting extra mcast header");
156  ManagedBytes block(6);
157  yarp::conf::ssize_t len = proto.is().readFull(block.bytes());
158  if ((size_t)len != block.length()) {
159  yCError(MCASTCARRIER, "problem with MCAST header");
160  return false;
161  }
162 
163  int ip[] = {0, 0, 0, 0};
164  int port = -1;
165 
166  auto* base = (unsigned char*)block.get();
167  std::string add;
168  for (int i = 0; i < 4; i++) {
169  ip[i] = base[i];
170  if (i != 0) {
171  add += ".";
172  }
173  char buf[100];
174  sprintf(buf, "%d", ip[i]);
175  add += buf;
176  }
177  port = 256 * base[4] + base[5];
178  Contact addr("mcast", add, port);
179  yCDebug(MCASTCARRIER, "got mcast header %s", addr.toURI().c_str());
180  mcastAddress = addr;
181 
182  return true;
183 }
184 
185 
187 {
188  stream = new DgramTwoWayStream();
189  yCAssert(MCASTCARRIER, stream != nullptr);
190  Contact remote = proto.getStreams().getRemoteAddress();
191  local = proto.getStreams().getLocalAddress();
192  //(yarp::NameConfig::getEnv("YARP_MCAST_TEST")!="");
193  proto.takeStreams(nullptr); // free up port from tcp
194 
195  if (sender) {
196  /*
197  Multicast behavior seems a bit variable.
198  We assume here that if packages need to be broadcast
199  to targets via different network interfaces, that
200  we'll need to send independently on those two
201  interfaces. This may or may not always be the case,
202  the author doesn't know, so is being cautious.
203  */
204  key = proto.getRoute().getFromName();
205  key += "/net=";
206  key += local.getHost();
207 
208  yCDebug(MCASTCARRIER, "multicast key: %s", key.c_str());
209  addSender(key);
210  }
211 
212  bool ok = true;
213  if (isElect() || !sender) {
214  ok = stream->join(mcastAddress, sender, local);
215  }
216 
217  if (!ok) {
218  delete stream;
219  return false;
220  }
221  proto.takeStreams(stream);
222  return true;
223 }
224 
226 {
227  return becomeMcast(proto, false);
228 }
229 
230 
232 {
233  return becomeMcast(proto, true);
234 }
235 
236 void yarp::os::impl::McastCarrier::addSender(const std::string& key)
237 {
238  getCaster().add(key, this);
239 }
240 
241 void yarp::os::impl::McastCarrier::removeSender(const std::string& key)
242 {
243  getCaster().remove(key, this);
244 }
245 
247 {
248  void* elect = getCaster().getElect(key);
249  //void *elect = caster.getElect(mcastAddress.toString());
250  return elect == this || elect == nullptr;
251 }
252 
254 {
255  if (stream != nullptr) {
256  return stream->join(mcastAddress, true, local);
257  }
258  return false;
259 }
260 
261 
263 {
264  return isElect();
265 }
266 
268 {
269  return true;
270 }
yarp::os::TwoWayStream::getLocalAddress
virtual const Contact & getLocalAddress() const =0
Get the address of the local side of the stream.
yarp::os::NetworkBase::registerContact
static Contact registerContact(const Contact &contact)
Register contact information with the name server.
Definition: Network.cpp:1020
Network.h
yarp::os::impl::McastCarrier::mcastName
std::string mcastName
Definition: McastCarrier.h:33
yarp::os::impl::McastCarrier::removeSender
void removeSender(const std::string &key)
Definition: McastCarrier.cpp:241
yarp::os::impl::McastCarrier::sendHeader
bool sendHeader(ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
Definition: McastCarrier.cpp:88
McastCarrier.h
yarp::os::impl::McastCarrier::getName
std::string getName() const override
Get the name of this connection type ("tcp", "mcast", "shmem", ...)
Definition: McastCarrier.cpp:77
yarp::os::Carrier
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:48
yarp::os::NetworkBase::lock
static void lock()
Call wait() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1462
yarp::os::OutputStream::write
virtual void write(char ch)
Write a single byte to the stream.
Definition: OutputStream.cpp:17
yarp::os::impl::SplitString
Split a string into pieces.
Definition: SplitString.h:27
yarp::os::NetworkBase::setProperty
static bool setProperty(const char *name, const char *key, const Value &value)
Names registered with the nameserver can have arbitrary key->value properties associated with them.
Definition: Network.cpp:1038
yarp::os::impl::McastCarrier::respondToHeader
bool respondToHeader(ConnectionState &proto) override
Respond to the header.
Definition: McastCarrier.cpp:225
yarp::os::impl::SplitString::size
int size()
Definition: SplitString.cpp:31
yarp::os::impl::McastCarrier::isActive
bool isActive() const override
Check if carrier is alive and error free.
Definition: McastCarrier.cpp:262
yarp::os::impl::McastCarrier::create
Carrier * create() const override
Factory method.
Definition: McastCarrier.cpp:72
LogComponent.h
yarp::os::Contact::getRegName
std::string getRegName() const
Get the name associated with this Contact.
Definition: Contact.cpp:220
yarp::os::impl::McastCarrier::becomeMcast
bool becomeMcast(ConnectionState &proto, bool sender)
Definition: McastCarrier.cpp:186
yarp::os::impl::McastCarrier::mcastAddress
Contact mcastAddress
Definition: McastCarrier.h:32
yarp::os::impl::McastCarrier::getSpecifierCode
int getSpecifierCode() const override
Definition: McastCarrier.cpp:82
yarp::os::Contact::toURI
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition: Contact.cpp:316
yarp::os::ManagedBytes::bytes
const Bytes & bytes() const
Definition: ManagedBytes.cpp:177
yarp::os::ConnectionState::takeStreams
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
yarp::os::ConnectionState::os
OutputStream & os()
Shorthand for getOutputStream()
Definition: ConnectionState.h:117
yarp::os::ConnectionState::getRoute
virtual const Route & getRoute() const =0
Get the route associated with this connection.
yarp::os::ManagedBytes
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:25
Route.h
yarp::os::impl::McastCarrier::addSender
void addSender(const std::string &key)
Definition: McastCarrier.cpp:236
ConnectionState.h
yarp::os::Contact::getPort
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:242
yarp::os::impl::McastCarrier::expectExtraHeader
bool expectExtraHeader(ConnectionState &proto) override
Receive any carrier-specific header.
Definition: McastCarrier.cpp:153
yarp::os::impl::McastCarrier::caster
static ElectionOf< PeerRecord< McastCarrier > > * caster
Definition: McastCarrier.h:38
yarp::os::NetworkBase::unregisterName
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
Definition: Network.cpp:1026
yarp::conf::ssize_t
::ssize_t ssize_t
Definition: numeric.h:60
yarp::os::TwoWayStream::getRemoteAddress
virtual const Contact & getRemoteAddress() const =0
Get the address of the remote side of the stream.
yarp::os::impl::McastCarrier::getCaster
static ElectionOf< PeerRecord< McastCarrier > > & getCaster()
Definition: McastCarrier.cpp:30
yarp::os::impl::McastCarrier::isBroadcast
bool isBroadcast() const override
Check if this carrier uses a broadcast mechanism.
Definition: McastCarrier.cpp:267
yarp::os::impl::McastCarrier::takeElection
bool takeElection()
takeElection, this function is called when the elect mcast carrier dies and pass the write buffers to...
Definition: McastCarrier.cpp:253
yarp::os::impl::DgramTwoWayStream
A stream abstraction for datagram communication.
Definition: DgramTwoWayStream.h:40
yarp::os::ConnectionState::getStreams
virtual TwoWayStream & getStreams()=0
Access the streams associated with the connection.
system.h
yCAssert
#define yCAssert(component, x)
Definition: LogComponent.h:172
yarp::os::ElectionOf
Pick one of a set of peers to be "active".
Definition: Election.h:64
yarp::os::ConnectionState
The basic state of a connection - route, streams in use, etc.
Definition: ConnectionState.h:31
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
yarp::os::impl::McastCarrier::~McastCarrier
virtual ~McastCarrier()
Definition: McastCarrier.cpp:53
yarp::os::InputStream::readFull
yarp::conf::ssize_t readFull(Bytes &b)
Keep reading until buffer is full.
Definition: InputStream.cpp:99
yarp::os::impl::McastCarrier::expectReplyToHeader
bool expectReplyToHeader(ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
Definition: McastCarrier.cpp:231
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
yarp::os::NetworkBase::unlock
static void unlock()
Call post() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1467
yarp::os::impl::McastCarrier
Communicating between two ports via MCAST.
Definition: McastCarrier.h:30
yarp::os::Contact::isValid
bool isValid() const
Checks if a Contact is tagged as valid.
Definition: Contact.cpp:301
yarp::os::Contact::getHost
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:231
yarp::os::ConnectionState::is
InputStream & is()
Shorthand for getInputStream()
Definition: ConnectionState.h:125
yarp::os::impl::McastCarrier::McastCarrier
McastCarrier()
Definition: McastCarrier.cpp:47
yarp::os::impl::McastCarrier::isElect
bool isElect() const
Definition: McastCarrier.cpp:246
yarp::os::Contact
Represents how to reach a part of a YARP network.
Definition: Contact.h:39
yarp::os::ManagedBytes::get
const char * get() const
Definition: ManagedBytes.cpp:154
yarp::os::ManagedBytes::length
size_t length() const
Definition: ManagedBytes.cpp:144
yarp::os::Value
A single value (typically within a Bottle).
Definition: Value.h:47
yarp::os::Route::getFromName
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:96
yarp::os::impl::SplitString::get
const char * get(int idx)
Definition: SplitString.cpp:44
YARP_OS_LOG_COMPONENT
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
yarp::os::impl
The components from which ports and connections are built.
yarp::os::NetType::toInt
static int toInt(const std::string &x)
Definition: NetType.cpp:160