YARP
Yet Another Robot Platform
ShmemInputStream.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #include "ShmemInputStream.h"
11 #include "ShmemLogComponent.h"
12 
13 #include <yarp/os/Bytes.h>
15 
16 #include <ace/Lib_Find.h>
17 // In one the ACE headers there is a definition of "main" for WIN32
18 # ifdef main
19 # undef main
20 # endif
21 
22 
23 
25  m_bOpen(false),
26  m_ResizeNum(0),
27  m_Port(-1),
28  m_pAccessMutex(nullptr),
29  m_pWaitDataMutex(nullptr),
30  m_pMap(nullptr),
31  m_pData(nullptr),
32  m_pHeader(nullptr),
33  m_pSock(nullptr)
34 {
35 }
36 
38 {
39  close();
40 }
41 
43 {
44  return m_bOpen;
45 }
46 
47 bool ShmemInputStreamImpl::open(int port, ACE_SOCK_Stream* pSock, int size)
48 {
49  m_pSock = pSock;
50 
51  m_pAccessMutex = m_pWaitDataMutex = nullptr;
52  m_pMap = nullptr;
53  m_pData = nullptr;
54  m_pHeader = nullptr;
55  m_ResizeNum = 0;
56 
57  m_Port = port;
58 
59  char obj_name[2048];
60  char temp_dir_path[1024];
61 
62  if (ACE::get_temp_dir(temp_dir_path, 1024) == -1) {
63  yCError(SHMEMCARRIER, "ShmemHybridStream: no temp directory found.");
64  return false;
65  }
66 
67 #ifdef ACE_LACKS_SYSV_SHMEM
68 
69  snprintf(obj_name, 2048, "%sSHMEM_FILE_%d_%d", temp_dir_path, port, 0);
70 
71  m_pMap = new ACE_Shared_Memory_MM(obj_name, //const ACE_TCHAR *filename,
72  size + sizeof(ShmemHeader_t), //int len = -1,
73  O_RDWR, //int flags = O_RDWR | O_CREAT,
74  ACE_DEFAULT_FILE_PERMS, //int mode = ACE_DEFAULT_FILE_PERMS,
75  PROT_RDWR, //int prot = PROT_RDWR,
76  ACE_MAP_SHARED); //int share = ACE_MAP_PRIVATE,
77 
78 #else
79 
80  m_pMap = new ACE_Shared_Memory_SV(port, size + sizeof(ShmemHeader_t));
81 
82 #endif
83 
84  m_pHeader = (ShmemHeader_t*)m_pMap->malloc();
85  m_pData = (char*)(m_pHeader + 1);
86 
87 #ifdef _ACE_USE_SV_SEM
88  snprintf(obj_name, 2048, "%sSHMEM_ACCESS_MUTEX_%d", temp_dir_path, port);
89  m_pAccessMutex = new ACE_Mutex(USYNC_PROCESS, obj_name);
90  snprintf(obj_name, 2048, "%sSHMEM_WAITDATA_MUTEX_%d", temp_dir_path, port);
91  m_pWaitDataMutex = new ACE_Mutex(USYNC_PROCESS, obj_name);
92 #else
93  snprintf(obj_name, 2048, "SHMEM_ACCESS_MUTEX_%d", port);
94  m_pAccessMutex = new ACE_Process_Mutex(obj_name);
95  snprintf(obj_name, 2048, "SHMEM_WAITDATA_MUTEX_%d", port);
96  m_pWaitDataMutex = new ACE_Process_Mutex(obj_name);
97 #endif
98 
99  m_pWaitDataMutex->acquire();
100 
101  m_bOpen = true;
102 
103  return true;
104 }
105 
107 {
108  ++m_ResizeNum;
109 
110  ACE_Shared_Memory* pNewMap;
111 
112  yCDebug(SHMEMCARRIER, "input stream resize %d to %d", m_ResizeNum, m_pHeader->newsize);
113 
114 #ifdef ACE_LACKS_SYSV_SHMEM
115 
116  char file_path[1024];
117 
118  if (ACE::get_temp_dir(file_path, 1024) == -1) {
119  yCError(SHMEMCARRIER, "ShmemHybridStream: no temp directory found.");
120  return false;
121  }
122 
123  char file_name[2048];
124  snprintf(file_name, 2048, "%sSHMEM_FILE_%d_%d", file_path, m_Port, m_ResizeNum);
125 
126  pNewMap = new ACE_Shared_Memory_MM(file_name, //const ACE_TCHAR *filename,
127  m_pHeader->newsize + sizeof(ShmemHeader_t), //int len = -1,
128  O_RDWR, //int flags = O_RDWR | O_CREAT,
129  ACE_DEFAULT_FILE_PERMS, //int mode = ACE_DEFAULT_FILE_PERMS,
130  PROT_RDWR, //int prot = PROT_RDWR,
131  ACE_MAP_SHARED); //int share = ACE_MAP_PRIVATE,
132 
133 #else
134 
135  int shmemkey = (m_ResizeNum << 16) + m_Port;
136 
137  pNewMap = new ACE_Shared_Memory_SV(shmemkey, m_pHeader->size + sizeof(ShmemHeader_t));
138 
139 #endif
140 
141  if (!pNewMap) {
142  yCError(SHMEMCARRIER, "ShmemOutputStream can't create shared memory");
143  return false;
144  }
145 
146  auto* pNewHeader = (ShmemHeader_t*)pNewMap->malloc();
147  char* pNewData = (char*)(pNewHeader + 1);
148 
149  m_pMap->close();
150  delete m_pMap;
151 
152  m_pMap = pNewMap;
153  m_pHeader = pNewHeader;
154  m_pData = pNewData;
155 
156  return true;
157 }
158 
159 int ShmemInputStreamImpl::read(char* data, int len)
160 {
161  m_pAccessMutex->acquire();
162 
163  if (m_pHeader->close) {
164  m_pAccessMutex->release();
165  close();
166  return -1;
167  }
168 
169  while (m_pHeader->resize)
170  Resize();
171 
172  if (m_pHeader->avail < len) {
173  ++m_pHeader->waiting;
174  m_pAccessMutex->release();
175  return 0;
176  }
177 
178  if (m_pHeader->tail + len > m_pHeader->size) {
179  int first_block_size = m_pHeader->size - m_pHeader->tail;
180 
181  memcpy((void*)data, (void*)(m_pData + m_pHeader->tail), first_block_size);
182  memcpy((void*)(data + first_block_size), (void*)m_pData, len - first_block_size);
183  } else {
184  memcpy((void*)data, (void*)(m_pData + m_pHeader->tail), len);
185  }
186 
187  m_pHeader->avail -= len;
188  m_pHeader->tail += len;
190 
191  m_pAccessMutex->release();
192 
193  return len;
194 }
195 
197 {
198  m_ReadSerializerMutex.lock();
199 
200  if (!m_bOpen) {
201  m_ReadSerializerMutex.unlock();
202  return -1;
203  }
204 
205  char* data = b.get();
206  char* buf;
207  size_t len = b.length();
209 
210  while (!(ret = read(data, (int)len))) {
211 #ifdef _ACE_USE_SV_SEM
212  yarp::os::impl::YARP_timeval tv = ACE_OS::gettimeofday();
213  tv.sec(tv.sec() + 1);
214 #else
216 #endif
217 
218  m_pWaitDataMutex->acquire(tv);
219 
220  if (!m_pSock->recv(&buf, 1)) {
221  yCDebug(SHMEMCARRIER, "STREAM IS BROKEN");
222  close();
223  m_ReadSerializerMutex.unlock();
224  return -1;
225  }
226  }
227 
228  m_ReadSerializerMutex.unlock();
229 
230  return ret;
231 }
232 
234 {
235  if (!m_bOpen)
236  return;
237 
238  m_bOpen = false;
239 
240  m_pAccessMutex->acquire();
241  while (m_pHeader->waiting > 0) {
242  --m_pHeader->waiting;
243  m_pWaitDataMutex->release();
244  }
245  m_pHeader->close = true;
246  m_pAccessMutex->release();
247 
248  m_pAccessMutex->remove();
249  delete m_pAccessMutex;
250  m_pAccessMutex = nullptr;
251 
252  m_pWaitDataMutex->remove();
253  delete m_pWaitDataMutex;
254  m_pWaitDataMutex = nullptr;
255 
256  m_pMap->close();
257  delete m_pMap;
258  m_pMap = nullptr;
259 }
ShmemInputStreamImpl::m_pAccessMutex
ACE_Process_Mutex * m_pAccessMutex
Definition: ShmemInputStream.h:63
ShmemInputStreamImpl::read
yarp::conf::ssize_t read(yarp::os::Bytes &b)
Definition: ShmemInputStream.cpp:196
ShmemHeader_t::size
int size
Definition: ShmemTypes.h:20
ShmemInputStreamImpl::m_pSock
ACE_SOCK_Stream * m_pSock
Definition: ShmemInputStream.h:73
ret
bool ret
Definition: ImplementAxisInfo.cpp:72
ShmemHeader_t
Definition: ShmemTypes.h:16
ShmemHeader_t::close
bool close
Definition: ShmemTypes.h:18
ShmemHeader_t::resize
bool resize
Definition: ShmemTypes.h:17
ShmemInputStreamImpl::Resize
bool Resize()
Definition: ShmemInputStream.cpp:106
ShmemInputStreamImpl::close
void close()
Definition: ShmemInputStream.cpp:233
ShmemInputStreamImpl::isOk
bool isOk() const
Definition: ShmemInputStream.cpp:42
ShmemHeader_t::avail
int avail
Definition: ShmemTypes.h:25
yarp::os::Bytes::get
const char * get() const
Definition: Bytes.cpp:30
yarp::os::Bytes::length
size_t length() const
Definition: Bytes.cpp:25
SHMEMCARRIER
const yarp::os::LogComponent & SHMEMCARRIER()
Definition: ShmemLogComponent.cpp:16
ShmemInputStreamImpl::m_pData
char * m_pData
Definition: ShmemInputStream.h:70
yarp::conf::ssize_t
::ssize_t ssize_t
Definition: numeric.h:60
ShmemInputStreamImpl::open
bool open(int port, ACE_SOCK_Stream *pSock, int size=4096)
Definition: ShmemInputStream.cpp:47
yarp::os::Bytes
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
ShmemHeader_t::tail
int tail
Definition: ShmemTypes.h:24
ShmemInputStreamImpl::m_pMap
ACE_Shared_Memory * m_pMap
Definition: ShmemInputStream.h:69
ShmemInputStreamImpl::m_ReadSerializerMutex
std::mutex m_ReadSerializerMutex
Definition: ShmemInputStream.h:67
ShmemLogComponent.h
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
ShmemHeader_t::newsize
int newsize
Definition: ShmemTypes.h:21
ShmemInputStreamImpl::m_pWaitDataMutex
ACE_Process_Mutex * m_pWaitDataMutex
Definition: ShmemInputStream.h:64
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
ShmemInputStreamImpl::m_ResizeNum
int m_ResizeNum
Definition: ShmemInputStream.h:56
ShmemInputStreamImpl::m_pHeader
ShmemHeader_t * m_pHeader
Definition: ShmemInputStream.h:71
yarp::os::impl::YARP_timeval
struct timeval YARP_timeval
Definition: PlatformTime.h:35
ShmemInputStreamImpl::m_bOpen
bool m_bOpen
Definition: ShmemInputStream.h:54
Bytes.h
ShmemInputStreamImpl::~ShmemInputStreamImpl
~ShmemInputStreamImpl()
Definition: ShmemInputStream.cpp:37
ShmemInputStream.h
PlatformTime.h
ShmemInputStreamImpl::ShmemInputStreamImpl
ShmemInputStreamImpl()
Definition: ShmemInputStream.cpp:24
ShmemInputStreamImpl::m_Port
int m_Port
Definition: ShmemInputStream.h:57
ShmemHeader_t::waiting
int waiting
Definition: ShmemTypes.h:26