libvisiontransfer  6.4.0
imagetransfer.cpp
1 /*******************************************************************************
2  * Copyright (c) 2019 Nerian Vision GmbH
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *******************************************************************************/
14 
15 #include <cstdio>
16 #include <iostream>
17 #include <cstring>
18 #include <memory>
19 #include <string>
20 #include <vector>
21 #include <mutex>
22 #include "visiontransfer/imagetransfer.h"
23 #include "visiontransfer/exceptions.h"
24 #include "visiontransfer/datablockprotocol.h"
25 #include "visiontransfer/networking.h"
26 
27 using namespace std;
28 using namespace visiontransfer;
29 using namespace visiontransfer::internal;
30 
31 /*************** Pimpl class containing all private members ***********/
32 
33 class ImageTransfer::Pimpl {
34 public:
35  Pimpl(const char* address, const char* service, ImageProtocol::ProtocolType protType,
36  bool server, int bufferSize, int maxUdpPacketSize);
37  ~Pimpl();
38 
39  // Redeclaration of public members
40  void setRawTransferData(const ImagePair& metaData, unsigned char* rawData,
41  int secondTileWidth = 0, int validBytes = 0x7FFFFFFF);
42  void setRawValidBytes(int validBytes);
43  void setTransferImagePair(const ImagePair& imagePair);
44  TransferStatus transferData();
45  bool receiveImagePair(ImagePair& imagePair);
46  bool receivePartialImagePair(ImagePair& imagePair, int& validRows, bool& complete);
47  int getNumDroppedFrames() const;
48  bool isConnected() const;
49  void disconnect();
50  std::string getRemoteAddress() const;
51  bool tryAccept();
52 
53 private:
54  // Configuration parameters
56  bool isServer;
57  int bufferSize;
58  int maxUdpPacketSize;
59 
60  // Thread synchronization
61  std::recursive_mutex receiveMutex;
62  std::recursive_mutex sendMutex;
63 
64  // Transfer related members
65  SOCKET clientSocket;
66  SOCKET tcpServerSocket;
67  sockaddr_in remoteAddress;
68 
69  // Object for encoding and decoding the network protocol
70  std::unique_ptr<ImageProtocol> protocol;
71 
72  // Outstanding network message that still has to be transferred
73  int currentMsgLen;
74  int currentMsgOffset;
75  const unsigned char* currentMsg;
76 
77  // Socket configuration
78  void setSocketOptions();
79 
80  // Network socket initialization
81  void initTcpServer(const addrinfo* addressInfo);
82  void initTcpClient(const addrinfo* addressInfo);
83  void initUdp(const addrinfo* addressInfo);
84 
85  // Data reception
86  bool receiveNetworkData(bool block);
87 
88  // Data transmission
89  bool sendNetworkMessage(const unsigned char* msg, int length);
90  void sendPendingControlMessages();
91 
92  bool selectSocket(bool read, bool wait);
93 };
94 
95 /******************** Stubs for all public members ********************/
96 
97 ImageTransfer::ImageTransfer(const char* address, const char* service,
98  ImageProtocol::ProtocolType protType, bool server, int bufferSize, int maxUdpPacketSize):
99  pimpl(new Pimpl(address, service, protType, server, bufferSize, maxUdpPacketSize)) {
100  // All initialization in the pimpl class
101 }
102 
103 ImageTransfer::ImageTransfer(const DeviceInfo& device, int bufferSize, int maxUdpPacketSize):
104  pimpl(new Pimpl(device.getIpAddress().c_str(), "7681", static_cast<ImageProtocol::ProtocolType>(device.getNetworkProtocol()),
105  false, bufferSize, maxUdpPacketSize)) {
106  // All initialization in the pimpl class
107 }
108 
109 ImageTransfer::~ImageTransfer() {
110  delete pimpl;
111 }
112 
113 void ImageTransfer::setRawTransferData(const ImagePair& metaData, unsigned char* rawData,
114  int secondTileWidth, int validBytes) {
115  pimpl->setRawTransferData(metaData, rawData, secondTileWidth, validBytes);
116 }
117 
118 void ImageTransfer::setRawValidBytes(int validBytes) {
119  pimpl->setRawValidBytes(validBytes);
120 }
121 
123  pimpl->setTransferImagePair(imagePair);
124 }
125 
127  return pimpl->transferData();
128 }
129 
131  return pimpl->receiveImagePair(imagePair);
132 }
133 
134 bool ImageTransfer::receivePartialImagePair(ImagePair& imagePair, int& validRows, bool& complete) {
135  return pimpl->receivePartialImagePair(imagePair, validRows, complete);
136 }
137 
139  return pimpl->getNumDroppedFrames();
140 }
141 
143  return pimpl->isConnected();
144 }
145 
147  pimpl->disconnect();
148 }
149 
150 std::string ImageTransfer::getRemoteAddress() const {
151  return pimpl->getRemoteAddress();
152 }
153 
155  return pimpl->tryAccept();
156 }
157 
158 /******************** Implementation in pimpl class *******************/
159 ImageTransfer::Pimpl::Pimpl(const char* address, const char* service,
160  ImageProtocol::ProtocolType protType, bool server, int
161  bufferSize, int maxUdpPacketSize)
162  : protType(protType), isServer(server), bufferSize(bufferSize),
163  maxUdpPacketSize(maxUdpPacketSize),
164  clientSocket(INVALID_SOCKET), tcpServerSocket(INVALID_SOCKET),
165  currentMsgLen(0), currentMsgOffset(0), currentMsg(nullptr) {
166 
167  Networking::initNetworking();
168 #ifndef _WIN32
169  // We don't want to be interrupted by the pipe signal
170  signal(SIGPIPE, SIG_IGN);
171 #endif
172 
173  memset(&remoteAddress, 0, sizeof(remoteAddress));
174 
175  // If address is null we use the any address
176  if(address == nullptr || string(address) == "") {
177  address = "0.0.0.0";
178  }
179 
180  addrinfo* addressInfo = Networking::resolveAddress(address, service);
181 
182  try {
183  if(protType == ImageProtocol::PROTOCOL_UDP) {
184  initUdp(addressInfo);
185  } else if(protType == ImageProtocol::PROTOCOL_TCP && isServer) {
186  initTcpServer(addressInfo);
187  } else {
188  initTcpClient(addressInfo);
189  }
190  } catch(...) {
191  freeaddrinfo(addressInfo);
192  throw;
193  }
194 
195  if(addressInfo != nullptr) {
196  freeaddrinfo(addressInfo);
197  }
198 }
199 
200 ImageTransfer::Pimpl::~Pimpl() {
201  if(clientSocket != INVALID_SOCKET) {
202  Networking::closeSocket(clientSocket);
203  }
204  if(tcpServerSocket != INVALID_SOCKET) {
205  Networking::closeSocket(tcpServerSocket);
206  }
207 }
208 
209 void ImageTransfer::Pimpl::initTcpClient(const addrinfo* addressInfo) {
210  protocol.reset(new ImageProtocol(isServer, ImageProtocol::PROTOCOL_TCP));
211  clientSocket = Networking::connectTcpSocket(addressInfo);
212  memcpy(&remoteAddress, addressInfo->ai_addr, sizeof(remoteAddress));
213 
214  // Set special socket options
215  setSocketOptions();
216 }
217 
218 void ImageTransfer::Pimpl::initTcpServer(const addrinfo* addressInfo) {
219  protocol.reset(new ImageProtocol(isServer, ImageProtocol::PROTOCOL_TCP));
220 
221  // Create socket
222  tcpServerSocket = ::socket(addressInfo->ai_family, addressInfo->ai_socktype,
223  addressInfo->ai_protocol);
224  if (tcpServerSocket == INVALID_SOCKET) {
225  TransferException ex("Error opening socket: " + string(strerror(errno)));
226  throw ex;
227  }
228 
229  // Enable reuse address
230  Networking::enableReuseAddress(tcpServerSocket, true);
231 
232  // Open a server port
233  Networking::bindSocket(tcpServerSocket, addressInfo);
234  clientSocket = INVALID_SOCKET;
235 
236  // Make the server socket non-blocking
237  Networking::setSocketBlocking(tcpServerSocket, false);
238 
239  // Listen on port
240  listen(tcpServerSocket, 1);
241 }
242 
243 void ImageTransfer::Pimpl::initUdp(const addrinfo* addressInfo) {
244  protocol.reset(new ImageProtocol(isServer, ImageProtocol::PROTOCOL_UDP, maxUdpPacketSize));
245  // Create sockets
246  clientSocket = socket(AF_INET, SOCK_DGRAM, 0);
247  if(clientSocket == INVALID_SOCKET) {
248  TransferException ex("Error creating receive socket: " + string(strerror(errno)));
249  throw ex;
250  }
251 
252  // Enable reuse address
253  Networking::enableReuseAddress(clientSocket, true);
254 
255  // Bind socket to port
256  if(isServer && addressInfo != nullptr) {
257  Networking::bindSocket(clientSocket, addressInfo);
258  }
259 
260  if(!isServer) {
261  memcpy(&remoteAddress, addressInfo->ai_addr, sizeof(remoteAddress));
262  }
263 
264  // Set special socket options
265  setSocketOptions();
266 }
267 
268 bool ImageTransfer::Pimpl::tryAccept() {
269  if(protType != ImageProtocol::PROTOCOL_TCP || ! isServer) {
270  throw TransferException("Connections can only be accepted in tcp server mode");
271  }
272 
273  unique_lock<recursive_mutex> recvLock(receiveMutex);
274  unique_lock<recursive_mutex> sendLock(sendMutex);
275 
276  // Accept one connection
277  SOCKET newSocket = Networking::acceptConnection(tcpServerSocket, remoteAddress);
278  if(newSocket == INVALID_SOCKET) {
279  // No connection
280  return false;
281  }
282 
283  if(clientSocket != INVALID_SOCKET) {
284  Networking::closeSocket(clientSocket);
285  }
286  clientSocket = newSocket;
287 
288  // Set special socket options
289  setSocketOptions();
290 
291  // Reset connection data
292  protocol->resetTransfer();
293  protocol->resetReception();
294  currentMsg = nullptr;
295 
296  return true;
297 }
298 
299 std::string ImageTransfer::Pimpl::getRemoteAddress() const {
300  unique_lock<recursive_mutex> lock(const_cast<recursive_mutex&>(sendMutex)); // either mutex will work
301 
302  if(remoteAddress.sin_family != AF_INET) {
303  return "";
304  }
305 
306  char strPort[11];
307  snprintf(strPort, sizeof(strPort), ":%d", remoteAddress.sin_port);
308 
309  return string(inet_ntoa(remoteAddress.sin_addr)) + strPort;
310 }
311 
312 void ImageTransfer::Pimpl::setSocketOptions() {
313  // Set the socket buffer sizes
314  if(bufferSize > 0) {
315  setsockopt(clientSocket, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char*>(&bufferSize), sizeof(bufferSize));
316  setsockopt(clientSocket, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&bufferSize), sizeof(bufferSize));
317  }
318 
319  Networking::setSocketTimeout(clientSocket, 500);
320  Networking::setSocketBlocking(clientSocket, true);
321 }
322 
323 void ImageTransfer::Pimpl::setRawTransferData(const ImagePair& metaData,
324  unsigned char* rawData, int secondTileWidth, int validBytes) {
325  unique_lock<recursive_mutex> sendLock(sendMutex);
326  protocol->setRawTransferData(metaData, rawData, secondTileWidth, validBytes);
327  currentMsg = nullptr;
328 }
329 
330 void ImageTransfer::Pimpl::setRawValidBytes(int validBytes) {
331  unique_lock<recursive_mutex> sendLock(sendMutex);
332  protocol->setRawValidBytes(validBytes);
333 }
334 
335 void ImageTransfer::Pimpl::setTransferImagePair(const ImagePair& imagePair) {
336  unique_lock<recursive_mutex> sendLock(sendMutex);
337  protocol->setTransferImagePair(imagePair);
338  currentMsg = nullptr;
339 }
340 
341 ImageTransfer::TransferStatus ImageTransfer::Pimpl::transferData() {
342  unique_lock<recursive_mutex> lock(sendMutex);
343 
344  // First receive data in case a control message arrives
345  if(protType == ImageProtocol::PROTOCOL_UDP) {
346  receiveNetworkData(false);
347  }
348 
349  if(remoteAddress.sin_family != AF_INET || !protocol->isConnected()) {
350  return NOT_CONNECTED;
351  }
352 
353  // Get first message to transfer
354  if(currentMsg == nullptr) {
355  currentMsgOffset = 0;
356  currentMsg = protocol->getTransferMessage(currentMsgLen);
357 
358  if(currentMsg == nullptr) {
359  if(protocol->transferComplete()) {
360  return ALL_TRANSFERRED;
361  } else {
362  return NO_VALID_DATA;
363  }
364  }
365  }
366 
367  // Try transferring messages
368  bool dataTransferred = (currentMsg != nullptr);
369  while(currentMsg != nullptr) {
370  int writing = (int)(currentMsgLen - currentMsgOffset);
371 
372  if(sendNetworkMessage(&currentMsg[currentMsgOffset], writing)) {
373  // Get next message
374  currentMsgOffset = 0;
375  currentMsg = protocol->getTransferMessage(currentMsgLen);
376  } else {
377  return WOULD_BLOCK;
378  }
379  }
380 
381  if(dataTransferred && protType == ImageProtocol::PROTOCOL_TCP && protocol->transferComplete()) {
382  // Force a flush for TCP by turning the nagle algorithm off and on
383  int flag = 1;
384  setsockopt(clientSocket, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
385  flag = 0;
386  setsockopt(clientSocket, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
387  }
388 
389  // Also check for control messages at the end
390  if(protType == ImageProtocol::PROTOCOL_UDP) {
391  receiveNetworkData(false);
392  }
393 
394  if(protocol->transferComplete()) {
395  return ALL_TRANSFERRED;
396  } else {
397  return PARTIAL_TRANSFER;
398  }
399 }
400 
401 bool ImageTransfer::Pimpl::receiveImagePair(ImagePair& imagePair) {
402  int validRows = 0;
403  bool complete = false;
404 
405  std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now();
406  while(!complete) {
407  if(!receivePartialImagePair(imagePair, validRows, complete)) {
408  return false;
409  }
410 
411  unsigned int time = static_cast<unsigned int>(std::chrono::duration_cast<std::chrono::milliseconds>(
412  std::chrono::steady_clock::now() - startTime).count());
413  if(time > 1000) {
414  return false;
415  }
416  }
417 
418  return true;
419 }
420 
421 bool ImageTransfer::Pimpl::receivePartialImagePair(ImagePair& imagePair,
422  int& validRows, bool& complete) {
423  unique_lock<recursive_mutex> lock(receiveMutex);
424 
425  // Try to receive further image data if needed
426  bool block = true;
427  while(!protocol->imagesReceived() && receiveNetworkData(block)) {
428  block = false;
429  }
430 
431  // Get received image
432  return protocol->getPartiallyReceivedImagePair(imagePair, validRows, complete);
433 }
434 
435 bool ImageTransfer::Pimpl::receiveNetworkData(bool block) {
436  unique_lock<recursive_mutex> lock = block ?
437  unique_lock<recursive_mutex>(receiveMutex) : unique_lock<recursive_mutex>(receiveMutex, std::try_to_lock);
438 
439  if(clientSocket == INVALID_SOCKET) {
440  return false; // Not connected
441  }
442 
443  // First send control messages if necessary
444  sendPendingControlMessages();
445 
446  if(!lock.owns_lock()) {
447  // Waiting for the lock would block this call
448  return false;
449  }
450 
451  // Test if the socket has data available
452  if(!block && !selectSocket(true, false)) {
453  return 0;
454  }
455 
456  int maxLength = 0;
457  char* buffer = reinterpret_cast<char*>(protocol->getNextReceiveBuffer(maxLength));
458 
459  // Receive data
460  sockaddr_in fromAddress;
461  socklen_t fromSize = sizeof(fromAddress);
462 
463  int bytesReceived = recvfrom(clientSocket, buffer, maxLength,
464  0, reinterpret_cast<sockaddr*>(&fromAddress), &fromSize);
465 
466  if(bytesReceived == 0 || (protType == ImageProtocol::PROTOCOL_TCP && bytesReceived < 0 && errno == WSAECONNRESET)) {
467  // Connection closed
468  disconnect();
469  } else if(bytesReceived < 0 && errno != EWOULDBLOCK && errno != EINTR &&
470  errno != ETIMEDOUT && errno != WSA_IO_PENDING && errno != WSAECONNRESET) {
471  TransferException ex("Error reading from socket: " + string(strerror(errno)));
472  throw ex;
473  } else if(bytesReceived > 0) {
474  protocol->processReceivedMessage(bytesReceived);
475  if(protocol->newClientConnected()) {
476  // We have just established a new connection
477  memcpy(&remoteAddress, &fromAddress, sizeof(remoteAddress));
478  }
479  }
480 
481  return bytesReceived > 0;
482 }
483 
484 void ImageTransfer::Pimpl::disconnect() {
485  // We just need to forget the remote address in order to
486  // disconnect
487  unique_lock<recursive_mutex> recvLock(receiveMutex);
488  unique_lock<recursive_mutex> sendLock(sendMutex);
489 
490  if(clientSocket != INVALID_SOCKET && protType == ImageProtocol::PROTOCOL_TCP) {
491  Networking::closeSocket(clientSocket);
492  }
493  memset(&remoteAddress, 0, sizeof(remoteAddress));
494 }
495 
496 bool ImageTransfer::Pimpl::isConnected() const {
497  unique_lock<recursive_mutex> lock(const_cast<recursive_mutex&>(sendMutex)); //either mutex will work
498 
499  return remoteAddress.sin_family == AF_INET && protocol->isConnected();
500 }
501 
502 bool ImageTransfer::Pimpl::sendNetworkMessage(const unsigned char* msg, int length) {
503  int written = 0;
504  if(protType == ImageProtocol::PROTOCOL_UDP) {
505  sockaddr_in destAddr;
506  SOCKET destSocket;
507  {
508  unique_lock<recursive_mutex> lock(sendMutex);
509  destAddr = remoteAddress;
510  destSocket = clientSocket;
511  }
512 
513  if(destAddr.sin_family != AF_INET) {
514  return false; // Not connected
515  }
516 
517  written = sendto(destSocket, reinterpret_cast<const char*>(msg), length, 0,
518  reinterpret_cast<sockaddr*>(&destAddr), sizeof(destAddr));
519  } else {
520  SOCKET destSocket;
521  {
522  unique_lock<recursive_mutex> lock(sendMutex);
523  destSocket = clientSocket;
524  }
525  written = send(destSocket, reinterpret_cast<const char*>(msg), length, 0);
526  }
527 
528  unsigned long sendError = errno;
529 
530  if(written < 0) {
531  if(sendError == EAGAIN || sendError == EWOULDBLOCK || sendError == ETIMEDOUT) {
532  // The socket is not yet ready for a new transfer
533  return false;
534  } else if(sendError == EPIPE) {
535  // The connection has been closed
536  disconnect();
537  return false;
538  } else {
539  TransferException ex("Error sending network packet: " + string(strerror(sendError)));
540  throw ex;
541  }
542  } else if(written != length) {
543  if(protType == ImageProtocol::PROTOCOL_UDP) {
544  // The message has been transmitted partially
545  throw TransferException("Unable to transmit complete UDP message");
546  } else {
547  // For TCP we can transmit the remaining data later
548  currentMsgOffset += written;
549  return false;
550  }
551  } else {
552  return true;
553  }
554 }
555 
556 void ImageTransfer::Pimpl::sendPendingControlMessages() {
557  const unsigned char* controlMsgData = nullptr;
558  int controlMsgLen = 0;
559 
560  while(true) {
561  unique_lock<recursive_mutex> lock(sendMutex);
562  if(remoteAddress.sin_family != AF_INET) {
563  return;
564  }
565 
566  controlMsgData = protocol->getNextControlMessage(controlMsgLen);
567 
568  if(controlMsgData != nullptr) {
569  sendNetworkMessage(controlMsgData, controlMsgLen);
570  } else {
571  break;
572  }
573  }
574 }
575 
576 int ImageTransfer::Pimpl::getNumDroppedFrames() const {
577  return protocol->getNumDroppedFrames();
578 }
579 
580 bool ImageTransfer::Pimpl::selectSocket(bool read, bool wait) {
581  SOCKET sock;
582  {
583  unique_lock<recursive_mutex> lock(sendMutex); // Either mutex will do
584  sock = clientSocket;
585  }
586 
587  fd_set fds;
588  struct timeval tv;
589  FD_ZERO(&fds);
590  FD_SET(sock, &fds);
591  tv.tv_sec = 0;
592  if(wait) {
593  tv.tv_usec = 100000;
594  } else {
595  tv.tv_usec = 0;
596  }
597 
598  if(select(sock+1, (read ? &fds : nullptr), (!read ? &fds : nullptr), nullptr, &tv) <= 0) {
599  // The socket is currently not ready
600  return false;
601  } else {
602  return true;
603  }
604 }
void disconnect()
Terminates the current connection.
TransferStatus transferData()
Performs a partial (or full) image transmission.
The operation would block and blocking as been disabled.
Definition: imagetransfer.h:51
bool tryAccept()
Tries to accept a client connection.
void setRawTransferData(const ImagePair &metaData, unsigned char *rawData, int secondTileWidth=0, int validBytes=0x7FFFFFFF)
Sets the raw pixel data for a partial image transmission.
The connection-less UDP transport protocol.
Definition: imageprotocol.h:46
A lightweight protocol for transferring image pairs.
Definition: imageprotocol.h:38
bool receivePartialImagePair(ImagePair &imagePair, int &validRows, bool &complete)
Returns the received image pair, even if it is not yet complete.
void setRawValidBytes(int validBytes)
Updates the number of valid bytes in a partial raw transmission.
void setTransferImagePair(const ImagePair &imagePair)
Sets a new image pair that shall be transmitted.
bool isConnected() const
Returns true if a remote connection is established.
No network connection has been established.
Definition: imagetransfer.h:54
std::string getRemoteAddress() const
Returns the address of the remote host.
ProtocolType
Supported network protocols.
Definition: imageprotocol.h:41
Aggregates information about a discovered device.
Definition: deviceinfo.h:25
There is currently no more data that could be transmitted.
Definition: imagetransfer.h:48
The image pair has been transferred completely.
Definition: imagetransfer.h:41
ImageTransfer(const char *address, const char *service="7681", ImageProtocol::ProtocolType protType=ImageProtocol::PROTOCOL_UDP, bool server=false, int bufferSize=1048576, int maxUdpPacketSize=1472)
Creates a new transfer object by manually specifying the target address.
int getNumDroppedFrames() const
Returns the number of frames that have been dropped since connecting to the current remote host...
Exception class that is used for all transfer exceptions.
Definition: exceptions.h:33
The connection oriented TCP transport protocol.
Definition: imageprotocol.h:43
A set of two images, which are usually the left camera image and the disparity map.
Definition: imagepair.h:33
bool receiveImagePair(ImagePair &imagePair)
Waits for and receives a new image pair.
TransferStatus
The result of a partial image transfer.
Definition: imagetransfer.h:39
Nerian Vision Technologies