15 #ifndef VISIONTRANSFER_DATABLOCKPROTOCOL_H
16 #define VISIONTRANSFER_DATABLOCKPROTOCOL_H
24 #include "visiontransfer/alignedallocator.h"
25 #include "visiontransfer/exceptions.h"
27 namespace visiontransfer {
46 class DataBlockProtocol {
54 static const int MAX_DATA_BLOCKS = 8;
57 static const int MAX_TCP_BYTES_TRANSFER = 0xFFFF;
58 static const int MAX_UDP_RECEPTION = 0x4000;
59 static const int MAX_OUTSTANDING_BYTES = 2*MAX_TCP_BYTES_TRANSFER;
64 struct HeaderPreamble {
65 uint16_t netHeaderSize;
66 int32_t netTransferSizeDummy;
67 uint32_t netTransferSizes[MAX_DATA_BLOCKS];
69 struct SegmentHeaderUDP {
70 uint32_t segmentOffset;
72 struct SegmentHeaderTCP {
73 uint32_t fragmentSize;
74 uint32_t segmentOffset;
93 return protType == PROTOCOL_UDP ?
sizeof(int) : 0;
146 void setTransferData(
int block,
unsigned char* data,
int validBytes = 0x7FFFFFFF);
233 return droppedReceptions;
265 unsigned char* getBlockReceiveBuffer(
int block) {
266 if (block >= numReceptionBlocks) {
267 throw ProtocolException(
"Tried to get receive buffer beyond initialized block range");
269 return &blockReceiveBuffers[block][0];
271 int getBlockValidSize(
int block) {
272 if (block >= numReceptionBlocks) {
273 throw ProtocolException(
"Tried to get valid buffer index beyond initialized block range");
275 return blockValidSize[block];
277 bool isBlockDone(
int block) {
278 if (block >= numReceptionBlocks) {
279 throw ProtocolException(
"Tried to get completion status of uninitialized block");
281 return blockValidSize[block] >= blockReceiveSize[block];
283 bool allBlocksDone() {
284 for (
int i=0; i<numReceptionBlocks; ++i) {
285 if (!isBlockDone(i))
return false;
289 bool anyPayloadReceived() {
290 for (
int i=0; i<numReceptionBlocks; ++i) {
291 if (blockReceiveOffsets[i] > 0)
return true;
296 std::string statusReport();
298 bool wasHeaderReceived()
const {
299 return headerReceived;
306 struct MissingReceiveSegment {
310 unsigned char subsequentData[4];
313 static constexpr
int HEARTBEAT_INTERVAL_MS = 1000;
314 static constexpr
int RECONNECT_TIMEOUT_MS = 2000;
316 static constexpr
unsigned char CONNECTION_MESSAGE = 0x01;
317 static constexpr
unsigned char CONFIRM_MESSAGE = 0x02;
318 static constexpr
unsigned char HEADER_MESSAGE = 0x03;
319 static constexpr
unsigned char RESEND_MESSAGE = 0x04;
320 static constexpr
unsigned char EOF_MESSAGE = 0x05;
321 static constexpr
unsigned char HEARTBEAT_MESSAGE = 0x06;
324 ProtocolType protType;
330 unsigned char* rawDataArr[MAX_DATA_BLOCKS];
331 int rawDataArrStrideHackOrig[MAX_DATA_BLOCKS];
332 int rawDataArrStrideHackRepl[MAX_DATA_BLOCKS];
333 int rawValidBytes[MAX_DATA_BLOCKS];
334 int transferOffset[MAX_DATA_BLOCKS];
335 int transferSize[MAX_DATA_BLOCKS];
336 char overwrittenTransferData[
sizeof(SegmentHeaderTCP)];
337 int overwrittenTransferIndex;
338 int overwrittenTransferBlock;
339 unsigned char* transferHeaderData;
340 int transferHeaderSize;
341 int totalBytesCompleted;
342 int totalTransferSize;
343 int numTransferBlocks;
344 int lastTransmittedBlock;
347 std::deque<MissingReceiveSegment> missingReceiveSegments[MAX_DATA_BLOCKS];
348 std::deque<std::pair<int, int> > missingTransferSegments;
349 bool waitingForMissingSegments;
350 int totalReceiveSize;
352 unsigned char controlMessageBuffer[1024 * 16];
355 bool connectionConfirmed;
356 bool confirmationMessagePending;
357 bool eofMessagePending;
358 bool clientConnectionPending;
359 bool resendMessagePending;
360 std::chrono::steady_clock::time_point lastRemoteHostActivity;
361 std::chrono::steady_clock::time_point lastSentHeartbeat;
362 std::chrono::steady_clock::time_point lastReceivedHeartbeat;
365 std::vector<unsigned char, AlignedAllocator<unsigned char> > receiveBuffer;
366 std::vector<unsigned char, AlignedAllocator<unsigned char> > blockReceiveBuffers[MAX_DATA_BLOCKS];
367 int blockReceiveOffsets[MAX_DATA_BLOCKS];
368 int blockReceiveSize[MAX_DATA_BLOCKS];
369 int blockValidSize[MAX_DATA_BLOCKS];
370 std::vector<unsigned char> receivedHeader;
371 bool finishedReception;
372 int droppedReceptions;
373 int completedReceptions;
374 double lostSegmentRate;
375 int lostSegmentBytes;
376 unsigned char unprocessedMsgPart[MAX_OUTSTANDING_BYTES];
377 int unprocessedMsgLength;
380 int numReceptionBlocks;
383 const unsigned char* extractPayload(
const unsigned char* data,
int& length,
bool& error);
384 bool processControlMessage(
int length);
385 void restoreTransferBuffer();
386 bool generateResendRequest(
int& length);
387 void getNextTransferSegment(
int& block,
int& offset,
int& length);
388 void parseResendMessage(
int length);
389 void parseEofMessage(
int length);
390 void integrateMissingUdpSegments(
int block,
int lastSegmentOffset,
int lastSegmentSize);
393 void resizeReceiveBuffer();
394 int parseReceivedHeader(
int length,
int offset);
395 void zeroStructures();
396 void splitRawOffset(
int rawSegmentOffset,
int& dataBlockID,
int& segmentOffset);
397 int mergeRawOffset(
int dataBlockID,
int segmentOffset,
int reserved=0);