33 #ifndef DOXYGEN_SHOULD_SKIP_THIS
34 class PortReaderPacket
37 PortReaderPacket *prev_, *next_;
51 prev_ = next_ =
nullptr;
58 virtual ~PortReaderPacket()
66 if (reader !=
nullptr) {
83 this->reader = reader;
94 this->external = reader;
95 this->writer = writer;
98 void setEnvelope(
const Bytes& bytes)
100 envelope = std::string(bytes.
get(), bytes.
length());
106 if (writer !=
nullptr) {
118 std::list<PortReaderPacket*> inactive;
119 std::list<PortReaderPacket*> active;
124 return active.size();
129 return inactive.size();
132 PortReaderPacket* getInactivePacket()
134 if (inactive.empty()) {
135 PortReaderPacket* obj =
nullptr;
136 obj =
new PortReaderPacket();
137 inactive.push_back(obj);
139 PortReaderPacket* next = inactive.front();
140 yCAssert(PORTREADERBUFFERBASE, next !=
nullptr);
141 inactive.remove(next);
145 PortReaderPacket* getActivePacket()
147 PortReaderPacket* next =
nullptr;
148 if (getCount() >= 1) {
149 next = active.front();
150 yCAssert(PORTREADERBUFFERBASE, next !=
nullptr);
156 void addActivePacket(PortReaderPacket* packet)
158 if (packet !=
nullptr) {
159 active.push_back(packet);
163 void addInactivePacket(PortReaderPacket* packet)
165 if (packet !=
nullptr) {
166 inactive.push_back(packet);
172 while (!active.empty()) {
173 delete active.back();
176 while (!inactive.empty()) {
177 delete inactive.back();
184 class PortReaderBufferBase::Private
188 PortReaderPacket* prev;
192 unsigned int maxBuffer;
204 std::mutex stateMutex;
210 maxBuffer(maxBuffer),
225 Port* closePort =
nullptr;
227 if (port !=
nullptr) {
231 if (closePort !=
nullptr) {
241 if (prev !=
nullptr) {
242 pool.addInactivePacket(prev);
250 std::string getName()
252 if (port !=
nullptr) {
258 PortReaderPacket* get()
260 PortReaderPacket* result =
nullptr;
262 if (pool.getFree() == 0) {
264 if (maxBuffer == 0 || pool.getCount() < maxBuffer) {
272 result = pool.getInactivePacket();
280 return (
int)pool.getCount();
283 PortReaderPacket* getContent()
285 if (prev !=
nullptr) {
286 pool.addInactivePacket(prev);
289 if (pool.getCount() >= 1) {
290 prev = pool.getActivePacket();
299 if (prev ==
nullptr) {
303 sis.
add(prev->envelope);
307 sbr.
reset(sis,
nullptr, route, 0,
true);
308 return envelope.
read(sbr);
311 PortReaderPacket* dropContent()
314 PortReaderPacket* drop =
nullptr;
316 if (pool.getCount() >= 1) {
317 drop = pool.getActivePacket();
318 if (drop !=
nullptr) {
319 pool.addInactivePacket(drop);
326 void attach(
Port& port)
334 if (prev !=
nullptr) {
342 void release(
void* key)
344 if (key !=
nullptr) {
345 pool.addInactivePacket((PortReaderPacket*)key);
349 #endif // DOXYGEN_SHOULD_SKIP_THIS
354 mPriv(new Private(*this, maxBuffer))
365 if (mPriv->creator !=
nullptr) {
366 return mPriv->creator->create();
373 mPriv->stateMutex.lock();
374 int count = mPriv->checkContent();
375 mPriv->stateMutex.unlock();
383 mPriv->contentSema.post();
389 if (mPriv->period < 0 || cleanup) {
390 mPriv->contentSema.wait();
394 double target =
now + mPriv->period;
395 if (mPriv->last_recv > 0) {
396 target = mPriv->last_recv + mPriv->period;
398 double diff = target -
now;
400 ok = mPriv->contentSema.waitWithTimeout(diff);
402 ok = mPriv->contentSema.check();
404 mPriv->contentSema.wait();
409 if (mPriv->last_recv > 0) {
410 mPriv->last_recv += mPriv->period;
415 if (mPriv->last_recv < 0) {
416 mPriv->last_recv =
now;
422 mPriv->last_recv = target;
425 mPriv->stateMutex.lock();
426 PortReaderPacket* readerPacket = mPriv->getContent();
428 if (readerPacket !=
nullptr) {
429 PortReader* external = readerPacket->getExternal();
430 if (external ==
nullptr) {
431 reader = readerPacket->getReader();
436 mPriv->stateMutex.unlock();
437 if (reader !=
nullptr) {
438 mPriv->consumeSema.post();
452 if (mPriv->replier !=
nullptr) {
454 return mPriv->replier->read(connection);
457 PortReaderPacket* reader =
nullptr;
458 while (reader ==
nullptr) {
459 mPriv->stateMutex.lock();
460 reader = mPriv->get();
461 if ((reader !=
nullptr) && reader->getReader() ==
nullptr) {
463 yCAssert(PORTREADERBUFFERBASE, next !=
nullptr);
464 reader->setReader(next);
467 mPriv->stateMutex.unlock();
468 if (reader ==
nullptr) {
469 mPriv->consumeSema.wait();
474 yCAssert(PORTREADERBUFFERBASE, reader->getReader() !=
nullptr);
475 ok = reader->getReader()->read(connection);
480 mPriv->port =
nullptr;
483 mPriv->stateMutex.lock();
485 if (mPriv->ct > 0 && mPriv->prune) {
486 PortReaderPacket* readerPacket = mPriv->dropContent();
487 pruned = (readerPacket !=
nullptr);
490 mPriv->pool.addActivePacket(reader);
492 mPriv->stateMutex.unlock();
494 mPriv->contentSema.post();
496 yCTrace(PORTREADERBUFFERBASE,
">>>>>>>>>>>>>>>>> adding data");
498 mPriv->stateMutex.lock();
499 mPriv->pool.addInactivePacket(reader);
500 mPriv->stateMutex.unlock();
501 yCTrace(PORTREADERBUFFERBASE,
">>>>>>>>>>>>>>>>> skipping data");
504 yCDebug(PORTREADERBUFFERBASE,
"giving PortReaderBuffer chance to close");
505 mPriv->contentSema.post();
513 mPriv->creator = creator;
518 mPriv->replier = &reader;
528 mPriv->period = period;
533 return mPriv->getName();
538 return mPriv->maxBuffer;
543 return mPriv->port ==
nullptr;
564 PortReaderPacket* reader =
nullptr;
565 while (reader ==
nullptr) {
566 mPriv->stateMutex.lock();
567 reader = mPriv->get();
568 mPriv->stateMutex.unlock();
569 if (reader ==
nullptr) {
570 mPriv->consumeSema.wait();
574 reader->setExternal(obj, wrapper);
576 mPriv->stateMutex.lock();
578 if (mPriv->ct > 0 && mPriv->prune) {
579 PortReaderPacket* readerPacket = mPriv->dropContent();
580 pruned = (readerPacket !=
nullptr);
583 mPriv->pool.addActivePacket(reader);
585 mPriv->stateMutex.unlock();
587 mPriv->contentSema.post();
589 yCTrace(PORTREADERBUFFERBASE,
">>>>>>>>>>>>>>>>> adding data");
600 printf(
"Sorry, forgetting not implemented yet\n");
607 return mPriv->acquire();
612 mPriv->stateMutex.lock();
614 mPriv->stateMutex.unlock();
620 return mPriv->getEnvelope(envelope);
630 yCError(PORTREADERBUFFERBASE,
"Missing or incorrectly typed onRead function");