libvisiontransfer  10.6.0
datachannelservicebase.cpp
1 #include <sys/types.h>
2 #include <cstring>
3 #include <stdexcept>
4 #include <fcntl.h>
5 #include <fstream>
6 
7 #include <visiontransfer/internalinformation.h>
8 #include <visiontransfer/networking.h>
9 #include <visiontransfer/datachannelservicebase.h>
10 
11 #include <iostream>
12 
13 using namespace visiontransfer;
14 using namespace visiontransfer::internal;
15 
16 namespace visiontransfer {
17 namespace internal {
18 
19 DataChannelServiceBase::DataChannelServiceBase() {
20  // Create socket
21  if((dataChannelSocket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
22  throw std::runtime_error("Error creating data channel service socket!");
23  }
24 
25  Networking::enableReuseAddress(dataChannelSocket, true);
26 
27  // Bind to port
28  sockaddr_in localAddr;
29  memset(&localAddr, 0, sizeof(localAddr));
30  localAddr.sin_family = AF_INET;
31  localAddr.sin_port = htons(InternalInformation::DATACHANNELSERVICE_PORT);
32  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
33  if(::bind(dataChannelSocket, (sockaddr *)&localAddr, sizeof(localAddr)) != 0) {
34  throw std::runtime_error("Error binding dataChannel socket!");
35  }
36 
37  Networking::setSocketBlocking(dataChannelSocket, false);
38 }
39 
40 DataChannelServiceBase::~DataChannelServiceBase() {
41  Networking::closeSocket(dataChannelSocket);
42 }
43 
44 void DataChannelServiceBase::process() {
45  static unsigned char buffer[100000];
46  static sockaddr_in senderAddress;
47  static socklen_t senderLength = (socklen_t) sizeof(senderAddress);
48 
49  int received;
50  while (true) {
51  // socket is non-blocking
52  received = recvfrom(dataChannelSocket, (char*) buffer, sizeof(buffer), 0, (sockaddr *)&senderAddress, &senderLength);
53  if ((received > 0) && ((unsigned)received >= sizeof(DataChannelMessageHeader))) {
54  DataChannelMessageHeader* raw = reinterpret_cast<DataChannelMessageHeader*>(buffer);
55  DataChannelMessage message;
56  message.header.channelID = (DataChannel::ID) raw->channelID;
57  message.header.channelType = (DataChannel::Type) raw->channelType;
58  message.header.payloadSize = ntohl(raw->payloadSize);
59  message.payload = buffer + sizeof(DataChannelMessageHeader);
60  if ((sizeof(DataChannelMessageHeader) + message.header.payloadSize) != (unsigned) received) {
61  std::cerr << "DataChannelServiceBase: Size mismatch in UDP message, type " << (int) message.header.channelType << " ID " << (int) message.header.channelID << " - discarded!" << std::endl;
62  } else {
63  if (!(message.header.channelType)) {
64  handleChannel0Message(message, &senderAddress);
65  } else {
66  // Try to find a matching registered channel to handle the message
67  auto it = channels.find(message.header.channelID);
68  if (it != channels.end()) {
69  it->second->handleMessage(message, &senderAddress);
70  }
71  }
72  }
73  } else {
74  break;
75  }
76 
77  // Call channel process() iterations
78  for (auto& kv: channels) {
79  kv.second->process();
80  }
81  }
82 }
83 
84 // Actually send data, buffer must be stable
85 int DataChannelServiceBase::sendDataInternal(unsigned char* compiledMessage, unsigned int messageSize, sockaddr_in* recipient) {
86  if (!recipient) throw std::runtime_error("Requested sendDataInternal without recipient address");
87  if (messageSize < sizeof(DataChannelMessageHeader)) throw std::runtime_error("Message header too short");
88  DataChannelMessageHeader* header = reinterpret_cast<DataChannelMessageHeader*>(compiledMessage);
89  unsigned int reportedSize = sizeof(DataChannelMessageHeader) + ntohl(header->payloadSize);
90  if (messageSize != reportedSize) throw std::runtime_error("Message size does not match");
91  int result = 0;
92  result = sendto(dataChannelSocket, (char*) compiledMessage, reportedSize, 0, (sockaddr*) recipient, sizeof(*recipient));
93  if (result != (int) reportedSize) {
94  std::cerr << "Error sending DataChannel message to " << inet_ntoa(recipient->sin_addr) << ": " << Networking::getLastErrorString() << std::endl;
95  throw std::runtime_error("Error during sendto");
96  }
97  return result;
98 }
99 
100 // Generate a new message and send it
101 int DataChannelServiceBase::sendDataIsolatedPacket(DataChannel::ID id, DataChannel::Type type, unsigned char* data, unsigned int dataSize, sockaddr_in* recipient) {
102  unsigned int msgSize = sizeof(DataChannelMessageHeader) + dataSize;
103  unsigned char* buf = new unsigned char[msgSize]();
104  DataChannelMessageHeader* header = reinterpret_cast<DataChannelMessageHeader*>(buf);
105  header->channelID = id;
106  header->channelType = type;
107  header->payloadSize = htonl(dataSize);
108  std::memcpy(buf + sizeof(DataChannelMessageHeader), data, dataSize);
109 
110  int result = sendDataInternal(buf, msgSize, recipient);
111  delete[] buf;
112  return result;
113 }
114 
115 DataChannel::ID DataChannelServiceBase::registerChannel(std::shared_ptr<DataChannel> channel) {
116  // Preliminary implementation: set id:=type (should allocate dynamic IDs later)
117  DataChannel::ID id = (DataChannel::ID) channel->getChannelType();
118  if (channels.count(id)) {
119  return 0; // already registered this ID
120  }
121  // Checking dynamic init, if this fails the service is not registered (and will be auto cleaned)
122  if (!channel->initialize()) return 0;
123  channel->setChannelID(id);
124  channels[id] = channel;
125  channel->setService(shared_from_this());
126  return id;
127 }
128 int DataChannel::sendData(unsigned char* data, unsigned int dataLen, sockaddr_in* recipient) {
129  if (auto srv = service.lock()) {
130  return srv->sendDataIsolatedPacket(channelID, getChannelType(), data, dataLen, recipient);
131  } else return 0;
132 }
133 
134 }} // namespaces
135 
visiontransfer::internal::DataChannelMessage
Definition: datachannelservicebase.h:71
visiontransfer::internal::DataChannelMessageHeader
Transport-level DataChannel header.
Definition: datachannelservicebase.h:66
Allied Vision