YARP
Yet Another Robot Platform
TcpRosStream.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * All rights reserved.
4  *
5  * This software may be modified and distributed under the terms of the
6  * BSD-3-Clause license. See the accompanying LICENSE file for details.
7  */
8 
9 #include "TcpRosStream.h"
10 #include "TcpRosLogComponent.h"
11 
12 #include <cstdio>
13 #include <cstring>
14 #include <cstdlib>
15 #include <yarp/os/NetType.h>
16 #include <yarp/os/NetInt32.h>
17 #include <yarp/os/Bottle.h>
18 #include <yarp/os/Port.h>
19 
21 
22 using namespace yarp::os;
23 using namespace yarp::wire_rep_utils;
24 using namespace std;
25 
27  if (!setInitiative) {
28  initiative = false;
29  setInitiative = true;
30  }
31 
32  if (kind!="") {
33  return twiddlerReader.read(b);
34  }
35 
36  if (phase==-1) return -1;
37  if (remaining==0) {
38  if (phase==1) {
39  phase = 2;
40  if (scan.length()>0) {
41  cursor = scan.get();
42  remaining = scan.length();
43  } else {
44  cursor = nullptr;
45  remaining = header.blobLen;
46  }
47  } else {
48  scan.clear();
49  phase = 0;
50  }
51  }
52  if (phase==0) {
53  if (expectTwiddle&&initiative) {
54  // There's a success/failure byte to check.
55  // Here we just consume it, but if it shows failure
56  // we should in fact expect a string afterwards.
57  char twiddle[1];
58  Bytes twiddle_buf(twiddle,1);
59  delegate->getInputStream().readFull(twiddle_buf);
60  }
61 
62  char mlen[4];
63  Bytes mlen_buf(mlen,4);
64  int res = delegate->getInputStream().readFull(mlen_buf);
65  if (res<4) {
66  yCTrace(TCPROSCARRIER, "tcpros_carrier failed, %s %d\n", __FILE__, __LINE__);
67  phase = -1;
68  return -1;
69  }
70  int len = NetType::netInt(mlen_buf);
71  yCTrace(TCPROSCARRIER, "Unit length %d\n", len);
72 
73  // inhibit type scanning for now, it is unreliable
74  if (raw==-1) raw = 2;
75  if (raw==-2) {
76  scan.allocate(len);
77  int res = delegate->getInputStream().readFull(scan.bytes());
78  yCTrace(TCPROSCARRIER, "Read %d bytes with raw==-2\n", res);
79  if (res<0) {
80  yCTrace(TCPROSCARRIER, "tcpros_carrier failed, %s %d\n", __FILE__, __LINE__);
81  phase = -1;
82  return -1;
83  }
84  int len_scan = scan.length();
85  if (WireBottle::checkBottle(scan.get(),len_scan)) {
86  yCTrace(TCPROSCARRIER, "Looks YARP-compatible\n");
87  raw = 1;
88  } else {
89  yCTrace(TCPROSCARRIER, "Looks strange, blobbing...\n");
90  raw = 0;
91  }
92  }
93 
94  header.init(len);
95  if (raw==1) {
96  phase = 2;
97  if (scan.length()>0) {
98  cursor = scan.get();
99  remaining = scan.length();
100  } else {
101  cursor = nullptr;
102  remaining = header.blobLen;
103  }
104  } else if (raw==2) {
105  cursor = nullptr;
106  remaining = header.blobLen;
107  phase = 2;
108  } else {
109  phase = 1;
110  cursor = (char*) &header;
111  remaining = sizeof(header);
112  }
113  }
114  yCTrace(TCPROSCARRIER, "phase %d remaining %d\n", phase, remaining);
115  if (remaining>0) {
116  if (cursor!=nullptr) {
117  int allow = remaining;
118  if ((int)b.length()<allow) {
119  allow = b.length();
120  }
121  memcpy(b.get(),cursor,allow);
122  cursor+=allow;
123  remaining-=allow;
124  yCTrace(TCPROSCARRIER, "%d bytes of header\n", allow);
125  return allow;
126  } else {
127  int result = delegate->getInputStream().read(b);
128  yCTrace(TCPROSCARRIER, "Read %d bytes\n", result);
129  if (result>0) {
130  remaining-=result;
131  yCTrace(TCPROSCARRIER, "%d bytes of meat\n", result);
132  return result;
133  }
134  }
135  }
136  phase = -1;
137  return -1;
138 }
139 
140 
141 void TcpRosStream::write(const Bytes& b) {
142  if (!setInitiative) {
143  initiative = true;
144  setInitiative = true;
145  }
146  yCTrace(TCPROSCARRIER, " [[[[[ write %d bytes ]]]]]\n", (int)b.length());
147  delegate->getOutputStream().write(b);
148 }
149 
150 
151 void TcpRosStream::updateKind(const char *kind, bool sender, bool reply) {
152  string code = rosToKind(kind);
153  if (code!="") {
154  configureTwiddler(twiddler,code.c_str(),kind,sender,reply);
155  this->kind = code;
156  } else {
157  this->kind = "";
158  }
159 }
160 
161 
162 std::map<std::string, std::string> TcpRosStream::rosToKind() {
163  std::map<std::string, std::string> kinds;
164  kinds["std_msgs/String"] = "vector string 1 *";
165  kinds["std_msgs/Int32"] = "vector int32 1 *";
166  kinds["std_msgs/Float64"] = "vector float64 1 *";
167 
168  // these two are specialized, TODO link them specifically to
169  // yarp/image and yarp/vector
170  kinds["sensor_msgs/Image"] = "list 4 skip uint32 * skip uint32 * skip uint32 * skip string * >height uint32 * >width uint32 * >encoding string * skip int8 * >step int32 * compute image_params <=[mat] vocab * <translated_encoding vocab * item_vector int32 5 <depth item * <img_size item * <quantum item * <width item * <height item * blob *";
171 
172  kinds["test_roscpp/TestStringString"] = "vector string 1 * --- vector string 1 *";
173  // kinds["rospy_tutorials/AddTwoInts"] = "vector int64 2 * --- vector int64 1 *";
174  return kinds;
175 }
176 
177 std::string TcpRosStream::rosToKind(const char *rosname) {
178  if (std::string(rosname)=="") return {};
179  std::map<std::string, std::string> kinds = rosToKind();
180 
181  if (kinds.find(rosname)!=kinds.end()) {
182  return kinds[rosname];
183  }
184  Port port;
185  port.openFake("yarpidl_rosmsg");
186  if (port.addOutput("/typ")) {
187  Bottle cmd, resp;
188  cmd.addString(std::string("twiddle ") + rosname);
189  yCTrace(TCPROSCARRIER, "QUERY yarpidl_rosmsg %s\n", cmd.toString().c_str());
190  port.write(cmd,resp);
191  yCTrace(TCPROSCARRIER, "GOT yarpidl_rosmsg %s\n", resp.toString().c_str());
192  std::string txt = resp.get(0).asString();
193  if (txt!="?") return txt;
194  }
195  port.close();
196  if (std::string(rosname)!="") {
197  yCError(TCPROSCARRIER, "Do not know anything about type '%s'\n", rosname);
198  yCError(TCPROSCARRIER, "Could not connect to a type server to look up type '%s'\n", rosname);
199  ::exit(1);
200  }
201  return {};
202 }
203 
204 
205 bool TcpRosStream::configureTwiddler(WireTwiddler& twiddler, const char *txt, const char *prompt, bool sender, bool reply) {
206  yCTrace(TCPROSCARRIER, "CONFIGURE AS %s [%s/%s]\n", txt,
207  sender?"sender":"receiver",
208  reply?"reply":"main");
209  std::string str(txt);
210  if (reply) {
211  size_t idx = str.find("---");
212  if (idx!=std::string::npos) {
213  str = str.substr(idx+3,str.length());
214  }
215  }
216  str = std::string("skip int32 * ") + str;
217  if (reply) {
218  str = std::string("skip int8 * ") + str;
219  }
220  return twiddler.configure(str.c_str(),prompt);
221 }
yarp::os::Port::close
void close() override
Stop port activity.
Definition: Port.cpp:357
yarp::os::Bottle
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:73
yarp::os::Bottle::toString
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition: Bottle.cpp:214
TcpRosStream::rosToKind
static std::map< std::string, std::string > rosToKind()
NetInt32.h
Port.h
TcpRosStream::updateKind
void updateKind(const char *kind, bool sender, bool reply)
TcpRosStream::configureTwiddler
static bool configureTwiddler(yarp::wire_rep_utils::WireTwiddler &twiddler, const char *txt, const char *prompt, bool sender, bool reply)
NetType.h
yarp::os::Port::addOutput
bool addOutput(const std::string &name) override
Add an output connection to the specified port.
Definition: Port.cpp:347
yarp::os::Port
A mini-server for network communication.
Definition: Port.h:50
yarp::os::Bottle::get
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:249
yarp::os::Bytes::get
const char * get() const
Definition: Bytes.cpp:30
yarp::os::Bytes::length
size_t length() const
Definition: Bytes.cpp:25
yarp::os::Value::asString
virtual std::string asString() const
Get string value.
Definition: Value.cpp:237
yarp::conf::ssize_t
::ssize_t ssize_t
Definition: numeric.h:60
checkBottle
static char * checkBottle(char *cursor, int &remaining, int ct, int list_tag)
Definition: WireBottle.cpp:25
yarp::wire_rep_utils::WireTwiddler::configure
bool configure(const char *txt, const char *prompt)
Definition: WireTwiddler.cpp:216
TcpRosStream::write
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
Definition: TcpRosStream.cpp:141
yarp::os::Bottle::addString
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:173
yarp::os::Bytes
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
yarp::os::Port::write
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
Definition: Port.cpp:430
yarp::wire_rep_utils::WireTwiddler
Definition: WireTwiddler.h:72
yarp::os::InputStream::read
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:23
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
TcpRosLogComponent.h
yarp::os::Port::openFake
bool openFake(const std::string &name)
Start port without making it accessible from the network.
Definition: Port.cpp:77
WireBottle.h
yCTrace
#define yCTrace(component,...)
Definition: LogComponent.h:88
yarp::wire_rep_utils
Definition: BlobNetworkHeader.h:18
TcpRosStream.h
yarp::os::NetType::netInt
static int netInt(const yarp::os::Bytes &code)
Definition: NetType.cpp:96
Bottle.h
TCPROSCARRIER
const yarp::os::LogComponent & TCPROSCARRIER()
Definition: TcpRosLogComponent.cpp:16