libvisiontransfer  10.6.0
parametertransfer.cpp
1 /*******************************************************************************
2  * Copyright (c) 2023 Allied Vision Technologies GmbH
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *******************************************************************************/
14 
15 #include <iostream>
16 
17 #include "visiontransfer/parametertransfer.h"
18 #include "visiontransfer/exceptions.h"
19 #include "visiontransfer/internalinformation.h"
20 #include "visiontransfer/parametertransferdata.h"
21 #include "visiontransfer/parameterserialization.h"
22 
23 #include <cstring>
24 #include <string>
25 #include <functional>
26 #include <atomic>
27 #include <chrono>
28 
29 using namespace std;
30 using namespace visiontransfer;
31 using namespace visiontransfer::internal;
32 using namespace visiontransfer::param;
33 
34 namespace visiontransfer {
35 namespace internal {
36 
37 constexpr int ParameterTransfer::SOCKET_TIMEOUT_MS;
38 
39 thread_local bool ParameterTransfer::transactionInProgress = false;
40 thread_local std::vector<std::pair<std::string, std::string> > ParameterTransfer::transactionQueuedWrites = {};
41 
42 ParameterTransfer::ParameterTransfer(const char* address, const char* service)
43  : socket(INVALID_SOCKET), address(address), service(service), networkReady(false), featureDisabledTransactions(false) {
44 
45  tabTokenizer.collapse(false).separators({"\t"});
46 
47  Networking::initNetworking();
48  attemptConnection();
49 }
50 
51 ParameterTransfer::~ParameterTransfer() {
52  threadRunning = false;
53  if (receiverThread->joinable()) {
54  receiverThread->join();
55  }
56 
57  if(socket != INVALID_SOCKET) {
58  Networking::closeSocket(socket);
59  }
60 }
61 
62 // Will attempt initial connection or reconnection after error
63 void ParameterTransfer::attemptConnection() {
64  std::unique_lock<std::mutex> localLock(socketModificationMutex);
65 
66  addrinfo* addressInfo = Networking::resolveAddress(address.c_str(), service.c_str());
67 
68  socket = Networking::connectTcpSocket(addressInfo);
69  Networking::setSocketTimeout(socket, SOCKET_TIMEOUT_MS);
70 
71  if (!receiverThread) {
72  receiverThread = std::make_shared<std::thread>(std::bind(&ParameterTransfer::receiverRoutine, this));
73  }
74  networkError = false;
75  pollDelay = 1000;
76 
77  // Initial 'GetAll' command
78  size_t written = send(socket, "A\n", 2, 0);
79  if(written != 2) {
80  Networking::closeSocket(socket);
81  socket = INVALID_SOCKET;
82  networkError = true;
83  TransferException ex("Error sending GetAllParameter request: " + Networking::getLastErrorString());
84  throw ex;
85  }
86 
87  freeaddrinfo(addressInfo);
88 }
89 
90 void ParameterTransfer::waitNetworkReady() const {
91  if (!networkReady) {
92  // Block for network to become ready
93  std::unique_lock<std::mutex> readyLock(readyMutex);
94  auto status = readyCond.wait_for(readyLock, std::chrono::milliseconds(2000));
95  if (status == std::cv_status::timeout) {
96  throw TransferException("Timeout waiting for parameter server ready state");
97  }
98  }
99 }
100 
101 void ParameterTransfer::readParameter(unsigned char messageType, const char* id, unsigned char* dest, int length) {
102  waitNetworkReady();
103  if (networkError) {
104  // collecting deferred error from background thread
105  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
106  }
107 
108  for (int i=0; i<length; ++i) { dest[i] = '\0'; } // PLACEHOLDER
109 }
110 
111 void ParameterTransfer::sendNetworkCommand(const std::string& cmdline) {
112  std::unique_lock<std::mutex> localLock(socketModificationMutex);
113  if (socket == INVALID_SOCKET) {
114  throw TransferException("Connection has been closed and not reconnected so far");
115  }
116  size_t written = send(socket, cmdline.c_str(), (int) cmdline.size(), 0);
117  if(written != cmdline.size()) {
118  throw TransferException("Error sending parameter set request: " + Networking::getLastErrorString());
119  }
120 }
121 
122 template<typename T>
123 void ParameterTransfer::writeParameter(const char* id, const T& value, bool synchronous) {
124  waitNetworkReady();
125  if (networkError) {
126  // collecting deferred error from background thread
127  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
128  }
129  if (!paramSet.count(id)) {
130  throw ParameterException("Invalid parameter: " + std::string(id));
131  }
132 
133  // Assemble a set request with our thread id - the receiver thread can unblock us based on that ID
134  // For 'fire-and-forget' commands without reply, we use -1
135  std::stringstream ss;
136  ss << "S" << "\t" << (synchronous ? getThreadId() : -1) << "\t" << id << "\t" << value << "\n";
137 
138  if (synchronous) {
139  blockingCallThisThread([this, &id, &value, &ss](){
140  sendNetworkCommand(ss.str());
141  });
142  auto result = lastSetRequestResult[getThreadId()];
143  if (result.first == false) {
144  // There was a remote error, append its info to the exception
145  throw ParameterException("Remote parameter error: " + result.second);
146  } else {
147  // Local preliminary value update - the (successful!) async remote update may need additional time.
148  // The actual value MIGHT have been revised by the server, but in the vast majority of cases this allows
149  // reading back the successfully written parameter. The safest way is via setParameterUpdateCallback.
150  paramSet[id].setCurrent<T>(value);
151  }
152  } else {
153  // 'Fire and forget' immediate-return mode, e.g. for sending a trigger
154  sendNetworkCommand(ss.str());
155  }
156 }
157 
158 // Explicit instantiation for std::string
159 template<>
160 void ParameterTransfer::writeParameter(const char* id, const std::string& value, bool synchronous) {
161  waitNetworkReady();
162  if (networkError) {
163  // collecting deferred error from background thread
164  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
165  }
166  if (!paramSet.count(id)) {
167  throw ParameterException("Invalid parameter: " + std::string(id));
168  }
169 
170  // Assemble a set request with our thread id - the receiver thread can unblock us based on that ID
171  // For 'fire-and-forget' commands without reply, we use -1
172  std::stringstream ss;
173  ss << "S" << "\t" << (synchronous ? getThreadId() : -1) << "\t" << id << "\t" << value << "\n";
174 
175  if (synchronous) {
176  blockingCallThisThread([this, &id, &value, &ss](){
177  sendNetworkCommand(ss.str());
178  });
179  auto result = lastSetRequestResult[getThreadId()];
180  if (result.first == false) {
181  // There was a remote error, append its info to the exception
182  throw ParameterException("Remote parameter error: " + result.second);
183  } else {
184  // Local preliminary value update - the (successful!) async remote update may need additional time.
185  // The actual value MIGHT have been revised by the server, but in the vast majority of cases this allows
186  // reading back the successfully written parameter. The safest way is via setParameterUpdateCallback.
187  paramSet[id].setCurrent<std::string>(value);
188  }
189  } else {
190  // 'Fire and forget' immediate-return mode, e.g. for sending a trigger
191  sendNetworkCommand(ss.str());
192  }
193 }
194 
195 template<typename T>
196 void ParameterTransfer::writeParameterTransactionGuardedImpl(const char* id, const T& value) {
197  if (transactionInProgress) {
198  if (!paramSet.count(id)) {
199  throw ParameterException("Invalid parameter: " + std::string(id));
200  }
201  // Queue is thread_local
202  transactionQueuedWrites.push_back({std::string(id), std::to_string(value)});
203  } else {
204  // No transaction, immediate dispatch
205  writeParameter(id, value);
206  }
207 }
208 
209 template<typename T>
210 void ParameterTransfer::writeParameterTransactionUnguardedImpl(const char* id, const T& value) {
211  // No transaction, immediate dispatch
212  writeParameter(id, value, false);
213 }
214 
215 // Explicit instantiation for std::string
216 template<>
217 void ParameterTransfer::writeParameterTransactionGuarded(const char* id, const std::string& value) {
218  if (transactionInProgress) {
219  if (!paramSet.count(id)) {
220  throw ParameterException("Invalid parameter: " + std::string(id));
221  }
222  // Queue is thread_local
223  transactionQueuedWrites.push_back({std::string(id), value});
224  } else {
225  // No transaction, immediate dispatch
226  writeParameter(id, value);
227  }
228 }
229 
230 template<>
231 void ParameterTransfer::writeParameterTransactionGuarded(const char* id, const double& value) {
232  writeParameterTransactionGuardedImpl<double>(id, value);
233 }
234 
235 template<>
236 void ParameterTransfer::writeParameterTransactionGuarded(const char* id, const int& value) {
237  writeParameterTransactionGuardedImpl<int>(id, value);
238 }
239 
240 template<>
241 void ParameterTransfer::writeParameterTransactionGuarded(const char* id, const bool& value) {
242  writeParameterTransactionGuardedImpl<bool>(id, value);
243 }
244 
245 template<>
246 void ParameterTransfer::writeParameterTransactionUnguarded(const char* id, const bool& value) {
247  writeParameterTransactionUnguardedImpl<bool>(id, value);
248 }
249 
251  waitNetworkReady();
252  if (networkError) {
253  // collecting deferred error from background thread
254  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
255  }
256  if (!paramSet.count(id)) {
257  throw ParameterException("Invalid parameter: " + std::string(id));
258  }
259  return paramSet[id].getCurrent<int>();
260 }
261 
263  waitNetworkReady();
264  if (networkError) {
265  // collecting deferred error from background thread
266  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
267  }
268  if (!paramSet.count(id)) {
269  throw ParameterException("Invalid parameter: " + std::string(id));
270  }
271  return paramSet[id].getCurrent<double>();
272 }
273 
275  waitNetworkReady();
276  if (networkError) {
277  // collecting deferred error from background thread
278  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
279  }
280  if (!paramSet.count(id)) {
281  throw ParameterException("Invalid parameter: " + std::string(id));
282  }
283  return paramSet[id].getCurrent<bool>();
284 }
285 
286 void ParameterTransfer::writeIntParameter(const char* id, int value) {
287  writeParameter(id, value);
288 }
289 
290 void ParameterTransfer::writeDoubleParameter(const char* id, double value) {
291  writeParameter(id, value);
292 }
293 
294 void ParameterTransfer::writeBoolParameter(const char* id, bool value) {
295  writeParameter(id, value);
296 }
297 
298 std::map<std::string, ParameterInfo> ParameterTransfer::getAllParameters() {
299  waitNetworkReady();
300  if (networkError) {
301  // collecting deferred error from background thread
302  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
303  }
304  std::map<std::string, ParameterInfo> compatMap;
305  {
306  std::unique_lock<std::mutex> globalLock(mapMutex);
307  for (auto kv: paramSet) {
308  auto& name = kv.first;
309  auto& param = kv.second;
310  bool writeable = param.getAccessForApi() == param::Parameter::ACCESS_READWRITE;
311  switch(param.getType()) {
312  case param::ParameterValue::TYPE_INT: {
313  int min = -1, max = -1, increment = -1;
314  if (param.hasRange()) {
315  min = param.getMin<int>();
316  max = param.getMax<int>();
317  }
318  if (param.hasIncrement()) {
319  increment = param.getIncrement<int>();
320  }
321  compatMap[name] = ParameterInfo::fromInt(name, writeable, param.getCurrent<int>(), min, max, increment);
322  break;
323  }
324  case param::ParameterValue::TYPE_DOUBLE: {
325  double min = -1, max = -1, increment = -1;
326  if (param.hasRange()) {
327  min = param.getMin<double>();
328  max = param.getMax<double>();
329  }
330  if (param.hasIncrement()) {
331  increment = param.getIncrement<double>();
332  }
333  compatMap[name] = ParameterInfo::fromDouble(name, writeable, param.getCurrent<double>(), min, max, increment);
334  break;
335  }
336  case param::ParameterValue::TYPE_BOOL: {
337  compatMap[name] = ParameterInfo::fromBool(name, writeable, param.getCurrent<bool>());
338  break;
339  }
340  default:
341  // Omit parameters with other types from legacy compatibility API
342  break;
343  }
344  }
345  }
346  return compatMap;
347 }
348 
349 void ParameterTransfer::receiverRoutine() {
350  auto refTime = std::chrono::steady_clock::now();
351  recvBufBytes = 0;
352  threadRunning = true;
353  [[maybe_unused]] int internalThreadId = getThreadId(); // we just reserve ID 0 for the receiver
354  while (threadRunning) {
355  if (socket == INVALID_SOCKET) {
356  // Error that is recoverable by reconnection (otherwise this thread would have been terminated)
357  try {
358  attemptConnection();
359  } catch(...) {
360  std::cerr << "Failed to reconnect to parameter server." << std::endl;
361  // Sleep receiver thread and retry reconnection in next iteration
362  std::this_thread::sleep_for(std::chrono::milliseconds(SOCKET_RECONNECT_INTERVAL_MS));
363  }
364  } else {
365  // Regular connection state - handle incoming events and replies
366  int bytesReceived = recv(socket, recvBuf+recvBufBytes, (RECV_BUF_SIZE - recvBufBytes), 0);
367  if (bytesReceived < 0) {
368  auto err = Networking::getErrno();
369  if(err == EAGAIN || err == EWOULDBLOCK || err == ETIMEDOUT) {
370  // No event or reply - no problem
371  //std::this_thread::sleep_for(std::chrono::milliseconds(1));
372  continue;
373  } else {
374  std::cerr << "Network error (will periodically attempt reconnection)." << std::endl;
375  std::unique_lock<std::mutex> localLock(socketModificationMutex);
376  Networking::closeSocket(socket);
377  socket = INVALID_SOCKET;
378  refTime = std::chrono::steady_clock::now();
379  networkError = true;
380  networkErrorString = std::string("Error receiving network packet: ") + Networking::getLastErrorString();
381  continue;
382  }
383  } else if (bytesReceived == 0) {
384  std::cerr << "Connection closed by remote side (will periodically attempt reconnection)." << std::endl;
385  std::unique_lock<std::mutex> localLock(socketModificationMutex);
386  Networking::closeSocket(socket);
387  socket = INVALID_SOCKET;
388  refTime = std::chrono::steady_clock::now();
389  networkError = true;
390  networkErrorString = "Connection closed";
391  continue;
392  } else {
393  recvBufBytes += bytesReceived;
394  }
395  unsigned int start=0;
396  for (unsigned int i=0; i<recvBufBytes; ++i) {
397  unsigned char c = recvBuf[i];
398  if (c=='\n') {
399  std::string currentLine((const char*) recvBuf+start, i-start);
400  auto toks = tabTokenizer.tokenize(currentLine);
401  if (toks.size()>0) {
402  const std::string& cmd = toks[0];
403  if (cmd=="P") {
404  if (toks.size()>1) {
405  // Check of protocol version - old (non-nvparam) firmwares do not send a newline-terminated version, which will just time out waitNetworkReady()
406  long reportedVersionMajor = atol(toks[1].c_str());
407  if(reportedVersionMajor != static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR)) {
408  // Unhandled / incompatible version
409  networkError = true;
410  networkErrorString = std::string("Protocol major version mismatch, expected ") + std::to_string(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR) + " but got " + toks[1];
411  threadRunning = false;
412  // Wake up the network wait (to propagate error quickly)
413  std::lock_guard<std::mutex> readyLock(readyMutex);
414  readyCond.notify_all();
415  break;
416  }
417  long reportedVersionMinor = -1; // = unreported, legacy version
418  if (toks.size()>2) {
419  // Minor version reported
420  reportedVersionMinor = atol(toks[2].c_str());
421  if (reportedVersionMinor > static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MINOR)) {
422  std::cerr << "Caution: remote parameter protocol version " << reportedVersionMajor << "." << reportedVersionMinor
423  << " is newer than our version " <<static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR)
424  << "." << static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MINOR) << std::endl;
425  std::cerr << "Consider a library upgrade for maximum compatibility." << std::endl;
426  }
427  // Further toks fields are reserved for future extensions
428  }
429  if (reportedVersionMinor == -1) {
430  // Device is protocol 7.0, batch transactions added in 7.1 --> fallback: write parameters one-by-one
431  std::cerr << "Warning: remote firmware is out of date - parameter batch transaction support disabled." << std::endl;
432  featureDisabledTransactions = true;
433  } else {
434  // Device accepts full version description handshake, report ours
435  std::stringstream ss;
436  ss << "P" << "\t" << (unsigned int) visiontransfer::internal::InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR
437  << "\t" << (unsigned int) visiontransfer::internal::InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MINOR << "\n";
438  {
439  std::unique_lock<std::mutex> localLock(socketModificationMutex);
440  if (socket == INVALID_SOCKET) {
441  throw TransferException("Connection has been closed and not reconnected so far");
442  }
443  size_t written = send(socket, ss.str().c_str(), (int) ss.str().size(), 0);
444  if(written != ss.str().size()) {
445  throw TransferException("Error sending protocol version handshake reply: " + Networking::getLastErrorString());
446  }
447  }
448  }
449  } else {
450  networkError = true;
451  networkErrorString = "Incomplete transfer of protocol version";
452  threadRunning = false;
453  break;
454  }
455  } else if (cmd=="I") {
456  // Full parameter info (value and metadata): add or overwrite local parameter
457  Parameter param = ParameterSerialization::deserializeParameterFullUpdate(toks);
458  auto uid = param.getUid();
459  bool alreadyPresent = paramSet.count(uid);
460  paramSet[uid] = param;
461  if (alreadyPresent && parameterUpdateCallback) {
462  // Only call the user callback for metadata updates, but not for the initial enumeration
463  parameterUpdateCallback(uid);
464  }
465  } else if (cmd=="M") {
466  // Metadata-only update: overwrite an existing local parameter, but preserve its previous value.
467  if (paramSet.count(toks[1])) {
468  Parameter param = ParameterSerialization::deserializeParameterFullUpdate(toks, "M");
469  auto uid = param.getUid();
470  param.setCurrentFrom(paramSet[uid]);
471  paramSet[uid] = param;
472  if (parameterUpdateCallback) {
473  parameterUpdateCallback(uid);
474  }
475  } else {
476  std::cerr << "Parameter not received yet - not updating metadata of: " << toks[1] << std::endl;;
477  }
478  } else if (cmd=="V") {
479  // Current-value-only update
480  if (toks.size() < 3) {
481  throw TransferException("Received malformed parameter value update");
482  }
483  if (paramSet.count(toks[1])) {
484  // In-place update
485  ParameterSerialization::deserializeParameterValueChange(toks, paramSet[toks[1]]);
486  if (parameterUpdateCallback) {
487  parameterUpdateCallback(toks[1]);
488  }
489  } else {
490  std::cerr << "Parameter not received yet - not updating value of: " << toks[1] << std::endl;;
491  }
492  } else if (cmd=="R") {
493  // Reply with asynchronous result of a request to set parameter[s]
494  if (toks.size() < 4) {
495  throw TransferException("Received malformed reply for parameter set request");
496  }
497  std::unique_lock<std::mutex> globalLock(mapMutex);
498  int replyThreadId = atol(toks[1].c_str());
499  if (waitConds.count(replyThreadId)) {
500  // Reanimating the waiting thread - it will clean up after itself
501  std::lock_guard<std::mutex> localLock(waitCondMutexes[replyThreadId]);
502  lastSetRequestResult[replyThreadId] = {toks[2] == "1", toks[3]};
503  waitConds[replyThreadId].notify_all();
504  } else {
505  if (replyThreadId != -1) { // dummy ID -1 for fire-and-forget command (no reply expected)
506  std::cerr << "Ignoring unexpected request result for thread " << replyThreadId << std::endl;
507  }
508  }
509  } else if (cmd=="E") {
510  // 'End of Transmission' - at least one full enumeration has arrived - we are ready
511  networkReady = true;
512  // Wake any sleeping threads that were blocked until network became ready
513  std::lock_guard<std::mutex> readyLock(readyMutex);
514  readyCond.notify_all();
515  } else if (cmd=="HB") {
516  // Heartbeat
517  } else if (cmd=="X") {
518  // Reserved extension
519  } else {
520  networkError = true;
521  networkErrorString = std::string("Unknown update command received: ") + cmd;
522  threadRunning = false;
523  break;
524  }
525  }
526  start = i+1;
527  }
528  }
529  // Move any incomplete line to front of recv buffer
530  if (start>=recvBufBytes) {
531  recvBufBytes = 0;
532  } else {
533  std::memmove(recvBuf, recvBuf+start, recvBufBytes-start);
534  recvBufBytes = recvBufBytes-start;
535  }
536  }
537  }
538 }
539 
540 int ParameterTransfer::getThreadId() {
541  // Always returns an int type (which may not be the case for std::thread::id)
542  static std::atomic_int threadCount{0};
543  thread_local int threadId = threadCount.fetch_add(1);
544  return threadId;
545 }
546 
547 void ParameterTransfer::blockingCallThisThread(std::function<void()> fn, int waitMaxMilliseconds) {
548  bool timeout = false;
549  auto tid = getThreadId();
550  {
551  std::unique_lock<std::mutex> globalLock(mapMutex);
552  // Populate maps
553  auto& localWaitCond = waitConds[tid];
554  auto& localWaitCondMutex = waitCondMutexes[tid];
555  std::unique_lock<std::mutex> localLock(localWaitCondMutex);
556  // First do the actual handshake setup, like emitting the network message
557  // (The current thread is protected against a reply race at this point)
558  fn();
559  // Allow receiver thread to access its checks (it is still blocked by our specific localLock)
560  globalLock.unlock();
561  // Wait for receiver thread to notify us with the reply
562  auto status = localWaitCond.wait_for(localLock, std::chrono::milliseconds(waitMaxMilliseconds));
563  timeout = (status == std::cv_status::timeout);
564  }
565  {
566  // Cleanup, so that any spurious network replies can get detected and discarded
567  std::unique_lock<std::mutex> globalLock(mapMutex);
568  waitConds.erase(tid);
569  waitCondMutexes.erase(tid);
570  }
571  // Outcome
572  if (timeout) {
573  TransferException ex("Timeout waiting for request reply from parameter server");
574  throw ex;
575  }
576 }
577 
579  waitNetworkReady();
580  if (networkError) {
581  // collecting deferred error from background thread
582  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
583  }
584  return paramSet;
585 }
586 
588  waitNetworkReady();
589  if (networkError) {
590  // collecting deferred error from background thread
591  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
592  }
593  return paramSet;
594 }
595 
596 void ParameterTransfer::setParameterUpdateCallback(std::function<void(const std::string& uid)> callback) {
597  parameterUpdateCallback = callback;
598 }
599 
601  // N.B. the flag is thread_local static
602  if (featureDisabledTransactions) {
603  // Fallback mode for outdated firmware versions -> ignore transaction batching
604  return;
605  }
606  if (transactionInProgress) throw TransferException("Simultaneous and/or nested parameter transactions are not supported");
607  transactionInProgress = true;
608  // We are now in batch-write mode
609 }
610 
612  if (featureDisabledTransactions) {
613  // Fallback mode for outdated firmware versions -> ignore transaction batching
614  return;
615  }
616  // Send queued parameter transactions
617  waitNetworkReady();
618  if (networkError) {
619  // collecting deferred error from background thread
620  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
621  }
622 
623  // If there are no actual changes, do not send anything
624  if (transactionQueuedWrites.size() > 0) {
625  // Collect affected UIDs for transaction
626  std::set<std::string> affectedUids;
627  for (auto& kv: transactionQueuedWrites) {
628  affectedUids.insert(kv.first);
629  }
630  // Start transaction on server, incorporating all affected UIDs
631  std::string transactionId = std::to_string(getThreadId());
632  // (Note: could use a one-time UUID instead of getThreadId(), but it is amended on the remote side)
633  {
634  std::stringstream ss;
635  ss << "TS" << "\t" << transactionId << "\t";
636  bool first = true;
637  for (auto& uid: affectedUids) {
638  if (first) first=false; else ss << ",";
639  ss << uid;
640  }
641  ss << "\n";
642  {
643  std::unique_lock<std::mutex> localLock(socketModificationMutex);
644  if (socket == INVALID_SOCKET) {
645  throw TransferException("Connection has been closed and not reconnected so far");
646  }
647  size_t written = send(socket, ss.str().c_str(), (int) ss.str().size(), 0);
648  if(written != ss.str().size()) {
649  throw TransferException("Error sending transaction start request: " + Networking::getLastErrorString());
650  }
651  }
652  }
653 
654  // Play back queued writes
655  for (auto& kv: transactionQueuedWrites) {
656  auto& uid = kv.first;
657  auto& value = kv.second;
658  writeParameter(uid.c_str(), value);
659  }
660 
661  // Finish transaction on server - automatic updates are then applied (and resumed)
662  {
663  std::stringstream ss;
664  ss << "TE" << "\t" << transactionId << "\n";
665  {
666  std::unique_lock<std::mutex> localLock(socketModificationMutex);
667  if (socket == INVALID_SOCKET) {
668  throw TransferException("Connection has been closed and not reconnected so far");
669  }
670  size_t written = send(socket, ss.str().c_str(), (int) ss.str().size(), 0);
671  if(written != ss.str().size()) {
672  throw TransferException("Error sending transaction end request: " + Networking::getLastErrorString());
673  }
674  // The transaction will be finalized anyway on the server, but only after its timeout
675  }
676  }
677 
678  // Cleanup
679  transactionQueuedWrites.clear();
680  }
681 
682  transactionInProgress = false;
683 }
684 
685 }} // namespace
686 
visiontransfer::internal::ParameterTransfer::writeDoubleParameter
void writeDoubleParameter(const char *id, double value)
Writes a double precision floating point value to a parameter of the parameter server.
Definition: parametertransfer.cpp:290
visiontransfer::internal::ParameterTransfer::transactionCommitQueue
void transactionCommitQueue()
Complete the started parameter transaction.
Definition: parametertransfer.cpp:611
visiontransfer::internal::ParameterTransfer::writeIntParameter
void writeIntParameter(const char *id, int value)
Writes an integer value to a parameter of the parameter server.
Definition: parametertransfer.cpp:286
visiontransfer::param::ParameterSet::getCurrent
T getCurrent(const std::string &key)
Convenience function for safe bulk parameter access (throws for invalid UIDs). Will return any defaul...
Definition: parameterset.h:88
visiontransfer::internal::ParameterTransfer::writeParameter
void writeParameter(const char *id, const T &value, bool synchronous=true)
Writes a scalar value to a parameter of the parameter server.
Definition: parametertransfer.cpp:123
visiontransfer::internal::ParameterTransfer::getParameterSet
param::ParameterSet & getParameterSet()
Returns a reference to the internal parameter set (once the network handshake is complete)
Definition: parametertransfer.cpp:578
visiontransfer::param::Parameter::setCurrentFrom
Parameter & setCurrentFrom(const Parameter &from)
Definition: parameter.cpp:135
visiontransfer::param::ParameterSet
Definition: parameterset.h:59
visiontransfer::TransferException
Exception class that is used for all transfer exceptions.
Definition: exceptions.h:45
visiontransfer::param::Parameter
Definition: parameter.h:71
visiontransfer::internal::ParameterTransfer::writeParameterTransactionUnguarded
void writeParameterTransactionUnguarded(const char *id, const T &value)
Writes a scalar value to a parameter of the parameter server, using 'fire-and-forget' for real-time c...
visiontransfer::internal::ParameterTransfer::getAllParameters
std::map< std::string, ParameterInfo > getAllParameters()
Enumerates all parameters as reported by the device.
Definition: parametertransfer.cpp:298
visiontransfer::ParameterException
Exception class that is used for all parameter-related exceptions.
Definition: exceptions.h:53
visiontransfer::internal::ParameterTransfer::readBoolParameter
bool readBoolParameter(const char *id)
Reads a boolean value from the parameter server.
Definition: parametertransfer.cpp:274
visiontransfer::internal::ParameterTransfer::transactionStartQueue
void transactionStartQueue()
Start batch parameter transaction.
Definition: parametertransfer.cpp:600
visiontransfer::internal::ParameterTransfer::writeParameterTransactionGuarded
void writeParameterTransactionGuarded(const char *id, const T &value)
Writes a scalar value to a parameter of the parameter server, transparently deferring for a batch upd...
visiontransfer::param::Parameter::getUid
std::string getUid() const
Definition: parameter.h:108
visiontransfer::internal::ParameterTransfer::readDoubleParameter
double readDoubleParameter(const char *id)
Reads a double precision floating point value from the parameter server.
Definition: parametertransfer.cpp:262
visiontransfer::internal::ParameterTransfer::writeBoolParameter
void writeBoolParameter(const char *id, bool value)
Writes a boolean value to a parameter of the parameter server.
Definition: parametertransfer.cpp:294
visiontransfer::internal::ParameterTransfer::readIntParameter
int readIntParameter(const char *id)
Reads an integer value from the parameter server.
Definition: parametertransfer.cpp:250
Allied Vision