19 #if defined(YARP_HAS_ACE)
21 # include <ace/Handle_Set.h>
22 # include <ace/INET_Addr.h>
23 # include <ace/Log_Msg.h>
24 # include <ace/OS_Memory.h>
25 # include <ace/OS_NS_sys_select.h>
26 # include <ace/SOCK_Dgram.h>
27 # include <ace/SOCK_Dgram_Mcast.h>
28 # include <ace/os_include/net/os_if.h>
34 # include <arpa/inet.h>
35 # include <netinet/in.h>
36 # include <sys/socket.h>
37 # include <sys/types.h>
48 #define UDP_MAX_DATAGRAM_SIZE (65507 - CRC_SIZE)
59 (length > crcLength) ? (length - crcLength) : 0);
64 bool ok = (alt == curr && pct == altPct);
67 yCDebug(DGRAMTWOWAYSTREAM,
"crc mismatch");
70 yCDebug(DGRAMTWOWAYSTREAM,
"packet code broken");
73 if (store_altPct !=
nullptr) {
74 *store_altPct = altPct;
84 (length > crcLength) ? (length - crcLength) : 0);
94 #if defined(YARP_HAS_ACE)
95 ACE_INET_Addr anywhere((u_short)0, (ACE_UINT32)INADDR_ANY);
96 Contact local(anywhere.get_host_addr(),
97 anywhere.get_port_number());
101 return open(local, remote);
106 localAddress = local;
107 remoteAddress = remote;
109 #if defined(YARP_HAS_ACE)
110 localHandle = ACE_INET_Addr((u_short)(localAddress.getPort()), (ACE_UINT32)INADDR_ANY);
112 remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
114 dgram =
new ACE_SOCK_Dgram;
115 yCAssert(DGRAMTWOWAYSTREAM, dgram !=
nullptr);
117 int result = dgram->open(localHandle,
118 ACE_PROTOCOL_FAMILY_INET,
126 if ((s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
129 struct sockaddr_in dgram_sin;
130 memset((
char*)&dgram_sin, 0,
sizeof(dgram_sin));
131 dgram_sin.sin_family = AF_INET;
132 dgram_sin.sin_addr.s_addr = htonl(INADDR_ANY);
133 dgram_sin.sin_port = htons(remote.
getPort());
135 if (inet_pton(AF_INET, remote.
getHost().c_str(), &dgram_sin.sin_addr) == 0) {
136 yCError(DGRAMTWOWAYSTREAM,
"could not set up udp client");
139 if (connect(s, (
struct sockaddr*)&dgram_sin,
sizeof(dgram_sin)) == -1) {
140 yCError(DGRAMTWOWAYSTREAM,
"could not connect udp client");
144 if (bind(s, (
struct sockaddr*)&dgram_sin,
sizeof(dgram_sin)) == -1) {
145 yCError(DGRAMTWOWAYSTREAM,
"could not create udp server");
154 struct sockaddr_in sin;
155 socklen_t len =
sizeof(sin);
156 if (getsockname(dgram_sockfd, (
struct sockaddr*)&sin, &len) == 0 && sin.sin_family == AF_INET) {
158 local_port = ntohs(sin.sin_port);
163 yCError(DGRAMTWOWAYSTREAM,
"could not open datagram socket");
167 configureSystemBuffers();
169 #if defined(YARP_HAS_ACE)
170 dgram->get_local_addr(localHandle);
171 yCDebug(DGRAMTWOWAYSTREAM,
"starting DGRAM entity on port number %u",localHandle.get_port_number());
172 localAddress =
Contact(
"127.0.0.1", localHandle.get_port_number());
174 localAddress =
Contact(
"127.0.0.1", local_port);
177 yCDebug(DGRAMTWOWAYSTREAM,
"Update: DGRAM from %s to %s", localAddress.toURI().c_str(), remoteAddress.toURI().c_str());
184 void DgramTwoWayStream::allocate(
int readSize,
int writeSize)
190 int _write_size = -1;
193 std::string _env_mode;
199 if (!_env_mode.empty()) {
200 _env_dgram = _env_mode;
202 if (!_env_dgram.empty()) {
205 _read_size = _write_size = sz;
207 yCInfo(DGRAMTWOWAYSTREAM,
"Datagram packet size set to %d", _read_size);
210 _read_size = readSize;
211 yCInfo(DGRAMTWOWAYSTREAM,
"Datagram read size reset to %d", _read_size);
213 if (writeSize != 0) {
214 _write_size = writeSize;
215 yCInfo(DGRAMTWOWAYSTREAM,
"Datagram write size reset to %d", _write_size);
223 if (_read_size < 0) {
224 #if defined(YARP_HAS_ACE)
226 if (dgram !=
nullptr) {
227 int len =
sizeof(_read_size);
228 int result = dgram->get_option(SOL_SOCKET, SO_RCVBUF, &_read_size, &len);
230 yCError(DGRAMTWOWAYSTREAM,
"Failed to read buffer size from RCVBUF socket with error: %s. Setting read buffer size to UDP_MAX_DATAGRAM_SIZE.",
236 socklen_t len =
sizeof(_read_size);
238 int result = getsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, &_read_size, &len);
240 yCError(DGRAMTWOWAYSTREAM,
"Failed to read buffer size from RCVBUF socket with error: %s. Setting read buffer size to UDP_MAX_DATAGRAM_SIZE.",
247 readBuffer.allocate(_read_size);
248 writeBuffer.allocate(_write_size);
257 void DgramTwoWayStream::configureSystemBuffers()
268 int readBufferSize = -1;
269 if (!socketReadBufferSize.empty()) {
271 }
else if (!socketBufferSize.empty()) {
275 int writeBufferSize = -1;
276 if (!socketSendBufferSize.empty()) {
278 }
else if (!socketBufferSize.empty()) {
285 yCWarning(DGRAMTWOWAYSTREAM,
"The desired SND buffer size is too big. It is set to the max datagram size : %d",
291 if (readBufferSize > 0) {
292 int actualReadSize = -1;
294 #if defined(YARP_HAS_ACE)
295 int intSize =
sizeof(readBufferSize);
296 int setResult = dgram->set_option(SOL_SOCKET, SO_RCVBUF, (
void*)&readBufferSize, intSize);
298 int getResult = dgram->get_option(SOL_SOCKET, SO_RCVBUF, (
void*)&actualReadSize, &intSize);
300 socklen_t intSize =
sizeof(readBufferSize);
301 int setResult = setsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, (
void*)&readBufferSize, intSize);
302 int getResult = getsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, (
void*)&actualReadSize, &intSize);
306 #if defined(__linux__)
309 if (setResult < 0 || getResult < 0 || readBufferSize != actualReadSize) {
310 bufferAlertNeeded =
true;
311 bufferAlerted =
false;
312 yCWarning(DGRAMTWOWAYSTREAM,
"Failed to set RECV socket buffer to desired size. Actual: %d, Desired %d",
317 if (writeBufferSize > 0) {
318 int actualWriteSize = -1;
319 #if defined(YARP_HAS_ACE)
320 int intSize =
sizeof(writeBufferSize);
321 int setResult = dgram->set_option(SOL_SOCKET, SO_SNDBUF, (
void*)&writeBufferSize, intSize);
322 int getResult = dgram->get_option(SOL_SOCKET, SO_SNDBUF, (
void*)&actualWriteSize, &intSize);
324 socklen_t intSize =
sizeof(writeBufferSize);
325 int setResult = setsockopt(dgram_sockfd, SOL_SOCKET, SO_SNDBUF, (
void*)&writeBufferSize, intSize);
326 int getResult = getsockopt(dgram_sockfd, SOL_SOCKET, SO_SNDBUF, (
void*)&actualWriteSize, &intSize);
330 #if defined(__linux__)
331 actualWriteSize /= 2;
333 if (setResult < 0 || getResult < 0 || writeBufferSize != actualWriteSize) {
334 bufferAlertNeeded =
true;
335 bufferAlerted =
false;
336 yCWarning(DGRAMTWOWAYSTREAM,
"Failed to set SND socket buffer to desired size. Actual: %d, Desired: %d",
344 #if defined(YARP_HAS_ACE)
345 int DgramTwoWayStream::restrictMcast(ACE_SOCK_Dgram_Mcast* dmcast,
350 restrictInterfaceIp = ipLocal;
352 yCInfo(DGRAMTWOWAYSTREAM,
"multicast connection %s on network interface for %s", group.
getHost().c_str(), ipLocal.
getHost().c_str());
361 ip_mreq multicast_address;
362 ACE_INET_Addr group_addr(group.
getPort(),
364 ACE_INET_Addr interface_addr(ipLocal.
getPort(),
366 multicast_address.imr_interface.s_addr = htonl(interface_addr.get_ip_address());
367 multicast_address.imr_multiaddr.s_addr = htonl(group_addr.get_ip_address());
370 yCDebug(DGRAMTWOWAYSTREAM,
"Trying to correct mcast membership...");
371 result = ((ACE_SOCK*)dmcast)->set_option(IPPROTO_IP, IP_ADD_MEMBERSHIP, &multicast_address,
sizeof(
struct ip_mreq));
373 yCDebug(DGRAMTWOWAYSTREAM,
"Trying to correct mcast output...");
374 result = ((ACE_SOCK*)dmcast)->set_option(IPPROTO_IP, IP_MULTICAST_IF, &multicast_address.imr_interface.s_addr,
sizeof(
struct in_addr));
378 yCDebug(DGRAMTWOWAYSTREAM,
"mcast result: %s", strerror(num));
397 localAddress = ipLocal;
399 #if defined(YARP_HAS_ACE)
400 localHandle = ACE_INET_Addr((u_short)(localAddress.getPort()),
401 (ACE_UINT32)INADDR_ANY);
403 ACE_SOCK_Dgram_Mcast::options mcastOptions = ACE_SOCK_Dgram_Mcast::DEFOPTS;
404 # if defined(__APPLE__)
405 mcastOptions =
static_cast<ACE_SOCK_Dgram_Mcast::options
>(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
408 auto* dmcast =
new ACE_SOCK_Dgram_Mcast(mcastOptions);
411 yCAssert(DGRAMTWOWAYSTREAM, dgram !=
nullptr);
415 result = dmcast->open(addr,
nullptr, 1);
417 result = restrictMcast(dmcast, group, ipLocal,
false);
421 yCError(DGRAMTWOWAYSTREAM,
"could not open multicast datagram socket");
430 struct sockaddr_in dgram_sin;
432 if ((s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
433 yCError(DGRAMTWOWAYSTREAM,
"could not create sender socket");
437 memset((
char*)&dgram_sin, 0,
sizeof(dgram_sin));
438 dgram_sin.sin_family = AF_INET;
439 dgram_sin.sin_port = htons(group.
getPort());
442 if (inet_pton(AF_INET, group.
getHost().c_str(), &dgram_sin.sin_addr) == 0) {
443 yCError(DGRAMTWOWAYSTREAM,
"could not set up mcast client");
446 if (connect(s, (
struct sockaddr*)&dgram_sin,
sizeof(dgram_sin)) == -1) {
447 yCError(DGRAMTWOWAYSTREAM,
"could not connect mcast client");
456 struct sockaddr_in sin;
457 socklen_t len =
sizeof(sin);
458 if (getsockname(dgram_sockfd, (
struct sockaddr*)&sin, &len) == 0 && sin.sin_family == AF_INET) {
459 local_port = ntohs(sin.sin_port);
464 configureSystemBuffers();
465 remoteAddress = group;
468 localHandle.set(localAddress.getPort(), localAddress.getHost().c_str());
469 remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
472 remoteAddress = group;
473 localAddress =
Contact(
"127.0.0.1", local_port);
474 localHandle = local_port;
475 remoteHandle = remoteAddress.getPort();
479 yCDebug(DGRAMTWOWAYSTREAM,
"Update: DGRAM from %s to %s", localAddress.toURI().c_str(), remoteAddress.toURI().c_str());
488 yCDebug(DGRAMTWOWAYSTREAM,
"subscribing to mcast address %s for %s", group.
toURI().c_str(), (sender ?
"writing" :
"reading"));
494 return openMcast(group, ipLocal);
500 #if defined(YARP_HAS_ACE)
501 ACE_SOCK_Dgram_Mcast::options mcastOptions = ACE_SOCK_Dgram_Mcast::DEFOPTS;
502 # if defined(__APPLE__)
503 mcastOptions =
static_cast<ACE_SOCK_Dgram_Mcast::options
>(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
506 auto* dmcast =
new ACE_SOCK_Dgram_Mcast(mcastOptions);
510 yCAssert(DGRAMTWOWAYSTREAM, dgram !=
nullptr);
516 result = dmcast->join(addr, 1);
519 result = restrictMcast(dmcast, group, ipLocal,
true);
522 result = dmcast->join(addr, 1);
526 yCError(DGRAMTWOWAYSTREAM,
"cannot connect to multi-cast address");
533 if ((s = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
534 yCError(DGRAMTWOWAYSTREAM,
"could not create receiver socket");
538 struct sockaddr_in addr;
542 memset(&addr, 0,
sizeof(addr));
543 addr.sin_family = AF_INET;
544 addr.sin_addr.s_addr = htonl(INADDR_ANY);
545 addr.sin_port = htons(group.
getPort());
548 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes,
sizeof(u_int)) < 0) {
549 yCError(DGRAMTWOWAYSTREAM,
"could not allow sockets use the same ADDRESS");
554 # if defined(__APPLE__)
555 if (setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &yes,
sizeof(u_int)) < 0) {
556 yCError(DGRAMTWOWAYSTREAM,
"could not allow sockets use the same PORT number");
563 if (bind(s, (
struct sockaddr*)&addr,
sizeof(addr)) == -1) {
564 yCError(DGRAMTWOWAYSTREAM,
"could not create mcast server");
570 if (inet_pton(AF_INET, group.
getHost().c_str(), &mreq.imr_multiaddr) == 0) {
571 yCError(DGRAMTWOWAYSTREAM,
"Could not set up the mcast server");
574 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
575 if (setsockopt(s, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
sizeof(mreq)) < 0) {
576 yCError(DGRAMTWOWAYSTREAM,
"could not join the multicast group");
577 yCError(DGRAMTWOWAYSTREAM,
"sendto: %d, %s", errno, strerror(errno));
585 configureSystemBuffers();
587 localAddress = group;
588 remoteAddress = group;
590 localHandle.set(localAddress.getPort(), localAddress.getHost().c_str());
591 remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
593 localHandle = localAddress.
getPort();
594 remoteHandle = remoteAddress.getPort();
609 if ((!closed) && (!interrupting) && happy) {
619 while (happy && ct > 0) {
622 if (mgram !=
nullptr) {
623 yCDebug(DGRAMTWOWAYSTREAM,
"* mcast interrupt, interface %s", restrictInterfaceIp.toString().c_str());
624 tmp.
join(localAddress,
true, restrictInterfaceIp);
626 yCDebug(DGRAMTWOWAYSTREAM,
"* dgram interrupt");
630 yCDebug(DGRAMTWOWAYSTREAM,
"* interrupt state %s %s %s",
631 (interrupting ?
"true" :
"false"),
632 (closed ?
"true" :
"false"),
633 (happy ?
"true" :
"false"));
635 for (
size_t i = 0; i < empty.
length(); i++) {
649 yCDebug(DGRAMTWOWAYSTREAM,
"dgram interrupt done");
652 interrupting =
false;
657 while (interrupting) {
658 yCDebug(DGRAMTWOWAYSTREAM,
"waiting for dgram interrupt to be finished...");
667 if (dgram !=
nullptr) {
675 while (interrupting) {
680 if (dgram !=
nullptr) {
681 #if defined(YARP_HAS_ACE)
685 if (dgram_sockfd >= 0) {
686 ::close(dgram_sockfd);
712 if (readAvail == 0) {
717 yCTrace(DGRAMTWOWAYSTREAM,
"DGRAM Waiting for something!");
719 #if defined(YARP_HAS_ACE)
720 if ((dgram !=
nullptr) && restrictInterfaceIp.isValid()) {
721 yCTrace(DGRAMTWOWAYSTREAM,
"Consider remote mcast");
722 yCTrace(DGRAMTWOWAYSTREAM,
"What we know:");
723 yCTrace(DGRAMTWOWAYSTREAM,
" %s", restrictInterfaceIp.toString().c_str());
724 yCTrace(DGRAMTWOWAYSTREAM,
" %s", localAddress.toString().c_str());
725 yCTrace(DGRAMTWOWAYSTREAM,
" %s", remoteAddress.toString().c_str());
727 ACE_INET_Addr iface(restrictInterfaceIp.getPort(),
728 restrictInterfaceIp.getHost().c_str());
729 ACE_INET_Addr dummy((u_short)0, (ACE_UINT32)INADDR_ANY);
730 result = dgram->recv(readBuffer.get(), readBuffer.length(), dummy);
731 yCDebug(DGRAMTWOWAYSTREAM,
"MCAST Got %zd bytes", result);
735 if (dgram !=
nullptr) {
736 yCAssert(DGRAMTWOWAYSTREAM, dgram !=
nullptr);
737 #if defined(YARP_HAS_ACE)
738 ACE_INET_Addr dummy((u_short)0, (ACE_UINT32)INADDR_ANY);
739 yCTrace(DGRAMTWOWAYSTREAM,
"DGRAM Waiting for something!");
740 result = dgram->recv(readBuffer.get(), readBuffer.length(), dummy);
742 result = recv(dgram_sockfd, readBuffer.get(), readBuffer.length(), 0);
744 yCDebug(DGRAMTWOWAYSTREAM,
"DGRAM Got %zd bytes", result);
748 if (monitor.length() > readBuffer.length()) {
749 printf(
"Too big!\n");
752 memcpy(readBuffer.get(), monitor.get(), monitor.length());
753 result = monitor.length();
756 if (closed || (result < 0)) {
768 if (bufferAlertNeeded && !bufferAlerted) {
769 yCError(DGRAMTWOWAYSTREAM,
"*** Multicast/UDP packet dropped - checksum error ***");
770 yCInfo(DGRAMTWOWAYSTREAM,
"The UDP/MCAST system buffer limit on your system is low.");
771 yCInfo(DGRAMTWOWAYSTREAM,
"You may get packet loss under heavy conditions.");
773 yCInfo(DGRAMTWOWAYSTREAM,
"To change the buffer limit on linux: sysctl -w net.core.rmem_max=8388608");
774 yCInfo(DGRAMTWOWAYSTREAM,
"(Might be something like: sudo /sbin/sysctl -w net.core.rmem_max=8388608)");
776 yCInfo(DGRAMTWOWAYSTREAM,
"To change the limit use: sysctl for Linux/FreeBSD, ndd for Solaris, no for AIX");
778 bufferAlerted =
true;
782 if (
now - lastReportTime > 1) {
783 yCError(DGRAMTWOWAYSTREAM,
"*** %d datagram packet(s) dropped - checksum error ***", errCount);
784 lastReportTime =
now;
801 size_t take = readAvail;
805 memcpy(b.
get(), readBuffer.get() + readAt, take);
817 yCTrace(DGRAMTWOWAYSTREAM,
"DGRAM prep writing");
818 yCTrace(DGRAMTWOWAYSTREAM,
"DGRAM write %zu bytes", b.
length());
823 if (writeBuffer.get() ==
nullptr) {
828 while (local.
length() > 0) {
829 yCTrace(DGRAMTWOWAYSTREAM,
"DGRAM prep writing");
832 bool shouldFlush =
false;
837 memcpy(writeBuffer.get() + writeAvail, local.
get(), rem);
849 if (writeBuffer.get() ==
nullptr) {
860 if (writeAvail > 0) {
864 #if defined(YARP_HAS_ACE)
865 if (mgram !=
nullptr) {
866 len = mgram->send(writeBuffer.get(), writeAvail);
867 yCDebug(DGRAMTWOWAYSTREAM,
"MCAST - wrote %zd bytes", len);
870 if (dgram !=
nullptr) {
871 #if defined(YARP_HAS_ACE)
872 len = dgram->send(writeBuffer.get(), writeAvail, remoteHandle);
874 len = send(dgram_sockfd, writeBuffer.get(), writeAvail, 0);
876 yCDebug(DGRAMTWOWAYSTREAM,
"DGRAM - wrote %zd bytes to %s", len, remoteAddress.toString().c_str());
878 Bytes b(writeBuffer.get(), writeAvail);
882 len = monitor.length();
885 if (len > writeBuffer.length() * 0.75) {
886 yCDebug(DGRAMTWOWAYSTREAM,
"long dgrams might need a little time");
903 }
while (
now - first < 0.001);
908 yCDebug(DGRAMTWOWAYSTREAM,
"DGRAM failed to send message with error: %s", strerror(errno));
913 if (writeAvail != 0) {
916 yCDebug(DGRAMTWOWAYSTREAM,
"dgram/mcast send behaving badly");
957 return monitor.bytes();
969 if (dgram ==
nullptr) {
972 #if defined(YARP_HAS_ACE)
973 return (dgram->set_option(IPPROTO_IP, IP_TOS, (
int*)&tos, (
int)
sizeof(tos)) == 0);
975 return (setsockopt(dgram_sockfd, IPPROTO_IP, IP_TOS, (
int*)&tos, (
int)
sizeof(tos)) == 0);
982 if (dgram ==
nullptr) {
985 #if defined(YARP_HAS_ACE)
987 dgram->get_option(IPPROTO_IP, IP_TOS, (
int*)&tos, &optlen);
990 getsockopt(dgram_sockfd, IPPROTO_IP, IP_TOS, (
int*)&tos, &optlen);