YARP
Yet Another Robot Platform
SocketTwoWayStream.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #ifndef YARP_OS_IMPL_SOCKETTWOWAYSTREAM_H
11 #define YARP_OS_IMPL_SOCKETTWOWAYSTREAM_H
12 
13 #include <yarp/conf/system.h>
14 
15 #include <yarp/os/Bytes.h>
16 #include <yarp/os/LogComponent.h>
17 #include <yarp/os/TwoWayStream.h>
20 #include <yarp/os/impl/TcpStream.h>
21 
22 #ifdef YARP_HAS_ACE // For TCP_CORK definition
23 # include <ace/os_include/netinet/os_tcp.h>
24 // In one the ACE headers there is a definition of "main" for WIN32
25 # ifdef main
26 # undef main
27 # endif
28 #else
29 # include <netinet/tcp.h>
30 #endif
31 
33 
34 namespace yarp {
35 namespace os {
36 namespace impl {
37 
42  public TwoWayStream,
43  public InputStream,
44  public OutputStream
45 {
46 public:
48  haveWriteTimeout(false),
49  haveReadTimeout(false),
50  happy(false)
51  {
52  }
53 
54  int open(const Contact& address);
55 
56  int open(yarp::os::impl::TcpAcceptor& acceptor);
57 
59  {
60  close();
61  }
62 
64  {
65  return *this;
66  }
67 
69  {
70  return *this;
71  }
72 
73  const Contact& getLocalAddress() const override
74  {
75  return localAddress;
76  }
77 
78  const Contact& getRemoteAddress() const override
79  {
80  return remoteAddress;
81  }
82 
83  void interrupt() override
84  {
85  yCDebug(SOCKETTWOWAYSTREAM, "Interrupting socket");
86  if (happy) {
87  happy = false;
88  stream.close_reader();
89  yCDebug(SOCKETTWOWAYSTREAM, "Interrupting socket reader");
90  stream.close_writer();
91  yCDebug(SOCKETTWOWAYSTREAM, "Interrupting socket writer");
92  stream.close();
93  yCDebug(SOCKETTWOWAYSTREAM, "Interrupting socket fully");
94  }
95  }
96 
97  void close() override
98  {
99  stream.close();
100  happy = false;
101  }
102 
105  {
106  if (!isOk()) {
107  return -1;
108  }
109  yarp::conf::ssize_t result;
110  if (haveReadTimeout) {
111  result = stream.recv_n(b.get(), b.length(), &readTimeout);
112  } else {
113  result = stream.recv_n(b.get(), b.length());
114  }
115  if (!happy) {
116  return -1;
117  }
118  if (result <= 0) {
119  happy = false;
120  yCDebug(SOCKETTWOWAYSTREAM, "bad socket read");
121  }
122  return result;
123  }
124 
126  {
127  if (!isOk()) {
128  return -1;
129  }
130  yarp::conf::ssize_t result;
131  if (haveReadTimeout) {
132  result = stream.recv(b.get(), b.length(), &readTimeout);
133  } else {
134  result = stream.recv(b.get(), b.length());
135  }
136  if (!happy) {
137  return -1;
138  }
139  if (result <= 0) {
140  happy = false;
141  yCDebug(SOCKETTWOWAYSTREAM, "bad socket read");
142  }
143  return result;
144  }
145 
147  void write(const Bytes& b) override
148  {
149  if (!isOk()) {
150  return;
151  }
152  yarp::conf::ssize_t result;
153  if (haveWriteTimeout) {
154  result = stream.send_n(b.get(), b.length(), &writeTimeout);
155  } else {
156  result = stream.send_n(b.get(), b.length());
157  }
158  if (result < 0) {
159  happy = false;
160  yCDebug(SOCKETTWOWAYSTREAM, "bad socket write");
161  }
162  }
163 
164  void flush() override
165  {
166 #ifdef TCP_CORK
167  int status = 0;
168  int sizeInt = sizeof(int);
169  stream.get_option(IPPROTO_TCP, TCP_CORK, &status, &sizeInt);
170  if (status == 1) {
171  // Remove CORK
172  int zero = 0;
173  stream.set_option(IPPROTO_TCP, TCP_CORK, &zero, sizeof(int));
174  // Set CORK
175  int one = 1;
176  stream.set_option(IPPROTO_TCP, TCP_CORK, &one, sizeof(int));
177  }
178 #endif
179  }
180 
181  bool isOk() const override
182  {
183  return happy;
184  }
185 
186  void reset() override
187  {
188  }
189 
190  void beginPacket() override
191  {
192 #ifdef TCP_CORK
193  // Set CORK
194  int one = 1;
195  stream.set_option(IPPROTO_TCP, TCP_CORK, &one, sizeof(int));
196 #endif
197  }
198 
199  void endPacket() override
200  {
201 #ifdef TCP_CORK
202  // Remove CORK
203  int zero = 0;
204  stream.set_option(IPPROTO_TCP, TCP_CORK, &zero, sizeof(int));
205 #endif
206  }
207 
208  bool setWriteTimeout(double timeout) override
209  {
210  if (timeout < 1e-12) {
211  haveWriteTimeout = false;
212  } else {
213  PLATFORM_TIME_SET(writeTimeout, timeout);
214  haveWriteTimeout = true;
215  }
216  return true;
217  }
218 
219  bool setReadTimeout(double timeout) override
220  {
221  if (timeout < 1e-12) {
222  haveReadTimeout = false;
223  } else {
224  PLATFORM_TIME_SET(readTimeout, timeout);
225  haveReadTimeout = true;
226  }
227  return true;
228  }
229 
230  bool setTypeOfService(int tos) override;
231  int getTypeOfService() override;
232 
233 private:
234  yarp::os::impl::TcpStream stream;
235  bool haveWriteTimeout;
236  bool haveReadTimeout;
237  YARP_timeval writeTimeout;
238  YARP_timeval readTimeout;
239  Contact localAddress, remoteAddress;
240  bool happy;
241  void updateAddresses();
242 };
243 
244 } // namespace impl
245 } // namespace os
246 } // namespace yarp
247 
248 #endif // YARP_OS_IMPL_SOCKETTWOWAYSTREAM_H
yarp::os::impl::SocketTwoWayStream::isOk
bool isOk() const override
Check if the stream is ok or in an error state.
Definition: SocketTwoWayStream.h:181
yarp::os::impl::SocketTwoWayStream::setReadTimeout
bool setReadTimeout(double timeout) override
Set activity timeout.
Definition: SocketTwoWayStream.h:219
yarp::os::TwoWayStream
A stream which can be asked to perform bidirectional communication.
Definition: TwoWayStream.h:29
yarp::os::impl::SocketTwoWayStream::getRemoteAddress
const Contact & getRemoteAddress() const override
Get the address of the remote side of the stream.
Definition: SocketTwoWayStream.h:78
YARP_DECLARE_LOG_COMPONENT
#define YARP_DECLARE_LOG_COMPONENT(name)
Definition: LogComponent.h:77
yarp::os::OutputStream::write
virtual void write(char ch)
Write a single byte to the stream.
Definition: OutputStream.cpp:17
yarp::os::impl::SocketTwoWayStream::beginPacket
void beginPacket() override
Mark the beginning of a logical packet.
Definition: SocketTwoWayStream.h:190
yarp::os::OutputStream
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:25
yarp::os::impl::SocketTwoWayStream::write
void write(const Bytes &b) override
Write a block of bytes to the stream.
Definition: SocketTwoWayStream.h:147
yarp::os::impl::SocketTwoWayStream::setWriteTimeout
bool setWriteTimeout(double timeout) override
Set activity timeout.
Definition: SocketTwoWayStream.h:208
TwoWayStream.h
yarp::os::impl::SocketTwoWayStream::getLocalAddress
const Contact & getLocalAddress() const override
Get the address of the local side of the stream.
Definition: SocketTwoWayStream.h:73
yarp::os::impl::SocketTwoWayStream::interrupt
void interrupt() override
Interrupt the stream.
Definition: SocketTwoWayStream.h:83
yarp::os::impl::SocketTwoWayStream
A stream abstraction for socket communication.
Definition: SocketTwoWayStream.h:45
yarp::os::impl::SocketTwoWayStream::SocketTwoWayStream
SocketTwoWayStream()
Definition: SocketTwoWayStream.h:47
TcpStream.h
yarp::os::Bytes::get
const char * get() const
Definition: Bytes.cpp:30
yarp::os::impl::SocketTwoWayStream::endPacket
void endPacket() override
Mark the end of a logical packet (see beginPacket).
Definition: SocketTwoWayStream.h:199
yarp::os::Bytes::length
size_t length() const
Definition: Bytes.cpp:25
TcpAcceptor.h
SOCKETTWOWAYSTREAM
const yarp::os::LogComponent & SOCKETTWOWAYSTREAM()
Definition: SocketTwoWayStream.cpp:33
yarp::conf::ssize_t
::ssize_t ssize_t
Definition: numeric.h:60
yarp::os::impl::SocketTwoWayStream::getInputStream
InputStream & getInputStream() override
Get an InputStream to read from.
Definition: SocketTwoWayStream.h:63
yarp::os::Bytes
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
PLATFORM_TIME_SET
#define PLATFORM_TIME_SET(x, y)
Definition: PlatformTime.h:25
LogComponent.h
system.h
yarp::os::impl::SocketTwoWayStream::read
yarp::conf::ssize_t read(Bytes &b) override
Read a block of data from the stream.
Definition: SocketTwoWayStream.h:104
yarp::os::impl::SocketTwoWayStream::close
void close() override
Terminate the stream.
Definition: SocketTwoWayStream.h:97
yarp::os::InputStream::read
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:23
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
yarp::os::impl::YARP_timeval
struct timeval YARP_timeval
Definition: PlatformTime.h:35
yarp::os::impl::SocketTwoWayStream::reset
void reset() override
Reset the stream.
Definition: SocketTwoWayStream.h:186
yarp
The main, catch-all namespace for YARP.
Definition: environment.h:18
Bytes.h
yarp::os::impl::SocketTwoWayStream::~SocketTwoWayStream
~SocketTwoWayStream() override
Definition: SocketTwoWayStream.h:58
YARP_os_impl_API
#define YARP_os_impl_API
Definition: api.h:45
yarp::os::Contact
Represents how to reach a part of a YARP network.
Definition: Contact.h:39
yarp::os::impl::SocketTwoWayStream::flush
void flush() override
Make sure all pending write operations are finished.
Definition: SocketTwoWayStream.h:164
yarp::os::impl::SocketTwoWayStream::getOutputStream
OutputStream & getOutputStream() override
Get an OutputStream to write to.
Definition: SocketTwoWayStream.h:68
yarp::os::InputStream
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:29
PlatformTime.h
yarp::os::impl::SocketTwoWayStream::partialRead
yarp::conf::ssize_t partialRead(Bytes &b) override
Like read, but solicit partial responses.
Definition: SocketTwoWayStream.h:125