YARP
Yet Another Robot Platform
MpiBcastStream.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #include "MpiBcastStream.h"
11 
12 using namespace yarp::os;
13 
14 
16  comm->sema.wait();
17  int cmd = CMD_JOIN;
18  MPI_Bcast(&cmd, 1, MPI_INT, 0,comm->comm);
19 }
20 
21 
22 
23 // Connection commands
24 
25 void MpiBcastStream::execCmd(int cmd) {
26  switch (cmd) {
27  case CMD_JOIN:
28  // Connect:
29  // Let a new port join the broadcast group
30  comm->accept();
31  break;
32  case CMD_DISCONNECT:
33  // Disconnect:
34  // Let a port leave the broadcast group
35  int length;
36  MPI_Bcast(&length, 1, MPI_INT, 0,comm->comm);
37  char* remote = new char[length];
38  MPI_Bcast(remote, length, MPI_CHAR, 0,comm->comm);
39  terminate = !strcmp(remote, name.c_str());
40  yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] Got disconnect : %s => %d", name.c_str(), remote, terminate);
41  delete [] remote;
42  comm->disconnect(terminate);
43  break;
44  }
45 }
46 
47 
48 
50 // InputStream
51 
53  if (terminate) {
54  return -1;
55  }
56  if (readAvail == 0) {
57  // get new data
58  reset();
59  int size;
60  yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] Trying to read", name.c_str());
61 
62  MPI_Bcast(&size, 1, MPI_INT, 0,comm->comm);
63  yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] got size %d", name.c_str(), size);
64  if (size < 0) {
65  execCmd(size);
66  return 0;
67  }
68  if ((size_t)size == b.length()) {
69  // size of received data matches expected data
70  // do not use buffer, but write directly
71  MPI_Bcast(b.get(), size, MPI_BYTE, 0, comm->comm);
72  return size;
73  }
74  else {
75  // allocate new buffer
76  readBuffer = new char[size];
77  MPI_Bcast(readBuffer, size, MPI_BYTE, 0, comm->comm);
78  yCDebug(MPI_CARRIER, "got new msg of size %d", size);
79  readAvail = size;
80  readAt = 0;
81  }
82  }
83  if (readAvail>0) {
84  // copy data from buffer to destination object
85  int take = readAvail;
86  if (take>(int)b.length()) {
87  take = b.length();
88  }
89  memcpy(b.get(),readBuffer+readAt,take);
90  yCDebug(MPI_CARRIER, "read %d of %d", take, readAvail);
91  readAt += take;
92  readAvail -= take;
93  return take;
94  }
95  return 0;
96 }
97 
99 // OutputStream
100 
101 void MpiBcastStream::write(const Bytes& b) {
102  yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] getting sema for write", name.c_str());
103  comm->sema.wait();
104 
105  yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] trying to write", name.c_str());
106  int size = b.length();
107  MPI_Bcast(&size, 1, MPI_INT, 0, comm->comm );
108  MPI_Bcast((void*)b.get(), size, MPI_BYTE, 0, comm->comm );
109  comm->sema.post();
110 
111  yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] done writing", name.c_str());
112 }
CMD_JOIN
#define CMD_JOIN
Definition: MpiBcastStream.h:17
MpiBcastStream::write
void write(const yarp::os::Bytes &b) override=0
CMD_DISCONNECT
#define CMD_DISCONNECT
Definition: MpiBcastStream.h:18
MpiBcastStream::execCmd
void execCmd(int cmd)
Definition: MpiBcastStream.cpp:25
MpiBcastStream::read
ssize_t read(yarp::os::Bytes &b) override=0
Read and return a single byte.
yarp::os::Bytes::get
const char * get() const
Definition: Bytes.cpp:30
yarp::os::Bytes::length
size_t length() const
Definition: Bytes.cpp:25
yarp::conf::ssize_t
::ssize_t ssize_t
Definition: numeric.h:60
MpiBcastStream::startJoin
void startJoin()
Definition: MpiBcastStream.cpp:15
yarp::os::Bytes
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
MPI_CARRIER
const yarp::os::LogComponent & MPI_CARRIER()
Definition: MpiLogComponent.cpp:16
MpiBcastStream.h