libvisiontransfer  5.2.0
asynctransfer.cpp
1 /*******************************************************************************
2  * Copyright (c) 2018 Nerian Vision Technologies
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *******************************************************************************/
14 
15 #if __GNUC__ == 4 && __GNUC_MINOR__ < 9
16 // This is a very ugly workaround for GCC bug 54562. If omitted,
17 // passing timeouts to collectReceivedImage() is broken.
18 #include <bits/c++config.h>
19 #undef _GLIBCXX_USE_CLOCK_MONOTONIC
20 #endif
21 
22 #include <iostream>
23 #include <functional>
24 #include <stdexcept>
25 #include <thread>
26 #include <condition_variable>
27 #include <chrono>
28 #include <mutex>
29 #include <vector>
30 #include <cstring>
31 #include <algorithm>
32 #include "visiontransfer/asynctransfer.h"
33 #include "visiontransfer/alignedallocator.h"
34 
35 
36 using namespace std;
37 
38 /*************** Pimpl class containing all private members ***********/
39 
40 class AsyncTransfer::Pimpl {
41 public:
42  Pimpl(ImageTransfer::OperationMode mode, const char* remoteAddress, const char* remoteService,
43  const char* localAddress, const char* localService, int bufferSize, int maxUdpPacketSize);
44  ~Pimpl();
45 
46  // Redeclaration of public members
47  void sendImagePairAsync(const ImagePair& imagePair, bool deleteData);
48  bool collectReceivedImagePair(ImagePair& imagePair, double timeout);
49 
50 private:
51  static constexpr int NUM_BUFFERS = 6;
52 
53  // The encapsulated image transfer object
54  ImageTransfer imgTrans;
55 
56  // Variable for controlling thread termination
57  volatile bool terminate;
58 
59  // There are two threads, one for sending and one for receiving.
60  // Each has a mutex and condition variable for synchronization.
61  std::thread sendThread;
62  std::mutex sendMutex;
63  std::condition_variable sendCond;
64  std::condition_variable sendWaitCond;
65 
66  std::thread receiveThread;
67  std::timed_mutex receiveMutex;
68  std::condition_variable_any receiveCond;
69  std::condition_variable_any receiveWaitCond;
70 
71  // Objects for exchanging images with the send and receive threads
72  ImagePair receivedPair;
73  std::vector<unsigned char, AlignedAllocator<unsigned char> > receivedData[NUM_BUFFERS];
74  bool newDataReceived;
75 
76  ImagePair sendImagePair;
77  bool sendPairValid;
78  bool deleteSendData;
79 
80  // Exception occurred in one of the threads
81  std::exception_ptr receiveException;
82  std::exception_ptr sendException;
83 
84  bool sendThreadCreated;
85  bool receiveThreadCreated;
86 
87  // Main loop for sending thread
88  void sendLoop();
89 
90  // Main loop for receiving;
91  void receiveLoop();
92 };
93 
94 /******************** Stubs for all public members ********************/
95 
96 AsyncTransfer::AsyncTransfer(ImageTransfer::OperationMode mode, const char* remoteAddress, const char* remoteService,
97  const char* localAddress, const char* localService, int bufferSize, int maxUdpPacketSize)
98  : pimpl(new Pimpl(mode, remoteAddress, remoteService, localAddress, localService, bufferSize, maxUdpPacketSize)) {
99 }
100 
101 AsyncTransfer::~AsyncTransfer() {
102  delete pimpl;
103 }
104 
105 void AsyncTransfer::sendImagePairAsync(const ImagePair& imagePair, bool deleteData) {
106  pimpl->sendImagePairAsync(imagePair, deleteData);
107 }
108 
109 bool AsyncTransfer::collectReceivedImagePair(ImagePair& imagePair, double timeout) {
110  return pimpl->collectReceivedImagePair(imagePair, timeout);
111 }
112 
113 /******************** Implementation in pimpl class *******************/
114 
115 AsyncTransfer::Pimpl::Pimpl(ImageTransfer::OperationMode mode, const char* remoteAddress, const char* remoteService,
116  const char* localAddress, const char* localService, int bufferSize, int maxUdpPacketSize)
117  : imgTrans(mode, remoteAddress, remoteService, localAddress, localService, bufferSize, maxUdpPacketSize), terminate(false),
118  newDataReceived(false), sendPairValid(false), deleteSendData(false), sendThreadCreated(false),
119  receiveThreadCreated(false) {
120 
121  if(mode == ImageTransfer::TCP_SERVER) {
122  // Wait for a connection. This is only a debugging feature
123  while(!imgTrans.tryAccept()) {
124  std::chrono::milliseconds duration(10);
125  std::this_thread::sleep_for(duration);
126  }
127  }
128 }
129 
130 AsyncTransfer::Pimpl::~Pimpl() {
131  terminate = true;
132 
133  sendCond.notify_all();
134  receiveCond.notify_all();
135  sendWaitCond.notify_all();
136  receiveWaitCond.notify_all();
137 
138  if(sendThreadCreated && sendThread.joinable()) {
139  sendThread.join();
140  }
141 
142  if(receiveThreadCreated && receiveThread.joinable()) {
143  receiveThread.join();
144  }
145 
146  if(sendPairValid && deleteSendData) {
147  delete[] sendImagePair.getPixelData(0);
148  delete[] sendImagePair.getPixelData(1);
149  }
150 }
151 
152 void AsyncTransfer::Pimpl::sendImagePairAsync(const ImagePair& imagePair, bool deleteData) {
153  if(!sendThreadCreated) {
154  // Lazy initialization of the send thread as it is not always needed
155  unique_lock<mutex> lock(sendMutex);
156  sendThread = thread(bind(&AsyncTransfer::Pimpl::sendLoop, this));
157  sendThreadCreated = true;
158  }
159 
160  while(true) {
161  unique_lock<mutex> lock(sendMutex);
162 
163  // Test for errors
164  if(sendException) {
165  std::rethrow_exception(sendException);
166  }
167 
168  if(!sendPairValid) {
169  sendImagePair = imagePair;
170  sendPairValid = true;
171  deleteSendData = deleteData;
172 
173  // Wake up the sender thread
174  sendCond.notify_one();
175 
176  return;
177  } else {
178  // Wait for old data to be processed first
179  sendWaitCond.wait(lock);
180  }
181  }
182 }
183 
184 bool AsyncTransfer::Pimpl::collectReceivedImagePair(ImagePair& imagePair, double timeout) {
185  if(!receiveThreadCreated) {
186  // Lazy initialization of receive thread
187  unique_lock<timed_mutex> lock(receiveMutex);
188  receiveThreadCreated = true;
189  receiveThread = thread(bind(&AsyncTransfer::Pimpl::receiveLoop, this));
190  }
191 
192  // Acquire mutex
193  unique_lock<timed_mutex> lock(receiveMutex, std::defer_lock);
194  if(timeout < 0) {
195  lock.lock();
196  } else {
197  std::chrono::steady_clock::time_point lockStart =
198  std::chrono::steady_clock::now();
199  if(!lock.try_lock_for(std::chrono::microseconds(static_cast<unsigned int>(timeout*1e6)))) {
200  // Timed out
201  return false;
202  }
203 
204  // Update timeout
205  unsigned int lockDuration = static_cast<unsigned int>(std::chrono::duration_cast<std::chrono::microseconds>(
206  std::chrono::steady_clock::now() - lockStart).count());
207  timeout = std::max(0.0, timeout - lockDuration*1e-6);
208  }
209 
210  // Test for errors
211  if(receiveException) {
212  std::rethrow_exception(receiveException);
213  }
214 
215  if(timeout == 0 && !newDataReceived) {
216  // No image has been received and we are not blocking
217  return false;
218  }
219 
220  // If there is no data yet then keep on waiting
221  if(!newDataReceived) {
222  if(timeout < 0) {
223  while(!terminate && !receiveException && !newDataReceived) {
224  receiveCond.wait(lock);
225  }
226  } else {
227  receiveCond.wait_for(lock, std::chrono::microseconds(static_cast<unsigned int>(timeout*1e6)));
228  }
229  }
230 
231  // Test for errors again
232  if(receiveException) {
233  std::rethrow_exception(receiveException);
234  }
235 
236  if(newDataReceived) {
237  // Get the received image
238  imagePair = receivedPair;
239 
240  newDataReceived = false;
241  receiveWaitCond.notify_one();
242 
243  return true;
244  } else {
245  return false;
246  }
247 }
248 
249 void AsyncTransfer::Pimpl::sendLoop() {
250  {
251  // Delay the thread start
252  unique_lock<mutex> lock(sendMutex);
253  }
254 
255  ImagePair pair;
256  bool deletePair = false;
257 
258  try {
259  while(!terminate) {
260  // Wait for next image
261  {
262  unique_lock<mutex> lock(sendMutex);
263  // Wait for next frame to be queued
264  while(!terminate && !sendPairValid) {
265  sendCond.wait(lock);
266  }
267  if(!sendPairValid) {
268  continue;
269  }
270 
271  pair = sendImagePair;
272  deletePair = deleteSendData;
273  sendPairValid = false;
274 
275  sendWaitCond.notify_one();
276  }
277 
278  if(!terminate) {
279  imgTrans.setTransferImagePair(pair);
280  imgTrans.transferData(true);
281  }
282 
283  if(deletePair) {
284  delete[] pair.getPixelData(0);
285  delete[] pair.getPixelData(1);
286  deletePair = false;
287  }
288  }
289  } catch(...) {
290  // Store the exception for later
291  if(!sendException) {
292  sendException = std::current_exception();
293  }
294  sendWaitCond.notify_all();
295 
296  // Don't forget to free the memory
297  if(deletePair) {
298  delete[] pair.getPixelData(0);
299  delete[] pair.getPixelData(1);
300  deletePair = false;
301  }
302  }
303 }
304 
305 void AsyncTransfer::Pimpl::receiveLoop() {
306  {
307  // Delay the thread start
308  unique_lock<timed_mutex> lock(receiveMutex);
309  }
310 
311  try {
312  ImagePair currentPair;
313  int bufferIndex = 0;
314 
315  while(!terminate) {
316  // Receive new image
317  if(!imgTrans.receiveImagePair(currentPair)) {
318  // No image available
319  continue;
320  }
321 
322  // Copy the pixel data
323  for(int i=0;i<2;i++) {
324  int bytesPerPixel = currentPair.getPixelFormat(i) == ImagePair::FORMAT_8_BIT ? 1 : 2;
325  int newStride = currentPair.getWidth() * bytesPerPixel;
326  int totalSize = currentPair.getHeight() * newStride;
327  if(static_cast<int>(receivedData[i + bufferIndex].size()) < totalSize) {
328  receivedData[i + bufferIndex].resize(totalSize);
329  }
330  if(newStride == currentPair.getRowStride(i)) {
331  memcpy(&receivedData[i + bufferIndex][0], currentPair.getPixelData(i),
332  newStride*currentPair.getHeight());
333  } else {
334  for(int y = 0; y<currentPair.getHeight(); y++) {
335  memcpy(&receivedData[i + bufferIndex][y*newStride],
336  &currentPair.getPixelData(i)[y*currentPair.getRowStride(i)],
337  newStride);
338  }
339  currentPair.setRowStride(i, newStride);
340  }
341  currentPair.setPixelData(i, &receivedData[i + bufferIndex][0]);
342  }
343 
344  {
345  unique_lock<timed_mutex> lock(receiveMutex);
346 
347  // Wait for previously received data to be processed
348  while(newDataReceived) {
349  receiveWaitCond.wait_for(lock, std::chrono::milliseconds(100));
350  if(terminate) {
351  return;
352  }
353  }
354 
355  // Notify that a new image pair has been received
356  newDataReceived = true;
357  receivedPair = currentPair;
358  receiveCond.notify_one();
359  }
360 
361  // Increment index for data buffers
362  bufferIndex = (bufferIndex + 2) % NUM_BUFFERS;
363  }
364  } catch(...) {
365  // Store the exception for later
366  if(!receiveException) {
367  receiveException = std::current_exception();
368  }
369  receiveCond.notify_all();
370  }
371 }
AsyncTransfer(ImageTransfer::OperationMode mode, const char *remoteAddress, const char *remoteService, const char *localAddress, const char *localService, int bufferSize=1048576, int maxUdpPacketSize=1472)
Creates a new transfer object.
Using TCP and acting as communication server.
Definition: imagetransfer.h:42
OperationMode
Supported transfer modes.
Definition: imagetransfer.h:33
unsigned char * getPixelData(int imageNumber) const
Returns the pixel data for the given image.
Definition: imagepair.h:190
int getWidth() const
Returns the width of each image.
Definition: imagepair.h:155
void setRowStride(int imageNumber, int stride)
Sets a new row stride for the pixel data of one image.
Definition: imagepair.h:74
Class for synchronous transfer of image pairs.
Definition: imagetransfer.h:30
void setPixelData(int imageNumber, unsigned char *pixelData)
Sets the pixel data for the given image.
Definition: imagepair.h:98
A set of two images, which are usually the left camera image and the disparity map.
Definition: imagepair.h:30
ImageFormat getPixelFormat(int imageNumber) const
Returns the pixel format for the given image.
Definition: imagepair.h:179
void sendImagePairAsync(const ImagePair &imagePair, bool deleteData)
Starts an asynchronous transmission of the given image pair.
8-bit greyscale format
Definition: imagepair.h:37
int getHeight() const
Returns the height of each image.
Definition: imagepair.h:160
int getRowStride(int imageNumber) const
Returns the row stride for the pixel data of one image.
Definition: imagepair.h:168
bool collectReceivedImagePair(ImagePair &imagePair, double timeout=-1)
Collects the asynchronously received image.
Nerian Vision Technologies