YARP
Yet Another Robot Platform
MpiP2PStream.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #include "MpiP2PStream.h"
11 
12 using namespace yarp::os;
13 
14 
16 // InputStream
17 
19  if (readAvail == 0) {
20  // get new data
21  reset();
22  int size;
23  int available = 0;
24  int tag = 0;
25  int rank = comm->rank();
26  MPI_Status status;
27  while (true) {
28  if (terminate)
29  return -1;
30  // Check for a message
31  MPI_Iprobe(!rank, tag, comm->comm, &available, &status);
32  if (available)
33  break;
34  // Prevent the busy polling which hurts
35  // performance in the oversubscription scenario
36  Time::yield();
37  }
38  MPI_Get_count(&status, MPI_BYTE, &size);
39  if (size == (int)b.length()) {
40  // size of received data matches expected data
41  // do not use buffer, but write directly
42  MPI_Recv(b.get(), size, MPI_BYTE, !rank, tag, comm->comm, &status);
43  return size;
44  }
45  else {
46  // allocate new buffer
47  readBuffer = new char[size];
48  MPI_Recv(readBuffer, size, MPI_BYTE, !rank, tag, comm->comm, &status);
49  yCDebug(MPI_CARRIER, "got new msg of size %d", size);
50  readAvail = size;
51  readAt = 0;
52  }
53  }
54  if (readAvail>0) {
55  // copy data from buffer to destination object
56  int take = readAvail;
57  if (take>(int)b.length()) {
58  take = (int)b.length();
59  }
60  memcpy(b.get(),readBuffer+readAt,take);
61  yCDebug(MPI_CARRIER, "read %d of %d", take, readAvail);
62  readAt += take;
63  readAvail -= take;
64  return take;
65  }
66  return 0;
67 }
68 
70 // OutputStream
71 
72 void MpiP2PStream::write(const Bytes& b) {
73  int size = b.length();
74  //MPI_Bcast(&size, 1, MPI_INT, MPI_ROOT, intercomm );
75  MPI_Request request;
76  MPI_Status status;
77  int flag = 0;
78  int rank = comm->rank();
79  //MPI_Send(b.get(), size, MPI_BYTE, 0, 0, intercomm);
80 
81  MPI_Isend(b.get(), size, MPI_BYTE, !rank , 0, comm->comm, &request );
82  while(true) {
83  /*
84  // TODO: Need to implement a mechanism for breaking!!
85  if (terminate)
86  break;
87  */
88  // Check if message has been received
89  MPI_Test(&request, &flag, &status);
90  if (flag)
91  break;
92  // Prevent the busy polling which hurts
93  // performance in the oversubscription scenario
94  Time::yield();
95  }
96 }
yarp::os::Bytes::get
const char * get() const
Definition: Bytes.cpp:30
yarp::os::Bytes::length
size_t length() const
Definition: Bytes.cpp:25
yarp::os::Time::yield
void yield()
The calling thread releases its remaining quantum upon calling this function.
Definition: Time.cpp:141
yarp::conf::ssize_t
::ssize_t ssize_t
Definition: numeric.h:60
yarp::os::Bytes
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
MpiP2PStream::write
void write(const yarp::os::Bytes &b) override=0
MpiP2PStream.h
yarp::os
An interface to the operating system, including Port based communication.
Definition: AbstractCarrier.h:17
yCDebug
#define yCDebug(component,...)
Definition: LogComponent.h:112
MPI_CARRIER
const yarp::os::LogComponent & MPI_CARRIER()
Definition: MpiLogComponent.cpp:16
MpiP2PStream::read
ssize_t read(yarp::os::Bytes &b) override=0
Read and return a single byte.