libvisiontransfer  10.6.0
asynctransfer.cpp
1 /*******************************************************************************
2  * Copyright (c) 2023 Allied Vision Technologies GmbH
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *******************************************************************************/
14 
15 #if __GNUC__ == 4 && __GNUC_MINOR__ < 9
16 // This is a very ugly workaround for GCC bug 54562. If omitted,
17 // passing timeouts to collectReceivedImage() is broken.
18 #include <bits/c++config.h>
19 #undef _GLIBCXX_USE_CLOCK_MONOTONIC
20 #endif
21 
22 #include <iostream>
23 #include <functional>
24 #include <stdexcept>
25 #include <thread>
26 #include <condition_variable>
27 #include <chrono>
28 #include <mutex>
29 #include <vector>
30 #include <cstring>
31 #include <algorithm>
32 #include "visiontransfer/asynctransfer.h"
33 #include "visiontransfer/alignedallocator.h"
34 
35 using namespace std;
36 using namespace visiontransfer;
37 using namespace visiontransfer::internal;
38 
39 namespace visiontransfer {
40 
41 /*************** Pimpl class containing all private members ***********/
42 
43 class AsyncTransfer::Pimpl {
44 public:
45  Pimpl(const char* address, const char* service,
46  ImageProtocol::ProtocolType protType, bool server,
47  int bufferSize, int maxUdpPacketSize);
48  ~Pimpl();
49 
50  // Redeclaration of public members
51  void sendImageSetAsync(const ImageSet& imageSet, bool deleteData);
52  bool collectReceivedImageSet(ImageSet& imageSet, double timeout);
53  int getNumDroppedFrames() const;
54  bool isConnected() const;
55  void disconnect();
56  std::string getRemoteAddress() const;
57  bool tryAccept();
58 
59 private:
60  static constexpr int NUM_BUFFERS = ImageSet::MAX_SUPPORTED_IMAGES * 3;
61  static constexpr int SEND_THREAD_SHORT_WAIT_MS = 1;
62  static constexpr int SEND_THREAD_LONG_WAIT_MS = 10;
63 
64  // The encapsulated image transfer object
65  ImageTransfer imgTrans;
66 
67  // Variable for controlling thread termination
68  volatile bool terminate;
69 
70  // There are two threads, one for sending and one for receiving.
71  // Each has a mutex and condition variable for synchronization.
72  std::thread sendThread;
73  std::mutex sendMutex;
74  std::condition_variable sendCond;
75  std::condition_variable sendWaitCond;
76 
77  std::thread receiveThread;
78  std::timed_mutex receiveMutex;
79  std::condition_variable_any receiveCond;
80 
81  // Objects for exchanging images with the send and receive threads
82  ImageSet receivedSet;
83  std::vector<unsigned char, AlignedAllocator<unsigned char> > receivedData[NUM_BUFFERS];
84  volatile int receiveBufferIndex;
85  volatile bool newDataReceived;
86 
87  ImageSet sendImageSet;
88  bool sendSetValid;
89  bool deleteSendData;
90 
91  // Exception occurred in one of the threads
92  std::exception_ptr receiveException;
93  std::exception_ptr sendException;
94 
95  bool sendThreadCreated;
96  bool receiveThreadCreated;
97 
98  // Count of additional locally dropped frames (due to not being collected in time)
99  // Only starts counting with the first call of collectReceivedImagePair()
100  int uncollectedDroppedFrames;
101 
102  // Main loop for sending thread
103  void sendLoop();
104 
105  // Main loop for receiving;
106  void receiveLoop();
107 
108  void createSendThread();
109 };
110 
111 /******************** Stubs for all public members ********************/
112 
113 AsyncTransfer::AsyncTransfer(const char* address, const char* service,
114  ImageProtocol::ProtocolType protType, bool server,
115  int bufferSize, int maxUdpPacketSize)
116  : pimpl(new Pimpl(address, service, protType, server, bufferSize, maxUdpPacketSize)) {
117 }
118 
119 AsyncTransfer::AsyncTransfer(const DeviceInfo& device, int bufferSize, int maxUdpPacketSize)
120  : pimpl(new Pimpl(device.getIpAddress().c_str(), "7681", static_cast<ImageProtocol::ProtocolType>(device.getNetworkProtocol()),
121  false, bufferSize, maxUdpPacketSize)) {
122 }
123 
124 AsyncTransfer::~AsyncTransfer() {
125  delete pimpl;
126 }
127 
128 void AsyncTransfer::sendImageSetAsync(const ImageSet& imageSet, bool deleteData) {
129  pimpl->sendImageSetAsync(imageSet, deleteData);
130 }
131 
132 bool AsyncTransfer::collectReceivedImageSet(ImageSet& imageSet, double timeout) {
133  return pimpl->collectReceivedImageSet(imageSet, timeout);
134 }
135 
137  return pimpl->getNumDroppedFrames();
138 }
139 
141  return pimpl->isConnected();
142 }
143 
145  return pimpl->disconnect();
146 }
147 
148 std::string AsyncTransfer::getRemoteAddress() const {
149  return pimpl->getRemoteAddress();
150 }
151 
153  return pimpl->tryAccept();
154 }
155 
156 /******************** Implementation in pimpl class *******************/
157 
158 AsyncTransfer::Pimpl::Pimpl(const char* address, const char* service,
159  ImageProtocol::ProtocolType protType, bool server,
160  int bufferSize, int maxUdpPacketSize)
161  : imgTrans(address, service, protType, server, bufferSize, maxUdpPacketSize),
162  terminate(false), receiveBufferIndex(0), newDataReceived(false), sendSetValid(false),
163  deleteSendData(false), sendThreadCreated(false),
164  receiveThreadCreated(false), uncollectedDroppedFrames(-1) {
165 
166  if(server) {
167  createSendThread();
168  }
169 }
170 
171 AsyncTransfer::Pimpl::~Pimpl() {
172  terminate = true;
173 
174  sendCond.notify_all();
175  receiveCond.notify_all();
176  sendWaitCond.notify_all();
177 
178  if(sendThreadCreated && sendThread.joinable()) {
179  sendThread.join();
180  }
181 
182  if(receiveThreadCreated && receiveThread.joinable()) {
183  receiveThread.join();
184  }
185 
186  if(sendSetValid && deleteSendData) {
187  delete[] sendImageSet.getPixelData(0);
188  delete[] sendImageSet.getPixelData(1);
189  }
190 }
191 
192 void AsyncTransfer::Pimpl::createSendThread() {
193  if(!sendThreadCreated) {
194  // Lazy initialization of the send thread as it is not always needed
195  unique_lock<mutex> lock(sendMutex);
196  sendThread = thread(bind(&AsyncTransfer::Pimpl::sendLoop, this));
197  sendThreadCreated = true;
198  }
199 }
200 
201 void AsyncTransfer::Pimpl::sendImageSetAsync(const ImageSet& imageSet, bool deleteData) {
202  createSendThread();
203 
204  while(true) {
205  unique_lock<mutex> lock(sendMutex);
206 
207  // Test for errors
208  if(sendException) {
209  std::rethrow_exception(sendException);
210  }
211 
212  if(!sendSetValid) {
213  sendImageSet = imageSet;
214  sendSetValid = true;
215  deleteSendData = deleteData;
216 
217  // Wake up the sender thread
218  sendCond.notify_one();
219 
220  return;
221  } else {
222  // Wait for old data to be processed first
223  sendWaitCond.wait(lock);
224  }
225  }
226 }
227 
228 bool AsyncTransfer::Pimpl::collectReceivedImageSet(ImageSet& imageSet, double timeout) {
229  if(!receiveThreadCreated) {
230  // Lazy initialization of receive thread
231  unique_lock<timed_mutex> lock(receiveMutex);
232  receiveThreadCreated = true;
233  receiveThread = thread(bind(&AsyncTransfer::Pimpl::receiveLoop, this));
234  }
235 
236  // Acquire mutex
237  unique_lock<timed_mutex> lock(receiveMutex, std::defer_lock);
238  if(timeout < 0) {
239  lock.lock();
240  } else {
241  std::chrono::steady_clock::time_point lockStart =
242  std::chrono::steady_clock::now();
243  if(!lock.try_lock_for(std::chrono::microseconds(static_cast<unsigned int>(timeout*1e6)))) {
244  // Timed out
245  return false;
246  }
247 
248  // Update timeout
249  unsigned int lockDuration = static_cast<unsigned int>(std::chrono::duration_cast<std::chrono::microseconds>(
250  std::chrono::steady_clock::now() - lockStart).count());
251  timeout = std::max(0.0, timeout - lockDuration*1e-6);
252  }
253 
254  // Test for errors
255  if(receiveException) {
256  std::rethrow_exception(receiveException);
257  }
258 
259  if(timeout == 0 && !newDataReceived) {
260  // No image has been received and we are not blocking
261  return false;
262  }
263 
264  // If there is no data yet then keep on waiting
265  if(!newDataReceived) {
266  if(timeout < 0) {
267  while(!terminate && !receiveException && !newDataReceived) {
268  receiveCond.wait(lock);
269  }
270  } else {
271  receiveCond.wait_for(lock, std::chrono::microseconds(static_cast<unsigned int>(timeout*1e6)));
272  }
273  }
274 
275  // Test for errors again
276  if(receiveException) {
277  std::rethrow_exception(receiveException);
278  }
279 
280  if(newDataReceived) {
281  // Get the received image
282  imageSet = receivedSet;
283 
284  newDataReceived = false;
285 
286  // Increment index for data buffers
287  receiveBufferIndex = (receiveBufferIndex + receivedSet.getNumberOfImages()) % NUM_BUFFERS;
288 
289  // Start counting uncollected frames at the time of first collection
290  if (uncollectedDroppedFrames < 0) uncollectedDroppedFrames = 0;
291 
292  return true;
293  } else {
294  return false;
295  }
296 }
297 
298 void AsyncTransfer::Pimpl::sendLoop() {
299  {
300  // Delay the thread start
301  unique_lock<mutex> lock(sendMutex);
302  }
303 
304  ImageSet imgSet;
305  bool deleteSet = false;
306 
307  try {
308  while(!terminate) {
309  // Wait for next image
310  {
311  unique_lock<mutex> lock(sendMutex);
312  // Wait for next frame to be queued
313  bool firstWait = true;
314  while(!terminate && !sendSetValid) {
315  imgTrans.transferData();
316  sendCond.wait_for(lock, std::chrono::milliseconds(
317  firstWait ? SEND_THREAD_SHORT_WAIT_MS : SEND_THREAD_LONG_WAIT_MS));
318  firstWait = false;
319  }
320  if(!sendSetValid) {
321  continue;
322  }
323 
324  imgSet = sendImageSet;
325  deleteSet = deleteSendData;
326  sendSetValid = false;
327 
328  sendWaitCond.notify_one();
329  }
330 
331  imgTrans.setTransferImageSet(imgSet);
332  while(!terminate) {
333  ImageTransfer::TransferStatus status = imgTrans.transferData();
335  break;
336  }
337  std::this_thread::sleep_for(std::chrono::milliseconds(SEND_THREAD_LONG_WAIT_MS));
338  }
339 
340  if(deleteSet) {
341  for (int i=0; i<imgSet.getNumberOfImages(); ++i) {
342  delete[] imgSet.getPixelData(i);
343  }
344  deleteSet = false;
345  }
346  }
347  } catch(...) {
348  // Store the exception for later
349  if(!sendException) {
350  sendException = std::current_exception();
351  }
352  sendWaitCond.notify_all();
353 
354  // Don't forget to free the memory
355  if(deleteSet) {
356  for (int i=0; i<imgSet.getNumberOfImages(); ++i) {
357  delete[] imgSet.getPixelData(i);
358  }
359  deleteSet = false;
360  }
361  }
362 }
363 
364 void AsyncTransfer::Pimpl::receiveLoop() {
365  {
366  // Delay the thread start
367  unique_lock<timed_mutex> lock(receiveMutex);
368  }
369 
370  try {
371  ImageSet currentSet;
372 
373  while(!terminate) {
374  // Receive new image (blocks internally)
375  bool newImageSetArrived = imgTrans.receiveImageSet(currentSet);
376 
377  if (newImageSetArrived) {
378  unique_lock<timed_mutex> lock(receiveMutex);
379  if (newDataReceived) {
380  // collectReceivedImageSet() frequency was too low; previous frame lost
381  if (uncollectedDroppedFrames > -1) uncollectedDroppedFrames++;
382  }
383  // Copy the pixel data
384  for(int i=0;i<currentSet.getNumberOfImages();i++) {
385  int bytesPerPixel = currentSet.getBytesPerPixel(i);
386  int newStride = currentSet.getWidth() * bytesPerPixel;
387  int totalSize = currentSet.getHeight() * newStride;
388  int bufIdxHere = (i + receiveBufferIndex) % NUM_BUFFERS;
389  if(static_cast<int>(receivedData[bufIdxHere].size()) < totalSize) {
390  receivedData[bufIdxHere].resize(totalSize);
391  }
392  if(newStride == currentSet.getRowStride(i)) {
393  memcpy(&receivedData[bufIdxHere][0], currentSet.getPixelData(i),
394  newStride*currentSet.getHeight());
395  } else {
396  for(int y = 0; y<currentSet.getHeight(); y++) {
397  memcpy(&receivedData[bufIdxHere][y*newStride],
398  &currentSet.getPixelData(i)[y*currentSet.getRowStride(i)],
399  newStride);
400  }
401  currentSet.setRowStride(i, newStride);
402  }
403  currentSet.setPixelData(i, &receivedData[bufIdxHere][0]);
404  }
405  // N.B. receiveBufferIndex is only increased at collection time
406 
407  // Notify that a new image set has been received
408  newDataReceived = true;
409  receivedSet = currentSet;
410  receiveCond.notify_one();
411  }
412 
413  }
414  } catch(...) {
415  // Store the exception for later
416  if(!receiveException) {
417  receiveException = std::current_exception();
418  }
419  receiveCond.notify_all();
420  }
421 }
422 
423 bool AsyncTransfer::Pimpl::isConnected() const {
424  return imgTrans.isConnected();
425 }
426 
427 void AsyncTransfer::Pimpl::disconnect() {
428  imgTrans.disconnect();
429 }
430 
431 std::string AsyncTransfer::Pimpl::getRemoteAddress() const {
432  return imgTrans.getRemoteAddress();
433 }
434 
435 int AsyncTransfer::Pimpl::getNumDroppedFrames() const {
436  return imgTrans.getNumDroppedFrames() + uncollectedDroppedFrames;
437 }
438 
439 bool AsyncTransfer::Pimpl::tryAccept() {
440  return imgTrans.tryAccept();
441 }
442 
443 constexpr int AsyncTransfer::Pimpl::NUM_BUFFERS;
444 constexpr int AsyncTransfer::Pimpl::SEND_THREAD_SHORT_WAIT_MS;
445 constexpr int AsyncTransfer::Pimpl::SEND_THREAD_LONG_WAIT_MS;
446 
447 } // namespace
448 
visiontransfer::ImageSet::getHeight
int getHeight() const
Returns the height of each image.
Definition: imageset.h:234
visiontransfer::AsyncTransfer::tryAccept
bool tryAccept()
Tries to accept a client connection.
Definition: asynctransfer.cpp:152
visiontransfer::ImageSet::getWidth
int getWidth() const
Returns the width of each image.
Definition: imageset.h:229
visiontransfer::ImageSet::getRowStride
int getRowStride(int imageNumber) const
Returns the row stride for the pixel data of one image.
Definition: imageset.h:245
visiontransfer::ImageSet::getNumberOfImages
int getNumberOfImages() const
Returns the number of images in this set.
Definition: imageset.h:431
visiontransfer::AsyncTransfer::isConnected
bool isConnected() const
Returns true if a remote connection is established.
Definition: asynctransfer.cpp:140
visiontransfer::ImageSet::setPixelData
void setPixelData(int imageNumber, unsigned char *pixelData)
Sets the pixel data for the given image.
Definition: imageset.h:161
visiontransfer::ImageTransfer::TransferStatus
TransferStatus
The result of a partial image transfer.
Definition: imagetransfer.h:67
visiontransfer::DeviceInfo
Aggregates information about a discovered device.
Definition: deviceinfo.h:59
visiontransfer::ImageSet
A set of one to three images, but usually two (the left camera image and the disparity map)....
Definition: imageset.h:50
visiontransfer::AsyncTransfer::getRemoteAddress
std::string getRemoteAddress() const
Returns the address of the remote host.
Definition: asynctransfer.cpp:148
visiontransfer::ImageTransfer::WOULD_BLOCK
@ WOULD_BLOCK
The operation would block and blocking as been disabled.
Definition: imagetransfer.h:91
visiontransfer::ImageTransfer::PARTIAL_TRANSFER
@ PARTIAL_TRANSFER
Definition: imagetransfer.h:85
visiontransfer::ImageSet::setRowStride
void setRowStride(int imageNumber, int stride)
Sets a new row stride for the pixel data of one image.
Definition: imageset.h:131
visiontransfer::ImageSet::getBytesPerPixel
int getBytesPerPixel(int imageNumber) const
Returns the number of bytes that are required to store one image pixel.
Definition: imageset.h:399
visiontransfer::ImageProtocol::ProtocolType
ProtocolType
Supported network protocols.
Definition: imageprotocol.h:67
visiontransfer::AsyncTransfer::sendImageSetAsync
void sendImageSetAsync(const ImageSet &imageSet, bool deleteData=false)
Starts an asynchronous transmission of the given image set.
Definition: asynctransfer.cpp:128
visiontransfer::ImageProtocol
A lightweight protocol for transferring image sets.
Definition: imageprotocol.h:52
visiontransfer::ImageTransfer
Class for synchronous transfer of image sets.
Definition: imagetransfer.h:52
visiontransfer::AsyncTransfer::disconnect
void disconnect()
Terminates the current connection.
Definition: asynctransfer.cpp:144
visiontransfer::AsyncTransfer::collectReceivedImageSet
bool collectReceivedImageSet(ImageSet &imageSet, double timeout=-1)
Collects the asynchronously received image.
Definition: asynctransfer.cpp:132
visiontransfer::ImageSet::getPixelData
unsigned char * getPixelData(int imageNumber) const
Returns the pixel data for the given image.
Definition: imageset.h:299
visiontransfer::AsyncTransfer::AsyncTransfer
AsyncTransfer(const char *address, const char *service="7681", ImageProtocol::ProtocolType protType=ImageProtocol::PROTOCOL_UDP, bool server=false, int bufferSize=16 *1048576, int maxUdpPacketSize=1472)
Creates a new transfer object.
Definition: asynctransfer.cpp:113
visiontransfer::AsyncTransfer::getNumDroppedFrames
int getNumDroppedFrames() const
Returns the number of frames that have been dropped since connecting to the current remote host.
Definition: asynctransfer.cpp:136
Allied Vision