15 #if __GNUC__ == 4 && __GNUC_MINOR__ < 9
18 #include <bits/c++config.h>
19 #undef _GLIBCXX_USE_CLOCK_MONOTONIC
26 #include <condition_variable>
32 #include "visiontransfer/asynctransfer.h"
33 #include "visiontransfer/alignedallocator.h"
36 using namespace visiontransfer;
37 using namespace visiontransfer::internal;
39 namespace visiontransfer {
43 class AsyncTransfer::Pimpl {
45 Pimpl(
const char* address,
const char* service,
47 int bufferSize,
int maxUdpPacketSize);
51 void sendImageSetAsync(
const ImageSet& imageSet,
bool deleteData);
52 bool collectReceivedImageSet(
ImageSet& imageSet,
double timeout);
53 int getNumDroppedFrames()
const;
54 bool isConnected()
const;
56 std::string getRemoteAddress()
const;
60 static constexpr
int NUM_BUFFERS = ImageSet::MAX_SUPPORTED_IMAGES * 3;
61 static constexpr
int SEND_THREAD_SHORT_WAIT_MS = 1;
62 static constexpr
int SEND_THREAD_LONG_WAIT_MS = 10;
68 volatile bool terminate;
72 std::thread sendThread;
74 std::condition_variable sendCond;
75 std::condition_variable sendWaitCond;
77 std::thread receiveThread;
78 std::timed_mutex receiveMutex;
79 std::condition_variable_any receiveCond;
83 std::vector<unsigned char, AlignedAllocator<unsigned char> > receivedData[NUM_BUFFERS];
84 volatile int receiveBufferIndex;
85 volatile bool newDataReceived;
92 std::exception_ptr receiveException;
93 std::exception_ptr sendException;
95 bool sendThreadCreated;
96 bool receiveThreadCreated;
100 int uncollectedDroppedFrames;
108 void createSendThread();
113 AsyncTransfer::AsyncTransfer(
const char* address,
const char* service,
115 int bufferSize,
int maxUdpPacketSize)
116 : pimpl(new Pimpl(address, service, protType, server, bufferSize, maxUdpPacketSize)) {
120 : pimpl(new Pimpl(device.getIpAddress().c_str(),
"7681", static_cast<
ImageProtocol::ProtocolType>(device.getNetworkProtocol()),
121 false, bufferSize, maxUdpPacketSize)) {
124 AsyncTransfer::~AsyncTransfer() {
129 pimpl->sendImageSetAsync(imageSet, deleteData);
133 return pimpl->collectReceivedImageSet(imageSet, timeout);
137 return pimpl->getNumDroppedFrames();
141 return pimpl->isConnected();
145 return pimpl->disconnect();
149 return pimpl->getRemoteAddress();
153 return pimpl->tryAccept();
158 AsyncTransfer::Pimpl::Pimpl(
const char* address,
const char* service,
160 int bufferSize,
int maxUdpPacketSize)
161 : imgTrans(address, service, protType, server, bufferSize, maxUdpPacketSize),
162 terminate(false), receiveBufferIndex(0), newDataReceived(false), sendSetValid(false),
163 deleteSendData(false), sendThreadCreated(false),
164 receiveThreadCreated(false), uncollectedDroppedFrames(-1) {
171 AsyncTransfer::Pimpl::~Pimpl() {
174 sendCond.notify_all();
175 receiveCond.notify_all();
176 sendWaitCond.notify_all();
178 if(sendThreadCreated && sendThread.joinable()) {
182 if(receiveThreadCreated && receiveThread.joinable()) {
183 receiveThread.join();
186 if(sendSetValid && deleteSendData) {
187 delete[] sendImageSet.getPixelData(0);
188 delete[] sendImageSet.getPixelData(1);
192 void AsyncTransfer::Pimpl::createSendThread() {
193 if(!sendThreadCreated) {
195 unique_lock<mutex> lock(sendMutex);
196 sendThread = thread(bind(&AsyncTransfer::Pimpl::sendLoop,
this));
197 sendThreadCreated =
true;
201 void AsyncTransfer::Pimpl::sendImageSetAsync(
const ImageSet& imageSet,
bool deleteData) {
205 unique_lock<mutex> lock(sendMutex);
209 std::rethrow_exception(sendException);
213 sendImageSet = imageSet;
215 deleteSendData = deleteData;
218 sendCond.notify_one();
223 sendWaitCond.wait(lock);
228 bool AsyncTransfer::Pimpl::collectReceivedImageSet(
ImageSet& imageSet,
double timeout) {
229 if(!receiveThreadCreated) {
231 unique_lock<timed_mutex> lock(receiveMutex);
232 receiveThreadCreated =
true;
233 receiveThread = thread(bind(&AsyncTransfer::Pimpl::receiveLoop,
this));
237 unique_lock<timed_mutex> lock(receiveMutex, std::defer_lock);
241 std::chrono::steady_clock::time_point lockStart =
242 std::chrono::steady_clock::now();
243 if(!lock.try_lock_for(std::chrono::microseconds(
static_cast<unsigned int>(timeout*1e6)))) {
249 unsigned int lockDuration =
static_cast<unsigned int>(std::chrono::duration_cast<std::chrono::microseconds>(
250 std::chrono::steady_clock::now() - lockStart).count());
251 timeout = std::max(0.0, timeout - lockDuration*1e-6);
255 if(receiveException) {
256 std::rethrow_exception(receiveException);
259 if(timeout == 0 && !newDataReceived) {
265 if(!newDataReceived) {
267 while(!terminate && !receiveException && !newDataReceived) {
268 receiveCond.wait(lock);
271 receiveCond.wait_for(lock, std::chrono::microseconds(
static_cast<unsigned int>(timeout*1e6)));
276 if(receiveException) {
277 std::rethrow_exception(receiveException);
280 if(newDataReceived) {
282 imageSet = receivedSet;
284 newDataReceived =
false;
287 receiveBufferIndex = (receiveBufferIndex + receivedSet.getNumberOfImages()) % NUM_BUFFERS;
290 if (uncollectedDroppedFrames < 0) uncollectedDroppedFrames = 0;
298 void AsyncTransfer::Pimpl::sendLoop() {
301 unique_lock<mutex> lock(sendMutex);
305 bool deleteSet =
false;
311 unique_lock<mutex> lock(sendMutex);
313 bool firstWait =
true;
314 while(!terminate && !sendSetValid) {
315 imgTrans.transferData();
316 sendCond.wait_for(lock, std::chrono::milliseconds(
317 firstWait ? SEND_THREAD_SHORT_WAIT_MS : SEND_THREAD_LONG_WAIT_MS));
324 imgSet = sendImageSet;
325 deleteSet = deleteSendData;
326 sendSetValid =
false;
328 sendWaitCond.notify_one();
331 imgTrans.setTransferImageSet(imgSet);
337 std::this_thread::sleep_for(std::chrono::milliseconds(SEND_THREAD_LONG_WAIT_MS));
350 sendException = std::current_exception();
352 sendWaitCond.notify_all();
364 void AsyncTransfer::Pimpl::receiveLoop() {
367 unique_lock<timed_mutex> lock(receiveMutex);
375 bool newImageSetArrived = imgTrans.receiveImageSet(currentSet);
377 if (newImageSetArrived) {
378 unique_lock<timed_mutex> lock(receiveMutex);
379 if (newDataReceived) {
381 if (uncollectedDroppedFrames > -1) uncollectedDroppedFrames++;
386 int newStride = currentSet.
getWidth() * bytesPerPixel;
387 int totalSize = currentSet.
getHeight() * newStride;
388 int bufIdxHere = (i + receiveBufferIndex) % NUM_BUFFERS;
389 if(
static_cast<int>(receivedData[bufIdxHere].size()) < totalSize) {
390 receivedData[bufIdxHere].resize(totalSize);
393 memcpy(&receivedData[bufIdxHere][0], currentSet.
getPixelData(i),
396 for(
int y = 0; y<currentSet.
getHeight(); y++) {
397 memcpy(&receivedData[bufIdxHere][y*newStride],
403 currentSet.
setPixelData(i, &receivedData[bufIdxHere][0]);
408 newDataReceived =
true;
409 receivedSet = currentSet;
410 receiveCond.notify_one();
416 if(!receiveException) {
417 receiveException = std::current_exception();
419 receiveCond.notify_all();
423 bool AsyncTransfer::Pimpl::isConnected()
const {
424 return imgTrans.isConnected();
427 void AsyncTransfer::Pimpl::disconnect() {
428 imgTrans.disconnect();
431 std::string AsyncTransfer::Pimpl::getRemoteAddress()
const {
432 return imgTrans.getRemoteAddress();
435 int AsyncTransfer::Pimpl::getNumDroppedFrames()
const {
436 return imgTrans.getNumDroppedFrames() + uncollectedDroppedFrames;
439 bool AsyncTransfer::Pimpl::tryAccept() {
440 return imgTrans.tryAccept();
443 constexpr
int AsyncTransfer::Pimpl::NUM_BUFFERS;
444 constexpr
int AsyncTransfer::Pimpl::SEND_THREAD_SHORT_WAIT_MS;
445 constexpr
int AsyncTransfer::Pimpl::SEND_THREAD_LONG_WAIT_MS;