YARP
Yet Another Robot Platform
DgramTwoWayStream.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
11 
12 #include <yarp/conf/system.h>
13 #include <yarp/conf/environment.h>
14 
15 #include <yarp/os/NetType.h>
16 #include <yarp/os/Time.h>
18 
19 #if defined(YARP_HAS_ACE)
20 # include <ace/ACE.h>
21 # include <ace/Handle_Set.h>
22 # include <ace/INET_Addr.h>
23 # include <ace/Log_Msg.h>
24 # include <ace/OS_Memory.h>
25 # include <ace/OS_NS_sys_select.h>
26 # include <ace/SOCK_Dgram.h>
27 # include <ace/SOCK_Dgram_Mcast.h>
28 # include <ace/os_include/net/os_if.h>
29 // In one the ACE headers there is a definition of "main" for WIN32
30 # ifdef main
31 # undef main
32 # endif
33 #else
34 # include <arpa/inet.h>
35 # include <netinet/in.h>
36 # include <sys/socket.h>
37 # include <sys/types.h>
38 # include <unistd.h>
39 #endif
40 
41 #include <cerrno>
42 #include <cstring>
43 
44 using namespace yarp::os::impl;
45 using namespace yarp::os;
46 
47 #define CRC_SIZE 8
48 #define UDP_MAX_DATAGRAM_SIZE (65507 - CRC_SIZE)
49 
50 
51 namespace {
52 YARP_OS_LOG_COMPONENT(DGRAMTWOWAYSTREAM, "yarp.os.impl.DgramTwoWayStream")
53 } // namespace
54 
55 
56 static bool checkCrc(char* buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct, int* store_altPct = nullptr)
57 {
58  auto alt = (NetInt32)NetType::getCrc(buf + crcLength,
59  (length > crcLength) ? (length - crcLength) : 0);
60  Bytes b(buf, 4);
61  Bytes b2(buf + 4, 4);
62  NetInt32 curr = NetType::netInt(b);
63  int altPct = NetType::netInt(b2);
64  bool ok = (alt == curr && pct == altPct);
65  if (!ok) {
66  if (alt != curr) {
67  yCDebug(DGRAMTWOWAYSTREAM, "crc mismatch");
68  }
69  if (pct != altPct) {
70  yCDebug(DGRAMTWOWAYSTREAM, "packet code broken");
71  }
72  }
73  if (store_altPct != nullptr) {
74  *store_altPct = altPct;
75  }
76 
77  return ok;
78 }
79 
80 
81 static void addCrc(char* buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct)
82 {
83  auto alt = (NetInt32)NetType::getCrc(buf + crcLength,
84  (length > crcLength) ? (length - crcLength) : 0);
85  Bytes b(buf, 4);
86  Bytes b2(buf + 4, 4);
87  NetType::netInt((NetInt32)alt, b);
88  NetType::netInt((NetInt32)pct, b2);
89 }
90 
91 
92 bool DgramTwoWayStream::open(const Contact& remote)
93 {
94 #if defined(YARP_HAS_ACE)
95  ACE_INET_Addr anywhere((u_short)0, (ACE_UINT32)INADDR_ANY);
96  Contact local(anywhere.get_host_addr(),
97  anywhere.get_port_number());
98 #else
99  Contact local("localhost", -1);
100 #endif
101  return open(local, remote);
102 }
103 
104 bool DgramTwoWayStream::open(const Contact& local, const Contact& remote)
105 {
106  localAddress = local;
107  remoteAddress = remote;
108 
109 #if defined(YARP_HAS_ACE)
110  localHandle = ACE_INET_Addr((u_short)(localAddress.getPort()), (ACE_UINT32)INADDR_ANY);
111  if (remote.isValid()) {
112  remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
113  }
114  dgram = new ACE_SOCK_Dgram;
115  yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
116 
117  int result = dgram->open(localHandle,
118  ACE_PROTOCOL_FAMILY_INET,
119  0,
120  1);
121 #else
122  dgram = nullptr;
123  dgram_sockfd = -1;
124 
125  int s = -1;
126  if ((s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
127  std::exit(1);
128  }
129  struct sockaddr_in dgram_sin;
130  memset((char*)&dgram_sin, 0, sizeof(dgram_sin));
131  dgram_sin.sin_family = AF_INET;
132  dgram_sin.sin_addr.s_addr = htonl(INADDR_ANY);
133  dgram_sin.sin_port = htons(remote.getPort());
134  if (local.isValid()) {
135  if (inet_pton(AF_INET, remote.getHost().c_str(), &dgram_sin.sin_addr) == 0) {
136  yCError(DGRAMTWOWAYSTREAM, "could not set up udp client");
137  std::exit(1);
138  }
139  if (connect(s, (struct sockaddr*)&dgram_sin, sizeof(dgram_sin)) == -1) {
140  yCError(DGRAMTWOWAYSTREAM, "could not connect udp client");
141  std::exit(1);
142  }
143  } else {
144  if (bind(s, (struct sockaddr*)&dgram_sin, sizeof(dgram_sin)) == -1) {
145  yCError(DGRAMTWOWAYSTREAM, "could not create udp server");
146  std::exit(1);
147  }
148  }
149  dgram_sockfd = s;
150  dgram = this;
151  int result = -1;
152  int local_port = -1;
153 
154  struct sockaddr_in sin;
155  socklen_t len = sizeof(sin);
156  if (getsockname(dgram_sockfd, (struct sockaddr*)&sin, &len) == 0 && sin.sin_family == AF_INET) {
157  result = 0;
158  local_port = ntohs(sin.sin_port);
159  }
160 #endif
161 
162  if (result != 0) {
163  yCError(DGRAMTWOWAYSTREAM, "could not open datagram socket");
164  return false;
165  }
166 
167  configureSystemBuffers();
168 
169 #if defined(YARP_HAS_ACE)
170  dgram->get_local_addr(localHandle);
171  yCDebug(DGRAMTWOWAYSTREAM, "starting DGRAM entity on port number %u",localHandle.get_port_number());
172  localAddress = Contact("127.0.0.1", localHandle.get_port_number());
173 #else
174  localAddress = Contact("127.0.0.1", local_port);
175 #endif
176 
177  yCDebug(DGRAMTWOWAYSTREAM, "Update: DGRAM from %s to %s", localAddress.toURI().c_str(), remoteAddress.toURI().c_str());
178 
179  allocate();
180 
181  return true;
182 }
183 
184 void DgramTwoWayStream::allocate(int readSize, int writeSize)
185 {
186  //These are only as another default. We should modify the method to return bool
187  //and fail if we cannot read the socket size.
188 
189  int _read_size = -1;
190  int _write_size = -1;
191 
192  std::string _env_dgram = yarp::conf::environment::getEnvironment("YARP_DGRAM_SIZE");
193  std::string _env_mode;
194  if (multiMode) {
195  _env_mode = yarp::conf::environment::getEnvironment("YARP_MCAST_SIZE");
196  } else {
197  _env_mode = yarp::conf::environment::getEnvironment("YARP_UDP_SIZE");
198  }
199  if (!_env_mode.empty()) {
200  _env_dgram = _env_mode;
201  }
202  if (!_env_dgram.empty()) {
203  int sz = NetType::toInt(_env_dgram);
204  if (sz != 0) {
205  _read_size = _write_size = sz;
206  }
207  yCInfo(DGRAMTWOWAYSTREAM, "Datagram packet size set to %d", _read_size);
208  }
209  if (readSize != 0) {
210  _read_size = readSize;
211  yCInfo(DGRAMTWOWAYSTREAM, "Datagram read size reset to %d", _read_size);
212  }
213  if (writeSize != 0) {
214  _write_size = writeSize;
215  yCInfo(DGRAMTWOWAYSTREAM, "Datagram write size reset to %d", _write_size);
216  }
217 
218  // force the size of the write buffer to be under the max size of a udp datagram.
219  if (_write_size > UDP_MAX_DATAGRAM_SIZE || _write_size < 0) {
220  _write_size = UDP_MAX_DATAGRAM_SIZE;
221  }
222 
223  if (_read_size < 0) {
224 #if defined(YARP_HAS_ACE)
225  //Defaults to socket size
226  if (dgram != nullptr) {
227  int len = sizeof(_read_size);
228  int result = dgram->get_option(SOL_SOCKET, SO_RCVBUF, &_read_size, &len);
229  if (result < 0) {
230  yCError(DGRAMTWOWAYSTREAM, "Failed to read buffer size from RCVBUF socket with error: %s. Setting read buffer size to UDP_MAX_DATAGRAM_SIZE.",
231  strerror(errno));
232  _read_size = UDP_MAX_DATAGRAM_SIZE;
233  }
234  }
235 #else
236  socklen_t len = sizeof(_read_size);
237 
238  int result = getsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, &_read_size, &len);
239  if (result < 0) {
240  yCError(DGRAMTWOWAYSTREAM, "Failed to read buffer size from RCVBUF socket with error: %s. Setting read buffer size to UDP_MAX_DATAGRAM_SIZE.",
241  strerror(errno));
242  _read_size = UDP_MAX_DATAGRAM_SIZE;
243  }
244 #endif
245  }
246 
247  readBuffer.allocate(_read_size);
248  writeBuffer.allocate(_write_size);
249  readAt = 0;
250  readAvail = 0;
251  writeAvail = CRC_SIZE;
252  //happy = true;
253  pct = 0;
254 }
255 
256 
257 void DgramTwoWayStream::configureSystemBuffers()
258 {
259  //By default the buffers are forced to the datagram size limit.
260  //These can be overwritten by environment variables
261  //Generic variable
262  std::string socketBufferSize = yarp::conf::environment::getEnvironment("YARP_DGRAM_BUFFER_SIZE");
263  //Specific read
264  std::string socketReadBufferSize = yarp::conf::environment::getEnvironment("YARP_DGRAM_RECV_BUFFER_SIZE");
265  //Specific write
266  std::string socketSendBufferSize = yarp::conf::environment::getEnvironment("YARP_DGRAM_SND_BUFFER_SIZE");
267 
268  int readBufferSize = -1;
269  if (!socketReadBufferSize.empty()) {
270  readBufferSize = NetType::toInt(socketReadBufferSize);
271  } else if (!socketBufferSize.empty()) {
272  readBufferSize = NetType::toInt(socketBufferSize);
273  }
274 
275  int writeBufferSize = -1;
276  if (!socketSendBufferSize.empty()) {
277  writeBufferSize = NetType::toInt(socketSendBufferSize);
278  } else if (!socketBufferSize.empty()) {
279  writeBufferSize = NetType::toInt(socketBufferSize);
280  }
281  // The writeBufferSize can't be set greater than udp datagram
282  // maximum size
283  if (writeBufferSize < 0 || writeBufferSize > UDP_MAX_DATAGRAM_SIZE) {
284  if (writeBufferSize > UDP_MAX_DATAGRAM_SIZE) {
285  yCWarning(DGRAMTWOWAYSTREAM, "The desired SND buffer size is too big. It is set to the max datagram size : %d",
287  }
288  writeBufferSize = UDP_MAX_DATAGRAM_SIZE;
289  }
290 
291  if (readBufferSize > 0) {
292  int actualReadSize = -1;
293 
294 #if defined(YARP_HAS_ACE)
295  int intSize = sizeof(readBufferSize);
296  int setResult = dgram->set_option(SOL_SOCKET, SO_RCVBUF, (void*)&readBufferSize, intSize);
297 
298  int getResult = dgram->get_option(SOL_SOCKET, SO_RCVBUF, (void*)&actualReadSize, &intSize);
299 #else
300  socklen_t intSize = sizeof(readBufferSize);
301  int setResult = setsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, (void*)&readBufferSize, intSize);
302  int getResult = getsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, (void*)&actualReadSize, &intSize);
303 #endif
304  // in linux the value returned by getsockopt is "doubled"
305  // for some unknown reasons (see https://linux.die.net/man/7/socket)
306 #if defined(__linux__)
307  actualReadSize /= 2;
308 #endif
309  if (setResult < 0 || getResult < 0 || readBufferSize != actualReadSize) {
310  bufferAlertNeeded = true;
311  bufferAlerted = false;
312  yCWarning(DGRAMTWOWAYSTREAM, "Failed to set RECV socket buffer to desired size. Actual: %d, Desired %d",
313  actualReadSize,
314  readBufferSize);
315  }
316  }
317  if (writeBufferSize > 0) {
318  int actualWriteSize = -1;
319 #if defined(YARP_HAS_ACE)
320  int intSize = sizeof(writeBufferSize);
321  int setResult = dgram->set_option(SOL_SOCKET, SO_SNDBUF, (void*)&writeBufferSize, intSize);
322  int getResult = dgram->get_option(SOL_SOCKET, SO_SNDBUF, (void*)&actualWriteSize, &intSize);
323 #else
324  socklen_t intSize = sizeof(writeBufferSize);
325  int setResult = setsockopt(dgram_sockfd, SOL_SOCKET, SO_SNDBUF, (void*)&writeBufferSize, intSize);
326  int getResult = getsockopt(dgram_sockfd, SOL_SOCKET, SO_SNDBUF, (void*)&actualWriteSize, &intSize);
327 #endif
328  // in linux the value returned by getsockopt is "doubled"
329  // for some unknown reasons (see https://linux.die.net/man/7/socket)
330 #if defined(__linux__)
331  actualWriteSize /= 2;
332 #endif
333  if (setResult < 0 || getResult < 0 || writeBufferSize != actualWriteSize) {
334  bufferAlertNeeded = true;
335  bufferAlerted = false;
336  yCWarning(DGRAMTWOWAYSTREAM, "Failed to set SND socket buffer to desired size. Actual: %d, Desired: %d",
337  actualWriteSize,
338  writeBufferSize);
339  }
340  }
341 }
342 
343 
344 #if defined(YARP_HAS_ACE)
345 int DgramTwoWayStream::restrictMcast(ACE_SOCK_Dgram_Mcast* dmcast,
346  const Contact& group,
347  const Contact& ipLocal,
348  bool add)
349 {
350  restrictInterfaceIp = ipLocal;
351 
352  yCInfo(DGRAMTWOWAYSTREAM, "multicast connection %s on network interface for %s", group.getHost().c_str(), ipLocal.getHost().c_str());
353  int result = -1;
354  // There's some major damage in ACE mcast interfaces.
355  // Most require interface names, yet provide no way to query
356  // these names - and in the end, convert to IP addresses.
357  // Here we try to do an end run around ACE.
358 
359  // based on: ACE_SOCK_Dgram::set_nic
360 
361  ip_mreq multicast_address;
362  ACE_INET_Addr group_addr(group.getPort(),
363  group.getHost().c_str());
364  ACE_INET_Addr interface_addr(ipLocal.getPort(),
365  ipLocal.getHost().c_str());
366  multicast_address.imr_interface.s_addr = htonl(interface_addr.get_ip_address());
367  multicast_address.imr_multiaddr.s_addr = htonl(group_addr.get_ip_address());
368 
369  if (add) {
370  yCDebug(DGRAMTWOWAYSTREAM, "Trying to correct mcast membership...");
371  result = ((ACE_SOCK*)dmcast)->set_option(IPPROTO_IP, IP_ADD_MEMBERSHIP, &multicast_address, sizeof(struct ip_mreq));
372  } else {
373  yCDebug(DGRAMTWOWAYSTREAM, "Trying to correct mcast output...");
374  result = ((ACE_SOCK*)dmcast)->set_option(IPPROTO_IP, IP_MULTICAST_IF, &multicast_address.imr_interface.s_addr, sizeof(struct in_addr));
375  }
376  if (result != 0) {
377  int num = errno;
378  yCDebug(DGRAMTWOWAYSTREAM, "mcast result: %s", strerror(num));
379  if (num == 98) {
380  // our membership is already correct / Address already in use
381  result = 0;
382  }
383  result = 0; // in fact, best to proceed for Windows.
384  }
385 
386  return result;
387 }
388 #endif
389 
390 
392  const Contact& ipLocal)
393 {
394 
395  multiMode = true;
396 
397  localAddress = ipLocal;
398 
399 #if defined(YARP_HAS_ACE)
400  localHandle = ACE_INET_Addr((u_short)(localAddress.getPort()),
401  (ACE_UINT32)INADDR_ANY);
402 
403  ACE_SOCK_Dgram_Mcast::options mcastOptions = ACE_SOCK_Dgram_Mcast::DEFOPTS;
404 # if defined(__APPLE__)
405  mcastOptions = static_cast<ACE_SOCK_Dgram_Mcast::options>(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
406 # endif
407 
408  auto* dmcast = new ACE_SOCK_Dgram_Mcast(mcastOptions);
409  dgram = dmcast;
410  mgram = dmcast;
411  yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
412 
413  int result = -1;
414  ACE_INET_Addr addr(group.getPort(), group.getHost().c_str());
415  result = dmcast->open(addr, nullptr, 1);
416  if (result == 0) {
417  result = restrictMcast(dmcast, group, ipLocal, false);
418  }
419 
420  if (result != 0) {
421  yCError(DGRAMTWOWAYSTREAM, "could not open multicast datagram socket");
422  return false;
423  }
424 
425 #else
426  dgram = nullptr;
427  dgram_sockfd = -1;
428 
429  int s = -1;
430  struct sockaddr_in dgram_sin;
431  // create what looks like an ordinary UDP socket
432  if ((s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
433  yCError(DGRAMTWOWAYSTREAM, "could not create sender socket");
434  std::exit(1);
435  }
436  // set up destination address
437  memset((char*)&dgram_sin, 0, sizeof(dgram_sin));
438  dgram_sin.sin_family = AF_INET;
439  dgram_sin.sin_port = htons(group.getPort());
440 
441 
442  if (inet_pton(AF_INET, group.getHost().c_str(), &dgram_sin.sin_addr) == 0) {
443  yCError(DGRAMTWOWAYSTREAM, "could not set up mcast client");
444  std::exit(1);
445  }
446  if (connect(s, (struct sockaddr*)&dgram_sin, sizeof(dgram_sin)) == -1) {
447  yCError(DGRAMTWOWAYSTREAM, "could not connect mcast client");
448  std::exit(1);
449  }
450 
451 
452  dgram_sockfd = s;
453  dgram = this;
454  int local_port = -1;
455 
456  struct sockaddr_in sin;
457  socklen_t len = sizeof(sin);
458  if (getsockname(dgram_sockfd, (struct sockaddr*)&sin, &len) == 0 && sin.sin_family == AF_INET) {
459  local_port = ntohs(sin.sin_port);
460  }
461 
462 
463 #endif
464  configureSystemBuffers();
465  remoteAddress = group;
466 #ifdef YARP_HAS_ACE
467 
468  localHandle.set(localAddress.getPort(), localAddress.getHost().c_str());
469  remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
470 #else
471 
472  remoteAddress = group;
473  localAddress = Contact("127.0.0.1", local_port);
474  localHandle = local_port;
475  remoteHandle = remoteAddress.getPort();
476 
477 
478 #endif
479  yCDebug(DGRAMTWOWAYSTREAM, "Update: DGRAM from %s to %s", localAddress.toURI().c_str(), remoteAddress.toURI().c_str());
480  allocate();
481 
482  return true;
483 }
484 
485 
486 bool DgramTwoWayStream::join(const Contact& group, bool sender, const Contact& ipLocal)
487 {
488  yCDebug(DGRAMTWOWAYSTREAM, "subscribing to mcast address %s for %s", group.toURI().c_str(), (sender ? "writing" : "reading"));
489 
490  multiMode = true;
491 
492  if (sender) {
493  if (ipLocal.isValid()) {
494  return openMcast(group, ipLocal);
495  }
496  // just use udp as normal
497  return open(group);
498  }
499 
500 #if defined(YARP_HAS_ACE)
501  ACE_SOCK_Dgram_Mcast::options mcastOptions = ACE_SOCK_Dgram_Mcast::DEFOPTS;
502 # if defined(__APPLE__)
503  mcastOptions = static_cast<ACE_SOCK_Dgram_Mcast::options>(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
504 # endif
505 
506  auto* dmcast = new ACE_SOCK_Dgram_Mcast(mcastOptions);
507 
508  dgram = dmcast;
509  mgram = dmcast;
510  yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
511 
512  ACE_INET_Addr addr(group.getPort(), group.getHost().c_str());
513 
514  int result = -1;
515  if (ipLocal.isValid()) {
516  result = dmcast->join(addr, 1);
517 
518  if (result == 0) {
519  result = restrictMcast(dmcast, group, ipLocal, true);
520  }
521  } else {
522  result = dmcast->join(addr, 1);
523  }
524 
525  if (result != 0) {
526  yCError(DGRAMTWOWAYSTREAM, "cannot connect to multi-cast address");
527  happy = false;
528  return false;
529  }
530 #else
531  struct ip_mreq mreq;
532  int s = -1;
533  if ((s = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
534  yCError(DGRAMTWOWAYSTREAM, "could not create receiver socket");
535  happy = false;
536  return false;
537  }
538  struct sockaddr_in addr;
539  u_int yes = 1;
540 
541  /* set up destination address */
542  memset(&addr, 0, sizeof(addr));
543  addr.sin_family = AF_INET;
544  addr.sin_addr.s_addr = htonl(INADDR_ANY);
545  addr.sin_port = htons(group.getPort());
546 
547  // allow multiple sockets to use the same PORT number
548  if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(u_int)) < 0) {
549  yCError(DGRAMTWOWAYSTREAM, "could not allow sockets use the same ADDRESS");
550  happy = false;
551  return false;
552  }
553 
554 # if defined(__APPLE__)
555  if (setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(u_int)) < 0) {
556  yCError(DGRAMTWOWAYSTREAM, "could not allow sockets use the same PORT number");
557  happy = false;
558  return false;
559  }
560 # endif
561 
562  // bind to receive address
563  if (bind(s, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
564  yCError(DGRAMTWOWAYSTREAM, "could not create mcast server");
565  happy = false;
566  return false;
567  }
568 
569  // use setsockopt() to request that the kernel join a multicast group
570  if (inet_pton(AF_INET, group.getHost().c_str(), &mreq.imr_multiaddr) == 0) {
571  yCError(DGRAMTWOWAYSTREAM, "Could not set up the mcast server");
572  std::exit(1);
573  }
574  mreq.imr_interface.s_addr = htonl(INADDR_ANY);
575  if (setsockopt(s, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
576  yCError(DGRAMTWOWAYSTREAM, "could not join the multicast group");
577  yCError(DGRAMTWOWAYSTREAM, "sendto: %d, %s", errno, strerror(errno));
578  happy = false;
579  return false;
580  }
581 
582  dgram_sockfd = s;
583  dgram = this;
584 #endif
585  configureSystemBuffers();
586 
587  localAddress = group;
588  remoteAddress = group;
589 #ifdef YARP_HAS_ACE
590  localHandle.set(localAddress.getPort(), localAddress.getHost().c_str());
591  remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
592 #else
593  localHandle = localAddress.getPort();
594  remoteHandle = remoteAddress.getPort();
595 #endif
596  allocate();
597  return true;
598 }
599 
601 {
602  closeMain();
603 }
604 
606 {
607  bool act = false;
608  mutex.lock();
609  if ((!closed) && (!interrupting) && happy) {
610  act = true;
611  interrupting = true;
612  closed = true;
613  }
614  mutex.unlock();
615 
616  if (act) {
617  if (reader) {
618  int ct = 3;
619  while (happy && ct > 0) {
620  ct--;
621  DgramTwoWayStream tmp;
622  if (mgram != nullptr) {
623  yCDebug(DGRAMTWOWAYSTREAM, "* mcast interrupt, interface %s", restrictInterfaceIp.toString().c_str());
624  tmp.join(localAddress, true, restrictInterfaceIp);
625  } else {
626  yCDebug(DGRAMTWOWAYSTREAM, "* dgram interrupt");
627  tmp.open(Contact(localAddress.getHost(), 0),
628  localAddress);
629  }
630  yCDebug(DGRAMTWOWAYSTREAM, "* interrupt state %s %s %s",
631  (interrupting ? "true" : "false"),
632  (closed ? "true" : "false"),
633  (happy ? "true" : "false"));
634  ManagedBytes empty(10);
635  for (size_t i = 0; i < empty.length(); i++) {
636  empty.get()[i] = 0;
637  }
638 
639  // don't want this message getting into a valid packet
640  tmp.pct = -1;
641 
642  tmp.write(empty.bytes());
643  tmp.flush();
644  tmp.close();
645  if (happy) {
647  }
648  }
649  yCDebug(DGRAMTWOWAYSTREAM, "dgram interrupt done");
650  }
651  mutex.lock();
652  interrupting = false;
653  mutex.unlock();
654  } else {
655  // wait for interruption to be done
656  if (interrupting) {
657  while (interrupting) {
658  yCDebug(DGRAMTWOWAYSTREAM, "waiting for dgram interrupt to be finished...");
660  }
661  }
662  }
663 }
664 
666 {
667  if (dgram != nullptr) {
668  //printf("Dgram closing, interrupt state %d\n", interrupting);
669  interrupt();
670  mutex.lock();
671  closed = true;
672  happy = false;
673  //printf("Dgram closinger, interrupt state %d\n", interrupting);
674  mutex.unlock();
675  while (interrupting) {
676  happy = false;
678  }
679  mutex.lock();
680  if (dgram != nullptr) {
681 #if defined(YARP_HAS_ACE)
682  dgram->close();
683  delete dgram;
684 #else
685  if (dgram_sockfd >= 0) {
686  ::close(dgram_sockfd);
687  }
688  dgram_sockfd = -1;
689 #endif
690  dgram = nullptr;
691  mgram = nullptr;
692  }
693  happy = false;
694  mutex.unlock();
695  }
696  happy = false;
697 }
698 
700 {
701  reader = true;
702  bool done = false;
703 
704  while (!done) {
705 
706  if (closed) {
707  happy = false;
708  return -1;
709  }
710 
711  // if nothing is available, try to grab stuff
712  if (readAvail == 0) {
713  readAt = 0;
714 
715 
716  //yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
717  yCTrace(DGRAMTWOWAYSTREAM, "DGRAM Waiting for something!");
718  yarp::conf::ssize_t result = -1;
719 #if defined(YARP_HAS_ACE)
720  if ((dgram != nullptr) && restrictInterfaceIp.isValid()) {
721  yCTrace(DGRAMTWOWAYSTREAM, "Consider remote mcast");
722  yCTrace(DGRAMTWOWAYSTREAM, "What we know:");
723  yCTrace(DGRAMTWOWAYSTREAM, " %s", restrictInterfaceIp.toString().c_str());
724  yCTrace(DGRAMTWOWAYSTREAM, " %s", localAddress.toString().c_str());
725  yCTrace(DGRAMTWOWAYSTREAM, " %s", remoteAddress.toString().c_str());
726 
727  ACE_INET_Addr iface(restrictInterfaceIp.getPort(),
728  restrictInterfaceIp.getHost().c_str());
729  ACE_INET_Addr dummy((u_short)0, (ACE_UINT32)INADDR_ANY);
730  result = dgram->recv(readBuffer.get(), readBuffer.length(), dummy);
731  yCDebug(DGRAMTWOWAYSTREAM, "MCAST Got %zd bytes", result);
732 
733  } else
734 #endif
735  if (dgram != nullptr) {
736  yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
737 #if defined(YARP_HAS_ACE)
738  ACE_INET_Addr dummy((u_short)0, (ACE_UINT32)INADDR_ANY);
739  yCTrace(DGRAMTWOWAYSTREAM, "DGRAM Waiting for something!");
740  result = dgram->recv(readBuffer.get(), readBuffer.length(), dummy);
741 #else
742  result = recv(dgram_sockfd, readBuffer.get(), readBuffer.length(), 0);
743 #endif
744  yCDebug(DGRAMTWOWAYSTREAM, "DGRAM Got %zd bytes", result);
745  } else {
746  onMonitorInput();
747  //printf("Monitored input of %d bytes\n", monitor.length());
748  if (monitor.length() > readBuffer.length()) {
749  printf("Too big!\n");
750  std::exit(1);
751  }
752  memcpy(readBuffer.get(), monitor.get(), monitor.length());
753  result = monitor.length();
754  }
755 
756  if (closed || (result < 0)) {
757  happy = false;
758  return -1;
759  }
760  readAvail = result;
761 
762  // deal with CRC
763  int altPct = 0;
764  bool crcOk = checkCrc(readBuffer.get(), readAvail, CRC_SIZE, pct, &altPct);
765  if (altPct != -1) {
766  pct++;
767  if (!crcOk) {
768  if (bufferAlertNeeded && !bufferAlerted) {
769  yCError(DGRAMTWOWAYSTREAM, "*** Multicast/UDP packet dropped - checksum error ***");
770  yCInfo(DGRAMTWOWAYSTREAM, "The UDP/MCAST system buffer limit on your system is low.");
771  yCInfo(DGRAMTWOWAYSTREAM, "You may get packet loss under heavy conditions.");
772 #ifdef __linux__
773  yCInfo(DGRAMTWOWAYSTREAM, "To change the buffer limit on linux: sysctl -w net.core.rmem_max=8388608");
774  yCInfo(DGRAMTWOWAYSTREAM, "(Might be something like: sudo /sbin/sysctl -w net.core.rmem_max=8388608)");
775 #else
776  yCInfo(DGRAMTWOWAYSTREAM, "To change the limit use: sysctl for Linux/FreeBSD, ndd for Solaris, no for AIX");
777 #endif
778  bufferAlerted = true;
779  } else {
780  errCount++;
781  double now = SystemClock::nowSystem();
782  if (now - lastReportTime > 1) {
783  yCError(DGRAMTWOWAYSTREAM, "*** %d datagram packet(s) dropped - checksum error ***", errCount);
784  lastReportTime = now;
785  errCount = 0;
786  }
787  }
788  reset();
789  return -1;
790  }
791  readAt += CRC_SIZE;
792  readAvail -= CRC_SIZE;
793  done = true;
794  } else {
795  readAvail = 0;
796  }
797  }
798 
799  // if stuff is available, take it
800  if (readAvail > 0) {
801  size_t take = readAvail;
802  if (take > b.length()) {
803  take = b.length();
804  }
805  memcpy(b.get(), readBuffer.get() + readAt, take);
806  readAt += take;
807  readAvail -= take;
808  return take;
809  }
810  }
811 
812  return 0;
813 }
814 
816 {
817  yCTrace(DGRAMTWOWAYSTREAM, "DGRAM prep writing");
818  yCTrace(DGRAMTWOWAYSTREAM, "DGRAM write %zu bytes", b.length());
819 
820  if (reader) {
821  return;
822  }
823  if (writeBuffer.get() == nullptr) {
824  return;
825  }
826 
827  Bytes local = b;
828  while (local.length() > 0) {
829  yCTrace(DGRAMTWOWAYSTREAM, "DGRAM prep writing");
830  yarp::conf::ssize_t rem = local.length();
831  yarp::conf::ssize_t space = writeBuffer.length() - writeAvail;
832  bool shouldFlush = false;
833  if (rem >= space) {
834  rem = space;
835  shouldFlush = true;
836  }
837  memcpy(writeBuffer.get() + writeAvail, local.get(), rem);
838  writeAvail += rem;
839  local = Bytes(local.get() + rem, local.length() - rem);
840  if (shouldFlush) {
841  flush();
842  }
843  }
844 }
845 
846 
848 {
849  if (writeBuffer.get() == nullptr) {
850  return;
851  }
852 
853  // should set CRC
854  if (writeAvail <= CRC_SIZE) {
855  return;
856  }
857  addCrc(writeBuffer.get(), writeAvail, CRC_SIZE, pct);
858  pct++;
859 
860  if (writeAvail > 0) {
861  //yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
862  yarp::conf::ssize_t len = 0;
863 
864 #if defined(YARP_HAS_ACE)
865  if (mgram != nullptr) {
866  len = mgram->send(writeBuffer.get(), writeAvail);
867  yCDebug(DGRAMTWOWAYSTREAM, "MCAST - wrote %zd bytes", len);
868  } else
869 #endif
870  if (dgram != nullptr) {
871 #if defined(YARP_HAS_ACE)
872  len = dgram->send(writeBuffer.get(), writeAvail, remoteHandle);
873 #else
874  len = send(dgram_sockfd, writeBuffer.get(), writeAvail, 0);
875 #endif
876  yCDebug(DGRAMTWOWAYSTREAM, "DGRAM - wrote %zd bytes to %s", len, remoteAddress.toString().c_str());
877  } else {
878  Bytes b(writeBuffer.get(), writeAvail);
879  monitor = ManagedBytes(b, false);
880  monitor.copy();
881  //printf("Monitored output of %d bytes\n", monitor.length());
882  len = monitor.length();
883  onMonitorOutput();
884  }
885  if (len > writeBuffer.length() * 0.75) {
886  yCDebug(DGRAMTWOWAYSTREAM, "long dgrams might need a little time");
887 
888  // Under heavy loads, packets could get dropped
889  // 640x480x3 images correspond to about 15 datagrams
890  // so there's not much time possible between them
891  // looked at iperf, it just does a busy-waiting delay
892  // there's an implementation below, but commented out -
893  // better solution was to increase recv buffer size
894 
895  double first = yarp::os::SystemClock::nowSystem();
896  double now;
897  int ct = 0;
898  do {
899  //printf("Busy wait... %d\n", ct);
902  ct++;
903  } while (now - first < 0.001);
904  }
905 
906  if (len < 0) {
907  happy = false;
908  yCDebug(DGRAMTWOWAYSTREAM, "DGRAM failed to send message with error: %s", strerror(errno));
909  return;
910  }
911  writeAvail -= len;
912 
913  if (writeAvail != 0) {
914  // well, we have a problem
915  // checksums will cause dumping
916  yCDebug(DGRAMTWOWAYSTREAM, "dgram/mcast send behaving badly");
917  }
918  }
919  // finally: writeAvail should be 0
920 
921  // make space for CRC
922  writeAvail = CRC_SIZE;
923 }
924 
925 
927 {
928  return happy;
929 }
930 
931 
933 {
934  readAt = 0;
935  readAvail = 0;
936  writeAvail = CRC_SIZE;
937  pct = 0;
938 }
939 
940 
942 {
943 // yCError(DGRAMTWOWAYSTREAM, "Packet begins: %s", (reader ? "reader" : "writer"));
944  pct = 0;
945 }
946 
948 {
949 // yCError(DGRAMTWOWAYSTREAM, "Packet ends: %s", (reader ? "reader" : "writer"));
950  if (!reader) {
951  pct = 0;
952  }
953 }
954 
956 {
957  return monitor.bytes();
958 }
959 
960 
962 {
963  monitor.clear();
964 }
965 
966 
968 {
969  if (dgram == nullptr) {
970  return false;
971  }
972 #if defined(YARP_HAS_ACE)
973  return (dgram->set_option(IPPROTO_IP, IP_TOS, (int*)&tos, (int)sizeof(tos)) == 0);
974 #else
975  return (setsockopt(dgram_sockfd, IPPROTO_IP, IP_TOS, (int*)&tos, (int)sizeof(tos)) == 0);
976 #endif
977 }
978 
980 {
981  int tos = -1;
982  if (dgram == nullptr) {
983  return tos;
984  }
985 #if defined(YARP_HAS_ACE)
986  int optlen;
987  dgram->get_option(IPPROTO_IP, IP_TOS, (int*)&tos, &optlen);
988 #else
989  socklen_t optlen;
990  getsockopt(dgram_sockfd, IPPROTO_IP, IP_TOS, (int*)&tos, &optlen);
991 #endif
992  return tos;
993 }
yarp::os::impl::DgramTwoWayStream::openMcast
virtual bool openMcast(const Contact &group, const Contact &ipLocal)
Definition: DgramTwoWayStream.cpp:391
yarp::os::impl::DgramTwoWayStream::endPacket
void endPacket() override
Mark the end of a logical packet (see beginPacket).
Definition: DgramTwoWayStream.cpp:947
yarp::os::impl::DgramTwoWayStream::join
virtual bool join(const Contact &group, bool sender, const Contact &ipLocal)
Definition: DgramTwoWayStream.cpp:486
yCWarning
#define yCWarning(component,...)
Definition: LogComponent.h:146
yarp::conf::environment::getEnvironment
std::string getEnvironment(const char *key, bool *found=nullptr)
Read a variable from the environment.
Definition: environment.h:31
yarp::os::NetType::getCrc
static unsigned long int getCrc(char *buf, size_t len)
Definition: NetType.cpp:250
checkCrc
static bool checkCrc(char *buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct, int *store_altPct=nullptr)
Definition: DgramTwoWayStream.cpp:56
yarp::os::impl::DgramTwoWayStream::reset
void reset() override
Reset the stream.
Definition: DgramTwoWayStream.cpp:932
LogComponent.h
yarp::os::Time::now
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition: Time.cpp:124
yarp::os::SystemClock::nowSystem
static double nowSystem()
Definition: SystemClock.cpp:37
NetType.h
yarp::os::Contact::toURI
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition: Contact.cpp:316
yarp::os::ManagedBytes::bytes
const Bytes & bytes() const
Definition: ManagedBytes.cpp:177
yarp::os::impl::DgramTwoWayStream::setTypeOfService
bool setTypeOfService(int tos) override
Definition: DgramTwoWayStream.cpp:967
CRC_SIZE
#define CRC_SIZE
Definition: DgramTwoWayStream.cpp:47
yarp::os::impl::DgramTwoWayStream::close
void close() override
Terminate the stream.
Definition: DgramTwoWayStream.h:117
yarp::os::ManagedBytes
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:25
yarp::os::impl::DgramTwoWayStream::interrupt
void interrupt() override
Interrupt the stream.
Definition: DgramTwoWayStream.cpp:605
yarp::os::Contact::getPort
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:242
yarp::os::SystemClock::delaySystem
static void delaySystem(double seconds)
Definition: SystemClock.cpp:32
yarp::os::Bytes::get
const char * get() const
Definition: Bytes.cpp:30
yarp::os::Bytes::length
size_t length() const
Definition: Bytes.cpp:25
yarp::conf::ssize_t
::ssize_t ssize_t
Definition: numeric.h:60
yarp::os::impl::DgramTwoWayStream::beginPacket
void beginPacket() override
Mark the beginning of a logical packet.
Definition: DgramTwoWayStream.cpp:941
yarp::os::impl::DgramTwoWayStream
A stream abstraction for datagram communication.
Definition: DgramTwoWayStream.h:40
yarp::os::impl::DgramTwoWayStream::closeMain
virtual void closeMain()
Definition: DgramTwoWayStream.cpp:665
yarp::os::Bytes
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
system.h
yarp::os::impl::DgramTwoWayStream::~DgramTwoWayStream
virtual ~DgramTwoWayStream()
Definition: DgramTwoWayStream.cpp:600
yCAssert
#define yCAssert(component, x)
Definition: LogComponent.h:172
DgramTwoWayStream.h
yarp::os::impl::DgramTwoWayStream::open
virtual bool open(const Contact &remote)
Definition: DgramTwoWayStream.cpp:92
yarp::os::impl::DgramTwoWayStream::getMonitor
yarp::os::Bytes getMonitor()
Definition: DgramTwoWayStream.cpp:955
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
yarp::os::impl::DgramTwoWayStream::getTypeOfService
int getTypeOfService() override
Definition: DgramTwoWayStream.cpp:979
yarp::os::impl::DgramTwoWayStream::removeMonitor
void removeMonitor()
Definition: DgramTwoWayStream.cpp:961
yCInfo
#define yCInfo(component,...)
Definition: LogComponent.h:135
UDP_MAX_DATAGRAM_SIZE
#define UDP_MAX_DATAGRAM_SIZE
Definition: DgramTwoWayStream.cpp:48
yarp::os::InputStream::read
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:23
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
yarp::os::Contact::isValid
bool isValid() const
Checks if a Contact is tagged as valid.
Definition: Contact.cpp:301
addCrc
static void addCrc(char *buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct)
Definition: DgramTwoWayStream.cpp:81
environment.h
yarp::os::Contact::getHost
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:231
yarp::os::Contact
Represents how to reach a part of a YARP network.
Definition: Contact.h:39
yarp::os::ManagedBytes::get
const char * get() const
Definition: ManagedBytes.cpp:154
Time.h
yarp::os::ManagedBytes::length
size_t length() const
Definition: ManagedBytes.cpp:144
yCTrace
#define yCTrace(component,...)
Definition: LogComponent.h:88
yarp::os::impl::DgramTwoWayStream::isOk
bool isOk() const override
Check if the stream is ok or in an error state.
Definition: DgramTwoWayStream.cpp:926
YARP_OS_LOG_COMPONENT
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
yarp::os::impl
The components from which ports and connections are built.
yarp::os::impl::DgramTwoWayStream::write
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
Definition: DgramTwoWayStream.cpp:815
yarp::os::impl::DgramTwoWayStream::flush
void flush() override
Make sure all pending write operations are finished.
Definition: DgramTwoWayStream.cpp:847
yarp::os::NetType::netInt
static int netInt(const yarp::os::Bytes &code)
Definition: NetType.cpp:96
yarp::os::NetType::toInt
static int toInt(const std::string &x)
Definition: NetType.cpp:160
yarp::os::NetInt32
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition: NetInt32.h:33