libvisiontransfer  10.6.0
datablockprotocol.h
1 /*******************************************************************************
2  * Copyright (c) 2023 Allied Vision Technologies GmbH
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *******************************************************************************/
14 
15 #ifndef VISIONTRANSFER_DATABLOCKPROTOCOL_H
16 #define VISIONTRANSFER_DATABLOCKPROTOCOL_H
17 
18 #include <map>
19 #include <vector>
20 #include <memory>
21 #include <chrono>
22 #include <deque>
23 
24 #include "visiontransfer/alignedallocator.h"
25 #include "visiontransfer/exceptions.h"
26 
27 namespace visiontransfer {
28 namespace internal {
29 
46 class DataBlockProtocol {
47 public:
48  enum ProtocolType {
49  PROTOCOL_TCP,
50  PROTOCOL_UDP
51  };
52 
53  //
54  static const int MAX_DATA_BLOCKS = 8;
55 
56  // Constants that are also used in other places.
57  static const int MAX_TCP_BYTES_TRANSFER = 0xFFFF; //64K - 1
58  static const int MAX_UDP_RECEPTION = 0x4000; //16K
59  static const int MAX_OUTSTANDING_BYTES = 2*MAX_TCP_BYTES_TRANSFER;
60 
61 #pragma pack(push,1)
62  // Extends previous one-channel 6-byte raw header buffer
63  // Legacy transfers can be detected via non-zero netTransferSizeDummy
64  struct HeaderPreamble {
65  uint16_t netHeaderSize;
66  int32_t netTransferSizeDummy; // layout compatibility, legacy detection
67  uint32_t netTransferSizes[MAX_DATA_BLOCKS]; // per-block total size
68  };
69  struct SegmentHeaderUDP {
70  uint32_t segmentOffset;
71  };
72  struct SegmentHeaderTCP {
73  uint32_t fragmentSize;
74  uint32_t segmentOffset;
75  };
76 #pragma pack(pop)
77 
86  DataBlockProtocol(bool server, ProtocolType protType, int maxUdpPacketSize);
87 
92  int getProtocolOverhead() const {
93  return protType == PROTOCOL_UDP ? sizeof(int) : 0;
94  }
95 
99  int getMaxReceptionSize() const;
100 
104  void resetTransfer();
105 
120  void setTransferHeader(unsigned char* data, int headerSize, int blocks);
121 
130  void setTransferBytes(int block, long bytes);
131 
146  void setTransferData(int block, unsigned char* data, int validBytes = 0x7FFFFFFF);
147 
157  void setTransferValidBytes(int block, int validBytes);
158 
168  const unsigned char* getTransferMessage(int& length);
169 
173  bool transferComplete();
174 
183  unsigned char* getNextReceiveBuffer(int maxLength);
184 
191  void resetReception(bool dropped);
192 
202  void processReceivedMessage(int length, bool& transferComplete);
203 
213  unsigned char* getReceivedData(int& length);
214 
226  unsigned char* getReceivedHeader(int& length);
227 
232  int getDroppedReceptions() const {
233  return droppedReceptions;
234  }
235 
243  bool newClientConnected();
244 
251  bool isConnected() const;
252 
263  const unsigned char* getNextControlMessage(int& length);
264 
265  unsigned char* getBlockReceiveBuffer(int block) {
266  if (block >= numReceptionBlocks) {
267  throw ProtocolException("Tried to get receive buffer beyond initialized block range");
268  }
269  return &blockReceiveBuffers[block][0];
270  }
271  int getBlockValidSize(int block) {
272  if (block >= numReceptionBlocks) {
273  throw ProtocolException("Tried to get valid buffer index beyond initialized block range");
274  }
275  return blockValidSize[block];
276  }
277  bool isBlockDone(int block) {
278  if (block >= numReceptionBlocks) {
279  throw ProtocolException("Tried to get completion status of uninitialized block");
280  }
281  return blockValidSize[block] >= blockReceiveSize[block];
282  }
283  bool allBlocksDone() {
284  for (int i=0; i<numReceptionBlocks; ++i) {
285  if (!isBlockDone(i)) return false;
286  }
287  return true;
288  }
289  bool anyPayloadReceived() {
290  for (int i=0; i<numReceptionBlocks; ++i) {
291  if (blockReceiveOffsets[i] > 0) return true;
292  }
293  return false;
294  }
295 
296  std::string statusReport();
297 
298  bool wasHeaderReceived() const {
299  return headerReceived;
300  }
301 
302 private:
303  // The pimpl idiom is not necessary here, as this class is usually not
304  // used directly
305 
306  struct MissingReceiveSegment {
307  int offset;
308  int length;
309  bool isEof;
310  unsigned char subsequentData[4];
311  };
312 
313  static constexpr int HEARTBEAT_INTERVAL_MS = 1000;
314  static constexpr int RECONNECT_TIMEOUT_MS = 2000;
315 
316  static constexpr unsigned char CONNECTION_MESSAGE = 0x01;
317  static constexpr unsigned char CONFIRM_MESSAGE = 0x02;
318  static constexpr unsigned char HEADER_MESSAGE = 0x03;
319  static constexpr unsigned char RESEND_MESSAGE = 0x04;
320  static constexpr unsigned char EOF_MESSAGE = 0x05;
321  static constexpr unsigned char HEARTBEAT_MESSAGE = 0x06;
322 
323  bool isServer;
324  ProtocolType protType;
325  int maxPayloadSize;
326  int minPayloadSize;
327 
328  // Transfer related variables
329  bool transferDone;
330  unsigned char* rawDataArr[MAX_DATA_BLOCKS];
331  int rawDataArrStrideHackOrig[MAX_DATA_BLOCKS];
332  int rawDataArrStrideHackRepl[MAX_DATA_BLOCKS];
333  int rawValidBytes[MAX_DATA_BLOCKS];
334  int transferOffset[MAX_DATA_BLOCKS];
335  int transferSize[MAX_DATA_BLOCKS];
336  char overwrittenTransferData[sizeof(SegmentHeaderTCP)];
337  int overwrittenTransferIndex;
338  int overwrittenTransferBlock;
339  unsigned char* transferHeaderData;
340  int transferHeaderSize;
341  int totalBytesCompleted;
342  int totalTransferSize;
343  int numTransferBlocks;
344  int lastTransmittedBlock;
345 
346  // Reliability related variables
347  std::deque<MissingReceiveSegment> missingReceiveSegments[MAX_DATA_BLOCKS];
348  std::deque<std::pair<int, int> > missingTransferSegments;
349  bool waitingForMissingSegments;
350  int totalReceiveSize;
351 
352  unsigned char controlMessageBuffer[1024 * 16];
353 
354  // Connection related variables
355  bool connectionConfirmed;
356  bool confirmationMessagePending;
357  bool eofMessagePending;
358  bool clientConnectionPending;
359  bool resendMessagePending;
360  std::chrono::steady_clock::time_point lastRemoteHostActivity;
361  std::chrono::steady_clock::time_point lastSentHeartbeat;
362  std::chrono::steady_clock::time_point lastReceivedHeartbeat;
363 
364  // Reception related variables
365  std::vector<unsigned char, AlignedAllocator<unsigned char> > receiveBuffer;
366  std::vector<unsigned char, AlignedAllocator<unsigned char> > blockReceiveBuffers[MAX_DATA_BLOCKS];
367  int blockReceiveOffsets[MAX_DATA_BLOCKS];
368  int blockReceiveSize[MAX_DATA_BLOCKS];
369  int blockValidSize[MAX_DATA_BLOCKS];
370  std::vector<unsigned char> receivedHeader;
371  bool finishedReception;
372  int droppedReceptions;
373  int completedReceptions;
374  double lostSegmentRate;
375  int lostSegmentBytes;
376  unsigned char unprocessedMsgPart[MAX_OUTSTANDING_BYTES];
377  int unprocessedMsgLength;
378  bool headerReceived;
379  bool legacyTransfer;
380  int numReceptionBlocks;
381  int receiveOffset;
382 
383  const unsigned char* extractPayload(const unsigned char* data, int& length, bool& error);
384  bool processControlMessage(int length);
385  void restoreTransferBuffer();
386  bool generateResendRequest(int& length);
387  void getNextTransferSegment(int& block, int& offset, int& length);
388  void parseResendMessage(int length);
389  void parseEofMessage(int length);
390  void integrateMissingUdpSegments(int block, int lastSegmentOffset, int lastSegmentSize);
391  void processReceivedUdpMessage(int length, bool& transferComplete);
392  void processReceivedTcpMessage(int length, bool& transferComplete);
393  void resizeReceiveBuffer();
394  int parseReceivedHeader(int length, int offset);
395  void zeroStructures();
396  void splitRawOffset(int rawSegmentOffset, int& dataBlockID, int& segmentOffset);
397  int mergeRawOffset(int dataBlockID, int segmentOffset, int reserved=0);
398 
399 };
400 
401 }} // namespace
402 
403 #endif
visiontransfer::internal::DataBlockProtocol::setTransferData
void setTransferData(int block, unsigned char *data, int validBytes=0x7FFFFFFF)
Sets the payload data for the next transfer.
Definition: datablockprotocol.cpp:161
visiontransfer::internal::DataBlockProtocol::getMaxReceptionSize
int getMaxReceptionSize() const
Returns the maximum payload size that can be received.
Definition: datablockprotocol.cpp:333
visiontransfer::internal::DataBlockProtocol::transferComplete
bool transferComplete()
Returns true if the current transfer has been completed.
Definition: datablockprotocol.cpp:326
visiontransfer::internal::DataBlockProtocol::resetReception
void resetReception(bool dropped)
Resets the message reception.
Definition: datablockprotocol.cpp:638
visiontransfer::internal::DataBlockProtocol::getNextReceiveBuffer
unsigned char * getNextReceiveBuffer(int maxLength)
Gets a buffer for receiving the next network message.
Definition: datablockprotocol.cpp:341
visiontransfer::internal::DataBlockProtocol::getTransferMessage
const unsigned char * getTransferMessage(int &length)
Gets the next network message for the current transfer.
Definition: datablockprotocol.cpp:195
visiontransfer::internal::DataBlockProtocol::isConnected
bool isConnected() const
Returns true if a remote connection is established.
Definition: datablockprotocol.cpp:729
visiontransfer::internal::DataBlockProtocol::getProtocolOverhead
int getProtocolOverhead() const
Returns the size of the overhead data that is required for transferring a single network message.
Definition: datablockprotocol.h:128
visiontransfer::internal::DataBlockProtocol::getReceivedData
unsigned char * getReceivedData(int &length)
Returns the data that has been received for the current transfer.
Definition: datablockprotocol.cpp:658
visiontransfer::internal::DataBlockProtocol::getDroppedReceptions
int getDroppedReceptions() const
Returns the internal counter of dropped transfers during reception.
Definition: datablockprotocol.h:268
visiontransfer::internal::DataBlockProtocol::setTransferBytes
void setTransferBytes(int block, long bytes)
Sets the per-block transfer size.
Definition: datablockprotocol.cpp:111
visiontransfer::internal::DataBlockProtocol::getReceivedHeader
unsigned char * getReceivedHeader(int &length)
Returns the header data that has been received for the current transfer.
Definition: datablockprotocol.cpp:663
visiontransfer::internal::DataBlockProtocol::newClientConnected
bool newClientConnected()
Returns true if the last network message has established a new connection from a client.
Definition: datablockprotocol.cpp:799
visiontransfer::internal::DataBlockProtocol::processReceivedMessage
void processReceivedMessage(int length, bool &transferComplete)
Handles a received network message.
Definition: datablockprotocol.cpp:348
visiontransfer::ProtocolException
Exception class that is used for all protocol exceptions.
Definition: exceptions.h:37
visiontransfer::internal::DataBlockProtocol::DataBlockProtocol
DataBlockProtocol(bool server, ProtocolType protType, int maxUdpPacketSize)
Creates a new instance.
Definition: datablockprotocol.cpp:44
visiontransfer::internal::DataBlockProtocol::resetTransfer
void resetTransfer()
Resets all transfer related internal variables.
Definition: datablockprotocol.cpp:101
visiontransfer::internal::DataBlockProtocol::setTransferValidBytes
void setTransferValidBytes(int block, int validBytes)
Updates the number of valid bytes in a partial transfer.
Definition: datablockprotocol.cpp:175
visiontransfer::internal::DataBlockProtocol::setTransferHeader
void setTransferHeader(unsigned char *data, int headerSize, int blocks)
Sets a user-defined header that shall be transmitted with the next transfer.
Definition: datablockprotocol.cpp:122
visiontransfer::internal::DataBlockProtocol::getNextControlMessage
const unsigned char * getNextControlMessage(int &length)
If a control message is pending to be transmitted, then the message data will be returned by this met...
Definition: datablockprotocol.cpp:740
Allied Vision