libvisiontransfer  6.1.0
asynctransfer.cpp
1 /*******************************************************************************
2  * Copyright (c) 2018 Nerian Vision GmbH
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *******************************************************************************/
14 
15 #if __GNUC__ == 4 && __GNUC_MINOR__ < 9
16 // This is a very ugly workaround for GCC bug 54562. If omitted,
17 // passing timeouts to collectReceivedImage() is broken.
18 #include <bits/c++config.h>
19 #undef _GLIBCXX_USE_CLOCK_MONOTONIC
20 #endif
21 
22 #include <iostream>
23 #include <functional>
24 #include <stdexcept>
25 #include <thread>
26 #include <condition_variable>
27 #include <chrono>
28 #include <mutex>
29 #include <vector>
30 #include <cstring>
31 #include <algorithm>
32 #include "visiontransfer/asynctransfer.h"
33 #include "visiontransfer/alignedallocator.h"
34 
35 
36 using namespace std;
37 
38 /*************** Pimpl class containing all private members ***********/
39 
40 class AsyncTransfer::Pimpl {
41 public:
42  Pimpl(const char* address, const char* service,
43  ImageProtocol::ProtocolType protType, bool server,
44  int bufferSize, int maxUdpPacketSize);
45  ~Pimpl();
46 
47  // Redeclaration of public members
48  void sendImagePairAsync(const ImagePair& imagePair, bool deleteData);
49  bool collectReceivedImagePair(ImagePair& imagePair, double timeout);
50  int getNumDroppedFrames() const;
51  bool isConnected() const;
52  void disconnect();
53  std::string getRemoteAddress() const;
54  bool tryAccept();
55 
56 private:
57  static constexpr int NUM_BUFFERS = 6;
58  static constexpr int SEND_THREAD_SHORT_WAIT_MS = 1;
59  static constexpr int SEND_THREAD_LONG_WAIT_MS = 10;
60 
61  // The encapsulated image transfer object
62  ImageTransfer imgTrans;
63 
64  // Variable for controlling thread termination
65  volatile bool terminate;
66 
67  // There are two threads, one for sending and one for receiving.
68  // Each has a mutex and condition variable for synchronization.
69  std::thread sendThread;
70  std::mutex sendMutex;
71  std::condition_variable sendCond;
72  std::condition_variable sendWaitCond;
73 
74  std::thread receiveThread;
75  std::timed_mutex receiveMutex;
76  std::condition_variable_any receiveCond;
77  std::condition_variable_any receiveWaitCond;
78 
79  // Objects for exchanging images with the send and receive threads
80  ImagePair receivedPair;
81  std::vector<unsigned char, AlignedAllocator<unsigned char> > receivedData[NUM_BUFFERS];
82  bool newDataReceived;
83 
84  ImagePair sendImagePair;
85  bool sendPairValid;
86  bool deleteSendData;
87 
88  // Exception occurred in one of the threads
89  std::exception_ptr receiveException;
90  std::exception_ptr sendException;
91 
92  bool sendThreadCreated;
93  bool receiveThreadCreated;
94 
95  // Main loop for sending thread
96  void sendLoop();
97 
98  // Main loop for receiving;
99  void receiveLoop();
100 
101  void createSendThread();
102 };
103 
104 /******************** Stubs for all public members ********************/
105 
106 AsyncTransfer::AsyncTransfer(const char* address, const char* service,
107  ImageProtocol::ProtocolType protType, bool server,
108  int bufferSize, int maxUdpPacketSize)
109  : pimpl(new Pimpl(address, service, protType, server, bufferSize, maxUdpPacketSize)) {
110 }
111 
112 AsyncTransfer::AsyncTransfer(const DeviceInfo& device, int bufferSize, int maxUdpPacketSize)
113  : pimpl(new Pimpl(device.getIpAddress().c_str(), "7681", static_cast<ImageProtocol::ProtocolType>(device.getNetworkProtocol()),
114  false, bufferSize, maxUdpPacketSize)) {
115 }
116 
117 AsyncTransfer::~AsyncTransfer() {
118  delete pimpl;
119 }
120 
121 void AsyncTransfer::sendImagePairAsync(const ImagePair& imagePair, bool deleteData) {
122  pimpl->sendImagePairAsync(imagePair, deleteData);
123 }
124 
125 bool AsyncTransfer::collectReceivedImagePair(ImagePair& imagePair, double timeout) {
126  return pimpl->collectReceivedImagePair(imagePair, timeout);
127 }
128 
130  return pimpl->getNumDroppedFrames();
131 }
132 
134  return pimpl->isConnected();
135 }
136 
138  return pimpl->disconnect();
139 }
140 
141 std::string AsyncTransfer::getRemoteAddress() const {
142  return pimpl->getRemoteAddress();
143 }
144 
146  return pimpl->tryAccept();
147 }
148 
149 /******************** Implementation in pimpl class *******************/
150 
151 AsyncTransfer::Pimpl::Pimpl(const char* address, const char* service,
152  ImageProtocol::ProtocolType protType, bool server,
153  int bufferSize, int maxUdpPacketSize)
154  : imgTrans(address, service, protType, server, bufferSize, maxUdpPacketSize),
155  terminate(false), newDataReceived(false), sendPairValid(false),
156  deleteSendData(false), sendThreadCreated(false),
157  receiveThreadCreated(false) {
158 
159  if(server) {
160  createSendThread();
161  }
162 }
163 
164 AsyncTransfer::Pimpl::~Pimpl() {
165  terminate = true;
166 
167  sendCond.notify_all();
168  receiveCond.notify_all();
169  sendWaitCond.notify_all();
170  receiveWaitCond.notify_all();
171 
172  if(sendThreadCreated && sendThread.joinable()) {
173  sendThread.join();
174  }
175 
176  if(receiveThreadCreated && receiveThread.joinable()) {
177  receiveThread.join();
178  }
179 
180  if(sendPairValid && deleteSendData) {
181  delete[] sendImagePair.getPixelData(0);
182  delete[] sendImagePair.getPixelData(1);
183  }
184 }
185 
186 void AsyncTransfer::Pimpl::createSendThread() {
187  if(!sendThreadCreated) {
188  // Lazy initialization of the send thread as it is not always needed
189  unique_lock<mutex> lock(sendMutex);
190  sendThread = thread(bind(&AsyncTransfer::Pimpl::sendLoop, this));
191  sendThreadCreated = true;
192  }
193 }
194 
195 void AsyncTransfer::Pimpl::sendImagePairAsync(const ImagePair& imagePair, bool deleteData) {
196  createSendThread();
197 
198  while(true) {
199  unique_lock<mutex> lock(sendMutex);
200 
201  // Test for errors
202  if(sendException) {
203  std::rethrow_exception(sendException);
204  }
205 
206  if(!sendPairValid) {
207  sendImagePair = imagePair;
208  sendPairValid = true;
209  deleteSendData = deleteData;
210 
211  // Wake up the sender thread
212  sendCond.notify_one();
213 
214  return;
215  } else {
216  // Wait for old data to be processed first
217  sendWaitCond.wait(lock);
218  }
219  }
220 }
221 
222 bool AsyncTransfer::Pimpl::collectReceivedImagePair(ImagePair& imagePair, double timeout) {
223  if(!receiveThreadCreated) {
224  // Lazy initialization of receive thread
225  unique_lock<timed_mutex> lock(receiveMutex);
226  receiveThreadCreated = true;
227  receiveThread = thread(bind(&AsyncTransfer::Pimpl::receiveLoop, this));
228  }
229 
230  // Acquire mutex
231  unique_lock<timed_mutex> lock(receiveMutex, std::defer_lock);
232  if(timeout < 0) {
233  lock.lock();
234  } else {
235  std::chrono::steady_clock::time_point lockStart =
236  std::chrono::steady_clock::now();
237  if(!lock.try_lock_for(std::chrono::microseconds(static_cast<unsigned int>(timeout*1e6)))) {
238  // Timed out
239  return false;
240  }
241 
242  // Update timeout
243  unsigned int lockDuration = static_cast<unsigned int>(std::chrono::duration_cast<std::chrono::microseconds>(
244  std::chrono::steady_clock::now() - lockStart).count());
245  timeout = std::max(0.0, timeout - lockDuration*1e-6);
246  }
247 
248  // Test for errors
249  if(receiveException) {
250  std::rethrow_exception(receiveException);
251  }
252 
253  if(timeout == 0 && !newDataReceived) {
254  // No image has been received and we are not blocking
255  return false;
256  }
257 
258  // If there is no data yet then keep on waiting
259  if(!newDataReceived) {
260  if(timeout < 0) {
261  while(!terminate && !receiveException && !newDataReceived) {
262  receiveCond.wait(lock);
263  }
264  } else {
265  receiveCond.wait_for(lock, std::chrono::microseconds(static_cast<unsigned int>(timeout*1e6)));
266  }
267  }
268 
269  // Test for errors again
270  if(receiveException) {
271  std::rethrow_exception(receiveException);
272  }
273 
274  if(newDataReceived) {
275  // Get the received image
276  imagePair = receivedPair;
277 
278  newDataReceived = false;
279  receiveWaitCond.notify_one();
280 
281  return true;
282  } else {
283  return false;
284  }
285 }
286 
287 void AsyncTransfer::Pimpl::sendLoop() {
288  {
289  // Delay the thread start
290  unique_lock<mutex> lock(sendMutex);
291  }
292 
293  ImagePair pair;
294  bool deletePair = false;
295 
296  try {
297  while(!terminate) {
298  // Wait for next image
299  {
300  unique_lock<mutex> lock(sendMutex);
301  // Wait for next frame to be queued
302  bool firstWait = true;
303  while(!terminate && !sendPairValid) {
304  imgTrans.transferData();
305  sendCond.wait_for(lock, std::chrono::milliseconds(
306  firstWait ? SEND_THREAD_SHORT_WAIT_MS : SEND_THREAD_LONG_WAIT_MS));
307  firstWait = false;
308  }
309  if(!sendPairValid) {
310  continue;
311  }
312 
313  pair = sendImagePair;
314  deletePair = deleteSendData;
315  sendPairValid = false;
316 
317  sendWaitCond.notify_one();
318  }
319 
320  if(!terminate) {
321  imgTrans.setTransferImagePair(pair);
322  imgTrans.transferData();
323  }
324 
325  if(deletePair) {
326  delete[] pair.getPixelData(0);
327  delete[] pair.getPixelData(1);
328  deletePair = false;
329  }
330  }
331  } catch(...) {
332  // Store the exception for later
333  if(!sendException) {
334  sendException = std::current_exception();
335  }
336  sendWaitCond.notify_all();
337 
338  // Don't forget to free the memory
339  if(deletePair) {
340  delete[] pair.getPixelData(0);
341  delete[] pair.getPixelData(1);
342  deletePair = false;
343  }
344  }
345 }
346 
347 void AsyncTransfer::Pimpl::receiveLoop() {
348  {
349  // Delay the thread start
350  unique_lock<timed_mutex> lock(receiveMutex);
351  }
352 
353  try {
354  ImagePair currentPair;
355  int bufferIndex = 0;
356 
357  while(!terminate) {
358  // Receive new image
359  if(!imgTrans.receiveImagePair(currentPair)) {
360  // No image available
361  continue;
362  }
363 
364  // Copy the pixel data
365  for(int i=0;i<2;i++) {
366  int bytesPerPixel = currentPair.getBytesPerPixel(i);
367  int newStride = currentPair.getWidth() * bytesPerPixel;
368  int totalSize = currentPair.getHeight() * newStride;
369  if(static_cast<int>(receivedData[i + bufferIndex].size()) < totalSize) {
370  receivedData[i + bufferIndex].resize(totalSize);
371  }
372  if(newStride == currentPair.getRowStride(i)) {
373  memcpy(&receivedData[i + bufferIndex][0], currentPair.getPixelData(i),
374  newStride*currentPair.getHeight());
375  } else {
376  for(int y = 0; y<currentPair.getHeight(); y++) {
377  memcpy(&receivedData[i + bufferIndex][y*newStride],
378  &currentPair.getPixelData(i)[y*currentPair.getRowStride(i)],
379  newStride);
380  }
381  currentPair.setRowStride(i, newStride);
382  }
383  currentPair.setPixelData(i, &receivedData[i + bufferIndex][0]);
384  }
385 
386  {
387  unique_lock<timed_mutex> lock(receiveMutex);
388 
389  // Wait for previously received data to be processed
390  while(newDataReceived) {
391  receiveWaitCond.wait_for(lock, std::chrono::milliseconds(100));
392  if(terminate) {
393  return;
394  }
395  }
396 
397  // Notify that a new image pair has been received
398  newDataReceived = true;
399  receivedPair = currentPair;
400  receiveCond.notify_one();
401  }
402 
403  // Increment index for data buffers
404  bufferIndex = (bufferIndex + 2) % NUM_BUFFERS;
405  }
406  } catch(...) {
407  // Store the exception for later
408  if(!receiveException) {
409  receiveException = std::current_exception();
410  }
411  receiveCond.notify_all();
412  }
413 }
414 
415 bool AsyncTransfer::Pimpl::isConnected() const {
416  return imgTrans.isConnected();
417 }
418 
419 void AsyncTransfer::Pimpl::disconnect() {
420  imgTrans.disconnect();
421 }
422 
423 std::string AsyncTransfer::Pimpl::getRemoteAddress() const {
424  return imgTrans.getRemoteAddress();
425 }
426 
427 int AsyncTransfer::Pimpl::getNumDroppedFrames() const {
428  return imgTrans.getNumDroppedFrames();
429 }
430 
431 bool AsyncTransfer::Pimpl::tryAccept() {
432  return imgTrans.tryAccept();
433 }
void disconnect()
Terminates the current connection.
A lightweight protocol for transferring image pairs.
Definition: imageprotocol.h:36
int getHeight() const
Returns the height of each image.
Definition: imagepair.h:178
bool tryAccept()
Tries to accept a client connection.
AsyncTransfer(const char *address, const char *service="7681", ImageProtocol::ProtocolType protType=ImageProtocol::PROTOCOL_UDP, bool server=false, int bufferSize=1048576, int maxUdpPacketSize=1472)
Creates a new transfer object.
int getRowStride(int imageNumber) const
Returns the row stride for the pixel data of one image.
Definition: imagepair.h:186
unsigned char * getPixelData(int imageNumber) const
Returns the pixel data for the given image.
Definition: imagepair.h:208
void sendImagePairAsync(const ImagePair &imagePair, bool deleteData=false)
Starts an asynchronous transmission of the given image pair.
void setRowStride(int imageNumber, int stride)
Sets a new row stride for the pixel data of one image.
Definition: imagepair.h:86
std::string getRemoteAddress() const
Returns the address of the remote host.
Class for synchronous transfer of image pairs.
Definition: imagetransfer.h:34
void setPixelData(int imageNumber, unsigned char *pixelData)
Sets the pixel data for the given image.
Definition: imagepair.h:116
ProtocolType
Supported network protocols.
Definition: imageprotocol.h:39
int getWidth() const
Returns the width of each image.
Definition: imagepair.h:173
A set of two images, which are usually the left camera image and the disparity map.
Definition: imagepair.h:31
int getNumDroppedFrames() const
Returns the number of frames that have been dropped since connecting to the current remote host...
Aggregates information about a discovered device.
Definition: deviceinfo.h:23
bool isConnected() const
Returns true if a remote connection is established.
bool collectReceivedImagePair(ImagePair &imagePair, double timeout=-1)
Collects the asynchronously received image.
int getBytesPerPixel(int imageNumber) const
Returns the number of bytes that are required to store one image pixel.
Definition: imagepair.h:277
Nerian Vision Technologies