libvisiontransfer  4.1.5
asynctransfer.cpp
1 /*******************************************************************************
2  * Copyright (c) 2017 Nerian Vision Technologies
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *******************************************************************************/
14 
15 #if __GNUC__ == 4 && __GNUC_MINOR__ < 9
16 // This is a very ugly workaround for GCC bug 54562. If omitted,
17 // passing timeouts to collectReceivedImage() is broken.
18 #include <bits/c++config.h>
19 #undef _GLIBCXX_USE_CLOCK_MONOTONIC
20 #endif
21 
22 #include <iostream>
23 #include <functional>
24 #include <stdexcept>
25 #include <thread>
26 #include <condition_variable>
27 #include <chrono>
28 #include <mutex>
29 #include <vector>
30 #include <cstring>
31 #include <algorithm>
32 #include "visiontransfer/asynctransfer.h"
33 #include "visiontransfer/alignedallocator.h"
34 
35 
36 using namespace std;
37 
38 /*************** Pimpl class containing all private members ***********/
39 
40 class AsyncTransfer::Pimpl {
41 public:
42  Pimpl(ImageTransfer::OperationMode mode, const char* remoteAddress, const char* remoteService,
43  const char* localAddress, const char* localService, int bufferSize);
44  ~Pimpl();
45 
46  // Redeclaration of public members
47  void sendImagePairAsync(const ImagePair& imagePair, bool deleteData);
48  bool collectReceivedImagePair(ImagePair& imagePair, double timeout);
49 
50 private:
51  // The encapsulated image transfer object
52  ImageTransfer imgTrans;
53 
54  // Variable for controlling thread termination
55  volatile bool terminate;
56 
57  // There are two threads, one for sending and one for receiving.
58  // Each has a mutex and condition variable for synchronization.
59  std::thread sendThread;
60  std::mutex sendMutex;
61  std::condition_variable sendCond;
62  std::condition_variable sendWaitCond;
63 
64  std::thread receiveThread;
65  std::timed_mutex receiveMutex;
66  std::condition_variable_any receiveCond;
67  std::condition_variable_any receiveWaitCond;
68 
69  // Objects for exchanging images with the send and receive threads
70  ImagePair receivedPair;
71  std::vector<unsigned char, AlignedAllocator<unsigned char> > receivedData[4];
72  bool newDataReceived;
73 
74  ImagePair sendImagePair;
75  bool sendPairValid;
76  bool deleteSendData;
77 
78  // Exception occurred in one of the threads
79  std::exception_ptr receiveException;
80  std::exception_ptr sendException;
81 
82  bool sendThreadCreated;
83  bool receiveThreadCreated;
84 
85  // Main loop for sending thread
86  void sendLoop();
87 
88  // Main loop for receiving;
89  void receiveLoop();
90 };
91 
92 /******************** Stubs for all public members ********************/
93 
94 AsyncTransfer::AsyncTransfer(ImageTransfer::OperationMode mode, const char* remoteAddress, const char* remoteService,
95  const char* localAddress, const char* localService, int bufferSize)
96  : pimpl(new Pimpl(mode, remoteAddress, remoteService, localAddress, localService, bufferSize)) {
97 }
98 
99 AsyncTransfer::~AsyncTransfer() {
100  delete pimpl;
101 }
102 
103 void AsyncTransfer::sendImagePairAsync(const ImagePair& imagePair, bool deleteData) {
104  pimpl->sendImagePairAsync(imagePair, deleteData);
105 }
106 
107 bool AsyncTransfer::collectReceivedImagePair(ImagePair& imagePair, double timeout) {
108  return pimpl->collectReceivedImagePair(imagePair, timeout);
109 }
110 
111 /******************** Implementation in pimpl class *******************/
112 
113 AsyncTransfer::Pimpl::Pimpl(ImageTransfer::OperationMode mode, const char* remoteAddress, const char* remoteService,
114  const char* localAddress, const char* localService, int bufferSize)
115  : imgTrans(mode, remoteAddress, remoteService, localAddress, localService, bufferSize), terminate(false),
116  newDataReceived(false), sendPairValid(false), deleteSendData(false), sendThreadCreated(false),
117  receiveThreadCreated(false) {
118 
119  if(mode == ImageTransfer::TCP_SERVER) {
120  // Wait for a connection. This is only a debugging feature
121  while(!imgTrans.tryAccept()) {
122  std::chrono::milliseconds duration(10);
123  std::this_thread::sleep_for(duration);
124  }
125  }
126 }
127 
128 AsyncTransfer::Pimpl::~Pimpl() {
129  terminate = true;
130 
131  sendCond.notify_all();
132  receiveCond.notify_all();
133  sendWaitCond.notify_all();
134  receiveWaitCond.notify_all();
135 
136  if(sendThreadCreated && sendThread.joinable()) {
137  sendThread.join();
138  }
139 
140  if(receiveThreadCreated && receiveThread.joinable()) {
141  receiveThread.join();
142  }
143 
144  if(sendPairValid && deleteSendData) {
145  delete[] sendImagePair.getPixelData(0);
146  delete[] sendImagePair.getPixelData(1);
147  }
148 }
149 
150 void AsyncTransfer::Pimpl::sendImagePairAsync(const ImagePair& imagePair, bool deleteData) {
151  if(!sendThreadCreated) {
152  // Lazy initialization of the send thread as it is not always needed
153  unique_lock<mutex> lock(sendMutex);
154  sendThread = thread(bind(&AsyncTransfer::Pimpl::sendLoop, this));
155  sendThreadCreated = true;
156  }
157 
158  while(true) {
159  unique_lock<mutex> lock(sendMutex);
160 
161  // Test for errors
162  if(sendException) {
163  std::rethrow_exception(sendException);
164  }
165 
166  if(!sendPairValid) {
167  sendImagePair = imagePair;
168  sendPairValid = true;
169  deleteSendData = deleteData;
170 
171  // Wake up the sender thread
172  sendCond.notify_one();
173 
174  return;
175  } else {
176  // Wait for old data to be processed first
177  sendWaitCond.wait(lock);
178  }
179  }
180 }
181 
182 bool AsyncTransfer::Pimpl::collectReceivedImagePair(ImagePair& imagePair, double timeout) {
183  if(!receiveThreadCreated) {
184  // Lazy initialization of receive thread
185  unique_lock<timed_mutex> lock(receiveMutex);
186  receiveThreadCreated = true;
187  receiveThread = thread(bind(&AsyncTransfer::Pimpl::receiveLoop, this));
188  }
189 
190  // Acquire mutex
191  unique_lock<timed_mutex> lock(receiveMutex, std::defer_lock);
192  if(timeout < 0) {
193  lock.lock();
194  } else {
195  std::chrono::steady_clock::time_point lockStart =
196  std::chrono::steady_clock::now();
197  if(!lock.try_lock_for(std::chrono::microseconds(static_cast<unsigned int>(timeout*1e6)))) {
198  // Timed out
199  return false;
200  }
201 
202  // Update timeout
203  unsigned int lockDuration = static_cast<unsigned int>(std::chrono::duration_cast<std::chrono::microseconds>(
204  std::chrono::steady_clock::now() - lockStart).count());
205  timeout = std::max(0.0, timeout - lockDuration*1e-6);
206  }
207 
208  // Test for errors
209  if(receiveException) {
210  std::rethrow_exception(receiveException);
211  }
212 
213  if(timeout == 0 && !newDataReceived) {
214  // No image has been received and we are not blocking
215  return false;
216  }
217 
218  // If there is no data yet then keep on waiting
219  if(!newDataReceived) {
220  if(timeout < 0) {
221  while(!terminate && !receiveException && !newDataReceived) {
222  receiveCond.wait(lock);
223  }
224  } else {
225  receiveCond.wait_for(lock, std::chrono::microseconds(static_cast<unsigned int>(timeout*1e6)));
226  }
227  }
228 
229  // Test for errors again
230  if(receiveException) {
231  std::rethrow_exception(receiveException);
232  }
233 
234  if(newDataReceived) {
235  // Get the received image
236  imagePair = receivedPair;
237 
238  newDataReceived = false;
239  receiveWaitCond.notify_one();
240 
241  return true;
242  } else {
243  return false;
244  }
245 }
246 
247 void AsyncTransfer::Pimpl::sendLoop() {
248  {
249  // Delay the thread start
250  unique_lock<mutex> lock(sendMutex);
251  }
252 
253  ImagePair pair;
254  bool deletePair = false;
255 
256  try {
257  while(!terminate) {
258  // Wait for next image
259  {
260  unique_lock<mutex> lock(sendMutex);
261  // Wait for next frame to be queued
262  while(!terminate && !sendPairValid) {
263  sendCond.wait(lock);
264  }
265  if(!sendPairValid) {
266  continue;
267  }
268 
269  pair = sendImagePair;
270  deletePair = deleteSendData;
271  sendPairValid = false;
272 
273  sendWaitCond.notify_one();
274  }
275 
276  if(!terminate) {
277  imgTrans.setTransferImagePair(pair);
278  imgTrans.transferData(true);
279  }
280 
281  if(deletePair) {
282  delete[] pair.getPixelData(0);
283  delete[] pair.getPixelData(1);
284  deletePair = false;
285  }
286  }
287  } catch(...) {
288  // Store the exception for later
289  if(!sendException) {
290  sendException = std::current_exception();
291  }
292  sendWaitCond.notify_all();
293 
294  // Don't forget to free the memory
295  if(deletePair) {
296  delete[] pair.getPixelData(0);
297  delete[] pair.getPixelData(1);
298  deletePair = false;
299  }
300  }
301 }
302 
303 void AsyncTransfer::Pimpl::receiveLoop() {
304  {
305  // Delay the thread start
306  unique_lock<timed_mutex> lock(receiveMutex);
307  }
308 
309  try {
310  ImagePair currentPair;
311  int bufferIndex = 0;
312 
313  while(!terminate) {
314  // Receive new image
315  if(!imgTrans.receiveImagePair(currentPair)) {
316  // No image available
317  continue;
318  }
319 
320  // Copy the pixel data
321  for(int i=0;i<2;i++) {
322  int bytesPerPixel = currentPair.getPixelFormat(i) == ImagePair::FORMAT_8_BIT ? 1 : 2;
323  int newStride = currentPair.getWidth() * bytesPerPixel;
324  int totalSize = currentPair.getHeight() * newStride;
325  if(static_cast<int>(receivedData[i + bufferIndex].size()) < totalSize) {
326  receivedData[i + bufferIndex].resize(totalSize);
327  }
328  if(newStride == currentPair.getRowStride(i)) {
329  memcpy(&receivedData[i + bufferIndex][0], currentPair.getPixelData(i),
330  newStride*currentPair.getHeight());
331  } else {
332  for(int y = 0; y<currentPair.getHeight(); y++) {
333  memcpy(&receivedData[i + bufferIndex][y*newStride],
334  &currentPair.getPixelData(i)[y*currentPair.getRowStride(i)],
335  newStride);
336  }
337  currentPair.setRowStride(i, newStride);
338  }
339  currentPair.setPixelData(i, &receivedData[i + bufferIndex][0]);
340  }
341 
342  {
343  unique_lock<timed_mutex> lock(receiveMutex);
344 
345  // Wait for previously received data to be processed
346  while(newDataReceived) {
347  receiveWaitCond.wait_for(lock, std::chrono::milliseconds(100));
348  if(terminate) {
349  return;
350  }
351  }
352 
353  // Notify that a new image pair has been received
354  newDataReceived = true;
355  receivedPair = currentPair;
356  receiveCond.notify_one();
357  }
358 
359  // Increment index for data buffers
360  bufferIndex = (bufferIndex + 2) % 4;
361  }
362  } catch(...) {
363  // Store the exception for later
364  if(!receiveException) {
365  receiveException = std::current_exception();
366  }
367  receiveCond.notify_all();
368  }
369 }
Using TCP and acting as communication server.
Definition: imagetransfer.h:42
OperationMode
Supported transfer modes.
Definition: imagetransfer.h:33
unsigned char * getPixelData(int imageNumber) const
Returns the pixel data for the given image.
Definition: imagepair.h:182
int getWidth() const
Returns the width of each image.
Definition: imagepair.h:147
AsyncTransfer(ImageTransfer::OperationMode mode, const char *remoteAddress, const char *remoteService, const char *localAddress, const char *localService, int bufferSize=800000)
Creates a new transfer object.
void setRowStride(int imageNumber, int stride)
Sets a new row stride for the pixel data of one image.
Definition: imagepair.h:74
Class for synchronous transfer of image pairs.
Definition: imagetransfer.h:30
void setPixelData(int imageNumber, unsigned char *pixelData)
Sets the pixel data for the given image.
Definition: imagepair.h:98
A set of two images, which are usually the left camera image and the disparity map.
Definition: imagepair.h:30
ImageFormat getPixelFormat(int imageNumber) const
Returns the pixel format for the given image.
Definition: imagepair.h:171
void sendImagePairAsync(const ImagePair &imagePair, bool deleteData)
Starts an asynchronous transmission of the given image pair.
8-bit greyscale format
Definition: imagepair.h:37
int getHeight() const
Returns the height of each image.
Definition: imagepair.h:152
int getRowStride(int imageNumber) const
Returns the row stride for the pixel data of one image.
Definition: imagepair.h:160
bool collectReceivedImagePair(ImagePair &imagePair, double timeout=-1)
Collects the asynchronously received image.
Nerian Vision Technologies