17 #include "visiontransfer/parametertransfer.h"
18 #include "visiontransfer/exceptions.h"
19 #include "visiontransfer/internalinformation.h"
20 #include "visiontransfer/parametertransferdata.h"
21 #include "visiontransfer/parameterserialization.h"
30 using namespace visiontransfer;
31 using namespace visiontransfer::internal;
32 using namespace visiontransfer::param;
34 namespace visiontransfer {
37 constexpr
int ParameterTransfer::SOCKET_TIMEOUT_MS;
39 thread_local
bool ParameterTransfer::transactionInProgress =
false;
40 thread_local std::vector<std::pair<std::string, std::string> > ParameterTransfer::transactionQueuedWrites = {};
42 ParameterTransfer::ParameterTransfer(
const char* address,
const char* service)
43 : socket(INVALID_SOCKET), address(address), service(service), networkReady(false), featureDisabledTransactions(false) {
45 tabTokenizer.collapse(
false).separators({
"\t"});
47 Networking::initNetworking();
51 ParameterTransfer::~ParameterTransfer() {
52 threadRunning =
false;
53 if (receiverThread->joinable()) {
54 receiverThread->join();
57 if(socket != INVALID_SOCKET) {
58 Networking::closeSocket(socket);
63 void ParameterTransfer::attemptConnection() {
64 std::unique_lock<std::mutex> localLock(socketModificationMutex);
66 addrinfo* addressInfo = Networking::resolveAddress(address.c_str(), service.c_str());
68 socket = Networking::connectTcpSocket(addressInfo);
69 Networking::setSocketTimeout(socket, SOCKET_TIMEOUT_MS);
71 if (!receiverThread) {
72 receiverThread = std::make_shared<std::thread>(std::bind(&ParameterTransfer::receiverRoutine,
this));
78 size_t written = send(socket,
"A\n", 2, 0);
80 Networking::closeSocket(socket);
81 socket = INVALID_SOCKET;
83 TransferException ex(
"Error sending GetAllParameter request: " + Networking::getLastErrorString());
87 freeaddrinfo(addressInfo);
90 void ParameterTransfer::waitNetworkReady()
const {
93 std::unique_lock<std::mutex> readyLock(readyMutex);
94 auto status = readyCond.wait_for(readyLock, std::chrono::milliseconds(2000));
95 if (status == std::cv_status::timeout) {
101 void ParameterTransfer::readParameter(
unsigned char messageType,
const char*
id,
unsigned char* dest,
int length) {
105 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
108 for (
int i=0; i<length; ++i) { dest[i] =
'\0'; }
111 void ParameterTransfer::sendNetworkCommand(
const std::string& cmdline) {
112 std::unique_lock<std::mutex> localLock(socketModificationMutex);
113 if (socket == INVALID_SOCKET) {
114 throw TransferException(
"Connection has been closed and not reconnected so far");
116 size_t written = send(socket, cmdline.c_str(), (
int) cmdline.size(), 0);
117 if(written != cmdline.size()) {
118 throw TransferException(
"Error sending parameter set request: " + Networking::getLastErrorString());
127 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
129 if (!paramSet.count(
id)) {
135 std::stringstream ss;
136 ss <<
"S" <<
"\t" << (synchronous ? getThreadId() : -1) <<
"\t" <<
id <<
"\t" << value <<
"\n";
139 blockingCallThisThread([
this, &
id, &value, &ss](){
140 sendNetworkCommand(ss.str());
142 auto result = lastSetRequestResult[getThreadId()];
143 if (result.first ==
false) {
150 paramSet[id].setCurrent<T>(value);
154 sendNetworkCommand(ss.str());
164 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
166 if (!paramSet.count(
id)) {
172 std::stringstream ss;
173 ss <<
"S" <<
"\t" << (synchronous ? getThreadId() : -1) <<
"\t" << id <<
"\t" << value <<
"\n";
176 blockingCallThisThread([
this, &
id, &value, &ss](){
177 sendNetworkCommand(ss.str());
179 auto result = lastSetRequestResult[getThreadId()];
180 if (result.first ==
false) {
187 paramSet[id].setCurrent<std::string>(value);
191 sendNetworkCommand(ss.str());
196 void ParameterTransfer::writeParameterTransactionGuardedImpl(
const char*
id,
const T& value) {
197 if (transactionInProgress) {
198 if (!paramSet.count(
id)) {
202 transactionQueuedWrites.push_back({std::string(
id), std::to_string(value)});
210 void ParameterTransfer::writeParameterTransactionUnguardedImpl(
const char*
id,
const T& value) {
218 if (transactionInProgress) {
219 if (!paramSet.count(
id)) {
223 transactionQueuedWrites.push_back({std::string(
id), value});
232 writeParameterTransactionGuardedImpl<double>(
id, value);
237 writeParameterTransactionGuardedImpl<int>(
id, value);
242 writeParameterTransactionGuardedImpl<bool>(
id, value);
247 writeParameterTransactionUnguardedImpl<bool>(
id, value);
254 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
256 if (!paramSet.count(
id)) {
266 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
268 if (!paramSet.count(
id)) {
278 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
280 if (!paramSet.count(
id)) {
302 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
304 std::map<std::string, ParameterInfo> compatMap;
306 std::unique_lock<std::mutex> globalLock(mapMutex);
307 for (
auto kv: paramSet) {
308 auto& name = kv.first;
309 auto& param = kv.second;
310 bool writeable = param.getAccessForApi() == param::Parameter::ACCESS_READWRITE;
311 switch(param.getType()) {
312 case param::ParameterValue::TYPE_INT: {
313 int min = -1, max = -1, increment = -1;
314 if (param.hasRange()) {
315 min = param.getMin<
int>();
316 max = param.getMax<
int>();
318 if (param.hasIncrement()) {
319 increment = param.getIncrement<
int>();
321 compatMap[name] = ParameterInfo::fromInt(name, writeable, param.getCurrent<
int>(), min, max, increment);
324 case param::ParameterValue::TYPE_DOUBLE: {
325 double min = -1, max = -1, increment = -1;
326 if (param.hasRange()) {
327 min = param.getMin<
double>();
328 max = param.getMax<
double>();
330 if (param.hasIncrement()) {
331 increment = param.getIncrement<
double>();
333 compatMap[name] = ParameterInfo::fromDouble(name, writeable, param.getCurrent<
double>(), min, max, increment);
336 case param::ParameterValue::TYPE_BOOL: {
337 compatMap[name] = ParameterInfo::fromBool(name, writeable, param.getCurrent<
bool>());
349 void ParameterTransfer::receiverRoutine() {
350 auto refTime = std::chrono::steady_clock::now();
352 threadRunning =
true;
353 [[maybe_unused]]
int internalThreadId = getThreadId();
354 while (threadRunning) {
355 if (socket == INVALID_SOCKET) {
360 std::cerr <<
"Failed to reconnect to parameter server." << std::endl;
362 std::this_thread::sleep_for(std::chrono::milliseconds(SOCKET_RECONNECT_INTERVAL_MS));
366 int bytesReceived = recv(socket, recvBuf+recvBufBytes, (RECV_BUF_SIZE - recvBufBytes), 0);
367 if (bytesReceived < 0) {
368 auto err = Networking::getErrno();
369 if(err == EAGAIN || err == EWOULDBLOCK || err == ETIMEDOUT) {
374 std::cerr <<
"Network error (will periodically attempt reconnection)." << std::endl;
375 std::unique_lock<std::mutex> localLock(socketModificationMutex);
376 Networking::closeSocket(socket);
377 socket = INVALID_SOCKET;
378 refTime = std::chrono::steady_clock::now();
380 networkErrorString = std::string(
"Error receiving network packet: ") + Networking::getLastErrorString();
383 }
else if (bytesReceived == 0) {
384 std::cerr <<
"Connection closed by remote side (will periodically attempt reconnection)." << std::endl;
385 std::unique_lock<std::mutex> localLock(socketModificationMutex);
386 Networking::closeSocket(socket);
387 socket = INVALID_SOCKET;
388 refTime = std::chrono::steady_clock::now();
390 networkErrorString =
"Connection closed";
393 recvBufBytes += bytesReceived;
395 unsigned int start=0;
396 for (
unsigned int i=0; i<recvBufBytes; ++i) {
397 unsigned char c = recvBuf[i];
399 std::string currentLine((
const char*) recvBuf+start, i-start);
400 auto toks = tabTokenizer.tokenize(currentLine);
402 const std::string& cmd = toks[0];
406 long reportedVersionMajor = atol(toks[1].c_str());
407 if(reportedVersionMajor !=
static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR)) {
410 networkErrorString = std::string(
"Protocol major version mismatch, expected ") + std::to_string(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR) +
" but got " + toks[1];
411 threadRunning =
false;
413 std::lock_guard<std::mutex> readyLock(readyMutex);
414 readyCond.notify_all();
417 long reportedVersionMinor = -1;
420 reportedVersionMinor = atol(toks[2].c_str());
421 if (reportedVersionMinor >
static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MINOR)) {
422 std::cerr <<
"Caution: remote parameter protocol version " << reportedVersionMajor <<
"." << reportedVersionMinor
423 <<
" is newer than our version " <<
static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR)
424 <<
"." <<
static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MINOR) << std::endl;
425 std::cerr <<
"Consider a library upgrade for maximum compatibility." << std::endl;
429 if (reportedVersionMinor == -1) {
431 std::cerr <<
"Warning: remote firmware is out of date - parameter batch transaction support disabled." << std::endl;
432 featureDisabledTransactions =
true;
435 std::stringstream ss;
436 ss <<
"P" <<
"\t" << (
unsigned int) visiontransfer::internal::InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR
437 <<
"\t" << (
unsigned int) visiontransfer::internal::InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MINOR <<
"\n";
439 std::unique_lock<std::mutex> localLock(socketModificationMutex);
440 if (socket == INVALID_SOCKET) {
441 throw TransferException(
"Connection has been closed and not reconnected so far");
443 size_t written = send(socket, ss.str().c_str(), (
int) ss.str().size(), 0);
444 if(written != ss.str().size()) {
445 throw TransferException(
"Error sending protocol version handshake reply: " + Networking::getLastErrorString());
451 networkErrorString =
"Incomplete transfer of protocol version";
452 threadRunning =
false;
455 }
else if (cmd==
"I") {
457 Parameter param = ParameterSerialization::deserializeParameterFullUpdate(toks);
458 auto uid = param.
getUid();
459 bool alreadyPresent = paramSet.count(uid);
460 paramSet[uid] = param;
461 if (alreadyPresent && parameterUpdateCallback) {
463 parameterUpdateCallback(uid);
465 }
else if (cmd==
"M") {
467 if (paramSet.count(toks[1])) {
468 Parameter param = ParameterSerialization::deserializeParameterFullUpdate(toks,
"M");
469 auto uid = param.
getUid();
471 paramSet[uid] = param;
472 if (parameterUpdateCallback) {
473 parameterUpdateCallback(uid);
476 std::cerr <<
"Parameter not received yet - not updating metadata of: " << toks[1] << std::endl;;
478 }
else if (cmd==
"V") {
480 if (toks.size() < 3) {
483 if (paramSet.count(toks[1])) {
485 ParameterSerialization::deserializeParameterValueChange(toks, paramSet[toks[1]]);
486 if (parameterUpdateCallback) {
487 parameterUpdateCallback(toks[1]);
490 std::cerr <<
"Parameter not received yet - not updating value of: " << toks[1] << std::endl;;
492 }
else if (cmd==
"R") {
494 if (toks.size() < 4) {
497 std::unique_lock<std::mutex> globalLock(mapMutex);
498 int replyThreadId = atol(toks[1].c_str());
499 if (waitConds.count(replyThreadId)) {
501 std::lock_guard<std::mutex> localLock(waitCondMutexes[replyThreadId]);
502 lastSetRequestResult[replyThreadId] = {toks[2] ==
"1", toks[3]};
503 waitConds[replyThreadId].notify_all();
505 if (replyThreadId != -1) {
506 std::cerr <<
"Ignoring unexpected request result for thread " << replyThreadId << std::endl;
509 }
else if (cmd==
"E") {
513 std::lock_guard<std::mutex> readyLock(readyMutex);
514 readyCond.notify_all();
515 }
else if (cmd==
"HB") {
517 }
else if (cmd==
"X") {
521 networkErrorString = std::string(
"Unknown update command received: ") + cmd;
522 threadRunning =
false;
530 if (start>=recvBufBytes) {
533 std::memmove(recvBuf, recvBuf+start, recvBufBytes-start);
534 recvBufBytes = recvBufBytes-start;
540 int ParameterTransfer::getThreadId() {
542 static std::atomic_int threadCount{0};
543 thread_local
int threadId = threadCount.fetch_add(1);
547 void ParameterTransfer::blockingCallThisThread(std::function<
void()> fn,
int waitMaxMilliseconds) {
548 bool timeout =
false;
549 auto tid = getThreadId();
551 std::unique_lock<std::mutex> globalLock(mapMutex);
553 auto& localWaitCond = waitConds[tid];
554 auto& localWaitCondMutex = waitCondMutexes[tid];
555 std::unique_lock<std::mutex> localLock(localWaitCondMutex);
562 auto status = localWaitCond.wait_for(localLock, std::chrono::milliseconds(waitMaxMilliseconds));
563 timeout = (status == std::cv_status::timeout);
567 std::unique_lock<std::mutex> globalLock(mapMutex);
568 waitConds.erase(tid);
569 waitCondMutexes.erase(tid);
582 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
591 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
596 void ParameterTransfer::setParameterUpdateCallback(std::function<
void(
const std::string& uid)> callback) {
597 parameterUpdateCallback = callback;
602 if (featureDisabledTransactions) {
606 if (transactionInProgress)
throw TransferException(
"Simultaneous and/or nested parameter transactions are not supported");
607 transactionInProgress =
true;
612 if (featureDisabledTransactions) {
620 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
624 if (transactionQueuedWrites.size() > 0) {
626 std::set<std::string> affectedUids;
627 for (
auto& kv: transactionQueuedWrites) {
628 affectedUids.insert(kv.first);
631 std::string transactionId = std::to_string(getThreadId());
634 std::stringstream ss;
635 ss <<
"TS" <<
"\t" << transactionId <<
"\t";
637 for (
auto& uid: affectedUids) {
638 if (first) first=
false;
else ss <<
",";
643 std::unique_lock<std::mutex> localLock(socketModificationMutex);
644 if (socket == INVALID_SOCKET) {
645 throw TransferException(
"Connection has been closed and not reconnected so far");
647 size_t written = send(socket, ss.str().c_str(), (
int) ss.str().size(), 0);
648 if(written != ss.str().size()) {
649 throw TransferException(
"Error sending transaction start request: " + Networking::getLastErrorString());
655 for (
auto& kv: transactionQueuedWrites) {
656 auto& uid = kv.first;
657 auto& value = kv.second;
663 std::stringstream ss;
664 ss <<
"TE" <<
"\t" << transactionId <<
"\n";
666 std::unique_lock<std::mutex> localLock(socketModificationMutex);
667 if (socket == INVALID_SOCKET) {
668 throw TransferException(
"Connection has been closed and not reconnected so far");
670 size_t written = send(socket, ss.str().c_str(), (
int) ss.str().size(), 0);
671 if(written != ss.str().size()) {
672 throw TransferException(
"Error sending transaction end request: " + Networking::getLastErrorString());
679 transactionQueuedWrites.clear();
682 transactionInProgress =
false;