YARP
Yet Another Robot Platform
UnixSocketCarrier.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * All rights reserved.
4  *
5  * This software may be modified and distributed under the terms of the
6  * BSD-3-Clause license. See the accompanying LICENSE file for details.
7  */
8 
9 #include "UnixSocketCarrier.h"
10 
11 #include <yarp/conf/environment.h>
12 #include <yarp/conf/filesystem.h>
13 
15 #include <yarp/os/Log.h>
16 #include <yarp/os/LogStream.h>
17 #include <yarp/os/Os.h>
18 
19 #include "UnixSocketLogComponent.h"
20 
21 #include <array>
22 #include <mutex>
23 
24 using namespace yarp::os;
25 namespace fs = yarp::conf::filesystem;
26 
27 namespace {
28 
29 // FIXME: This method should be available somewhere in YARP
30 std::string getYARPRuntimeDir()
31 {
32  static std::mutex m;
33  std::lock_guard<std::mutex> lock(m);
34 
35  static std::string yarp_runtime_dir;
36  bool found = false;
37 
38  // If already populated, there is nothing to do
39  if (!yarp_runtime_dir.empty()) {
40  return yarp_runtime_dir;
41  }
42 
43  // Check YARP_RUNTIME_DIR
44  yarp_runtime_dir = yarp::conf::environment::getEnvironment("YARP_RUNTIME_DIR", &found);
45  if (found) {
46  return yarp_runtime_dir;
47  }
48 
49  // Check XDG_RUNTIME_DIR
50  std::string xdg_runtime_dir = yarp::conf::environment::getEnvironment("XDG_RUNTIME_DIR", &found);
51  if (found) {
52  yarp_runtime_dir = xdg_runtime_dir + fs::preferred_separator + "yarp";
53  return yarp_runtime_dir;
54  }
55 
56  // Use /tmp/runtime-user
57  std::string user = yarp::conf::environment::getEnvironment("USER", &found);
58  if (found) {
59  yarp_runtime_dir = "/tmp/runtime-" + user + fs::preferred_separator + "yarp";
60  return yarp_runtime_dir;
61  }
62 
63  // ERROR
64  return {};
65 }
66 
72 bool isUnixSockSupported(ConnectionState& proto) // FIXME Why is this method unused?
73 {
76 
77  if (remote.getHost() != local.getHost()) {
79  "The ports are on different machines, unix socket not supported...");
80  return false;
81  }
82  return true;
83 }
84 
85 } // namespace
86 
88 {
89  return new UnixSocketCarrier();
90 }
91 
92 std::string UnixSocketCarrier::getName() const
93 {
94  return name;
95 }
96 
98 {
99  return requireAckFlag;
100 }
101 
103 {
104  return false;
105 }
106 
108 {
109  if (header.length() != headerSize) {
110  return false;
111  }
112 
113  bool isUnix = true;
114  bool isUnix_ack = true;
115 
116  const char* target = headerCode;
117  const char* target_ack = headerCode_ack;
118  for (size_t i = 0; i < headerSize; i++) {
119  if (header.get()[i] != target[i]) {
120  isUnix = false;
121  }
122  if (header.get()[i] != target_ack[i]) {
123  isUnix_ack = false;
124  }
125  }
126 
127  return (isUnix || isUnix_ack);
128 }
129 
131 {
132  const char* target = requireAckFlag ? headerCode_ack : headerCode;
133  for (size_t i = 0; i < headerSize && i < header.length(); i++) {
134  header.get()[i] = target[i];
135  }
136 }
137 
139 {
140  YARP_UNUSED(proto);
141  YARP_UNUSED(writer);
142 
143  return true;
144 }
145 
147 {
148  YARP_UNUSED(proto);
149 
150  return true;
151 }
152 
154 {
155  if (requireAckFlag) {
156  const Bytes ack_bytes(const_cast<char*>(ack_string), ack_string_size);
157  proto.os().write(ack_bytes);
158  }
159  return true;
160 }
161 
163 {
164  if (requireAckFlag) {
165  std::array<char, ack_string_size> buf;
166  Bytes ack(buf.data(), buf.size());
167  yarp::conf::ssize_t hdr = proto.is().readFull(ack);
168  if (static_cast<size_t>(hdr) != ack.length()) {
169  yCDebug(UNIXSOCK_CARRIER, "Did not get ack");
170  return false;
171  }
172 
173  const char* target = ack_string;
174  for (size_t i = 0; i < ack_string_size; i++) {
175  if (ack.get()[i] != target[i]) {
176  yCDebug(UNIXSOCK_CARRIER, "Bad ack");
177  return false;
178  }
179  }
180  }
181  return true;
182 }
183 
185 {
186  // I am the receiver
187  return becomeUnixSocket(proto, false);
188 }
189 
191 {
192  // I am the sender
193  return becomeUnixSocket(proto, true);
194 }
195 
196 bool UnixSocketCarrier::becomeUnixSocket(ConnectionState& proto, bool sender)
197 {
198  if (!isUnixSockSupported(proto)) {
199  return false;
200  }
201 
202  Contact remote = proto.getStreams().getRemoteAddress();
203  Contact local = proto.getStreams().getLocalAddress();
204 
205  proto.takeStreams(nullptr); // free up port from tcp
206 
207  std::string runtime_dir = getYARPRuntimeDir();
208 
209  // Make sure that the path exists
210  if (runtime_dir.empty() || yarp::os::mkdir_p(runtime_dir.c_str(), 0) != 0) {
211  yCError(UNIXSOCK_CARRIER, "Failed to create directory %s", runtime_dir.c_str());
212  return false;
213  }
214 
215  if (sender) {
216  socketPath = runtime_dir + fs::preferred_separator + std::to_string(remote.getPort()) + "_" + std::to_string(local.getPort()) + ".sock";
217  } else {
218  socketPath = runtime_dir + fs::preferred_separator + std::to_string(local.getPort()) + "_" + std::to_string(remote.getPort()) + ".sock";
219  }
220 
221  stream = new UnixSockTwoWayStream(socketPath);
222  stream->setLocalAddress(local);
223  stream->setRemoteAddress(remote);
224 
225  if (!stream->open(sender)) {
226  delete stream;
227  stream = nullptr;
228  yCError(UNIXSOCK_CARRIER, "Failed to open stream on socket %s as %s", socketPath.c_str(), (sender ? "sender" : "receiver"));
229  return false;
230  }
231  yAssert(stream != nullptr);
232 
233  proto.takeStreams(stream);
234 
235  yCDebug(UNIXSOCK_CARRIER, "Connected on socket %s as %s", socketPath.c_str(), (sender ? "sender" : "receiver"));
236  return true;
237 }
238 
239 
241 {
242  Property options;
243  options.fromString(proto.getSenderSpecifier());
244  return configureFromProperty(options);
245 }
246 
248 {
249  if (options.check("ack")) {
250  yCInfo(UNIXSOCK_CARRIER, "ACK Enabled");
251  requireAckFlag = true;
252  }
253  return true;
254 }
255 
257 {
258  const char* target_ack = headerCode_ack;
259  for (size_t i = 0; i < headerSize; i++) {
260  if (header.get()[i] != target_ack[i]) {
261  return;
262  }
263  }
264  yCInfo(UNIXSOCK_CARRIER, "ACK Enabled");
265  requireAckFlag = true;
266 }
LogStream.h
yarp::os::TwoWayStream::getLocalAddress
virtual const Contact & getLocalAddress() const =0
Get the address of the local side of the stream.
filesystem.h
UnixSocketCarrier
Communicating between two ports(IPC) via Unix Socket.
Definition: UnixSocketCarrier.h:28
UnixSocketCarrier::getHeader
void getHeader(yarp::os::Bytes &header) const override
Provide 8 bytes describing this connection sufficiently to allow the other side of a connection to se...
Definition: UnixSocketCarrier.cpp:130
yarp::conf::filesystem
Definition: filesystem.h:15
UnixSocketCarrier::expectAck
bool expectAck(yarp::os::ConnectionState &proto) override
Receive an acknowledgement, if expected for this carrier.
Definition: UnixSocketCarrier.cpp:162
UnixSocketCarrier::checkHeader
bool checkHeader(const yarp::os::Bytes &header) override
Given the first 8 bytes received on a connection, decide if this is the right carrier type to use for...
Definition: UnixSocketCarrier.cpp:107
UnixSocketCarrier::expectIndex
bool expectIndex(yarp::os::ConnectionState &proto) override
Expect a message header, if there is one for this carrier.
Definition: UnixSocketCarrier.cpp:146
yarp::os::Carrier
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:48
UnixSocketLogComponent.h
yarp::os::OutputStream::write
virtual void write(char ch)
Write a single byte to the stream.
Definition: OutputStream.cpp:17
yarp::conf::environment::getEnvironment
std::string getEnvironment(const char *key, bool *found=nullptr)
Read a variable from the environment.
Definition: environment.h:31
yarp::os::Property::fromString
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
Definition: Property.cpp:1046
UnixSocketCarrier::create
yarp::os::Carrier * create() const override
Factory method.
Definition: UnixSocketCarrier.cpp:87
UnixSocketCarrier::configure
bool configure(yarp::os::ConnectionState &proto) override
Give carrier a shot at looking at how the connection is set up.
Definition: UnixSocketCarrier.cpp:240
UnixSocketCarrier::expectReplyToHeader
bool expectReplyToHeader(yarp::os::ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
Definition: UnixSocketCarrier.cpp:190
YARP_UNUSED
#define YARP_UNUSED(var)
Definition: api.h:159
UnixSocketCarrier::isConnectionless
bool isConnectionless() const override
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
Definition: UnixSocketCarrier.cpp:102
Log.h
yarp::os::ConnectionState::takeStreams
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
yarp::os::ConnectionState::os
OutputStream & os()
Shorthand for getOutputStream()
Definition: ConnectionState.h:117
ConnectionState.h
yarp::os::Contact::getPort
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:242
yarp::os::Bytes::get
const char * get() const
Definition: Bytes.cpp:30
yarp::os::Bytes::length
size_t length() const
Definition: Bytes.cpp:25
Os.h
UnixSocketCarrier::setParameters
void setParameters(const yarp::os::Bytes &header) override
Configure this carrier based on the first 8 bytes of the connection.
Definition: UnixSocketCarrier.cpp:256
yarp::conf::ssize_t
::ssize_t ssize_t
Definition: numeric.h:60
yarp::os::TwoWayStream::getRemoteAddress
virtual const Contact & getRemoteAddress() const =0
Get the address of the remote side of the stream.
yarp::os::Property::check
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Property.cpp:1024
yarp::os::ConnectionState::getStreams
virtual TwoWayStream & getStreams()=0
Access the streams associated with the connection.
UnixSocketCarrier.h
yarp::os::Bytes
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
yarp::os::mkdir_p
int mkdir_p(const char *p, int ignoreLevels=0)
Create a directory and all parent directories needed.
Definition: Os.cpp:45
yarp::os::ConnectionState
The basic state of a connection - route, streams in use, etc.
Definition: ConnectionState.h:31
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
yarp::os::ConnectionState::getSenderSpecifier
virtual std::string getSenderSpecifier() const =0
Extract a name for the sender, if the connection type supports that.
yarp::os::InputStream::readFull
yarp::conf::ssize_t readFull(Bytes &b)
Keep reading until buffer is full.
Definition: InputStream.cpp:99
yCInfo
#define yCInfo(component,...)
Definition: LogComponent.h:135
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
UnixSocketCarrier::sendAck
bool sendAck(yarp::os::ConnectionState &proto) override
Send an acknowledgement, if needed for this carrier.
Definition: UnixSocketCarrier.cpp:153
environment.h
yarp::os::Contact::getHost
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:231
yarp::os::ConnectionState::is
InputStream & is()
Shorthand for getInputStream()
Definition: ConnectionState.h:125
UNIXSOCK_CARRIER
const yarp::os::LogComponent & UNIXSOCK_CARRIER()
Definition: UnixSocketLogComponent.cpp:16
yarp::conf::filesystem::preferred_separator
static constexpr value_type preferred_separator
Definition: filesystem.h:28
yarp::os::Contact
Represents how to reach a part of a YARP network.
Definition: Contact.h:39
UnixSocketCarrier::sendIndex
bool sendIndex(yarp::os::ConnectionState &proto, yarp::os::SizedWriter &writer) override
Definition: UnixSocketCarrier.cpp:138
UnixSockTwoWayStream
A stream abstraction for unix socket communication.
Definition: UnixSockTwoWayStream.h:25
UnixSocketCarrier::configureFromProperty
bool configureFromProperty(yarp::os::Property &options) override
Definition: UnixSocketCarrier.cpp:247
yAssert
#define yAssert(x)
Definition: Log.h:297
UnixSocketCarrier::respondToHeader
bool respondToHeader(yarp::os::ConnectionState &proto) override
Respond to the header.
Definition: UnixSocketCarrier.cpp:184
yarp::os::SizedWriter
Minimal requirements for an efficient Writer.
Definition: SizedWriter.h:36
yarp::os::Property
A class for storing options and configuration information.
Definition: Property.h:37
UnixSocketCarrier::requireAck
bool requireAck() const override
Check if carrier has flow control, requiring sent messages to be acknowledged by recipient.
Definition: UnixSocketCarrier.cpp:97
UnixSocketCarrier::getName
std::string getName() const override
Get the name of this connection type ("tcp", "mcast", "shmem", ...)
Definition: UnixSocketCarrier.cpp:92