Eclipse SUMO - Simulation of Urban MObility
Connection.cpp
Go to the documentation of this file.
1 /****************************************************************************/
2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo
3 // Copyright (C) 2012-2022 German Aerospace Center (DLR) and others.
4 // This program and the accompanying materials are made available under the
5 // terms of the Eclipse Public License 2.0 which is available at
6 // https://www.eclipse.org/legal/epl-2.0/
7 // This Source Code may also be made available under the following Secondary
8 // Licenses when the conditions for such availability set forth in the Eclipse
9 // Public License 2.0 are satisfied: GNU General Public License, version 2
10 // or later which is available at
11 // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 /****************************************************************************/
21 // C++ TraCI client API implementation
22 /****************************************************************************/
23 #include <thread>
24 #include <chrono>
25 #include <array>
26 #include <libsumo/StorageHelper.h>
27 #include <libsumo/TraCIDefs.h>
28 #include "Connection.h"
29 
30 
31 namespace libtraci {
32 // ===========================================================================
33 // static member initializations
34 // ===========================================================================
35 Connection* Connection::myActive = nullptr;
36 std::map<const std::string, Connection*> Connection::myConnections;
37 
38 
39 // ===========================================================================
40 // member method definitions
41 // ===========================================================================
42 #ifdef _MSC_VER
43 /* Disable "decorated name length exceeded, name was truncated" warnings for the whole file. */
44 #pragma warning(disable: 4503)
45 #endif
46 Connection::Connection(const std::string& host, int port, int numRetries, const std::string& label, FILE* const pipe) :
47  myLabel(label), myProcessPipe(pipe), myProcessReader(nullptr), mySocket(host, port) {
48  if (pipe != nullptr) {
49  myProcessReader = new std::thread(&Connection::readOutput, this);
50  }
51  for (int i = 0; i <= numRetries; i++) {
52  try {
53  mySocket.connect();
54  break;
55  } catch (tcpip::SocketException& e) {
56  if (i == numRetries) {
57  close();
58  throw;
59  }
60  std::cout << "Could not connect to TraCI server at " << host << ":" << port << " " << e.what() << std::endl;
61  std::cout << " Retrying in 1 second" << std::endl;
62  std::this_thread::sleep_for(std::chrono::seconds(1));
63  }
64  }
65 }
66 
67 
68 void
70  std::array<char, 256> buffer;
71  bool errout = false;
72  while (fgets(buffer.data(), (int)buffer.size(), myProcessPipe) != nullptr) {
73  std::stringstream result;
74  result << buffer.data();
75  std::string line;
76  while (std::getline(result, line)) {
77  if ((errout && line[0] == ' ') || line.compare(0, 6, "Error:") == 0 || line.compare(0, 8, "Warning:") == 0) {
78  std::cerr << line << std::endl;
79  errout = true;
80  } else {
81  std::cout << line << std::endl;
82  errout = false;
83  }
84  }
85  }
86 }
87 
88 
89 void
92  tcpip::Storage outMsg;
93  // command length
94  outMsg.writeUnsignedByte(1 + 1);
95  // command id
97  mySocket.sendExact(outMsg);
98 
99  tcpip::Storage inMsg;
100  std::string acknowledgement;
101  check_resultState(inMsg, libsumo::CMD_CLOSE, false, &acknowledgement);
102  mySocket.close();
103  }
104  if (myProcessReader != nullptr) {
105  myProcessReader->join();
106  delete myProcessReader;
107  myProcessReader = nullptr;
108 #ifdef WIN32
109  _pclose(myProcessPipe);
110 #else
111  pclose(myProcessPipe);
112 #endif
113  }
114  myConnections.erase(myLabel);
115  delete myActive;
116  myActive = nullptr;
117 }
118 
119 
120 void
122  tcpip::Storage outMsg;
123  // command length
124  outMsg.writeUnsignedByte(1 + 1 + 8);
125  // command id
127  outMsg.writeDouble(time);
128  // send request message
129  mySocket.sendExact(outMsg);
130 
131  tcpip::Storage inMsg;
133  mySubscriptionResults.clear();
135  int numSubs = inMsg.readInt();
136  while (numSubs-- > 0) {
137  const int responseID = check_commandGetResult(inMsg, 0, -1, true);
140  readVariableSubscription(responseID, inMsg);
141  } else {
142  readContextSubscription(responseID, inMsg);
143  }
144  }
145 }
146 
147 
148 void
150  tcpip::Storage outMsg;
151  // command length
152  outMsg.writeUnsignedByte(1 + 1 + 4);
153  // command id
155  // client index
156  outMsg.writeInt(order);
157  mySocket.sendExact(outMsg);
158 
159  tcpip::Storage inMsg;
161 }
162 
163 
164 void
165 Connection::createCommand(int cmdID, int varID, const std::string* const objID, tcpip::Storage* add) const {
167  throw libsumo::FatalTraCIError("Not connected.");
168  }
169  myOutput.reset();
170  // command length
171  int length = 1 + 1;
172  if (varID >= 0) {
173  length += 1;
174  if (objID != nullptr) {
175  length += 4 + (int)objID->length();
176  }
177  }
178  if (add != nullptr) {
179  length += (int)add->size();
180  }
181  if (length <= 255) {
182  myOutput.writeUnsignedByte(length);
183  } else {
185  myOutput.writeInt(length + 4);
186  }
188  if (varID >= 0) {
190  if (objID != nullptr) {
191  myOutput.writeString(*objID);
192  }
193  }
194  // additional values
195  if (add != nullptr) {
196  myOutput.writeStorage(*add);
197  }
198 }
199 
200 
201 void
202 Connection::subscribe(int domID, const std::string& objID, double beginTime, double endTime,
203  int domain, double range, const std::vector<int>& vars, const libsumo::TraCIResults& params) {
205  throw tcpip::SocketException("Socket is not initialised");
206  }
207  const bool isContext = domain != -1;
208  tcpip::Storage outMsg;
209  outMsg.writeUnsignedByte(domID); // command id
210  outMsg.writeDouble(beginTime);
211  outMsg.writeDouble(endTime);
212  outMsg.writeString(objID);
213  if (isContext) {
214  outMsg.writeUnsignedByte(domain);
215  outMsg.writeDouble(range);
216  }
217  if (vars.size() == 1 && vars.front() == -1) {
218  if (domID == libsumo::CMD_SUBSCRIBE_VEHICLE_VARIABLE && !isContext) {
219  // default for vehicles is edge id and lane position
220  outMsg.writeUnsignedByte(2);
223  } else {
224  // default for detectors is vehicle number, for all others (and contexts) id list
225  outMsg.writeUnsignedByte(1);
226  const bool isDetector = domID == libsumo::CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
232  }
233  } else {
234  outMsg.writeUnsignedByte((int)vars.size());
235  for (const int v : vars) {
236  outMsg.writeUnsignedByte(v);
237  const auto& paramEntry = params.find(v);
238  if (paramEntry != params.end()) {
239  outMsg.writeStorage(*libsumo::StorageHelper::toStorage(*paramEntry->second));
240  }
241  }
242  }
243  tcpip::Storage complete;
244  complete.writeUnsignedByte(0);
245  complete.writeInt(5 + (int)outMsg.size());
246  complete.writeStorage(outMsg);
247  // send message
248  mySocket.sendExact(complete);
249 
250  tcpip::Storage inMsg;
251  check_resultState(inMsg, domID);
252  if (!vars.empty()) {
253  const int responseID = check_commandGetResult(inMsg, domID);
254  if (isContext) {
255  readContextSubscription(responseID, inMsg);
256  } else {
257  readVariableSubscription(responseID, inMsg);
258  }
259  }
260 }
261 
262 
263 void
264 Connection::check_resultState(tcpip::Storage& inMsg, int command, bool ignoreCommandId, std::string* acknowledgement) {
265  mySocket.receiveExact(inMsg);
266  int cmdLength;
267  int cmdId;
268  int resultType;
269  int cmdStart;
270  std::string msg;
271  try {
272  cmdStart = inMsg.position();
273  cmdLength = inMsg.readUnsignedByte();
274  cmdId = inMsg.readUnsignedByte();
275  if (command != cmdId && !ignoreCommandId) {
276  throw libsumo::TraCIException("#Error: received status response to command: " + toString(cmdId) + " but expected: " + toString(command));
277  }
278  resultType = inMsg.readUnsignedByte();
279  msg = inMsg.readString();
280  } catch (std::invalid_argument&) {
281  throw libsumo::TraCIException("#Error: an exception was thrown while reading result state message");
282  }
283  switch (resultType) {
284  case libsumo::RTYPE_ERR:
285  throw libsumo::TraCIException(msg);
287  throw libsumo::TraCIException(".. Sent command is not implemented (" + toString(command) + "), [description: " + msg + "]");
288  case libsumo::RTYPE_OK:
289  if (acknowledgement != nullptr) {
290  (*acknowledgement) = ".. Command acknowledged (" + toString(command) + "), [description: " + msg + "]";
291  }
292  break;
293  default:
294  throw libsumo::TraCIException(".. Answered with unknown result code(" + toString(resultType) + ") to command(" + toString(command) + "), [description: " + msg + "]");
295  }
296  if ((cmdStart + cmdLength) != (int) inMsg.position()) {
297  throw libsumo::TraCIException("#Error: command at position " + toString(cmdStart) + " has wrong length");
298  }
299 }
300 
301 
302 int
303 Connection::check_commandGetResult(tcpip::Storage& inMsg, int command, int expectedType, bool ignoreCommandId) const {
304  int length = inMsg.readUnsignedByte();
305  if (length == 0) {
306  length = inMsg.readInt();
307  }
308  int cmdId = inMsg.readUnsignedByte();
309  if (!ignoreCommandId && cmdId != (command + 0x10)) {
310  throw libsumo::TraCIException("#Error: received response with command id: " + toString(cmdId) + "but expected: " + toString(command + 0x10));
311  }
312  if (expectedType >= 0) {
313  // not called from the TraCITestClient but from within the Connection
314  inMsg.readUnsignedByte(); // variableID
315  inMsg.readString(); // objectID
316  int valueDataType = inMsg.readUnsignedByte();
317  if (valueDataType != expectedType) {
318  throw libsumo::TraCIException("Expected " + toString(expectedType) + " but got " + toString(valueDataType));
319  }
320  }
321  return cmdId;
322 }
323 
324 
326 Connection::doCommand(int command, int var, const std::string& id, tcpip::Storage* add) {
327  createCommand(command, var, &id, add);
329  myInput.reset();
330  check_resultState(myInput, command);
331  return myInput;
332 }
333 
334 
335 void
339  myInput.reset();
341 }
342 
343 
344 void
345 Connection::readVariables(tcpip::Storage& inMsg, const std::string& objectID, int variableCount, libsumo::SubscriptionResults& into) {
346  while (variableCount > 0) {
347 
348  const int variableID = inMsg.readUnsignedByte();
349  const int status = inMsg.readUnsignedByte();
350  const int type = inMsg.readUnsignedByte();
351 
352  if (status == libsumo::RTYPE_OK) {
353  switch (type) {
355  into[objectID][variableID] = std::make_shared<libsumo::TraCIDouble>(inMsg.readDouble());
356  break;
358  into[objectID][variableID] = std::make_shared<libsumo::TraCIString>(inMsg.readString());
359  break;
360  case libsumo::POSITION_2D: {
361  auto p = std::make_shared<libsumo::TraCIPosition>();
362  p->x = inMsg.readDouble();
363  p->y = inMsg.readDouble();
364  p->z = 0.;
365  into[objectID][variableID] = p;
366  break;
367  }
368  case libsumo::POSITION_3D: {
369  auto p = std::make_shared<libsumo::TraCIPosition>();
370  p->x = inMsg.readDouble();
371  p->y = inMsg.readDouble();
372  p->z = inMsg.readDouble();
373  into[objectID][variableID] = p;
374  break;
375  }
376  case libsumo::TYPE_COLOR: {
377  auto c = std::make_shared<libsumo::TraCIColor>();
378  c->r = (unsigned char)inMsg.readUnsignedByte();
379  c->g = (unsigned char)inMsg.readUnsignedByte();
380  c->b = (unsigned char)inMsg.readUnsignedByte();
381  c->a = (unsigned char)inMsg.readUnsignedByte();
382  into[objectID][variableID] = c;
383  break;
384  }
386  into[objectID][variableID] = std::make_shared<libsumo::TraCIInt>(inMsg.readInt());
387  break;
389  auto sl = std::make_shared<libsumo::TraCIStringList>();
390  int n = inMsg.readInt();
391  for (int i = 0; i < n; ++i) {
392  sl->value.push_back(inMsg.readString());
393  }
394  into[objectID][variableID] = sl;
395  }
396  break;
397  case libsumo::TYPE_COMPOUND: {
398  int n = inMsg.readInt();
399  if (n == 2) {
400  inMsg.readUnsignedByte();
401  const std::string s = inMsg.readString();
402  const int secondType = inMsg.readUnsignedByte();
403  if (secondType == libsumo::TYPE_DOUBLE) {
404  auto r = std::make_shared<libsumo::TraCIRoadPosition>();
405  r->edgeID = s;
406  r->pos = inMsg.readDouble();
407  into[objectID][variableID] = r;
408  } else if (secondType == libsumo::TYPE_STRING) {
409  auto sl = std::make_shared<libsumo::TraCIStringList>();
410  sl->value.push_back(s);
411  sl->value.push_back(inMsg.readString());
412  into[objectID][variableID] = sl;
413  }
414  }
415  }
416  break;
417 
418  // TODO Other data types
419 
420  default:
421  throw libsumo::TraCIException("Unimplemented subscription type: " + toString(type));
422  }
423  } else {
424  throw libsumo::TraCIException("Subscription response error: variableID=" + toString(variableID) + " status=" + toString(status));
425  }
426 
427  variableCount--;
428  }
429 }
430 
431 
432 void
434  const std::string objectID = inMsg.readString();
435  const int variableCount = inMsg.readUnsignedByte();
436  readVariables(inMsg, objectID, variableCount, mySubscriptionResults[responseID]);
437 }
438 
439 
440 void
442  const std::string contextID = inMsg.readString();
443  inMsg.readUnsignedByte(); // context domain
444  const int variableCount = inMsg.readUnsignedByte();
445  int numObjects = inMsg.readInt();
446  // the following also instantiates the empty map to get comparable results with libsumo
447  // see also https://github.com/eclipse/sumo/issues/7288
448  libsumo::SubscriptionResults& results = myContextSubscriptionResults[responseID][contextID];
449  while (numObjects-- > 0) {
450  std::string objectID = inMsg.readString();
451  readVariables(inMsg, objectID, variableCount, results);
452  }
453 }
454 
455 
456 }
457 
458 
459 /****************************************************************************/
An error which is not recoverable.
Definition: TraCIDefs.h:141
static std::shared_ptr< tcpip::Storage > toStorage(const TraCIResult &v)
Definition: StorageHelper.h:33
An error which allows to continue.
Definition: TraCIDefs.h:130
void simulationStep(double time)
Sends a SimulationStep command.
Definition: Connection.cpp:121
Connection(const std::string &host, int port, int numRetries, const std::string &label, FILE *const pipe)
Constructor, connects to the specified SUMO server.
Definition: Connection.cpp:46
void close()
ends the simulation and closes the connection
Definition: Connection.cpp:90
void createCommand(int cmdID, int varID, const std::string *const objID, tcpip::Storage *add=nullptr) const
Sends a GetVariable / SetVariable request if mySocket is connected. Otherwise writes to myOutput only...
Definition: Connection.cpp:165
int check_commandGetResult(tcpip::Storage &inMsg, int command, int expectedType=-1, bool ignoreCommandId=false) const
Validates the result state of a command.
Definition: Connection.cpp:303
void addFilter(int var, tcpip::Storage *add=nullptr)
Definition: Connection.cpp:336
void readVariableSubscription(int responseID, tcpip::Storage &inMsg)
Definition: Connection.cpp:433
tcpip::Socket mySocket
The socket.
Definition: Connection.h:167
std::map< int, libsumo::SubscriptionResults > mySubscriptionResults
Definition: Connection.h:173
void check_resultState(tcpip::Storage &inMsg, int command, bool ignoreCommandId=false, std::string *acknowledgement=0)
Validates the result state of a command.
Definition: Connection.cpp:264
tcpip::Storage myInput
The reusable input storage.
Definition: Connection.h:171
FILE *const myProcessPipe
Definition: Connection.h:164
void readVariables(tcpip::Storage &inMsg, const std::string &objectID, int variableCount, libsumo::SubscriptionResults &into)
Definition: Connection.cpp:345
std::map< int, libsumo::ContextSubscriptionResults > myContextSubscriptionResults
Definition: Connection.h:174
tcpip::Storage myOutput
The reusable output storage.
Definition: Connection.h:169
void setOrder(int order)
Sends a SetOrder command.
Definition: Connection.cpp:149
void subscribe(int domID, const std::string &objID, double beginTime, double endTime, int domain, double range, const std::vector< int > &vars, const libsumo::TraCIResults &params)
Sends a SubscribeContext or a SubscribeVariable request.
Definition: Connection.cpp:202
tcpip::Storage & doCommand(int command, int var, const std::string &id, tcpip::Storage *add=nullptr)
Definition: Connection.cpp:326
static std::map< const std::string, Connection * > myConnections
Definition: Connection.h:177
const std::string myLabel
Definition: Connection.h:163
void readContextSubscription(int responseID, tcpip::Storage &inMsg)
Definition: Connection.cpp:441
static Connection * myActive
Definition: Connection.h:176
static std::string toString(const T &t, std::streamsize accuracy=PRECISION)
Definition: Connection.h:145
std::thread * myProcessReader
Definition: Connection.h:165
bool receiveExact(Storage &)
Receive a complete TraCI message from Socket::socket_.
Definition: socket.cpp:528
void sendExact(const Storage &)
Definition: socket.cpp:431
bool has_client_connection() const
Definition: socket.cpp:560
void connect()
Connects to host_:port_.
Definition: socket.cpp:359
void close()
Definition: socket.cpp:383
virtual std::string readString()
Definition: storage.cpp:180
virtual void writeString(const std::string &s)
Definition: storage.cpp:197
virtual unsigned int position() const
Definition: storage.cpp:76
virtual void writeInt(int)
Definition: storage.cpp:321
virtual void writeDouble(double)
Definition: storage.cpp:354
virtual int readUnsignedByte()
Definition: storage.cpp:155
void reset()
Definition: storage.cpp:85
virtual void writeUnsignedByte(int)
Definition: storage.cpp:165
StorageType::size_type size() const
Definition: storage.h:119
virtual void writeStorage(tcpip::Storage &store)
Definition: storage.cpp:388
virtual double readDouble()
Definition: storage.cpp:362
virtual int readInt()
Definition: storage.cpp:311
TRACI_CONST int TYPE_COLOR
TRACI_CONST int LAST_STEP_VEHICLE_NUMBER
TRACI_CONST int POSITION_3D
TRACI_CONST int RTYPE_NOTIMPLEMENTED
TRACI_CONST int TRACI_ID_LIST
TRACI_CONST int VAR_ROAD_ID
TRACI_CONST int TYPE_COMPOUND
TRACI_CONST int RESPONSE_SUBSCRIBE_PARKINGAREA_VARIABLE
TRACI_CONST int RESPONSE_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
TRACI_CONST int POSITION_2D
TRACI_CONST int RESPONSE_SUBSCRIBE_OVERHEADWIRE_VARIABLE
TRACI_CONST int CMD_CLOSE
TRACI_CONST int CMD_SETORDER
TRACI_CONST int TYPE_STRINGLIST
TRACI_CONST int TYPE_INTEGER
TRACI_CONST int RESPONSE_SUBSCRIBE_BUSSTOP_VARIABLE
TRACI_CONST int CMD_ADD_SUBSCRIPTION_FILTER
std::map< std::string, libsumo::TraCIResults > SubscriptionResults
{object->{variable->value}}
Definition: TraCIDefs.h:278
TRACI_CONST int VAR_LANEPOSITION
TRACI_CONST int CMD_SUBSCRIBE_VEHICLE_VARIABLE
TRACI_CONST int TYPE_DOUBLE
TRACI_CONST int CMD_SUBSCRIBE_LANEAREA_VARIABLE
TRACI_CONST int CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
TRACI_CONST int CMD_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE
TRACI_CONST int RTYPE_ERR
TRACI_CONST int CMD_SIMSTEP
TRACI_CONST int CMD_SUBSCRIBE_LANE_VARIABLE
TRACI_CONST int RTYPE_OK
std::map< int, std::shared_ptr< libsumo::TraCIResult > > TraCIResults
{variable->value}
Definition: TraCIDefs.h:276
TRACI_CONST int CMD_SUBSCRIBE_EDGE_VARIABLE
TRACI_CONST int TYPE_STRING