YARP
Yet Another Robot Platform
ShmemHybridStream.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #include "ShmemHybridStream.h"
11 #include "ShmemLogComponent.h"
12 
14  m_bLinked(false)
15 {
16 }
17 
19 {
20  close();
21 }
22 
23 int ShmemHybridStream::open(const yarp::os::Contact& yarp_address, bool sender)
24 {
25  m_bLinked = false;
26 
27  ACE_INET_Addr ace_address(yarp_address.getPort(), yarp_address.getHost().c_str());
28 
29  if (sender) {
30  return connect(ace_address);
31  } else {
32  ACE_INET_Addr ace_server_addr(ace_address.get_port_number());
33 
34  int result = m_Acceptor.open(ace_server_addr);
35 
36  if (result < 0) {
37  yCError(SHMEMCARRIER, "ShmemHybridStream open result %d", result);
38  return result;
39  }
40 
41  m_Acceptor.get_local_addr(ace_server_addr);
42 
43  m_LocalAddress = yarp::os::Contact(ace_server_addr.get_host_addr(), ace_server_addr.get_port_number());
44  m_RemoteAddress = m_LocalAddress; // finalized in call to accept()
45 
46  return result;
47  }
48 
49  return 1;
50 }
51 
53 {
54  if (m_bLinked) {
55  return -1;
56  }
57 
58  yarp::conf::ssize_t result = m_Acceptor.accept(m_SockStream);
59 
60  if (result < 0) {
61  yCError(SHMEMCARRIER, "ShmemHybridStream server returned %zd", result);
62  close();
63  return -1;
64  }
65 
66  ACE_INET_Addr local, remote;
67  m_SockStream.get_local_addr(local);
68  m_SockStream.get_remote_addr(remote);
69  m_LocalAddress = yarp::os::Contact(local.get_host_addr(), local.get_port_number());
70  m_RemoteAddress = yarp::os::Contact(remote.get_host_addr(), remote.get_port_number());
71 
72  ShmemPacket_t recv_conn_data;
73  result = m_SockStream.recv_n(&recv_conn_data, sizeof recv_conn_data);
74  if (result <= 0) {
75  yCError(SHMEMCARRIER, "Socket returned %zd", result);
76  close();
77  return -1;
78  }
79 
80  if (!in.open(m_RemoteAddress.getPort(), &m_SockStream)) {
81  yCError(SHMEMCARRIER, "ShmemHybridStream can't create shared memory");
82  close();
83  return -1;
84  }
85 
86  if (!out.open(m_LocalAddress.getPort())) {
87  yCError(SHMEMCARRIER, "ShmemHybridStream can't create shared memory");
88  close();
89  return -1;
90  }
91 
92  ShmemPacket_t send_conn_data;
93  send_conn_data.command = ACKNOWLEDGE;
94  if (m_SockStream.send_n(&send_conn_data, sizeof send_conn_data) <= 0) {
95  yCError(SHMEMCARRIER, "ShmemHybridStream socket writing error");
96  close();
97  return -1;
98  }
99 
100  m_bLinked = true;
101 
102  m_SockStream.enable(ACE_NONBLOCK);
103 
104  return 0;
105 }
106 
107 int ShmemHybridStream::connect(const ACE_INET_Addr& ace_address)
108 {
109  if (m_bLinked) {
110  return -1;
111  }
112 
113  ACE_SOCK_Connector connector;
114  yarp::conf::ssize_t result = connector.connect(m_SockStream, ace_address);
115  if (result < 0) {
116  yCError(SHMEMCARRIER, "ShmemHybridStream client returned %zd", result);
117  close();
118  return -1;
119  }
120 
121  ACE_INET_Addr local, remote;
122  m_SockStream.get_local_addr(local);
123  m_SockStream.get_remote_addr(remote);
124  m_LocalAddress = yarp::os::Contact(local.get_host_addr(), local.get_port_number());
125  m_RemoteAddress = yarp::os::Contact(remote.get_host_addr(), remote.get_port_number());
126 
127  out.open(m_LocalAddress.getPort());
128 
129  ShmemPacket_t send_conn_data;
130  send_conn_data.command = CONNECT;
131  send_conn_data.size = SHMEM_DEFAULT_SIZE;
132  result = m_SockStream.send_n(&send_conn_data, sizeof send_conn_data);
133  if (result <= 0) {
134  yCError(SHMEMCARRIER, "Socket returned %zd", result);
135  close();
136  return -1;
137  }
138 
139  ShmemPacket_t recv_conn_data;
140  result = m_SockStream.recv_n(&recv_conn_data, sizeof recv_conn_data);
141  if (result <= 0) {
142  yCError(SHMEMCARRIER, "Socket returned %zd", result);
143  close();
144  return -1;
145  }
146 
147  in.open(m_RemoteAddress.getPort(), &m_SockStream);
148 
149  m_bLinked = true;
150 
151  m_SockStream.enable(ACE_NONBLOCK);
152 
153  return 0;
154 }
155 
157 {
158  m_bLinked = false;
159  in.close();
160  out.close();
161 }
162 
164 {
165  yCDebug(SHMEMCARRIER, "INTERRUPT");
166  close();
167 }
168 
170 {
171  if (!out.write(b)) {
172  close();
173  }
174 }
175 
177 {
178  yarp::conf::ssize_t ret = in.read(b);
179  if (ret == -1) {
180  close();
181  }
182  return ret;
183 }
184 
186 {
187  return *this;
188 }
189 
191 {
192  return *this;
193 }
194 
196 {
197  return m_bLinked && in.isOk() && out.isOk();
198 }
199 
201 {
202  yCDebug(SHMEMCARRIER, "RECEIVED RESET COMMAND");
203  close();
204 }
205 
207 {
208 }
209 
211 {
212 }
213 
215 {
216  return m_LocalAddress;
217 }
218 
220 {
221  return m_RemoteAddress;
222 }
ShmemHybridStream::beginPacket
void beginPacket() override
Mark the beginning of a logical packet.
Definition: ShmemHybridStream.cpp:206
ShmemHybridStream::interrupt
void interrupt() override
Interrupt the stream.
Definition: ShmemHybridStream.cpp:163
ShmemPacket_t
Definition: ShmemTypes.h:30
ShmemHybridStream::ShmemHybridStream
ShmemHybridStream()
Definition: ShmemHybridStream.cpp:13
ShmemOutputStreamImpl::write
bool write(const yarp::os::Bytes &b)
Definition: ShmemOutputStream.cpp:187
yarp::os::OutputStream
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:25
ShmemInputStreamImpl::read
yarp::conf::ssize_t read(yarp::os::Bytes &b)
Definition: ShmemInputStream.cpp:196
ret
bool ret
Definition: ImplementAxisInfo.cpp:72
ShmemHybridStream::getOutputStream
yarp::os::OutputStream & getOutputStream() override
Get an OutputStream to write to.
Definition: ShmemHybridStream.cpp:190
ShmemOutputStreamImpl::close
void close()
Definition: ShmemOutputStream.cpp:232
ShmemHybridStream::getLocalAddress
const yarp::os::Contact & getLocalAddress() const override
Get the address of the local side of the stream.
Definition: ShmemHybridStream.cpp:214
SHMEM_DEFAULT_SIZE
#define SHMEM_DEFAULT_SIZE
Definition: ShmemTypes.h:13
ShmemHybridStream::isOk
bool isOk() const override
Check if the stream is ok or in an error state.
Definition: ShmemHybridStream.cpp:195
ShmemHybridStream::accept
int accept()
Definition: ShmemHybridStream.cpp:52
ShmemInputStreamImpl::close
void close()
Definition: ShmemInputStream.cpp:233
yarp::os::Contact::getPort
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:242
ShmemHybridStream::close
void close() override
Terminate the stream.
Definition: ShmemHybridStream.cpp:156
ShmemInputStreamImpl::isOk
bool isOk() const
Definition: ShmemInputStream.cpp:42
ShmemOutputStreamImpl::open
bool open(int port, int size=4096)
Definition: ShmemOutputStream.cpp:45
ShmemOutputStreamImpl::isOk
bool isOk() const
Definition: ShmemOutputStream.cpp:40
SHMEMCARRIER
const yarp::os::LogComponent & SHMEMCARRIER()
Definition: ShmemLogComponent.cpp:16
ShmemHybridStream::endPacket
void endPacket() override
Mark the end of a logical packet (see beginPacket).
Definition: ShmemHybridStream.cpp:210
yarp::conf::ssize_t
::ssize_t ssize_t
Definition: numeric.h:60
ShmemInputStreamImpl::open
bool open(int port, ACE_SOCK_Stream *pSock, int size=4096)
Definition: ShmemInputStream.cpp:47
yarp::os::Bytes
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
ShmemHybridStream::~ShmemHybridStream
virtual ~ShmemHybridStream()
Definition: ShmemHybridStream.cpp:18
ShmemLogComponent.h
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
ShmemPacket_t::size
int size
Definition: ShmemTypes.h:32
yarp::os::InputStream::read
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:23
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
yarp::os::Contact::getHost
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:231
yarp::os::Contact
Represents how to reach a part of a YARP network.
Definition: Contact.h:39
ShmemHybridStream.h
ShmemHybridStream::write
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
Definition: ShmemHybridStream.cpp:169
yarp::os::InputStream
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:29
ShmemHybridStream::reset
void reset() override
Reset the stream.
Definition: ShmemHybridStream.cpp:200
ShmemHybridStream::getRemoteAddress
const yarp::os::Contact & getRemoteAddress() const override
Get the address of the remote side of the stream.
Definition: ShmemHybridStream.cpp:219
ShmemHybridStream::getInputStream
yarp::os::InputStream & getInputStream() override
Get an InputStream to read from.
Definition: ShmemHybridStream.cpp:185
ShmemPacket_t::command
int command
Definition: ShmemTypes.h:31
ShmemHybridStream::open
int open(const yarp::os::Contact &yarp_address, bool sender)
Definition: ShmemHybridStream.cpp:23