22 #include "visiontransfer/imagetransfer.h"
23 #include "visiontransfer/exceptions.h"
24 #include "visiontransfer/datablockprotocol.h"
25 #include "visiontransfer/networking.h"
28 using namespace visiontransfer;
29 using namespace visiontransfer::internal;
31 namespace visiontransfer {
35 class ImageTransfer::Pimpl {
38 bool server,
int bufferSize,
int maxUdpPacketSize);
42 void setRawTransferData(
const ImageSet& metaData,
const std::vector<unsigned char*>& rawData,
43 int firstTileWidth = 0,
int middleTileWidth = 0,
int lastTileWidth = 0);
44 void setRawValidBytes(
const std::vector<int>& validBytes);
45 void setTransferImageSet(
const ImageSet& imageSet);
46 TransferStatus transferData();
47 bool receiveImageSet(
ImageSet& imageSet);
48 bool receivePartialImageSet(
ImageSet& imageSet,
int& validRows,
bool& complete);
49 int getNumDroppedFrames()
const;
50 bool isConnected()
const;
52 std::string getRemoteAddress()
const;
55 std::string statusReport();
64 std::recursive_mutex receiveMutex;
65 std::recursive_mutex sendMutex;
69 SOCKET tcpServerSocket;
70 sockaddr_in remoteAddress;
73 std::unique_ptr<ImageProtocol> protocol;
78 const unsigned char* currentMsg;
81 void setSocketOptions();
84 void initTcpServer(
const addrinfo* addressInfo);
85 void initTcpClient(
const addrinfo* addressInfo);
86 void initUdp(
const addrinfo* addressInfo);
89 bool receiveNetworkData(
bool block);
92 bool sendNetworkMessage(
const unsigned char* msg,
int length);
93 void sendPendingControlMessages();
95 bool selectSocket(
bool read,
bool wait);
100 ImageTransfer::ImageTransfer(
const char* address,
const char* service,
102 pimpl(new Pimpl(address, service, protType, server, bufferSize, maxUdpPacketSize)) {
107 pimpl(new Pimpl(device.getIpAddress().c_str(),
"7681", static_cast<
ImageProtocol::ProtocolType>(device.getNetworkProtocol()),
108 false, bufferSize, maxUdpPacketSize)) {
112 ImageTransfer::~ImageTransfer() {
117 int firstTileWidth,
int middleTileWidth,
int lastTileWidth) {
118 pimpl->setRawTransferData(metaData, rawData, firstTileWidth, middleTileWidth, lastTileWidth);
122 pimpl->setRawValidBytes(validBytes);
126 pimpl->setTransferImageSet(imageSet);
130 return pimpl->transferData();
134 return pimpl->receiveImageSet(imageSet);
138 return pimpl->receivePartialImageSet(imageSet, validRows, complete);
142 return pimpl->getNumDroppedFrames();
146 return pimpl->isConnected();
154 return pimpl->getRemoteAddress();
158 return pimpl->tryAccept();
162 ImageTransfer::Pimpl::Pimpl(
const char* address,
const char* service,
164 bufferSize,
int maxUdpPacketSize)
165 : protType(protType), isServer(server), bufferSize(bufferSize),
166 maxUdpPacketSize(maxUdpPacketSize),
167 clientSocket(INVALID_SOCKET), tcpServerSocket(INVALID_SOCKET),
168 currentMsgLen(0), currentMsgOffset(0), currentMsg(nullptr) {
170 Networking::initNetworking();
173 signal(SIGPIPE, SIG_IGN);
176 memset(&remoteAddress, 0,
sizeof(remoteAddress));
179 if(address ==
nullptr ||
string(address) ==
"") {
183 addrinfo* addressInfo = Networking::resolveAddress(address, service);
187 initUdp(addressInfo);
189 initTcpServer(addressInfo);
191 initTcpClient(addressInfo);
194 freeaddrinfo(addressInfo);
198 if(addressInfo !=
nullptr) {
199 freeaddrinfo(addressInfo);
203 ImageTransfer::Pimpl::~Pimpl() {
204 if(clientSocket != INVALID_SOCKET) {
205 Networking::closeSocket(clientSocket);
207 if(tcpServerSocket != INVALID_SOCKET) {
208 Networking::closeSocket(tcpServerSocket);
212 void ImageTransfer::Pimpl::initTcpClient(
const addrinfo* addressInfo) {
214 clientSocket = Networking::connectTcpSocket(addressInfo);
215 memcpy(&remoteAddress, addressInfo->ai_addr,
sizeof(remoteAddress));
221 void ImageTransfer::Pimpl::initTcpServer(
const addrinfo* addressInfo) {
225 tcpServerSocket = ::socket(addressInfo->ai_family, addressInfo->ai_socktype,
226 addressInfo->ai_protocol);
227 if (tcpServerSocket == INVALID_SOCKET) {
228 TransferException ex(
"Error opening socket: " + Networking::getLastErrorString());
233 Networking::enableReuseAddress(tcpServerSocket,
true);
236 Networking::bindSocket(tcpServerSocket, addressInfo);
237 clientSocket = INVALID_SOCKET;
240 Networking::setSocketBlocking(tcpServerSocket,
false);
243 listen(tcpServerSocket, 1);
246 void ImageTransfer::Pimpl::initUdp(
const addrinfo* addressInfo) {
249 clientSocket = socket(AF_INET, SOCK_DGRAM, 0);
250 if(clientSocket == INVALID_SOCKET) {
251 TransferException ex(
"Error creating receive socket: " + Networking::getLastErrorString());
256 Networking::enableReuseAddress(clientSocket,
true);
259 if(isServer && addressInfo !=
nullptr) {
260 Networking::bindSocket(clientSocket, addressInfo);
264 memcpy(&remoteAddress, addressInfo->ai_addr,
sizeof(remoteAddress));
271 bool ImageTransfer::Pimpl::tryAccept() {
277 SOCKET newSocket = Networking::acceptConnection(tcpServerSocket, remoteAddress);
278 if(newSocket == INVALID_SOCKET) {
284 unique_lock<recursive_mutex> recvLock(receiveMutex);
285 unique_lock<recursive_mutex> sendLock(sendMutex);
287 if(clientSocket != INVALID_SOCKET) {
288 Networking::closeSocket(clientSocket);
290 clientSocket = newSocket;
296 protocol->resetTransfer();
297 protocol->resetReception();
298 currentMsg =
nullptr;
303 std::string ImageTransfer::Pimpl::getRemoteAddress()
const {
304 unique_lock<recursive_mutex> lock(
const_cast<recursive_mutex&
>(sendMutex));
306 if(remoteAddress.sin_family != AF_INET) {
311 snprintf(strPort,
sizeof(strPort),
":%d", remoteAddress.sin_port);
313 return string(inet_ntoa(remoteAddress.sin_addr)) + strPort;
316 void ImageTransfer::Pimpl::setSocketOptions() {
319 setsockopt(clientSocket, SOL_SOCKET, SO_RCVBUF,
reinterpret_cast<char*
>(&bufferSize),
sizeof(bufferSize));
320 setsockopt(clientSocket, SOL_SOCKET, SO_SNDBUF,
reinterpret_cast<char*
>(&bufferSize),
sizeof(bufferSize));
323 Networking::setSocketTimeout(clientSocket, 500);
324 Networking::setSocketBlocking(clientSocket,
true);
327 void ImageTransfer::Pimpl::setRawTransferData(
const ImageSet& metaData,
328 const std::vector<unsigned char*>& rawDataVec,
int firstTileWidth,
int middleTileWidth,
int lastTileWidth) {
329 unique_lock<recursive_mutex> sendLock(sendMutex);
330 protocol->setRawTransferData(metaData, rawDataVec, firstTileWidth, middleTileWidth, lastTileWidth);
331 currentMsg =
nullptr;
334 void ImageTransfer::Pimpl::setRawValidBytes(
const std::vector<int>& validBytes) {
335 unique_lock<recursive_mutex> sendLock(sendMutex);
336 protocol->setRawValidBytes(validBytes);
339 void ImageTransfer::Pimpl::setTransferImageSet(
const ImageSet& imageSet) {
340 unique_lock<recursive_mutex> sendLock(sendMutex);
341 protocol->setTransferImageSet(imageSet);
342 currentMsg =
nullptr;
346 unique_lock<recursive_mutex> lock(sendMutex);
350 receiveNetworkData(
false);
353 if(remoteAddress.sin_family != AF_INET || !protocol->isConnected()) {
354 return NOT_CONNECTED;
361 setsockopt(clientSocket, IPPROTO_TCP, TCP_CORK, (
char *) &flag,
sizeof(
int));
366 if(currentMsg ==
nullptr) {
367 currentMsgOffset = 0;
368 currentMsg = protocol->getTransferMessage(currentMsgLen);
370 if(currentMsg ==
nullptr) {
371 if(protocol->transferComplete()) {
372 return ALL_TRANSFERRED;
374 return NO_VALID_DATA;
380 bool wouldBlock =
false;
381 bool dataTransferred = (currentMsg !=
nullptr);
382 while(currentMsg !=
nullptr) {
383 int writing = (int)(currentMsgLen - currentMsgOffset);
385 if(sendNetworkMessage(¤tMsg[currentMsgOffset], writing)) {
387 currentMsgOffset = 0;
388 currentMsg = protocol->getTransferMessage(currentMsgLen);
400 setsockopt(clientSocket, IPPROTO_TCP, TCP_CORK, (
char *) &flag,
sizeof(
int));
404 setsockopt(clientSocket, IPPROTO_TCP, TCP_NODELAY, (
char *) &flag,
sizeof(
int));
406 setsockopt(clientSocket, IPPROTO_TCP, TCP_NODELAY, (
char *) &flag,
sizeof(
int));
412 receiveNetworkData(
false);
415 if(protocol->transferComplete()) {
416 return ALL_TRANSFERRED;
417 }
else if(wouldBlock) {
420 return PARTIAL_TRANSFER;
424 bool ImageTransfer::Pimpl::receiveImageSet(
ImageSet& imageSet) {
426 bool complete =
false;
428 std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now();
430 if(!receivePartialImageSet(imageSet, validRows, complete)) {
434 unsigned int time =
static_cast<unsigned int>(std::chrono::duration_cast<std::chrono::milliseconds>(
435 std::chrono::steady_clock::now() - startTime).count());
436 if(time > 100 && !complete) {
444 bool ImageTransfer::Pimpl::receivePartialImageSet(
ImageSet& imageSet,
445 int& validRows,
bool& complete) {
446 unique_lock<recursive_mutex> lock(receiveMutex);
450 while(!protocol->imagesReceived() && receiveNetworkData(block)) {
455 return protocol->getPartiallyReceivedImageSet(imageSet, validRows, complete);
458 bool ImageTransfer::Pimpl::receiveNetworkData(
bool block) {
459 unique_lock<recursive_mutex> lock = block ?
460 unique_lock<recursive_mutex>(receiveMutex) : unique_lock<recursive_mutex>(receiveMutex, std::try_to_lock);
462 if(clientSocket == INVALID_SOCKET) {
467 sendPendingControlMessages();
469 if(!lock.owns_lock()) {
475 if(!block && !selectSocket(
true,
false)) {
480 char* buffer =
reinterpret_cast<char*
>(protocol->getNextReceiveBuffer(maxLength));
483 sockaddr_in fromAddress;
484 socklen_t fromSize =
sizeof(fromAddress);
486 int bytesReceived = recvfrom(clientSocket, buffer, maxLength,
487 0,
reinterpret_cast<sockaddr*
>(&fromAddress), &fromSize);
489 auto err = Networking::getErrno();
493 }
else if(bytesReceived < 0 && err != EWOULDBLOCK && err != EINTR &&
494 err != ETIMEDOUT && err != WSA_IO_PENDING && err != WSAECONNRESET) {
495 TransferException ex(
"Error reading from socket: " + Networking::getErrorString(err));
497 }
else if(bytesReceived > 0) {
498 protocol->processReceivedMessage(bytesReceived);
499 if(protocol->newClientConnected()) {
501 memcpy(&remoteAddress, &fromAddress,
sizeof(remoteAddress));
505 return bytesReceived > 0;
508 void ImageTransfer::Pimpl::disconnect() {
511 unique_lock<recursive_mutex> recvLock(receiveMutex);
512 unique_lock<recursive_mutex> sendLock(sendMutex);
515 Networking::closeSocket(clientSocket);
517 memset(&remoteAddress, 0,
sizeof(remoteAddress));
520 bool ImageTransfer::Pimpl::isConnected()
const {
521 unique_lock<recursive_mutex> lock(
const_cast<recursive_mutex&
>(sendMutex));
523 return remoteAddress.sin_family == AF_INET && protocol->isConnected();
526 bool ImageTransfer::Pimpl::sendNetworkMessage(
const unsigned char* msg,
int length) {
529 sockaddr_in destAddr;
532 unique_lock<recursive_mutex> lock(sendMutex);
533 destAddr = remoteAddress;
534 destSocket = clientSocket;
537 if(destAddr.sin_family != AF_INET) {
541 written = sendto(destSocket,
reinterpret_cast<const char*
>(msg), length, 0,
542 reinterpret_cast<sockaddr*
>(&destAddr),
sizeof(destAddr));
546 unique_lock<recursive_mutex> lock(sendMutex);
547 destSocket = clientSocket;
549 written = send(destSocket,
reinterpret_cast<const char*
>(msg), length, 0);
552 auto sendError = Networking::getErrno();
555 if(sendError == EAGAIN || sendError == EWOULDBLOCK || sendError == ETIMEDOUT) {
558 }
else if(sendError == EPIPE) {
563 TransferException ex(
"Error sending network packet: " + Networking::getErrorString(sendError));
566 }
else if(written != length) {
572 currentMsgOffset += written;
580 void ImageTransfer::Pimpl::sendPendingControlMessages() {
581 const unsigned char* controlMsgData =
nullptr;
582 int controlMsgLen = 0;
585 unique_lock<recursive_mutex> lock(sendMutex);
586 if(remoteAddress.sin_family != AF_INET) {
590 controlMsgData = protocol->getNextControlMessage(controlMsgLen);
592 if(controlMsgData !=
nullptr) {
593 sendNetworkMessage(controlMsgData, controlMsgLen);
600 int ImageTransfer::Pimpl::getNumDroppedFrames()
const {
601 return protocol->getNumDroppedFrames();
604 bool ImageTransfer::Pimpl::selectSocket(
bool read,
bool wait) {
607 unique_lock<recursive_mutex> lock(sendMutex);
622 if(select(sock+1, (read ? &fds :
nullptr), (!read ? &fds :
nullptr),
nullptr, &tv) <= 0) {
628 constexpr
int timeoutMillisec = 100;
632 if (poll(&pfd, 1, wait ? timeoutMillisec: 0) <= 0) {
641 std::string ImageTransfer::statusReport() {
642 return pimpl->statusReport();
644 std::string ImageTransfer::Pimpl::statusReport() {
645 return protocol->statusReport();