libvisiontransfer  6.4.0
asynctransfer.cpp
1 /*******************************************************************************
2  * Copyright (c) 2019 Nerian Vision GmbH
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *******************************************************************************/
14 
15 #if __GNUC__ == 4 && __GNUC_MINOR__ < 9
16 // This is a very ugly workaround for GCC bug 54562. If omitted,
17 // passing timeouts to collectReceivedImage() is broken.
18 #include <bits/c++config.h>
19 #undef _GLIBCXX_USE_CLOCK_MONOTONIC
20 #endif
21 
22 #include <iostream>
23 #include <functional>
24 #include <stdexcept>
25 #include <thread>
26 #include <condition_variable>
27 #include <chrono>
28 #include <mutex>
29 #include <vector>
30 #include <cstring>
31 #include <algorithm>
32 #include "visiontransfer/asynctransfer.h"
33 #include "visiontransfer/alignedallocator.h"
34 
35 using namespace std;
36 using namespace visiontransfer;
37 using namespace visiontransfer::internal;
38 
39 /*************** Pimpl class containing all private members ***********/
40 
41 class AsyncTransfer::Pimpl {
42 public:
43  Pimpl(const char* address, const char* service,
44  ImageProtocol::ProtocolType protType, bool server,
45  int bufferSize, int maxUdpPacketSize);
46  ~Pimpl();
47 
48  // Redeclaration of public members
49  void sendImagePairAsync(const ImagePair& imagePair, bool deleteData);
50  bool collectReceivedImagePair(ImagePair& imagePair, double timeout);
51  int getNumDroppedFrames() const;
52  bool isConnected() const;
53  void disconnect();
54  std::string getRemoteAddress() const;
55  bool tryAccept();
56 
57 private:
58  static constexpr int NUM_BUFFERS = 6;
59  static constexpr int SEND_THREAD_SHORT_WAIT_MS = 1;
60  static constexpr int SEND_THREAD_LONG_WAIT_MS = 10;
61 
62  // The encapsulated image transfer object
63  ImageTransfer imgTrans;
64 
65  // Variable for controlling thread termination
66  volatile bool terminate;
67 
68  // There are two threads, one for sending and one for receiving.
69  // Each has a mutex and condition variable for synchronization.
70  std::thread sendThread;
71  std::mutex sendMutex;
72  std::condition_variable sendCond;
73  std::condition_variable sendWaitCond;
74 
75  std::thread receiveThread;
76  std::timed_mutex receiveMutex;
77  std::condition_variable_any receiveCond;
78  std::condition_variable_any receiveWaitCond;
79 
80  // Objects for exchanging images with the send and receive threads
81  ImagePair receivedPair;
82  std::vector<unsigned char, AlignedAllocator<unsigned char> > receivedData[NUM_BUFFERS];
83  bool newDataReceived;
84 
85  ImagePair sendImagePair;
86  bool sendPairValid;
87  bool deleteSendData;
88 
89  // Exception occurred in one of the threads
90  std::exception_ptr receiveException;
91  std::exception_ptr sendException;
92 
93  bool sendThreadCreated;
94  bool receiveThreadCreated;
95 
96  // Main loop for sending thread
97  void sendLoop();
98 
99  // Main loop for receiving;
100  void receiveLoop();
101 
102  void createSendThread();
103 };
104 
105 /******************** Stubs for all public members ********************/
106 
107 AsyncTransfer::AsyncTransfer(const char* address, const char* service,
108  ImageProtocol::ProtocolType protType, bool server,
109  int bufferSize, int maxUdpPacketSize)
110  : pimpl(new Pimpl(address, service, protType, server, bufferSize, maxUdpPacketSize)) {
111 }
112 
113 AsyncTransfer::AsyncTransfer(const DeviceInfo& device, int bufferSize, int maxUdpPacketSize)
114  : pimpl(new Pimpl(device.getIpAddress().c_str(), "7681", static_cast<ImageProtocol::ProtocolType>(device.getNetworkProtocol()),
115  false, bufferSize, maxUdpPacketSize)) {
116 }
117 
118 AsyncTransfer::~AsyncTransfer() {
119  delete pimpl;
120 }
121 
122 void AsyncTransfer::sendImagePairAsync(const ImagePair& imagePair, bool deleteData) {
123  pimpl->sendImagePairAsync(imagePair, deleteData);
124 }
125 
126 bool AsyncTransfer::collectReceivedImagePair(ImagePair& imagePair, double timeout) {
127  return pimpl->collectReceivedImagePair(imagePair, timeout);
128 }
129 
131  return pimpl->getNumDroppedFrames();
132 }
133 
135  return pimpl->isConnected();
136 }
137 
139  return pimpl->disconnect();
140 }
141 
142 std::string AsyncTransfer::getRemoteAddress() const {
143  return pimpl->getRemoteAddress();
144 }
145 
147  return pimpl->tryAccept();
148 }
149 
150 /******************** Implementation in pimpl class *******************/
151 
152 AsyncTransfer::Pimpl::Pimpl(const char* address, const char* service,
153  ImageProtocol::ProtocolType protType, bool server,
154  int bufferSize, int maxUdpPacketSize)
155  : imgTrans(address, service, protType, server, bufferSize, maxUdpPacketSize),
156  terminate(false), newDataReceived(false), sendPairValid(false),
157  deleteSendData(false), sendThreadCreated(false),
158  receiveThreadCreated(false) {
159 
160  if(server) {
161  createSendThread();
162  }
163 }
164 
165 AsyncTransfer::Pimpl::~Pimpl() {
166  terminate = true;
167 
168  sendCond.notify_all();
169  receiveCond.notify_all();
170  sendWaitCond.notify_all();
171  receiveWaitCond.notify_all();
172 
173  if(sendThreadCreated && sendThread.joinable()) {
174  sendThread.join();
175  }
176 
177  if(receiveThreadCreated && receiveThread.joinable()) {
178  receiveThread.join();
179  }
180 
181  if(sendPairValid && deleteSendData) {
182  delete[] sendImagePair.getPixelData(0);
183  delete[] sendImagePair.getPixelData(1);
184  }
185 }
186 
187 void AsyncTransfer::Pimpl::createSendThread() {
188  if(!sendThreadCreated) {
189  // Lazy initialization of the send thread as it is not always needed
190  unique_lock<mutex> lock(sendMutex);
191  sendThread = thread(bind(&AsyncTransfer::Pimpl::sendLoop, this));
192  sendThreadCreated = true;
193  }
194 }
195 
196 void AsyncTransfer::Pimpl::sendImagePairAsync(const ImagePair& imagePair, bool deleteData) {
197  createSendThread();
198 
199  while(true) {
200  unique_lock<mutex> lock(sendMutex);
201 
202  // Test for errors
203  if(sendException) {
204  std::rethrow_exception(sendException);
205  }
206 
207  if(!sendPairValid) {
208  sendImagePair = imagePair;
209  sendPairValid = true;
210  deleteSendData = deleteData;
211 
212  // Wake up the sender thread
213  sendCond.notify_one();
214 
215  return;
216  } else {
217  // Wait for old data to be processed first
218  sendWaitCond.wait(lock);
219  }
220  }
221 }
222 
223 bool AsyncTransfer::Pimpl::collectReceivedImagePair(ImagePair& imagePair, double timeout) {
224  if(!receiveThreadCreated) {
225  // Lazy initialization of receive thread
226  unique_lock<timed_mutex> lock(receiveMutex);
227  receiveThreadCreated = true;
228  receiveThread = thread(bind(&AsyncTransfer::Pimpl::receiveLoop, this));
229  }
230 
231  // Acquire mutex
232  unique_lock<timed_mutex> lock(receiveMutex, std::defer_lock);
233  if(timeout < 0) {
234  lock.lock();
235  } else {
236  std::chrono::steady_clock::time_point lockStart =
237  std::chrono::steady_clock::now();
238  if(!lock.try_lock_for(std::chrono::microseconds(static_cast<unsigned int>(timeout*1e6)))) {
239  // Timed out
240  return false;
241  }
242 
243  // Update timeout
244  unsigned int lockDuration = static_cast<unsigned int>(std::chrono::duration_cast<std::chrono::microseconds>(
245  std::chrono::steady_clock::now() - lockStart).count());
246  timeout = std::max(0.0, timeout - lockDuration*1e-6);
247  }
248 
249  // Test for errors
250  if(receiveException) {
251  std::rethrow_exception(receiveException);
252  }
253 
254  if(timeout == 0 && !newDataReceived) {
255  // No image has been received and we are not blocking
256  return false;
257  }
258 
259  // If there is no data yet then keep on waiting
260  if(!newDataReceived) {
261  if(timeout < 0) {
262  while(!terminate && !receiveException && !newDataReceived) {
263  receiveCond.wait(lock);
264  }
265  } else {
266  receiveCond.wait_for(lock, std::chrono::microseconds(static_cast<unsigned int>(timeout*1e6)));
267  }
268  }
269 
270  // Test for errors again
271  if(receiveException) {
272  std::rethrow_exception(receiveException);
273  }
274 
275  if(newDataReceived) {
276  // Get the received image
277  imagePair = receivedPair;
278 
279  newDataReceived = false;
280  receiveWaitCond.notify_one();
281 
282  return true;
283  } else {
284  return false;
285  }
286 }
287 
288 void AsyncTransfer::Pimpl::sendLoop() {
289  {
290  // Delay the thread start
291  unique_lock<mutex> lock(sendMutex);
292  }
293 
294  ImagePair pair;
295  bool deletePair = false;
296 
297  try {
298  while(!terminate) {
299  // Wait for next image
300  {
301  unique_lock<mutex> lock(sendMutex);
302  // Wait for next frame to be queued
303  bool firstWait = true;
304  while(!terminate && !sendPairValid) {
305  imgTrans.transferData();
306  sendCond.wait_for(lock, std::chrono::milliseconds(
307  firstWait ? SEND_THREAD_SHORT_WAIT_MS : SEND_THREAD_LONG_WAIT_MS));
308  firstWait = false;
309  }
310  if(!sendPairValid) {
311  continue;
312  }
313 
314  pair = sendImagePair;
315  deletePair = deleteSendData;
316  sendPairValid = false;
317 
318  sendWaitCond.notify_one();
319  }
320 
321  if(!terminate) {
322  imgTrans.setTransferImagePair(pair);
323  imgTrans.transferData();
324  }
325 
326  if(deletePair) {
327  delete[] pair.getPixelData(0);
328  delete[] pair.getPixelData(1);
329  deletePair = false;
330  }
331  }
332  } catch(...) {
333  // Store the exception for later
334  if(!sendException) {
335  sendException = std::current_exception();
336  }
337  sendWaitCond.notify_all();
338 
339  // Don't forget to free the memory
340  if(deletePair) {
341  delete[] pair.getPixelData(0);
342  delete[] pair.getPixelData(1);
343  deletePair = false;
344  }
345  }
346 }
347 
348 void AsyncTransfer::Pimpl::receiveLoop() {
349  {
350  // Delay the thread start
351  unique_lock<timed_mutex> lock(receiveMutex);
352  }
353 
354  try {
355  ImagePair currentPair;
356  int bufferIndex = 0;
357 
358  while(!terminate) {
359  // Receive new image
360  if(!imgTrans.receiveImagePair(currentPair)) {
361  // No image available
362  continue;
363  }
364 
365  // Copy the pixel data
366  for(int i=0;i<2;i++) {
367  int bytesPerPixel = currentPair.getBytesPerPixel(i);
368  int newStride = currentPair.getWidth() * bytesPerPixel;
369  int totalSize = currentPair.getHeight() * newStride;
370  if(static_cast<int>(receivedData[i + bufferIndex].size()) < totalSize) {
371  receivedData[i + bufferIndex].resize(totalSize);
372  }
373  if(newStride == currentPair.getRowStride(i)) {
374  memcpy(&receivedData[i + bufferIndex][0], currentPair.getPixelData(i),
375  newStride*currentPair.getHeight());
376  } else {
377  for(int y = 0; y<currentPair.getHeight(); y++) {
378  memcpy(&receivedData[i + bufferIndex][y*newStride],
379  &currentPair.getPixelData(i)[y*currentPair.getRowStride(i)],
380  newStride);
381  }
382  currentPair.setRowStride(i, newStride);
383  }
384  currentPair.setPixelData(i, &receivedData[i + bufferIndex][0]);
385  }
386 
387  {
388  unique_lock<timed_mutex> lock(receiveMutex);
389 
390  // Wait for previously received data to be processed
391  while(newDataReceived) {
392  receiveWaitCond.wait_for(lock, std::chrono::milliseconds(100));
393  if(terminate) {
394  return;
395  }
396  }
397 
398  // Notify that a new image pair has been received
399  newDataReceived = true;
400  receivedPair = currentPair;
401  receiveCond.notify_one();
402  }
403 
404  // Increment index for data buffers
405  bufferIndex = (bufferIndex + 2) % NUM_BUFFERS;
406  }
407  } catch(...) {
408  // Store the exception for later
409  if(!receiveException) {
410  receiveException = std::current_exception();
411  }
412  receiveCond.notify_all();
413  }
414 }
415 
416 bool AsyncTransfer::Pimpl::isConnected() const {
417  return imgTrans.isConnected();
418 }
419 
420 void AsyncTransfer::Pimpl::disconnect() {
421  imgTrans.disconnect();
422 }
423 
424 std::string AsyncTransfer::Pimpl::getRemoteAddress() const {
425  return imgTrans.getRemoteAddress();
426 }
427 
428 int AsyncTransfer::Pimpl::getNumDroppedFrames() const {
429  return imgTrans.getNumDroppedFrames();
430 }
431 
432 bool AsyncTransfer::Pimpl::tryAccept() {
433  return imgTrans.tryAccept();
434 }
int getHeight() const
Returns the height of each image.
Definition: imagepair.h:180
void disconnect()
Terminates the current connection.
void sendImagePairAsync(const ImagePair &imagePair, bool deleteData=false)
Starts an asynchronous transmission of the given image pair.
bool collectReceivedImagePair(ImagePair &imagePair, double timeout=-1)
Collects the asynchronously received image.
bool tryAccept()
Tries to accept a client connection.
std::string getRemoteAddress() const
Returns the address of the remote host.
Class for synchronous transfer of image pairs.
Definition: imagetransfer.h:36
AsyncTransfer(const char *address, const char *service="7681", ImageProtocol::ProtocolType protType=ImageProtocol::PROTOCOL_UDP, bool server=false, int bufferSize=1048576, int maxUdpPacketSize=1472)
Creates a new transfer object.
bool isConnected() const
Returns true if a remote connection is established.
A lightweight protocol for transferring image pairs.
Definition: imageprotocol.h:38
void setPixelData(int imageNumber, unsigned char *pixelData)
Sets the pixel data for the given image.
Definition: imagepair.h:118
void setRowStride(int imageNumber, int stride)
Sets a new row stride for the pixel data of one image.
Definition: imagepair.h:88
int getNumDroppedFrames() const
Returns the number of frames that have been dropped since connecting to the current remote host...
int getRowStride(int imageNumber) const
Returns the row stride for the pixel data of one image.
Definition: imagepair.h:188
ProtocolType
Supported network protocols.
Definition: imageprotocol.h:41
int getWidth() const
Returns the width of each image.
Definition: imagepair.h:175
Aggregates information about a discovered device.
Definition: deviceinfo.h:25
unsigned char * getPixelData(int imageNumber) const
Returns the pixel data for the given image.
Definition: imagepair.h:210
A set of two images, which are usually the left camera image and the disparity map.
Definition: imagepair.h:33
int getBytesPerPixel(int imageNumber) const
Returns the number of bytes that are required to store one image pixel.
Definition: imagepair.h:279
Nerian Vision Technologies