libvisiontransfer  10.6.0
datablockprotocol.cpp
1 /*******************************************************************************
2  * Copyright (c) 2023 Allied Vision Technologies GmbH
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *******************************************************************************/
14 
15 #include <algorithm>
16 #include <iostream>
17 #include <cstring>
18 
19 #include <iomanip>
20 #include <sstream>
21 
22 #include "visiontransfer/datablockprotocol.h"
23 #include "visiontransfer/exceptions.h"
24 
25 // Network headers
26 #ifdef _WIN32
27 #include <winsock2.h>
28 #undef min
29 #undef max
30 #else
31 #include <arpa/inet.h>
32 #endif
33 
34 #define LOG_DEBUG_DBP(expr)
35 //#define LOG_DEBUG_DBP(expr) std::cerr << "DataBlockProtocol: " << expr << std::endl
36 
37 using namespace std;
38 using namespace visiontransfer;
39 using namespace visiontransfer::internal;
40 
41 namespace visiontransfer {
42 namespace internal {
43 
44 DataBlockProtocol::DataBlockProtocol(bool server, ProtocolType protType, int maxUdpPacketSize)
45  : isServer(server), protType(protType),
46  transferDone(true),
47  overwrittenTransferData{0},
48  overwrittenTransferIndex{-1},
49  overwrittenTransferBlock{-1},
50  transferHeaderData{nullptr},
51  transferHeaderSize{0},
52  totalBytesCompleted{0}, totalTransferSize{0},
53  waitingForMissingSegments(false),
54  totalReceiveSize(0), connectionConfirmed(false),
55  confirmationMessagePending(false), eofMessagePending(false),
56  clientConnectionPending(false), resendMessagePending(false),
57  lastRemoteHostActivity(), lastSentHeartbeat(),
58  lastReceivedHeartbeat(std::chrono::steady_clock::now()),
59  finishedReception(false), droppedReceptions(0),
60  completedReceptions(0), lostSegmentRate(0.0), lostSegmentBytes(0),
61  unprocessedMsgLength(0), headerReceived(false) {
62  // Determine the maximum allowed payload size
63  if(protType == PROTOCOL_TCP) {
64  maxPayloadSize = MAX_TCP_BYTES_TRANSFER - sizeof(SegmentHeaderTCP);
65  minPayloadSize = 0;
66  } else {
67  maxPayloadSize = maxUdpPacketSize - sizeof(SegmentHeaderUDP);
68  minPayloadSize = maxPayloadSize;
69  }
70  zeroStructures();
71  resizeReceiveBuffer();
72  resetReception(false);
73 }
74 void DataBlockProtocol::splitRawOffset(int rawSegmentOffset, int& dataBlockID, int& segmentOffset) {
75  int selector = (rawSegmentOffset >> 28) & 0xf;
76  dataBlockID = selector & 0x7; // Note: 0x8 bit is reserved for now
77  segmentOffset = rawSegmentOffset & 0x0FFFffff;
78 }
79 
80 int DataBlockProtocol::mergeRawOffset(int dataBlockID, int segmentOffset, int reserved_defaults0) {
81  return ((reserved_defaults0 & 1) << 31) | ((dataBlockID & 0x07) << 28) | (segmentOffset & 0x0FFFffff);
82 }
83 
84 void DataBlockProtocol::zeroStructures() {
85  for (int i=0; i<MAX_DATA_BLOCKS; ++i) {
86  rawDataArr[i] = nullptr;
87  rawDataArrStrideHackOrig[i] = 0;
88  rawDataArrStrideHackRepl[i] = 0;
89  rawValidBytes[i] = 0;
90  transferOffset[i] = 0;
91  transferSize[i] = 0;
92  }
93  std::memset(overwrittenTransferData, 0, sizeof(overwrittenTransferData));
94  overwrittenTransferIndex = -1;
95  overwrittenTransferBlock = -1;
96  lastTransmittedBlock = -1;
97  receiveOffset = 0;
98  numReceptionBlocks = 0;
99 }
100 
102  transferDone = true;
103  overwrittenTransferIndex = -1;
104  overwrittenTransferBlock = -1;
105  totalBytesCompleted = 0;
106  totalTransferSize = 0;
107  numTransferBlocks = 0;
108  missingTransferSegments.clear();
109 }
110 
111 void DataBlockProtocol::setTransferBytes(int block, long bytes) {
112  if (transferHeaderData == nullptr) {
113  throw ProtocolException("Tried to set data block size before initializing header!");
114  } else if (block >= numTransferBlocks) {
115  throw ProtocolException("Request to set data block size - block index too high!");
116  }
117  transferSize[block] = bytes;
118  HeaderPreamble* hp = reinterpret_cast<HeaderPreamble*>(transferHeaderData);
119  hp->netTransferSizes[block] = htonl(bytes);
120 }
121 
122 void DataBlockProtocol::setTransferHeader(unsigned char* data, int headerSize, int blocks) {
123  if(!transferDone && numTransferBlocks > 0) {
124  throw ProtocolException("Header data set while transfer is active!");
125  } else if(headerSize + 9 > static_cast<int>(sizeof(controlMessageBuffer))) {
126  throw ProtocolException("Transfer header is too large!");
127  } else if(blocks == 0) {
128  throw ProtocolException("Requested transfer of 0 blocks!");
129  }
130 
131  numTransferBlocks = blocks;
132 
133  transferDone = false;
134  for (int i=0; i<MAX_DATA_BLOCKS; ++i) {
135  this->transferSize[i] = 0; // must be set via setRawTransferBytes()
136  }
137 
138  int headerBaseOffset = sizeof(HeaderPreamble);
139 
140  transferHeaderData = &data[-headerBaseOffset];
141  HeaderPreamble* ourHeader = reinterpret_cast<HeaderPreamble*>(transferHeaderData);
142 
143  unsigned short netHeaderSize = htons(static_cast<unsigned short>(headerSize));
144  ourHeader->netHeaderSize = netHeaderSize;
145  ourHeader->netTransferSizeDummy = htonl(-1); // clashes on purpose with old recipients
146 
147  headerSize += headerBaseOffset;
148 
149  if(protType == PROTOCOL_UDP) {
150  // In UDP mode we still need to make this a control message
151  transferHeaderData[headerSize++] = HEADER_MESSAGE;
152  transferHeaderData[headerSize++] = 0xFF;
153  transferHeaderData[headerSize++] = 0xFF;
154  transferHeaderData[headerSize++] = 0xFF;
155  transferHeaderData[headerSize++] = 0xFF;
156  }
157 
158  transferHeaderSize = headerSize;
159 }
160 
161 void DataBlockProtocol::setTransferData(int block, unsigned char* data, int validBytes) {
162  if(transferHeaderSize == 0 || transferHeaderData == nullptr) {
163  throw ProtocolException("The transfer header has not yet been set!");
164  }
165 
166  transferDone = false;
167  rawDataArr[block] = data;
168  transferOffset[block] = 0;
169  overwrittenTransferIndex = -1;
170  overwrittenTransferBlock = -1;
171  rawValidBytes[block] = min(transferSize[block], validBytes);
172  totalBytesCompleted = 0;
173 }
174 
175 void DataBlockProtocol::setTransferValidBytes(int block, int validBytes) {
176  if(validBytes >= transferSize[block]) {
177  rawValidBytes[block] = transferSize[block];
178  } else if(validBytes < static_cast<int>(sizeof(int))) {
179  rawValidBytes[block] = 0;
180  } else {
181  rawValidBytes[block] = validBytes;
182  }
183 }
184 
185 std::string DataBlockProtocol::statusReport() {
186  std::stringstream ss;
187  ss << "DataBlockProtocol, blocks=" << numTransferBlocks << ": ";
188  for (int i=0; i<numTransferBlocks; ++i) {
189  ss << i << ":(len " << transferSize[i] << " ofs " << transferOffset[i] << " rawvalid " << rawValidBytes[i] << ") ";
190  }
191  ss << " total done: " << totalBytesCompleted << "/" << totalTransferSize;
192  return ss.str();
193 }
194 
195 const unsigned char* DataBlockProtocol::getTransferMessage(int& length) {
196  if(transferDone || rawValidBytes == 0) {
197  // No more data to be transferred
198  length = 0;
199  return nullptr;
200  }
201 
202  // For TCP we always send the header first
203  if(protType == PROTOCOL_TCP && transferHeaderData != nullptr) {
204  length = transferHeaderSize;
205  const unsigned char* ret = transferHeaderData;
206  transferHeaderData = nullptr;
207  return ret;
208  }
209 
210  // The transfer buffer might have been altered by the previous transfer
211  // and first needs to be restored
212  restoreTransferBuffer();
213 
214  // Determine which data segment to transfer next
215  int block = -1, offset = -1;
216  getNextTransferSegment(block, offset, length);
217  if(length == 0) {
218  return nullptr;
219  }
220 
221  if(protType == PROTOCOL_UDP) {
222  // For udp, we always append a segment offset
223  overwrittenTransferBlock = block;
224  overwrittenTransferIndex = offset + length;
225  SegmentHeaderUDP* segmentHeader = reinterpret_cast<SegmentHeaderUDP*>(&rawDataArr[block][offset + length]);
226  std::memcpy(overwrittenTransferData, segmentHeader, sizeof(SegmentHeaderUDP));
227  segmentHeader->segmentOffset = static_cast<int>(htonl(mergeRawOffset(block, offset)));
228  length += sizeof(SegmentHeaderUDP);
229  lastTransmittedBlock = block;
230  return &rawDataArr[block][offset];
231  } else {
232  // For tcp, we *PRE*pend the segment header consisting of segment offset plus the packet payload size
233  int headerOffset = offset - sizeof(SegmentHeaderTCP);
234 
235  SegmentHeaderTCP* segmentHeader = nullptr;
236  unsigned char* dataPointer = nullptr;
237 
238  if(headerOffset < 0) {
239  // For the first TCP transfer we need to copy the data as we cannot
240  // prepend before the data start
241  static unsigned char tcpBuffer[MAX_TCP_BYTES_TRANSFER];
242  dataPointer = tcpBuffer;
243  segmentHeader = reinterpret_cast<SegmentHeaderTCP*>(tcpBuffer);
244  std::memcpy(&tcpBuffer[sizeof(segmentHeader)], &rawDataArr[block][offset], length);
245  } else {
246  // For subsequent calls we will overwrite the segment header data and
247  // restore it
248  dataPointer = &rawDataArr[block][headerOffset];
249  segmentHeader = reinterpret_cast<SegmentHeaderTCP*>(&rawDataArr[block][headerOffset]);
250  overwrittenTransferBlock = block;
251  overwrittenTransferIndex = headerOffset;
252  std::memcpy(overwrittenTransferData, segmentHeader, sizeof(SegmentHeaderTCP));
253  }
254 
255  segmentHeader->fragmentSize = htonl(length);
256  segmentHeader->segmentOffset = static_cast<int>(htonl(mergeRawOffset(block, offset)));
257  length += sizeof(SegmentHeaderTCP);
258  lastTransmittedBlock = block;
259  return dataPointer;
260  }
261 }
262 
263 void DataBlockProtocol::getNextTransferSegment(int& block, int& offset, int& length) {
264  if(missingTransferSegments.size() == 0) {
265  // Select from block with the most unsent data
266  int sendBlock = 0, amount = 0;
267  for (int i=0; i<numTransferBlocks; ++i) {
268  int avail = std::min(transferSize[i], rawValidBytes[i]);
269  avail -= transferOffset[i];
270  if (avail > amount) {
271  amount = avail;
272  sendBlock = i;
273  }
274  }
275  length = std::min(maxPayloadSize, amount);
276  if(length == 0 || (length < minPayloadSize && rawValidBytes[sendBlock] != transferSize[sendBlock])) {
277  length = 0;
278  return;
279  }
280 
281  block = sendBlock;
282  offset = transferOffset[sendBlock];
283  transferOffset[sendBlock] += length; // for next transfer
284  if (protType == PROTOCOL_UDP) {
285  bool complete = true;
286  for (int i=0; i<numTransferBlocks; ++i) {
287  if (transferOffset[i] < transferSize[i]) {
288  complete = false;
289  break;
290  }
291  }
292  if (complete) {
293  eofMessagePending = true;
294  }
295  }
296  } else {
297  // This is a segment that is re-transmitted due to packet loss
298  splitRawOffset(missingTransferSegments.front().first, block, offset);
299  length = std::min(maxPayloadSize, missingTransferSegments.front().second);
300  LOG_DEBUG_DBP("Re-transmitting: " << offset << " - " << (offset + length));
301 
302  int remaining = missingTransferSegments[0].second - length;
303  if(remaining == 0) {
304  // The segment is competed
305  missingTransferSegments.pop_front();
306  } else {
307  // The segment is only partially complete
308  missingTransferSegments.front().first += length;
309  missingTransferSegments.front().second = remaining;
310  }
311  }
312 }
313 
314 void DataBlockProtocol::restoreTransferBuffer() {
315  if(overwrittenTransferBlock >= 0) {
316  if(protType == PROTOCOL_UDP) {
317  std::memcpy(&rawDataArr[overwrittenTransferBlock][overwrittenTransferIndex], overwrittenTransferData, sizeof(SegmentHeaderUDP));
318  } else {
319  std::memcpy(&rawDataArr[overwrittenTransferBlock][overwrittenTransferIndex], overwrittenTransferData, sizeof(SegmentHeaderTCP));
320  }
321  }
322  overwrittenTransferIndex = -1;
323  overwrittenTransferBlock = -1;
324 }
325 
327  for (int i=0; i<numTransferBlocks; ++i) {
328  if (transferOffset[i] < transferSize[i]) return false;
329  }
330  return !eofMessagePending;
331 }
332 
334  if(protType == PROTOCOL_TCP) {
335  return MAX_TCP_BYTES_TRANSFER;
336  } else {
337  return MAX_UDP_RECEPTION;
338  }
339 }
340 
341 unsigned char* DataBlockProtocol::getNextReceiveBuffer(int maxLength) {
342  if(receiveOffset + maxLength > (int)receiveBuffer.size()) {
343  receiveBuffer.resize(receiveOffset + maxLength);
344  }
345  return &receiveBuffer[receiveOffset];
346 }
347 
348 void DataBlockProtocol::processReceivedMessage(int length, bool& transferComplete) {
349  transferComplete = false;
350  if(length <= 0) {
351  return; // Nothing received
352  }
353 
354  if(finishedReception) {
355  // First reset for next frame
356  resetReception(false);
357  }
358 
359  if(protType == PROTOCOL_UDP) {
360  processReceivedUdpMessage(length, transferComplete);
361  } else {
362  processReceivedTcpMessage(length, transferComplete);
363  }
364 
365  transferComplete = finishedReception;
366 }
367 
368 void DataBlockProtocol::processReceivedUdpMessage(int length, bool& transferComplete) {
369  if(length < static_cast<int>(sizeof(int)) ||
370  0 + length > static_cast<int>(receiveBuffer.size())) {
371  throw ProtocolException("Received message size is invalid!");
372  }
373 
374  // Extract the sequence number
375  int rawSegmentOffset = ntohl(*reinterpret_cast<int*>(
376  &receiveBuffer[0 + length - sizeof(int)]));
377  // for holding the offset with blanked-out channel index
378  int dataBlockID, segmentOffset;
379  splitRawOffset(rawSegmentOffset, dataBlockID, segmentOffset);
380 
381  if(rawSegmentOffset == static_cast<int>(0xFFFFFFFF)) {
382  // This is a control packet
383  processControlMessage(length);
384  } else if(headerReceived) {
385  // Correct the length by subtracting the size of the segment offset
386  int realPayloadOffset = 0;
387  int payloadLength = length - sizeof(int);
388 
389  if(segmentOffset != blockReceiveOffsets[dataBlockID]) {
390  // The segment offset doesn't match what we expected. Probably
391  // a packet was dropped
392  if(!waitingForMissingSegments && //receiveOffset > 0 &&
393  segmentOffset > blockReceiveOffsets[dataBlockID]
394  && segmentOffset + payloadLength <= (int)blockReceiveBuffers[dataBlockID].size()) {
395  // We can just ask for a retransmission of this packet
396  LOG_DEBUG_DBP("Missing segment: " << dataBlockID << " size " << payloadLength << " ofs " << segmentOffset
397  << " but blkRecvOfs " << blockReceiveOffsets[dataBlockID]
398  << " (# " << missingReceiveSegments[dataBlockID].size() << ")");
399 
400  MissingReceiveSegment missingSeg;
401  missingSeg.offset = mergeRawOffset(dataBlockID, blockReceiveOffsets[dataBlockID]);
402  missingSeg.length = segmentOffset - blockReceiveOffsets[dataBlockID];
403  missingSeg.isEof = false;
404  lostSegmentBytes += missingSeg.length;
405  missingReceiveSegments[dataBlockID].push_back(missingSeg);
406 
407  // Move the received data to the right place in the buffer
408  memcpy(&blockReceiveBuffers[dataBlockID][segmentOffset], &receiveBuffer[0 + realPayloadOffset], payloadLength);
409  // Advance block receive offset
410  blockReceiveOffsets[dataBlockID] = segmentOffset + payloadLength;
411  } else {
412  // In this case we cannot recover from the packet loss or
413  // we just didn't get the EOF packet and everything is
414  // actually fine
415  resetReception(blockReceiveOffsets[0] > 0);
416  if(segmentOffset > 0 ) {
417  if(blockReceiveOffsets[dataBlockID] > 0) {
418  LOG_DEBUG_DBP("Resend failed!");
419  }
420  return;
421  } else {
422  LOG_DEBUG_DBP("Missed EOF message!");
423  }
424  }
425  } else {
426  if ((realPayloadOffset+payloadLength) > (int)receiveBuffer.size()) {
427  throw ProtocolException("Received out-of-bound data.");
428  }
429 
430  // append to correct block buffer
431  memcpy(&blockReceiveBuffers[dataBlockID][segmentOffset], &receiveBuffer[0 + realPayloadOffset], payloadLength);
432  // advance the expected next data offset for this block
433  blockReceiveOffsets[dataBlockID] = segmentOffset + payloadLength;
434  if (waitingForMissingSegments) {
435  // segment extends the currently valid region (suspended once we missed out first segment)
436  if ((missingReceiveSegments[dataBlockID].size() == 1) && (missingReceiveSegments[dataBlockID].front().length <= payloadLength)) {
437  // last gap closed by this segment
438  blockValidSize[dataBlockID] = blockReceiveSize[dataBlockID];
439  } else {
440  blockValidSize[dataBlockID] = segmentOffset + payloadLength;
441  }
442  } else if (missingReceiveSegments[dataBlockID].size() == 0) {
443  blockValidSize[dataBlockID] = segmentOffset + payloadLength;
444  }
445  }
446 
447  if(segmentOffset == 0 && dataBlockID == 0) {
448  // This is the beginning of a new frame
449  lastRemoteHostActivity = std::chrono::steady_clock::now();
450  }
451 
452  // Try to fill missing regions
453  integrateMissingUdpSegments(dataBlockID, segmentOffset, payloadLength);
454  }
455 }
456 
457 void DataBlockProtocol::integrateMissingUdpSegments(int block, int lastSegmentOffset, int lastSegmentSize) {
458  if(waitingForMissingSegments && missingReceiveSegments[block].size() > 0) {
459  // Things get more complicated when re-transmitting dropped packets
460  int checkBlock, checkOffset;
461  MissingReceiveSegment& firstSeg = missingReceiveSegments[block].front();
462  splitRawOffset(firstSeg.offset, checkBlock, checkOffset);
463  if((lastSegmentOffset != checkOffset) || (block != checkBlock)) {
464  LOG_DEBUG_DBP("Received invalid resend: " << block << " " << lastSegmentOffset);
465  resetReception(true);
466  } else {
467  firstSeg.offset += lastSegmentSize;
468  firstSeg.length -= lastSegmentSize;
469  if(firstSeg.length == 0) {
470  missingReceiveSegments[block].pop_front();
471  }
472 
473  // Check if ALL missing blocks are now handled
474  bool done = true;
475  for (int blk=0; blk<numReceptionBlocks; ++blk) {
476  if(missingReceiveSegments[blk].size() > 0) {
477  done = false;
478  break;
479  }
480  }
481  if (done) {
482  waitingForMissingSegments = false;
483  finishedReception = true;
484  } else if (missingReceiveSegments[block].size() > 0) {
485  // Another lost segment
486  int newBlock, newOffset;
487  MissingReceiveSegment& firstSeg = missingReceiveSegments[block].front();
488  splitRawOffset(firstSeg.offset, newBlock, newOffset);
489  blockReceiveOffsets[block] = newOffset;
490  }
491  }
492  }
493 }
494 
495 void DataBlockProtocol::processReceivedTcpMessage(int length, bool& transferComplete) {
496  // In TCP mode the header must be the first data item to be transmitted
497  if(!headerReceived) {
498  int totalHeaderSize = parseReceivedHeader(length, 0);
499  if(totalHeaderSize == 0) {
500  // Not yet enough data. Keep on buffering.
501  receiveOffset += length; // append in next recv
502  return;
503  } else {
504  // Header successfully parsed
505  // Move the remaining data to the beginning of the buffer
506  length -= totalHeaderSize;
507  // The rest is the first [part of] buffer segment data
508 
509  if(length == 0) {
510  return; // No more data remaining
511  }
512 
513  int movelength = receiveOffset + length; // also move the old stuff
514  ::memmove(&receiveBuffer[0], &receiveBuffer[totalHeaderSize], movelength);
515  receiveOffset = movelength; // append in next recv
516  }
517  } else {
518  receiveOffset += length; // modified below if complete chunks are present
519  }
520 
521  if (legacyTransfer) {
522  // Legacy TCP transfer: no segment headers, just raw data for block 0, up to the expected size
523  int remainingSize = blockReceiveSize[0] - blockValidSize[0];
524  int availableSize = std::min(receiveOffset, remainingSize);
525  // Update actual target buffer
526  std::memcpy(&blockReceiveBuffers[0][blockReceiveOffsets[0]], &receiveBuffer[0], availableSize);
527  blockReceiveOffsets[0] += availableSize;
528  blockValidSize[0] = blockReceiveOffsets[0];
529  // Extra data, store at buffer start for next reception to append to
530  if (receiveOffset <= remainingSize) {
531  // Start next reception at recv buffer start
532  receiveOffset = 0;
533  } else {
534  // Mark next reception to append to unhandled data remainder
535  std::memmove(&receiveBuffer[0], &receiveBuffer[remainingSize], availableSize - remainingSize);
536  receiveOffset = availableSize - remainingSize;
537  }
538  } else {
539  // Parse the SegmentHeaderTCP (if present) to see if a full fragment is present
540  int ofs = 0;
541  while ((receiveOffset - ofs) >= (int) sizeof(SegmentHeaderTCP)) {
542  SegmentHeaderTCP* header = reinterpret_cast<SegmentHeaderTCP*>(&receiveBuffer[ofs]);
543  int fragsize = ntohl(header->fragmentSize);
544  int rawSegmentOffset = ntohl(header->segmentOffset);
545  int block, offset;
546  splitRawOffset(rawSegmentOffset, block, offset);
547  if (block == 7) { // Block 7 is reserved; control message (the next header), stop moving image data
548  break;
549  }
550  if ((receiveOffset - ofs) >= (fragsize + (int) sizeof(SegmentHeaderTCP))) {
551  // Incorporate fragment
552  // assert here that offset==blockReceiveOffsets[block]
553  if (offset != blockReceiveOffsets[block]) {
554  throw ProtocolException("Received invalid header!");
555  }
556  std::memcpy(&blockReceiveBuffers[block][blockReceiveOffsets[block]], &receiveBuffer[ofs+sizeof(SegmentHeaderTCP)], fragsize);
557  blockReceiveOffsets[block] += fragsize;
558  blockValidSize[block] = blockReceiveOffsets[block];
559  // Advance to next potential chunk
560  ofs += fragsize + sizeof(SegmentHeaderTCP);
561  } else {
562  // Fragment incomplete, will be appended to in next recv (offset increased above)
563  break;
564  }
565  }
566  if (ofs > 0) {
567  // Move start of next unaccounted-for fragment to start of buffer
568  std::memmove(&receiveBuffer[0], &receiveBuffer[ofs], receiveOffset - ofs);
569  receiveOffset -= ofs; // and shift append position accordingly
570  }
571  }
572 
573  // Determine whether all buffers are filled now
574  bool complete = true;
575  for (int i=0; i<numReceptionBlocks; ++i) {
576  if (blockReceiveOffsets[i] < blockReceiveSize[i]) {
577  complete = false;
578  break;
579  }
580  }
581  finishedReception = complete;
582 
583 }
584 
585 int DataBlockProtocol::parseReceivedHeader(int length, int offset) {
586  int headerExtraBytes = 6; // see below
587 
588  if(length < headerExtraBytes) {
589  return 0;
590  }
591 
592  unsigned short headerSize = ntohs(*reinterpret_cast<unsigned short*>(&receiveBuffer[offset]));
593  if (length < (headerExtraBytes + headerSize)) {
594  return 0;
595  }
596  totalReceiveSize = static_cast<int>(ntohl(*reinterpret_cast<unsigned int*>(&receiveBuffer[offset + 2])));
597 
598  if (totalReceiveSize >= 0) { // old-style single block transfer
599  legacyTransfer = true;
600  headerExtraBytes = 6;
601  numReceptionBlocks = 1; // ONE interleaved buffer
602  blockReceiveSize[0] = totalReceiveSize;
603  } else { // marked -1 for new-style multi block transfer
604  legacyTransfer = false;
605  headerExtraBytes = static_cast<int>(sizeof(HeaderPreamble));
606  HeaderPreamble* header = reinterpret_cast<HeaderPreamble*>(&receiveBuffer[offset]);
607  numReceptionBlocks = 0;
608  totalReceiveSize = 0;
609  for (int i=0; i<MAX_DATA_BLOCKS; ++i) {
610  int s = ntohl(header->netTransferSizes[i]);
611  if (s > 0) {
612  blockReceiveSize[i] = s;
613  numReceptionBlocks++;
614  totalReceiveSize += s;
615  } else {
616  // first non-positive payload size signals end of blocks
617  //break;
618  }
619  }
620  }
621 
622  if (numReceptionBlocks==0) throw std::runtime_error("Received a transfer with zero blocks");
623  if (numReceptionBlocks > MAX_DATA_BLOCKS) throw std::runtime_error("Received a transfer with too many blocks");
624 
625  if(headerSize + headerExtraBytes > static_cast<int>(receiveBuffer.size())
626  || totalReceiveSize < 0 || headerSize + headerExtraBytes > length ) {
627  throw ProtocolException("Received invalid header!");
628  }
629 
630  headerReceived = true;
631  receivedHeader.assign(receiveBuffer.begin() + offset + headerExtraBytes,
632  receiveBuffer.begin() + offset + headerSize + headerExtraBytes);
633  resizeReceiveBuffer();
634 
635  return headerSize + headerExtraBytes;
636 }
637 
639  numReceptionBlocks = 0;
640  headerReceived = false;
641  for (int blk = 0; blk<MAX_DATA_BLOCKS; ++blk) {
642  missingReceiveSegments[blk].clear();
643  }
644  receivedHeader.clear();
645  waitingForMissingSegments = false;
646  totalReceiveSize = 0;
647  finishedReception = false;
648  lostSegmentBytes = 0;
649  for (int i=0; i<MAX_DATA_BLOCKS; ++i) {
650  blockReceiveOffsets[i] = 0;
651  blockValidSize[i] = 0;
652  }
653  if(dropped) {
654  droppedReceptions++;
655  }
656 }
657 
658 unsigned char* DataBlockProtocol::getReceivedData(int& length) {
659  length = 0;
660  return &receiveBuffer[0];
661 }
662 
663 unsigned char* DataBlockProtocol::getReceivedHeader(int& length) {
664  if(receivedHeader.size() > 0) {
665  length = static_cast<int>(receivedHeader.size());
666  return &receivedHeader[0];
667  } else {
668  return nullptr;
669  }
670 }
671 
672 bool DataBlockProtocol::processControlMessage(int length) {
673  if(length < static_cast<int>(sizeof(int) + 1)) {
674  return false;
675  }
676 
677  int payloadLength = length - sizeof(int) - 1;
678  switch(receiveBuffer[0 + payloadLength]) {
679  case CONFIRM_MESSAGE:
680  // Our connection request has been accepted
681  connectionConfirmed = true;
682  break;
683  case CONNECTION_MESSAGE:
684  // We establish a new connection
685  connectionConfirmed = true;
686  confirmationMessagePending = true;
687  clientConnectionPending = true;
688 
689  // A connection request is just as good as a heartbeat
690  lastReceivedHeartbeat = std::chrono::steady_clock::now();
691  break;
692  case HEADER_MESSAGE: {
693  if (anyPayloadReceived()) {
694  if (allBlocksDone()) {
695  LOG_DEBUG_DBP("No EOF message received!");
696  } else {
697  LOG_DEBUG_DBP("Received header too late/early!");
698  }
699  resetReception(true);
700  }
701  if(parseReceivedHeader(payloadLength, 0) == 0) {
702  throw ProtocolException("Received header is too short!");
703  }
704  }
705  break;
706  case EOF_MESSAGE:
707  // This is the end of the frame
708  if(anyPayloadReceived()) {
709  parseEofMessage(length);
710  }
711  break;
712  case RESEND_MESSAGE: {
713  // The client requested retransmission of missing packets
714  parseResendMessage(payloadLength);
715  break;
716  }
717  case HEARTBEAT_MESSAGE:
718  // A cyclic heartbeat message
719  lastReceivedHeartbeat = std::chrono::steady_clock::now();
720  break;
721  default:
722  throw ProtocolException("Received invalid control message!");
723  break;
724  }
725 
726  return true;
727 }
728 
730  if(protType == PROTOCOL_TCP) {
731  // Connection is handled by TCP and not by us
732  return true;
733  } else if(connectionConfirmed) {
734  return !isServer || std::chrono::duration_cast<std::chrono::milliseconds>(
735  std::chrono::steady_clock::now() - lastReceivedHeartbeat).count()
736  < 2*HEARTBEAT_INTERVAL_MS;
737  } else return false;
738 }
739 
740 const unsigned char* DataBlockProtocol::getNextControlMessage(int& length) {
741  length = 0;
742 
743  if(protType == PROTOCOL_TCP) {
744  // There are no control messages for TCP
745  return nullptr;
746  }
747 
748  if(confirmationMessagePending) {
749  // Send confirmation message
750  confirmationMessagePending = false;
751  controlMessageBuffer[0] = CONFIRM_MESSAGE;
752  length = 1;
753  } else if(!isServer && std::chrono::duration_cast<std::chrono::milliseconds>(
754  std::chrono::steady_clock::now() - lastRemoteHostActivity).count() > RECONNECT_TIMEOUT_MS) {
755  // Send a new connection request
756  controlMessageBuffer[0] = CONNECTION_MESSAGE;
757  length = 1;
758 
759  // Also update time stamps
760  lastRemoteHostActivity = lastSentHeartbeat = std::chrono::steady_clock::now();
761  } else if(transferHeaderData != nullptr && isConnected()) {
762  // We need to send a new protocol header
763  length = transferHeaderSize;
764  const unsigned char* ret = transferHeaderData;
765  transferHeaderData = nullptr;
766  return ret;
767  } else if(eofMessagePending) {
768  // Send end of frame message
769  eofMessagePending = false;
770  unsigned int networkOffset = htonl(mergeRawOffset(lastTransmittedBlock, transferSize[lastTransmittedBlock]));
771  memcpy(&controlMessageBuffer[0], &networkOffset, sizeof(int));
772  controlMessageBuffer[sizeof(int)] = EOF_MESSAGE;
773  length = 5;
774  } else if(resendMessagePending) {
775  // Send a re-send request for missing messages
776  resendMessagePending = false;
777  if(!generateResendRequest(length)) {
778  length = 0;
779  return nullptr;
780  }
781  } else if(!isServer && std::chrono::duration_cast<std::chrono::milliseconds>(
782  std::chrono::steady_clock::now() - lastSentHeartbeat).count() > HEARTBEAT_INTERVAL_MS) {
783  // Send a heartbeat message
784  controlMessageBuffer[0] = HEARTBEAT_MESSAGE;
785  length = 1;
786  lastSentHeartbeat = std::chrono::steady_clock::now();
787  } else {
788  return nullptr;
789  }
790 
791  // Mark this message as a control message
792  controlMessageBuffer[length++] = 0xff;
793  controlMessageBuffer[length++] = 0xff;
794  controlMessageBuffer[length++] = 0xff;
795  controlMessageBuffer[length++] = 0xff;
796  return controlMessageBuffer;
797 }
798 
800  if(clientConnectionPending) {
801  clientConnectionPending = false;
802  return true;
803  } else {
804  return false;
805  }
806 }
807 
808 bool DataBlockProtocol::generateResendRequest(int& length) {
809  length = 0;
810  for (int blk = 0; blk < numReceptionBlocks; ++blk) {
811  for(MissingReceiveSegment segment: missingReceiveSegments[blk]) {
812  unsigned int segOffset = htonl(static_cast<unsigned int>(segment.offset));
813  unsigned int segLen = htonl(static_cast<unsigned int>(segment.length));
814 
815  if (sizeof(controlMessageBuffer) < length + 2*sizeof(unsigned int) + 4) {
816  // Too many UDP resend segments for control buffer, dropping the frame!
817  resetReception(true);
818  break;
819  }
820 
821  memcpy(&controlMessageBuffer[length], &segOffset, sizeof(segOffset));
822  length += sizeof(unsigned int);
823  memcpy(&controlMessageBuffer[length], &segLen, sizeof(segLen));
824  length += sizeof(unsigned int);
825 
826  int dbgBlk, dbgOfs;
827  splitRawOffset(segment.offset, dbgBlk, dbgOfs);
828  LOG_DEBUG_DBP("Req missing " << dbgBlk << " " << dbgOfs << " " << segment.length);
829  }
830  }
831 
832  if(length + sizeof(int) + 1 > sizeof(controlMessageBuffer)) {
833  return false;
834  }
835 
836  controlMessageBuffer[length++] = RESEND_MESSAGE;
837 
838  return true;
839 }
840 
841 void DataBlockProtocol::parseResendMessage(int length) {
842  missingTransferSegments.clear();
843 
844  int num = length / (sizeof(unsigned int) + sizeof(unsigned int));
845  int bufferOffset = 0;
846 
847  for(int i=0; i<num; i++) {
848  unsigned int segOffsetNet = *reinterpret_cast<unsigned int*>(&receiveBuffer[bufferOffset]);
849  bufferOffset += sizeof(unsigned int);
850  unsigned int segLenNet = *reinterpret_cast<unsigned int*>(&receiveBuffer[bufferOffset]);
851  bufferOffset += sizeof(unsigned int);
852 
853  int segmentOffsetRaw = static_cast<int>(ntohl(segOffsetNet)); // with block ID
854  int segmentLength = static_cast<int>(ntohl(segLenNet));
855  int dataBlockID, segmentOffset;
856  splitRawOffset(segmentOffsetRaw, dataBlockID, segmentOffset);
857 
858  if(segmentOffset >= 0 && segmentLength > 0 && (segmentOffset + segmentLength) <= rawValidBytes[dataBlockID]) {
859  missingTransferSegments.push_back(std::pair<int, int>(
860  segmentOffsetRaw, segmentLength));
861  }
862 
863  }
864 }
865 
866 void DataBlockProtocol::parseEofMessage(int length) {
867 
868  completedReceptions++;
869  lostSegmentRate = (lostSegmentRate * (completedReceptions-1) + ((double) lostSegmentBytes) / totalReceiveSize) / completedReceptions;
870  LOG_DEBUG_DBP("Lost segment rate: " << lostSegmentRate);
871  if(length >= 4) {
872  // Find all missing segments at the end of blocks
873  for (int i=0; i<numReceptionBlocks; ++i) {
874  if (blockReceiveOffsets[i] < blockReceiveSize[i]) {
875  MissingReceiveSegment missingSeg;
876  missingSeg.offset = mergeRawOffset(i, blockReceiveOffsets[i]);
877  missingSeg.length = blockReceiveSize[i] - blockReceiveOffsets[i];
878  missingSeg.isEof = true;
879  missingReceiveSegments[i].push_back(missingSeg);
880  lostSegmentBytes += missingSeg.length;
881  }
882  }
883  for (int blk=0; blk<numReceptionBlocks; ++blk) {
884  if(missingReceiveSegments[blk].size() > 0) {
885  waitingForMissingSegments = true;
886  resendMessagePending = true;
887  // Initialize all missing block start indices with earliest missing address
888  int mblock, moffset;
889  for (int i=0; i<static_cast<int>(missingReceiveSegments[blk].size()); ++i) {
890  splitRawOffset(missingReceiveSegments[blk][i].offset, mblock, moffset);
891  if (moffset < blockReceiveOffsets[mblock]) {
892  blockReceiveOffsets[mblock] = moffset;
893  }
894  }
895  }
896  }
897  if (!resendMessagePending) {
898  finishedReception = true;
899  }
900  } else {
901  LOG_DEBUG_DBP("EOF message too short, length " << length);
902  }
903 }
904 
905 void DataBlockProtocol::resizeReceiveBuffer() {
906  if(totalReceiveSize < 0) {
907  throw ProtocolException("Received invalid transfer size!");
908  }
909 
910  // We increase the requested size to allow for one
911  // additional network message and the protocol overhead
912  int bufferSize = 2*getMaxReceptionSize()
913  + MAX_OUTSTANDING_BYTES + sizeof(int);
914 
915  // Resize the buffer
916  if(static_cast<int>(receiveBuffer.size()) < bufferSize) {
917  receiveBuffer.resize(bufferSize);
918  }
919 
920  for (int i=0; i<numReceptionBlocks; ++i) {
921  if (static_cast<int>(blockReceiveBuffers[i].size()) < blockReceiveSize[i]) {
922  blockReceiveBuffers[i].resize(blockReceiveSize[i]);
923  }
924  }
925 }
926 
927 }} // namespace
928 
visiontransfer::internal::DataBlockProtocol::setTransferData
void setTransferData(int block, unsigned char *data, int validBytes=0x7FFFFFFF)
Sets the payload data for the next transfer.
Definition: datablockprotocol.cpp:161
visiontransfer::internal::DataBlockProtocol::getMaxReceptionSize
int getMaxReceptionSize() const
Returns the maximum payload size that can be received.
Definition: datablockprotocol.cpp:333
visiontransfer::internal::DataBlockProtocol::transferComplete
bool transferComplete()
Returns true if the current transfer has been completed.
Definition: datablockprotocol.cpp:326
visiontransfer::internal::DataBlockProtocol::resetReception
void resetReception(bool dropped)
Resets the message reception.
Definition: datablockprotocol.cpp:638
visiontransfer::internal::DataBlockProtocol::HeaderPreamble
Definition: datablockprotocol.h:100
visiontransfer::internal::DataBlockProtocol::getNextReceiveBuffer
unsigned char * getNextReceiveBuffer(int maxLength)
Gets a buffer for receiving the next network message.
Definition: datablockprotocol.cpp:341
visiontransfer::internal::DataBlockProtocol::getTransferMessage
const unsigned char * getTransferMessage(int &length)
Gets the next network message for the current transfer.
Definition: datablockprotocol.cpp:195
visiontransfer::internal::DataBlockProtocol::SegmentHeaderTCP
Definition: datablockprotocol.h:108
visiontransfer::internal::DataBlockProtocol::isConnected
bool isConnected() const
Returns true if a remote connection is established.
Definition: datablockprotocol.cpp:729
visiontransfer::internal::DataBlockProtocol::getReceivedData
unsigned char * getReceivedData(int &length)
Returns the data that has been received for the current transfer.
Definition: datablockprotocol.cpp:658
visiontransfer::internal::DataBlockProtocol::setTransferBytes
void setTransferBytes(int block, long bytes)
Sets the per-block transfer size.
Definition: datablockprotocol.cpp:111
visiontransfer::internal::DataBlockProtocol::getReceivedHeader
unsigned char * getReceivedHeader(int &length)
Returns the header data that has been received for the current transfer.
Definition: datablockprotocol.cpp:663
visiontransfer::internal::DataBlockProtocol::newClientConnected
bool newClientConnected()
Returns true if the last network message has established a new connection from a client.
Definition: datablockprotocol.cpp:799
visiontransfer::internal::DataBlockProtocol::processReceivedMessage
void processReceivedMessage(int length, bool &transferComplete)
Handles a received network message.
Definition: datablockprotocol.cpp:348
visiontransfer::ProtocolException
Exception class that is used for all protocol exceptions.
Definition: exceptions.h:37
visiontransfer::internal::DataBlockProtocol::resetTransfer
void resetTransfer()
Resets all transfer related internal variables.
Definition: datablockprotocol.cpp:101
visiontransfer::internal::DataBlockProtocol::SegmentHeaderUDP
Definition: datablockprotocol.h:105
visiontransfer::internal::DataBlockProtocol::setTransferValidBytes
void setTransferValidBytes(int block, int validBytes)
Updates the number of valid bytes in a partial transfer.
Definition: datablockprotocol.cpp:175
visiontransfer::internal::DataBlockProtocol::setTransferHeader
void setTransferHeader(unsigned char *data, int headerSize, int blocks)
Sets a user-defined header that shall be transmitted with the next transfer.
Definition: datablockprotocol.cpp:122
visiontransfer::internal::DataBlockProtocol::getNextControlMessage
const unsigned char * getNextControlMessage(int &length)
If a control message is pending to be transmitted, then the message data will be returned by this met...
Definition: datablockprotocol.cpp:740
Allied Vision