YARP
Yet Another Robot Platform
ShmemOutputStream.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #include "ShmemOutputStream.h"
11 #include "ShmemLogComponent.h"
12 
13 #include <yarp/conf/numeric.h>
14 #include <yarp/os/Bytes.h>
15 #include <ace/Lib_Find.h>
16 // In one the ACE headers there is a definition of "main" for WIN32
17 # ifdef main
18 # undef main
19 # endif
20 
21 
22 
24  m_bOpen(false),
25  m_ResizeNum(0),
26  m_Port(0),
27  m_pAccessMutex(nullptr),
28  m_pWaitDataMutex(nullptr),
29  m_pMap(nullptr),
30  m_pData(nullptr),
31  m_pHeader(nullptr)
32 {
33 }
34 
36 {
37  close();
38 }
39 
41 {
42  return m_bOpen;
43 }
44 
45 bool ShmemOutputStreamImpl::open(int port, int size)
46 {
47  m_pAccessMutex = m_pWaitDataMutex = nullptr;
48  m_pMap = nullptr;
49  m_pData = nullptr;
50  m_pHeader = nullptr;
51  m_ResizeNum = 0;
52  m_Port = port;
53 
54  char obj_name[2048];
55  char temp_dir_path[1024];
56 
57  if (ACE::get_temp_dir(temp_dir_path, 1024) == -1) {
58  yCError(SHMEMCARRIER, "ShmemHybridStream: no temp directory found.");
59  return false;
60  }
61 
62 #ifdef ACE_LACKS_SYSV_SHMEM
63 
64  snprintf(obj_name, 2048, "%sSHMEM_FILE_%d_0", temp_dir_path, port);
65 
66  m_pMap = new ACE_Shared_Memory_MM(obj_name, //const ACE_TCHAR *filename,
67  size + sizeof(ShmemHeader_t), //int len = -1,
68  O_RDWR | O_CREAT, //int flags = O_RDWR | O_CREAT,
69  ACE_DEFAULT_FILE_PERMS, //int mode = ACE_DEFAULT_FILE_PERMS,
70  PROT_RDWR, //int prot = PROT_RDWR,
71  ACE_MAP_SHARED); //int share = ACE_MAP_PRIVATE,
72 
73 #else
74 
75  m_pMap = new ACE_Shared_Memory_SV(port, size + sizeof(ShmemHeader_t), ACE_Shared_Memory_SV::ACE_CREATE);
76 
77 #endif
78 
79  m_pHeader = (ShmemHeader_t*)m_pMap->malloc();
80  m_pData = (char*)(m_pHeader + 1);
81 
82 #ifdef _ACE_USE_SV_SEM
83  snprintf(obj_name, 2048, "%sSHMEM_ACCESS_MUTEX_%d", temp_dir_path, port);
84  m_pAccessMutex = new ACE_Mutex(USYNC_PROCESS, obj_name);
85  snprintf(obj_name, 2048, "%sSHMEM_WAITDATA_MUTEX_%d", temp_dir_path, port);
86  m_pWaitDataMutex = new ACE_Mutex(USYNC_PROCESS, obj_name);
87 #else
88  snprintf(obj_name, 2048, "SHMEM_ACCESS_MUTEX_%d", port);
89  m_pAccessMutex = new ACE_Process_Mutex(obj_name);
90  snprintf(obj_name, 2048, "SHMEM_WAITDATA_MUTEX_%d", port);
91  m_pWaitDataMutex = new ACE_Process_Mutex(obj_name);
92 #endif
93 
94  m_pAccessMutex->acquire();
95 
96  m_pHeader->resize = false;
97  m_pHeader->close = false;
98 
99  m_pHeader->avail = 0;
100  m_pHeader->head = 0;
101  m_pHeader->size = size;
102  m_pHeader->tail = 0;
103  m_pHeader->waiting = 0;
104 
105  m_pAccessMutex->release();
106 
107  m_bOpen = true;
108 
109  return true;
110 }
111 
113 {
114  ++m_ResizeNum;
115 
116  yCDebug(SHMEMCARRIER, "output stream resize %d to %d\n", m_ResizeNum, newsize);
117 
118  ACE_Shared_Memory* pNewMap;
119 
120  m_pHeader->resize = true;
121  m_pHeader->newsize = newsize;
122 
123 #ifdef ACE_LACKS_SYSV_SHMEM
124 
125  char file_path[1024];
126 
127  if (ACE::get_temp_dir(file_path, 1024) == -1) {
128  yCError(SHMEMCARRIER, "ShmemHybridStream: no temp directory found.");
129  return false;
130  }
131 
132  char file_name[2048];
133  snprintf(file_name, 2048, "%sSHMEM_FILE_%d_%d", file_path, m_Port, m_ResizeNum);
134 
135  pNewMap = new ACE_Shared_Memory_MM(file_name, //const ACE_TCHAR *filename,
136  newsize + sizeof(ShmemHeader_t), //int len = -1,
137  O_RDWR | O_CREAT, //int flags = O_RDWR | O_CREAT,
138  ACE_DEFAULT_FILE_PERMS, //int mode = ACE_DEFAULT_FILE_PERMS,
139  PROT_RDWR, //int prot = PROT_RDWR,
140  ACE_MAP_SHARED); //int share = ACE_MAP_PRIVATE,
141 
142 #else
143 
144  int shmemkey = (m_ResizeNum << 16) + m_Port;
145 
146  pNewMap = new ACE_Shared_Memory_SV(shmemkey, newsize + sizeof(ShmemHeader_t), ACE_Shared_Memory_SV::ACE_CREATE);
147 
148 #endif
149 
150  if (!pNewMap) {
151  yCError(SHMEMCARRIER, "ShmemOutputStream can't create shared memory");
152  return false;
153  }
154 
155  auto* pNewHeader = (ShmemHeader_t*)pNewMap->malloc();
156  char* pNewData = (char*)(pNewHeader + 1);
157 
158  pNewHeader->size = newsize;
159  pNewHeader->resize = false;
160  pNewHeader->close = m_pHeader->close;
161 
162  pNewHeader->tail = 0;
163  pNewHeader->head = pNewHeader->avail = m_pHeader->avail;
164  pNewHeader->waiting = m_pHeader->waiting;
165 
166  if (m_pHeader->avail) {
167  // one or two blocks in circular queue?
168  if (m_pHeader->tail < m_pHeader->head) {
169  memcpy(pNewData, m_pData + m_pHeader->tail, m_pHeader->avail);
170  } else {
171  int firstchunk = m_pHeader->size - m_pHeader->tail;
172  memcpy(pNewData, m_pData + m_pHeader->tail, firstchunk);
173  memcpy(pNewData + firstchunk, m_pData, m_pHeader->head);
174  }
175  }
176 
177  m_pMap->close();
178  delete m_pMap;
179  m_pMap = pNewMap;
180 
181  m_pHeader = pNewHeader;
182  m_pData = pNewData;
183 
184  return true;
185 }
186 
188 {
189  if (!m_bOpen) {
190  return false;
191  }
192 
193  m_pAccessMutex->acquire();
194 
195  if (!m_bOpen) {
196  return false;
197  }
198 
199  if (m_pHeader->close) {
200  m_pAccessMutex->release();
201  close();
202  return false;
203  }
204 
205  if ((int)m_pHeader->size - (int)m_pHeader->avail < (int)b.length()) {
206  yarp::conf::ssize_t required = m_pHeader->size + 2 * b.length();
207  Resize((int)required);
208  }
209 
210  if ((int)m_pHeader->head + (int)b.length() <= (int)m_pHeader->size) {
211  memcpy(m_pData + m_pHeader->head, b.get(), b.length());
212  } else {
213  int first_block_size = m_pHeader->size - m_pHeader->head;
214  memcpy(m_pData + m_pHeader->head, b.get(), first_block_size);
215  memcpy(m_pData, b.get() + first_block_size, b.length() - first_block_size);
216  }
217 
218  m_pHeader->avail += (int)b.length();
219  m_pHeader->head += (int)b.length();
221 
222  while (m_pHeader->waiting > 0) {
223  --m_pHeader->waiting;
224  m_pWaitDataMutex->release();
225  }
226 
227  m_pAccessMutex->release();
228 
229  return true;
230 }
231 
233 {
234  if (!m_bOpen)
235  return;
236 
237  m_bOpen = false;
238 
239  m_pAccessMutex->acquire();
240  while (m_pHeader->waiting > 0) {
241  --m_pHeader->waiting;
242  m_pWaitDataMutex->release();
243  }
244  m_pHeader->close = true;
245  m_pAccessMutex->release();
246 
247  m_pAccessMutex->remove();
248  delete m_pAccessMutex;
249  m_pAccessMutex = nullptr;
250 
251  m_pWaitDataMutex->remove();
252  delete m_pWaitDataMutex;
253  m_pWaitDataMutex = nullptr;
254 
255  m_pMap->close();
256  delete m_pMap;
257  m_pMap = nullptr;
258 }
ShmemOutputStreamImpl::write
bool write(const yarp::os::Bytes &b)
Definition: ShmemOutputStream.cpp:187
numeric.h
ShmemHeader_t::size
int size
Definition: ShmemTypes.h:20
ShmemOutputStreamImpl::m_ResizeNum
int m_ResizeNum
Definition: ShmemOutputStream.h:55
ShmemOutputStreamImpl::close
void close()
Definition: ShmemOutputStream.cpp:232
ShmemOutputStream.h
ShmemOutputStreamImpl::m_pWaitDataMutex
ACE_Process_Mutex * m_pWaitDataMutex
Definition: ShmemOutputStream.h:63
ShmemHeader_t
Definition: ShmemTypes.h:16
ShmemHeader_t::close
bool close
Definition: ShmemTypes.h:18
ShmemHeader_t::resize
bool resize
Definition: ShmemTypes.h:17
ShmemHeader_t::avail
int avail
Definition: ShmemTypes.h:25
ShmemOutputStreamImpl::m_pData
char * m_pData
Definition: ShmemOutputStream.h:67
ShmemOutputStreamImpl::open
bool open(int port, int size=4096)
Definition: ShmemOutputStream.cpp:45
ShmemOutputStreamImpl::m_bOpen
bool m_bOpen
Definition: ShmemOutputStream.h:53
yarp::os::Bytes::get
const char * get() const
Definition: Bytes.cpp:30
yarp::os::Bytes::length
size_t length() const
Definition: Bytes.cpp:25
ShmemOutputStreamImpl::Resize
bool Resize(int newsize)
Definition: ShmemOutputStream.cpp:112
ShmemOutputStreamImpl::isOk
bool isOk() const
Definition: ShmemOutputStream.cpp:40
ShmemOutputStreamImpl::~ShmemOutputStreamImpl
~ShmemOutputStreamImpl()
Definition: ShmemOutputStream.cpp:35
SHMEMCARRIER
const yarp::os::LogComponent & SHMEMCARRIER()
Definition: ShmemLogComponent.cpp:16
yarp::conf::ssize_t
::ssize_t ssize_t
Definition: numeric.h:60
yarp::os::Bytes
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
ShmemHeader_t::tail
int tail
Definition: ShmemTypes.h:24
ShmemOutputStreamImpl::m_pHeader
ShmemHeader_t * m_pHeader
Definition: ShmemOutputStream.h:68
ShmemLogComponent.h
yCError
#define yCError(component,...)
Definition: LogComponent.h:157
ShmemHeader_t::head
int head
Definition: ShmemTypes.h:23
ShmemHeader_t::newsize
int newsize
Definition: ShmemTypes.h:21
ShmemOutputStreamImpl::m_pAccessMutex
ACE_Process_Mutex * m_pAccessMutex
Definition: ShmemOutputStream.h:62
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
Bytes.h
ShmemOutputStreamImpl::ShmemOutputStreamImpl
ShmemOutputStreamImpl()
Definition: ShmemOutputStream.cpp:23
ShmemOutputStreamImpl::m_pMap
ACE_Shared_Memory * m_pMap
Definition: ShmemOutputStream.h:66
ShmemOutputStreamImpl::m_Port
int m_Port
Definition: ShmemOutputStream.h:56
ShmemHeader_t::waiting
int waiting
Definition: ShmemTypes.h:26