22 #include "visiontransfer/datablockprotocol.h"
23 #include "visiontransfer/exceptions.h"
31 #include <arpa/inet.h>
34 #define LOG_DEBUG_DBP(expr)
38 using namespace visiontransfer;
39 using namespace visiontransfer::internal;
41 namespace visiontransfer {
44 DataBlockProtocol::DataBlockProtocol(
bool server, ProtocolType protType,
int maxUdpPacketSize)
45 : isServer(server), protType(protType),
47 overwrittenTransferData{0},
48 overwrittenTransferIndex{-1},
49 overwrittenTransferBlock{-1},
50 transferHeaderData{
nullptr},
51 transferHeaderSize{0},
52 totalBytesCompleted{0}, totalTransferSize{0},
53 waitingForMissingSegments(
false),
54 totalReceiveSize(0), connectionConfirmed(
false),
55 confirmationMessagePending(
false), eofMessagePending(
false),
56 clientConnectionPending(
false), resendMessagePending(
false),
57 lastRemoteHostActivity(), lastSentHeartbeat(),
58 lastReceivedHeartbeat(std::chrono::steady_clock::now()),
59 finishedReception(
false), droppedReceptions(0),
60 completedReceptions(0), lostSegmentRate(0.0), lostSegmentBytes(0),
61 unprocessedMsgLength(0), headerReceived(
false) {
63 if(protType == PROTOCOL_TCP) {
64 maxPayloadSize = MAX_TCP_BYTES_TRANSFER -
sizeof(SegmentHeaderTCP);
67 maxPayloadSize = maxUdpPacketSize -
sizeof(SegmentHeaderUDP);
68 minPayloadSize = maxPayloadSize;
71 resizeReceiveBuffer();
72 resetReception(
false);
74 void DataBlockProtocol::splitRawOffset(
int rawSegmentOffset,
int& dataBlockID,
int& segmentOffset) {
75 int selector = (rawSegmentOffset >> 28) & 0xf;
76 dataBlockID = selector & 0x7;
77 segmentOffset = rawSegmentOffset & 0x0FFFffff;
80 int DataBlockProtocol::mergeRawOffset(
int dataBlockID,
int segmentOffset,
int reserved_defaults0) {
81 return ((reserved_defaults0 & 1) << 31) | ((dataBlockID & 0x07) << 28) | (segmentOffset & 0x0FFFffff);
84 void DataBlockProtocol::zeroStructures() {
85 for (
int i=0; i<MAX_DATA_BLOCKS; ++i) {
86 rawDataArr[i] =
nullptr;
87 rawDataArrStrideHackOrig[i] = 0;
88 rawDataArrStrideHackRepl[i] = 0;
90 transferOffset[i] = 0;
93 std::memset(overwrittenTransferData, 0,
sizeof(overwrittenTransferData));
94 overwrittenTransferIndex = -1;
95 overwrittenTransferBlock = -1;
96 lastTransmittedBlock = -1;
98 numReceptionBlocks = 0;
103 overwrittenTransferIndex = -1;
104 overwrittenTransferBlock = -1;
105 totalBytesCompleted = 0;
106 totalTransferSize = 0;
107 numTransferBlocks = 0;
108 missingTransferSegments.clear();
112 if (transferHeaderData ==
nullptr) {
113 throw ProtocolException(
"Tried to set data block size before initializing header!");
114 }
else if (block >= numTransferBlocks) {
115 throw ProtocolException(
"Request to set data block size - block index too high!");
117 transferSize[block] = bytes;
119 hp->netTransferSizes[block] = htonl(bytes);
123 if(!transferDone && numTransferBlocks > 0) {
125 }
else if(headerSize + 9 >
static_cast<int>(
sizeof(controlMessageBuffer))) {
127 }
else if(blocks == 0) {
131 numTransferBlocks = blocks;
133 transferDone =
false;
134 for (
int i=0; i<MAX_DATA_BLOCKS; ++i) {
135 this->transferSize[i] = 0;
140 transferHeaderData = &data[-headerBaseOffset];
143 unsigned short netHeaderSize = htons(
static_cast<unsigned short>(headerSize));
144 ourHeader->netHeaderSize = netHeaderSize;
145 ourHeader->netTransferSizeDummy = htonl(-1);
147 headerSize += headerBaseOffset;
149 if(protType == PROTOCOL_UDP) {
151 transferHeaderData[headerSize++] = HEADER_MESSAGE;
152 transferHeaderData[headerSize++] = 0xFF;
153 transferHeaderData[headerSize++] = 0xFF;
154 transferHeaderData[headerSize++] = 0xFF;
155 transferHeaderData[headerSize++] = 0xFF;
158 transferHeaderSize = headerSize;
162 if(transferHeaderSize == 0 || transferHeaderData ==
nullptr) {
166 transferDone =
false;
167 rawDataArr[block] = data;
168 transferOffset[block] = 0;
169 overwrittenTransferIndex = -1;
170 overwrittenTransferBlock = -1;
171 rawValidBytes[block] = min(transferSize[block], validBytes);
172 totalBytesCompleted = 0;
176 if(validBytes >= transferSize[block]) {
177 rawValidBytes[block] = transferSize[block];
178 }
else if(validBytes <
static_cast<int>(
sizeof(
int))) {
179 rawValidBytes[block] = 0;
181 rawValidBytes[block] = validBytes;
185 std::string DataBlockProtocol::statusReport() {
186 std::stringstream ss;
187 ss <<
"DataBlockProtocol, blocks=" << numTransferBlocks <<
": ";
188 for (
int i=0; i<numTransferBlocks; ++i) {
189 ss << i <<
":(len " << transferSize[i] <<
" ofs " << transferOffset[i] <<
" rawvalid " << rawValidBytes[i] <<
") ";
191 ss <<
" total done: " << totalBytesCompleted <<
"/" << totalTransferSize;
196 if(transferDone || rawValidBytes == 0) {
203 if(protType == PROTOCOL_TCP && transferHeaderData !=
nullptr) {
204 length = transferHeaderSize;
205 const unsigned char* ret = transferHeaderData;
206 transferHeaderData =
nullptr;
212 restoreTransferBuffer();
215 int block = -1, offset = -1;
216 getNextTransferSegment(block, offset, length);
221 if(protType == PROTOCOL_UDP) {
223 overwrittenTransferBlock = block;
224 overwrittenTransferIndex = offset + length;
226 std::memcpy(overwrittenTransferData, segmentHeader,
sizeof(
SegmentHeaderUDP));
227 segmentHeader->segmentOffset =
static_cast<int>(htonl(mergeRawOffset(block, offset)));
229 lastTransmittedBlock = block;
230 return &rawDataArr[block][offset];
236 unsigned char* dataPointer =
nullptr;
238 if(headerOffset < 0) {
241 static unsigned char tcpBuffer[MAX_TCP_BYTES_TRANSFER];
242 dataPointer = tcpBuffer;
244 std::memcpy(&tcpBuffer[
sizeof(segmentHeader)], &rawDataArr[block][offset], length);
248 dataPointer = &rawDataArr[block][headerOffset];
249 segmentHeader =
reinterpret_cast<SegmentHeaderTCP*
>(&rawDataArr[block][headerOffset]);
250 overwrittenTransferBlock = block;
251 overwrittenTransferIndex = headerOffset;
252 std::memcpy(overwrittenTransferData, segmentHeader,
sizeof(
SegmentHeaderTCP));
255 segmentHeader->fragmentSize = htonl(length);
256 segmentHeader->segmentOffset =
static_cast<int>(htonl(mergeRawOffset(block, offset)));
258 lastTransmittedBlock = block;
263 void DataBlockProtocol::getNextTransferSegment(
int& block,
int& offset,
int& length) {
264 if(missingTransferSegments.size() == 0) {
266 int sendBlock = 0, amount = 0;
267 for (
int i=0; i<numTransferBlocks; ++i) {
268 int avail = std::min(transferSize[i], rawValidBytes[i]);
269 avail -= transferOffset[i];
270 if (avail > amount) {
275 length = std::min(maxPayloadSize, amount);
276 if(length == 0 || (length < minPayloadSize && rawValidBytes[sendBlock] != transferSize[sendBlock])) {
282 offset = transferOffset[sendBlock];
283 transferOffset[sendBlock] += length;
284 if (protType == PROTOCOL_UDP) {
285 bool complete =
true;
286 for (
int i=0; i<numTransferBlocks; ++i) {
287 if (transferOffset[i] < transferSize[i]) {
293 eofMessagePending =
true;
298 splitRawOffset(missingTransferSegments.front().first, block, offset);
299 length = std::min(maxPayloadSize, missingTransferSegments.front().second);
300 LOG_DEBUG_DBP(
"Re-transmitting: " << offset <<
" - " << (offset + length));
302 int remaining = missingTransferSegments[0].second - length;
305 missingTransferSegments.pop_front();
308 missingTransferSegments.front().first += length;
309 missingTransferSegments.front().second = remaining;
314 void DataBlockProtocol::restoreTransferBuffer() {
315 if(overwrittenTransferBlock >= 0) {
316 if(protType == PROTOCOL_UDP) {
317 std::memcpy(&rawDataArr[overwrittenTransferBlock][overwrittenTransferIndex], overwrittenTransferData,
sizeof(SegmentHeaderUDP));
319 std::memcpy(&rawDataArr[overwrittenTransferBlock][overwrittenTransferIndex], overwrittenTransferData,
sizeof(SegmentHeaderTCP));
322 overwrittenTransferIndex = -1;
323 overwrittenTransferBlock = -1;
327 for (
int i=0; i<numTransferBlocks; ++i) {
328 if (transferOffset[i] < transferSize[i])
return false;
330 return !eofMessagePending;
334 if(protType == PROTOCOL_TCP) {
335 return MAX_TCP_BYTES_TRANSFER;
337 return MAX_UDP_RECEPTION;
342 if(receiveOffset + maxLength > (
int)receiveBuffer.size()) {
343 receiveBuffer.resize(receiveOffset + maxLength);
345 return &receiveBuffer[receiveOffset];
354 if(finishedReception) {
359 if(protType == PROTOCOL_UDP) {
368 void DataBlockProtocol::processReceivedUdpMessage(
int length,
bool& transferComplete) {
369 if(length <
static_cast<int>(
sizeof(
int)) ||
370 0 + length >
static_cast<int>(receiveBuffer.size())) {
375 int rawSegmentOffset = ntohl(*
reinterpret_cast<int*
>(
376 &receiveBuffer[0 + length -
sizeof(
int)]));
378 int dataBlockID, segmentOffset;
379 splitRawOffset(rawSegmentOffset, dataBlockID, segmentOffset);
381 if(rawSegmentOffset ==
static_cast<int>(0xFFFFFFFF)) {
383 processControlMessage(length);
384 }
else if(headerReceived) {
386 int realPayloadOffset = 0;
387 int payloadLength = length -
sizeof(int);
389 if(segmentOffset != blockReceiveOffsets[dataBlockID]) {
392 if(!waitingForMissingSegments &&
393 segmentOffset > blockReceiveOffsets[dataBlockID]
394 && segmentOffset + payloadLength <= (
int)blockReceiveBuffers[dataBlockID].size()) {
396 LOG_DEBUG_DBP(
"Missing segment: " << dataBlockID <<
" size " << payloadLength <<
" ofs " << segmentOffset
397 <<
" but blkRecvOfs " << blockReceiveOffsets[dataBlockID]
398 <<
" (# " << missingReceiveSegments[dataBlockID].size() <<
")");
400 MissingReceiveSegment missingSeg;
401 missingSeg.offset = mergeRawOffset(dataBlockID, blockReceiveOffsets[dataBlockID]);
402 missingSeg.length = segmentOffset - blockReceiveOffsets[dataBlockID];
403 missingSeg.isEof =
false;
404 lostSegmentBytes += missingSeg.length;
405 missingReceiveSegments[dataBlockID].push_back(missingSeg);
408 memcpy(&blockReceiveBuffers[dataBlockID][segmentOffset], &receiveBuffer[0 + realPayloadOffset], payloadLength);
410 blockReceiveOffsets[dataBlockID] = segmentOffset + payloadLength;
416 if(segmentOffset > 0 ) {
417 if(blockReceiveOffsets[dataBlockID] > 0) {
418 LOG_DEBUG_DBP(
"Resend failed!");
422 LOG_DEBUG_DBP(
"Missed EOF message!");
426 if ((realPayloadOffset+payloadLength) > (
int)receiveBuffer.size()) {
431 memcpy(&blockReceiveBuffers[dataBlockID][segmentOffset], &receiveBuffer[0 + realPayloadOffset], payloadLength);
433 blockReceiveOffsets[dataBlockID] = segmentOffset + payloadLength;
434 if (waitingForMissingSegments) {
436 if ((missingReceiveSegments[dataBlockID].size() == 1) && (missingReceiveSegments[dataBlockID].front().length <= payloadLength)) {
438 blockValidSize[dataBlockID] = blockReceiveSize[dataBlockID];
440 blockValidSize[dataBlockID] = segmentOffset + payloadLength;
442 }
else if (missingReceiveSegments[dataBlockID].size() == 0) {
443 blockValidSize[dataBlockID] = segmentOffset + payloadLength;
447 if(segmentOffset == 0 && dataBlockID == 0) {
449 lastRemoteHostActivity = std::chrono::steady_clock::now();
453 integrateMissingUdpSegments(dataBlockID, segmentOffset, payloadLength);
457 void DataBlockProtocol::integrateMissingUdpSegments(
int block,
int lastSegmentOffset,
int lastSegmentSize) {
458 if(waitingForMissingSegments && missingReceiveSegments[block].size() > 0) {
460 int checkBlock, checkOffset;
461 MissingReceiveSegment& firstSeg = missingReceiveSegments[block].front();
462 splitRawOffset(firstSeg.offset, checkBlock, checkOffset);
463 if((lastSegmentOffset != checkOffset) || (block != checkBlock)) {
464 LOG_DEBUG_DBP(
"Received invalid resend: " << block <<
" " << lastSegmentOffset);
467 firstSeg.offset += lastSegmentSize;
468 firstSeg.length -= lastSegmentSize;
469 if(firstSeg.length == 0) {
470 missingReceiveSegments[block].pop_front();
475 for (
int blk=0; blk<numReceptionBlocks; ++blk) {
476 if(missingReceiveSegments[blk].size() > 0) {
482 waitingForMissingSegments =
false;
483 finishedReception =
true;
484 }
else if (missingReceiveSegments[block].size() > 0) {
486 int newBlock, newOffset;
487 MissingReceiveSegment& firstSeg = missingReceiveSegments[block].front();
488 splitRawOffset(firstSeg.offset, newBlock, newOffset);
489 blockReceiveOffsets[block] = newOffset;
495 void DataBlockProtocol::processReceivedTcpMessage(
int length,
bool& transferComplete) {
497 if(!headerReceived) {
498 int totalHeaderSize = parseReceivedHeader(length, 0);
499 if(totalHeaderSize == 0) {
501 receiveOffset += length;
506 length -= totalHeaderSize;
513 int movelength = receiveOffset + length;
514 ::memmove(&receiveBuffer[0], &receiveBuffer[totalHeaderSize], movelength);
515 receiveOffset = movelength;
518 receiveOffset += length;
521 if (legacyTransfer) {
523 int remainingSize = blockReceiveSize[0] - blockValidSize[0];
524 int availableSize = std::min(receiveOffset, remainingSize);
526 std::memcpy(&blockReceiveBuffers[0][blockReceiveOffsets[0]], &receiveBuffer[0], availableSize);
527 blockReceiveOffsets[0] += availableSize;
528 blockValidSize[0] = blockReceiveOffsets[0];
530 if (receiveOffset <= remainingSize) {
535 std::memmove(&receiveBuffer[0], &receiveBuffer[remainingSize], availableSize - remainingSize);
536 receiveOffset = availableSize - remainingSize;
541 while ((receiveOffset - ofs) >= (
int)
sizeof(SegmentHeaderTCP)) {
542 SegmentHeaderTCP* header =
reinterpret_cast<SegmentHeaderTCP*
>(&receiveBuffer[ofs]);
543 int fragsize = ntohl(header->fragmentSize);
544 int rawSegmentOffset = ntohl(header->segmentOffset);
546 splitRawOffset(rawSegmentOffset, block, offset);
550 if ((receiveOffset - ofs) >= (fragsize + (
int)
sizeof(SegmentHeaderTCP))) {
553 if (offset != blockReceiveOffsets[block]) {
556 std::memcpy(&blockReceiveBuffers[block][blockReceiveOffsets[block]], &receiveBuffer[ofs+
sizeof(SegmentHeaderTCP)], fragsize);
557 blockReceiveOffsets[block] += fragsize;
558 blockValidSize[block] = blockReceiveOffsets[block];
560 ofs += fragsize +
sizeof(SegmentHeaderTCP);
568 std::memmove(&receiveBuffer[0], &receiveBuffer[ofs], receiveOffset - ofs);
569 receiveOffset -= ofs;
574 bool complete =
true;
575 for (
int i=0; i<numReceptionBlocks; ++i) {
576 if (blockReceiveOffsets[i] < blockReceiveSize[i]) {
581 finishedReception = complete;
585 int DataBlockProtocol::parseReceivedHeader(
int length,
int offset) {
586 int headerExtraBytes = 6;
588 if(length < headerExtraBytes) {
592 unsigned short headerSize = ntohs(*
reinterpret_cast<unsigned short*
>(&receiveBuffer[offset]));
593 if (length < (headerExtraBytes + headerSize)) {
596 totalReceiveSize =
static_cast<int>(ntohl(*
reinterpret_cast<unsigned int*
>(&receiveBuffer[offset + 2])));
598 if (totalReceiveSize >= 0) {
599 legacyTransfer =
true;
600 headerExtraBytes = 6;
601 numReceptionBlocks = 1;
602 blockReceiveSize[0] = totalReceiveSize;
604 legacyTransfer =
false;
605 headerExtraBytes =
static_cast<int>(
sizeof(HeaderPreamble));
606 HeaderPreamble* header =
reinterpret_cast<HeaderPreamble*
>(&receiveBuffer[offset]);
607 numReceptionBlocks = 0;
608 totalReceiveSize = 0;
609 for (
int i=0; i<MAX_DATA_BLOCKS; ++i) {
610 int s = ntohl(header->netTransferSizes[i]);
612 blockReceiveSize[i] = s;
613 numReceptionBlocks++;
614 totalReceiveSize += s;
622 if (numReceptionBlocks==0)
throw std::runtime_error(
"Received a transfer with zero blocks");
623 if (numReceptionBlocks > MAX_DATA_BLOCKS)
throw std::runtime_error(
"Received a transfer with too many blocks");
625 if(headerSize + headerExtraBytes >
static_cast<int>(receiveBuffer.size())
626 || totalReceiveSize < 0 || headerSize + headerExtraBytes > length ) {
630 headerReceived =
true;
631 receivedHeader.assign(receiveBuffer.begin() + offset + headerExtraBytes,
632 receiveBuffer.begin() + offset + headerSize + headerExtraBytes);
633 resizeReceiveBuffer();
635 return headerSize + headerExtraBytes;
639 numReceptionBlocks = 0;
640 headerReceived =
false;
641 for (
int blk = 0; blk<MAX_DATA_BLOCKS; ++blk) {
642 missingReceiveSegments[blk].clear();
644 receivedHeader.clear();
645 waitingForMissingSegments =
false;
646 totalReceiveSize = 0;
647 finishedReception =
false;
648 lostSegmentBytes = 0;
649 for (
int i=0; i<MAX_DATA_BLOCKS; ++i) {
650 blockReceiveOffsets[i] = 0;
651 blockValidSize[i] = 0;
660 return &receiveBuffer[0];
664 if(receivedHeader.size() > 0) {
665 length =
static_cast<int>(receivedHeader.size());
666 return &receivedHeader[0];
672 bool DataBlockProtocol::processControlMessage(
int length) {
673 if(length <
static_cast<int>(
sizeof(
int) + 1)) {
677 int payloadLength = length -
sizeof(int) - 1;
678 switch(receiveBuffer[0 + payloadLength]) {
679 case CONFIRM_MESSAGE:
681 connectionConfirmed =
true;
683 case CONNECTION_MESSAGE:
685 connectionConfirmed =
true;
686 confirmationMessagePending =
true;
687 clientConnectionPending =
true;
690 lastReceivedHeartbeat = std::chrono::steady_clock::now();
692 case HEADER_MESSAGE: {
693 if (anyPayloadReceived()) {
694 if (allBlocksDone()) {
695 LOG_DEBUG_DBP(
"No EOF message received!");
697 LOG_DEBUG_DBP(
"Received header too late/early!");
701 if(parseReceivedHeader(payloadLength, 0) == 0) {
708 if(anyPayloadReceived()) {
709 parseEofMessage(length);
712 case RESEND_MESSAGE: {
714 parseResendMessage(payloadLength);
717 case HEARTBEAT_MESSAGE:
719 lastReceivedHeartbeat = std::chrono::steady_clock::now();
730 if(protType == PROTOCOL_TCP) {
733 }
else if(connectionConfirmed) {
734 return !isServer || std::chrono::duration_cast<std::chrono::milliseconds>(
735 std::chrono::steady_clock::now() - lastReceivedHeartbeat).count()
736 < 2*HEARTBEAT_INTERVAL_MS;
743 if(protType == PROTOCOL_TCP) {
748 if(confirmationMessagePending) {
750 confirmationMessagePending =
false;
751 controlMessageBuffer[0] = CONFIRM_MESSAGE;
753 }
else if(!isServer && std::chrono::duration_cast<std::chrono::milliseconds>(
754 std::chrono::steady_clock::now() - lastRemoteHostActivity).count() > RECONNECT_TIMEOUT_MS) {
756 controlMessageBuffer[0] = CONNECTION_MESSAGE;
760 lastRemoteHostActivity = lastSentHeartbeat = std::chrono::steady_clock::now();
761 }
else if(transferHeaderData !=
nullptr &&
isConnected()) {
763 length = transferHeaderSize;
764 const unsigned char* ret = transferHeaderData;
765 transferHeaderData =
nullptr;
767 }
else if(eofMessagePending) {
769 eofMessagePending =
false;
770 unsigned int networkOffset = htonl(mergeRawOffset(lastTransmittedBlock, transferSize[lastTransmittedBlock]));
771 memcpy(&controlMessageBuffer[0], &networkOffset,
sizeof(
int));
772 controlMessageBuffer[
sizeof(int)] = EOF_MESSAGE;
774 }
else if(resendMessagePending) {
776 resendMessagePending =
false;
777 if(!generateResendRequest(length)) {
781 }
else if(!isServer && std::chrono::duration_cast<std::chrono::milliseconds>(
782 std::chrono::steady_clock::now() - lastSentHeartbeat).count() > HEARTBEAT_INTERVAL_MS) {
784 controlMessageBuffer[0] = HEARTBEAT_MESSAGE;
786 lastSentHeartbeat = std::chrono::steady_clock::now();
792 controlMessageBuffer[length++] = 0xff;
793 controlMessageBuffer[length++] = 0xff;
794 controlMessageBuffer[length++] = 0xff;
795 controlMessageBuffer[length++] = 0xff;
796 return controlMessageBuffer;
800 if(clientConnectionPending) {
801 clientConnectionPending =
false;
808 bool DataBlockProtocol::generateResendRequest(
int& length) {
810 for (
int blk = 0; blk < numReceptionBlocks; ++blk) {
811 for(MissingReceiveSegment segment: missingReceiveSegments[blk]) {
812 unsigned int segOffset = htonl(
static_cast<unsigned int>(segment.offset));
813 unsigned int segLen = htonl(
static_cast<unsigned int>(segment.length));
815 if (
sizeof(controlMessageBuffer) < length + 2*
sizeof(
unsigned int) + 4) {
821 memcpy(&controlMessageBuffer[length], &segOffset,
sizeof(segOffset));
822 length +=
sizeof(
unsigned int);
823 memcpy(&controlMessageBuffer[length], &segLen,
sizeof(segLen));
824 length +=
sizeof(
unsigned int);
827 splitRawOffset(segment.offset, dbgBlk, dbgOfs);
828 LOG_DEBUG_DBP(
"Req missing " << dbgBlk <<
" " << dbgOfs <<
" " << segment.length);
832 if(length +
sizeof(
int) + 1 >
sizeof(controlMessageBuffer)) {
836 controlMessageBuffer[length++] = RESEND_MESSAGE;
841 void DataBlockProtocol::parseResendMessage(
int length) {
842 missingTransferSegments.clear();
844 int num = length / (
sizeof(
unsigned int) +
sizeof(
unsigned int));
845 int bufferOffset = 0;
847 for(
int i=0; i<num; i++) {
848 unsigned int segOffsetNet = *
reinterpret_cast<unsigned int*
>(&receiveBuffer[bufferOffset]);
849 bufferOffset +=
sizeof(
unsigned int);
850 unsigned int segLenNet = *
reinterpret_cast<unsigned int*
>(&receiveBuffer[bufferOffset]);
851 bufferOffset +=
sizeof(
unsigned int);
853 int segmentOffsetRaw =
static_cast<int>(ntohl(segOffsetNet));
854 int segmentLength =
static_cast<int>(ntohl(segLenNet));
855 int dataBlockID, segmentOffset;
856 splitRawOffset(segmentOffsetRaw, dataBlockID, segmentOffset);
858 if(segmentOffset >= 0 && segmentLength > 0 && (segmentOffset + segmentLength) <= rawValidBytes[dataBlockID]) {
859 missingTransferSegments.push_back(std::pair<int, int>(
860 segmentOffsetRaw, segmentLength));
866 void DataBlockProtocol::parseEofMessage(
int length) {
868 completedReceptions++;
869 lostSegmentRate = (lostSegmentRate * (completedReceptions-1) + ((
double) lostSegmentBytes) / totalReceiveSize) / completedReceptions;
870 LOG_DEBUG_DBP(
"Lost segment rate: " << lostSegmentRate);
873 for (
int i=0; i<numReceptionBlocks; ++i) {
874 if (blockReceiveOffsets[i] < blockReceiveSize[i]) {
875 MissingReceiveSegment missingSeg;
876 missingSeg.offset = mergeRawOffset(i, blockReceiveOffsets[i]);
877 missingSeg.length = blockReceiveSize[i] - blockReceiveOffsets[i];
878 missingSeg.isEof =
true;
879 missingReceiveSegments[i].push_back(missingSeg);
880 lostSegmentBytes += missingSeg.length;
883 for (
int blk=0; blk<numReceptionBlocks; ++blk) {
884 if(missingReceiveSegments[blk].size() > 0) {
885 waitingForMissingSegments =
true;
886 resendMessagePending =
true;
889 for (
int i=0; i<static_cast<int>(missingReceiveSegments[blk].size()); ++i) {
890 splitRawOffset(missingReceiveSegments[blk][i].offset, mblock, moffset);
891 if (moffset < blockReceiveOffsets[mblock]) {
892 blockReceiveOffsets[mblock] = moffset;
897 if (!resendMessagePending) {
898 finishedReception =
true;
901 LOG_DEBUG_DBP(
"EOF message too short, length " << length);
905 void DataBlockProtocol::resizeReceiveBuffer() {
906 if(totalReceiveSize < 0) {
913 + MAX_OUTSTANDING_BYTES +
sizeof(int);
916 if(
static_cast<int>(receiveBuffer.size()) < bufferSize) {
917 receiveBuffer.resize(bufferSize);
920 for (
int i=0; i<numReceptionBlocks; ++i) {
921 if (
static_cast<int>(blockReceiveBuffers[i].size()) < blockReceiveSize[i]) {
922 blockReceiveBuffers[i].resize(blockReceiveSize[i]);