iop_simplenet.cpp
Go to the documentation of this file.
1 /** @file iop_simplenet.cpp Simple networked events via tcp/ip */
2 
3 /*
4  FAU Discrete Event Systems Library (libfaudes)
5 
6  Copyright (C) 2008, Thomas Moor
7  Exclusive copyright is granted to Klaus Schmidt
8 
9 */
10 
11 
12 
13 #include "iop_simplenet.h"
14 
15 
16 namespace faudes {
17 
18 
19 /*
20  **********************************************
21  **********************************************
22  **********************************************
23 
24  implementation: SimplenetAddress
25 
26  **********************************************
27  **********************************************
28  **********************************************
29  */
30 
31 // std faudes
32 FAUDES_TYPE_IMPLEMENTATION(SimplenetDevice,nDevice,vDevice)
33 
34 // construct
36  mIp="";
37  mPort=-1;
38 }
39 
40 // copy construct
42  mIp=rOther.mIp;
43  mPort=rOther.mPort;
44 }
45 
46 // construct from string
47 SimplenetAddress::SimplenetAddress(const std::string& rString) {
48  IpColonPort(rString);
49 }
50 
51 // validity
52 bool SimplenetAddress::Valid(void) const {
53  if(mPort<=0) return false;
54  if(mIp=="") return false;
55  if(mIp.find(':',0)!=std::string::npos) return false;
56  return true;
57 }
58 
59 // get colon seperated string
60 std::string SimplenetAddress::IpColonPort(void) const {
61  std::string res;
62  if(!Valid()) return res;
63  res=mIp+":"+ToStringInteger(mPort);
64  return res;
65 }
66 
67 // Set from colon seperated string
68 void SimplenetAddress::IpColonPort(std::string ipcolonport) {
69  FD_DHV("SimplenetAddress::IpColonPort(): " << ipcolonport << " --> ?");
70  // invalid
71  mIp="";
72  mPort=-1;
73  // find colon
74  std::size_t cpos = ipcolonport.find(':',0);
75  if(cpos==std::string::npos) return;
76  if(cpos==0) return;
77  if(cpos+1>= ipcolonport.length()) return;
78  // extract ip
79  mIp=ipcolonport.substr(0,cpos);
80  // extract port
81  mPort=ToIdx(ipcolonport.substr(cpos+1));
82  // report
83  FD_DHV("SimplenetAddress::IpColonPort(): " << ipcolonport << " --> " << IpColonPort());
84  // test for errors (might be too strict)
85  if(IpColonPort()!=ipcolonport) {
86  mIp="";
87  mPort=-1;
88  }
89 }
90 
91 // sorting
93  if(this->mIp < rOther.mIp) return true;
94  if(this->mIp > rOther.mIp) return false;
95  if(this->mPort < rOther.mPort) return true;
96  return false;
97 }
98 
99 
100 // only compile for simplenet support
101 #ifdef FAUDES_IODEVICE_SIMPLENET
102 
103 /*
104  **********************************************
105  **********************************************
106  **********************************************
107 
108  implementation: AttributeSimplenetOutput
109 
110  **********************************************
111  **********************************************
112  **********************************************
113  */
114 
115 // std faudes type
117 
118 
119 //DoWrite(rTw);
120 void AttributeSimplenetOutput::DoWrite(TokenWriter& rTw, const std::string& rLabel, const Type* pContext) const {
121  (void) rLabel; (void) pContext;
122 }
123 
124 
125 //DoRead(rTr)
126 void AttributeSimplenetOutput::DoRead(TokenReader& rTr, const std::string& rLabel, const Type* pContext) {
127  (void) rLabel; (void) pContext;
128  // report
129  FD_DHV("AttributeSimplenetOutput(" << this << ")::DoRead(tr)");
130 
131  // sense and digest pre 2.16 format
132  Token token;
133  rTr.Peek(token);
134  if(token.Type()==Token::Begin)
135  if(token.StringValue()=="Output") {
136  rTr.ReadBegin("Output");
137  rTr.ReadEnd("Output");
138  }
139 }
140 
141 
142 /*
143  **********************************************
144  **********************************************
145  **********************************************
146 
147  implementation: AttributeSimplenetInput
148 
149  **********************************************
150  **********************************************
151  **********************************************
152  */
153 
154 
155 // std faudes type
157 
158 
159 //DoWrite(rTw);
160 void AttributeSimplenetInput::DoWrite(TokenWriter& rTw, const std::string& rLabel, const Type* pContext) const {
161  (void) rLabel; (void) pContext;
162 }
163 
164 
165 //DoRead(rTr)
166 void AttributeSimplenetInput::DoRead(TokenReader& rTr, const std::string& rLabel, const Type* pContext) {
167  (void) rLabel; (void) pContext;
168  // report
169  FD_DHV("AttributeSimplenetInput(" << this << ")::DoRead(tr)");
170 
171  // sense and digest pre 2.16 format
172  Token token;
173  rTr.Peek(token);
174  if(token.Type()==Token::Begin)
175  if(token.StringValue()=="Input") {
176  rTr.ReadBegin("Input");
177  rTr.ReadEnd("Input");
178  }
179 }
180 
181 
182 
183 
184 /*
185  **********************************************
186  **********************************************
187  **********************************************
188 
189  implementation: AttributeSimplenetEvent
190 
191  **********************************************
192  **********************************************
193  **********************************************
194  */
195 
196 
197 
198 // std faudes type
200 
201 // Default constructor, set my prototypes
203  FD_DHV("AttributeSimplenetEvent::AttributeSimplenetEvent(" << this << ")");;
204  pOutputPrototype=OutputPrototypep();
205  pInputPrototype=InputPrototypep();
206 }
207 
208 // Copy constructor
211 {
212  FD_DHV("AttributeSimplenetEvent(" << this << "): form other attr " << &rOtherAttr);
215  DoAssign(rOtherAttr);
216 }
217 
218 
219 // pseudo statics
222  return attrp;
223 }
224 
225 // pseudo statics
228  return attrp;
229 }
230 
231 
232 /*
233  **********************************************
234  **********************************************
235  **********************************************
236 
237  implementation: nDevice
238 
239  **********************************************
240  **********************************************
241  **********************************************
242  */
243 
244 // autoregister
246 
247 // helper: send entire buffer
248 int syncSend(int dest, const char* data, int len, int flag) {
249  int from=0;
250  int left=len;
251  while(left>0) {
252  int rc=send(dest, data+from, left, 0);
253  if(rc<0) {
254  std::stringstream errstr;
255  errstr << "Simplenet fatal network error (cannot send message)";
256  throw Exception("nDevice::syncSend", errstr.str(), 553, true); // mute console out
257  }
258  left-=rc;
259  }
260  return len;
261 }
262 
263 
264 // constructor
266  FD_DHV("nDevice(" << this << ")::nDevice()");
269  // default token section
270  mDefaultLabel="SimplenetDevice";
271  // default network data
272  mName="SimplenetNode";
273  mNetwork="Simplenet";
274  mListenAddress.IpColonPort("localhost:40000");
275  mBroadcastAddress.IpColonPort("255.255.255.255:40000");
276  // clear/prepare state
277  faudes_mutex_init(&mMutex);
278  mListenSocket=-1;
279  mBroadcastSocket=-1;
280 }
281 
282 // destructor
284  FD_DHV("nDevice(" << this << ")::~nDevice()");
285  // stop
286  Stop();
287  // clean up my data
288  faudes_mutex_destroy(&mMutex);
289 }
290 
291 // clear all configuration
292 void nDevice::Clear(void) {
293  FD_DHV("nDevice(" << this << ")::Clear()");
294  // call base, incl stop
295  vDevice::Clear();
296  // clear compiled data
297  mInputSubscriptions.clear();
298 }
299 
300 
301 // programmatic config: server address
302 void nDevice::ServerAddress(const std::string& rAddr) {
303  if(mState!=Down) return;
305 }
306 
307 // programmatic config: broadcast address
308 void nDevice::BroadcastAddress(const std::string& rAddr) {
309  if(mState!=Down) return;
311 }
312 
313 
314 // programmatic config: network name
315 void nDevice::NetworkName(const std::string& rNetwork) {
316  if(mState!=Down) return;
317  mNetwork=rNetwork;
318 }
319 
320 // programmatic config: insert node name
321 void nDevice::InsNode(const std::string& rNodeName) {
322  if(mState!=Down) return;
323  mNetworkNodes[rNodeName]="unknown:0";
324 }
325 
326 // programmatic config: insert node address
327 void nDevice::InsNodeAddress(const std::string& rNodeName, const std::string& rNodeAddress) {
328  if(mState!=Down) return;
329  mNetworkNodes[rNodeName]=rNodeAddress;
330 }
331 
332 // programmatic config: clear known nodes
334  if(mState!=Down) return;
335  mNetworkNodes.clear();
336 }
337 
338 // programmatic config: insert input event
339 void nDevice::InsInputEvent(const std::string& event) {
340  if(mState!=Down) return;
342  inp.DefaultInput();
343  Idx ev=pConfiguration->Insert(event);
344  pConfiguration->Attribute(ev, inp);
345 }
346 
347 // programmatic config: insert output event
348 void nDevice::InsOutputEvent(const std::string& event) {
349  if(mState!=Down) return;
351  outp.DefaultOutput();
352  Idx ev=pConfiguration->Insert(event);
353  pConfiguration->Attribute(ev, outp);
354 }
355 
356 
357 //Compile(void)
358 void nDevice::Compile(void){
359  //setup up internal data structure
360  FD_DHV("nDevice(" << this << ")::Compile()");
361  // call base
363 }
364 
365 
366 //DoWritePreface(rTr,rLabel)
367 void nDevice::DoWritePreface(TokenWriter& rTw, const std::string& rLabel, const Type* pContext) const {
368  FD_DHV("nDevice::DoWrite()");
369  //call base
370  vDevice::DoWritePreface(rTw,rLabel,pContext);
371  // write my data: my server role ip address
372  Token vtag;
373  vtag.SetEmpty("ServerAddress");
375  rTw<<vtag;
376  // write my data: my server broadcast address
377  Token btag;
378  btag.SetEmpty("BroadcastAddress");
380  rTw<<btag;
381  // write my data network topology
382  Token ntag;
383  ntag.SetBegin("Network");
384  ntag.InsAttributeString("name",mNetwork);
385  rTw<<ntag;
386  std::map<std::string,std::string>::const_iterator nit;
387  for(nit=mNetworkNodes.begin();nit!=mNetworkNodes.end();nit++) {
388  vtag.SetEmpty("Node");
389  vtag.InsAttributeString("name",nit->first);
390  SimplenetAddress defaddress(nit->second);
391  if(defaddress.Valid())
392  vtag.InsAttributeString("address",nit->second);
393  rTw<<vtag;
394  }
395  rTw.WriteEnd("Network");
396 }
397 
398 //DoRead(rTr,rLabel)
399 void nDevice::DoReadPreface(TokenReader& rTr, const std::string& rLabel, const Type* pContext) {
400  FD_DHV("nDevice::DoReadPreface()");
401  // call base (reads name and timescale)
402  vDevice::DoReadPreface(rTr,rLabel,pContext);
403 
404  // sense and digest pre 2.16 format
405  Token token;
406  rTr.Peek(token);
407  if(token.IsString()) {
409  if(!mListenAddress.Valid()) {
410  std::stringstream errstr;
411  errstr << "Simplenet address expected at " << rTr.FileLine();
412  throw Exception("nDevice::DoRead", errstr.str(), 50);
413  }
414  mNetwork=rTr.ReadString();
415  mNetworkNodes.clear();
416  rTr.ReadBegin("Network");
417  while(!rTr.Eos("Network")) {
418  mNetworkNodes[rTr.ReadString()]="unknown:0";
419  }
420  rTr.ReadEnd("Network");
421  return;
422  }
423 
424  // read my data: server address
425  Token atag;
426  rTr.ReadBegin("ServerAddress",atag);
428  if(!mListenAddress.Valid()) {
429  std::stringstream errstr;
430  errstr << "Simplenet address expected at " << rTr.FileLine();
431  throw Exception("nDevice::DoRead", errstr.str(), 50);
432  }
433  rTr.ReadEnd("ServerAddress");
434  // read my data: broadcast address (optional)
435  mBroadcastAddress.IpColonPort("255.255.255.255:40000");
436  rTr.Peek(token);
437  if(token.IsBegin("BroadcastAddress")) {
438  rTr.ReadBegin("BroadcastAddress",atag);
440  if(!mBroadcastAddress.Valid()) {
441  std::stringstream errstr;
442  errstr << "Simplenet address expected at " << rTr.FileLine();
443  throw Exception("nDevice::DoRead", errstr.str(), 50);
444  }
445  rTr.ReadEnd("BroadcastAddress");
446  }
447  // read my data: network
448  Token ntag;
449  rTr.ReadBegin("Network",ntag);
450  mNetwork=ntag.AttributeStringValue("name");
451  // loop network nodes
452  while(!rTr.Eos("Network")) {
453  rTr.ReadBegin("Node",ntag);
454  std::string node=ntag.AttributeStringValue("name");
455  InsNode(node);
456  // undocumented feature: explicit server addresses in dev file; tmoor 20121113
457  if(ntag.ExistsAttributeString("address")) {
458  SimplenetAddress defaddress;
459  defaddress.IpColonPort(ntag.AttributeStringValue("address"));
460  if(!defaddress.Valid()) {
461  std::stringstream errstr;
462  errstr << "Simplenet address expected at " << rTr.FileLine();
463  throw Exception("nDevice::DoRead", errstr.str(), 50);
464  }
465  mNetworkNodes[node]=defaddress.IpColonPort();
466  }
467  rTr.ReadEnd("Node");
468  }
469  rTr.ReadEnd("Network");
470 }
471 
472 
473 // lock - unlock shortcuts;
474 #define LOCK_E {int rc = faudes_mutex_lock(&mMutex); \
475  if(rc) {FD_ERR("nDevice::LOCK_E: lock mutex error\n"); exit(1); }}
476 #define UNLOCK_E {int rc = faudes_mutex_unlock(&mMutex); \
477  if(rc) {FD_ERR("nDevice::LOCK_E: unlock mutex error\n"); exit(1); }}
478 #define TLOCK_E {int rc = faudes_mutex_lock(&ndevice->mMutex); \
479  if(rc) {FD_ERR("nDevice::TLOCK_E: lock mutex error\n"); exit(1); }}
480 #define TUNLOCK_E {int rc = faudes_mutex_unlock(&ndevice->mMutex); \
481  if(rc) {FD_ERR("nDevice::TLOCK_E: unlock mutex error\n"); exit(1); }}
482 
483 
484 // Write Output
485 void nDevice::WriteOutput(Idx output) {
486 
487  FD_DHV("nDevice::WriteOutput(" << mOutputs.SymbolicName(output) << ")");
488 
489  // bail out (do notify clients even when servers incomplete)
490  if(mState!=Up && mState!=StartUp) return;
491 
492  // test event
493  if(!mOutputs.Exists(output)) {
494  std::stringstream errstr;
495  errstr << "Unknown output event " << output;
496  throw Exception("nDevice::WriteOutput", errstr.str(), 65);
497  }
498 
499  // find properties
500  const AttributeSimplenetOutput* aattr = pConfiguration->Attribute(output).Outputp();
501  if(!aattr) {
502  std::stringstream errstr;
503  errstr << "Invalid output attribute " << output;
504  throw Exception("nDevice::WriteOutput", errstr.str(), 65);
505  }
506 
507  // report
508  std::string message= "<Notify> " + mOutputs.SymbolicName(output) + " </Notify>\n";
509  FD_DHV("nDevice::WriteOutput(): message: " << message.substr(0,message.length()-1));
510 
511  // send event to those clients that did subscribe
512  LOCK_E;
513  int clientsock=-1;
514  try {
515  std::map<int,ClientState>::iterator sit=mOutputClientStates.begin();
516  for(;sit!=mOutputClientStates.end();sit++) {
517  if(!sit->second.mEvents.Empty())
518  if(!sit->second.mEvents.Exists(output))
519  continue;
520  clientsock=sit->second.mClientSocket;
521  if(clientsock>0) {
522  FD_DHV("nDevice::WriteOutput(): to socket " << clientsock);
523  syncSend(clientsock, message.c_str(), message.length(), 0);
524  }
525  }
526  } catch (faudes::Exception&) {
527  FD_DH("nDevice::WriteOutput(): failed to notify client on socket " << clientsock);
528  }
529  UNLOCK_E;
530  FD_DHV("nDevice::WriteOutput(): done");
531 }
532 
533 
534 
535 // Start(void)
536 void nDevice::Start(void) {
537  if(mState!=Down) return;
538  FD_DH("nDevice(" << mName <<")::Start()");
539  // call base
540  vDevice::Start();
541  mState=StartUp;
542  // clear event server states
543  mInputServerStates.clear();
544  std::map<std::string,std::string>::iterator nit;
545  for(nit=mNetworkNodes.begin(); nit!=mNetworkNodes.end();nit++) {
546  if(nit->first == mName) continue;
547  mInputServerStates[nit->first].mAddress= SimplenetAddress(nit->second);
548  mInputServerStates[nit->first].mEvents= EventSet();
549  mInputServerStates[nit->first].mServerSocket=-1;
550  mInputServerStates[nit->first].mLineBuffer="";
551  }
552  // clear client states
553  mOutputClientStates.clear();
554  // set my effective address
555  char hostname[1024];
556  int hostname_len =1023;
557  if(gethostname(hostname,hostname_len)!=0) {
558  std::stringstream errstr;
559  errstr << "Simplenet fatal network error (cannot get hostname)";
560  throw Exception("nDevice::Start", errstr.str(), 553);
561  }
562  hostname[hostname_len]=0;
564  mEffectiveListenAddress.Ip(hostname);
565  FD_DH("nDevice::Start(): server adress " << mEffectiveListenAddress.IpColonPort());
566  // open a tcp port to listen: create socket
567  mListenSocket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
568  if(mListenSocket<=0) {
569  std::stringstream errstr;
570  errstr << "Simplenet fatal network error (cannot open server socket)";
571  throw Exception("nDevice::Start", errstr.str(), 553);
572  }
573  int reuse=1;
574  faudes_setsockopt(mListenSocket,SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
575  // open a tcp port to listen: set up address
576  struct sockaddr_in serveraddr;
577  memset(&serveraddr, 0, sizeof(serveraddr));
578  serveraddr.sin_family = AF_INET;
579  serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
580  serveraddr.sin_port = htons(mListenAddress.Port());
581  // open a tcp port to listen: bind socket to address
582  if(bind(mListenSocket, (struct sockaddr *) &serveraddr,sizeof(serveraddr)) <0) {
583  std::stringstream errstr;
584  errstr << "Simplenet fatal network error (cannot bind socket)";
585  throw Exception("nDevice::Start", errstr.str(), 553);
586  }
587  // open a tcp port to listen: start to listen
588  if(listen(mListenSocket, 77) < 0) { // todo: max pending connections
589  std::stringstream errstr;
590  errstr << "Simplenet fatal network error (cannot listen from socket)";
591  throw Exception("nDevice::Start", errstr.str(), 553);
592  }
593  // open a udp port to listen: create socket
594  mBroadcastSocket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
595  if(mBroadcastSocket<=0) {
596  std::stringstream errstr;
597  errstr << "Simplenet fatal network error (cannot open broadcast socket)";
598  throw Exception("nDevice::Start", errstr.str(), 553);
599  }
600  //int reuse=1;
601  faudes_setsockopt(mBroadcastSocket,SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
602  faudes_setsockopt(mBroadcastSocket,SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse));
603  // open a udp port: enable broadcast
604  // int reuse
605  if(faudes_setsockopt(mBroadcastSocket, SOL_SOCKET, SO_BROADCAST, &reuse, sizeof(reuse)) ) {
606  std::stringstream errstr;
607  errstr << "Simplenet fatal network error (cannot setopt broadcast socket)";
608  throw Exception("nDevice::Start", errstr.str(), 553);
609  }
610  // open a udp port to listen: set up address
611  struct sockaddr_in broadcastaddr;
612  memset(&broadcastaddr, 0, sizeof(broadcastaddr));
613  broadcastaddr.sin_family = AF_INET;
614  broadcastaddr.sin_addr.s_addr = htonl(INADDR_ANY);
615  broadcastaddr.sin_port = htons(mBroadcastAddress.Port());
616  // open a udp port to listen: bind socket to address
617  if(bind(mBroadcastSocket, (struct sockaddr *) &broadcastaddr,sizeof(broadcastaddr)) <0) {
618  std::stringstream errstr;
619  errstr << "Simplenet fatal network error (cannot bind broadcast socket)";
620  throw Exception("nDevice::Start", errstr.str(), 553);
621  }
622  // start background thread to listen: create & run thread
623  mStopListen=false;
624  int rc = faudes_thread_create(&mThreadListen, NDeviceListen, this);
625  // thread error
626  if(rc) {
627  std::stringstream errstr;
628  errstr << "Simplenet fatal thread error (cannot create thread)";
629  throw Exception("nDevice::Start", errstr.str(), 554);
630  }
631 }
632 
633 // Stop(void)
634 void nDevice::Stop(void) {
635  // bail out
636  if(mState!=Up && mState!=StartUp) return;
637  FD_DH("nDevice::Stop()");
638  LOCK_E;
639  // stop background threads
640  mStopListen=true;
641  UNLOCK_E;
642  // signal update to my listen thread: via udp message ... release select
643  std::string message= "<Stop> " + mNetwork + " " + mName + " </Stop>\n";
644  struct sockaddr_in broadcastaddr;
645  memset(&broadcastaddr, '\0', sizeof(broadcastaddr));
646  broadcastaddr.sin_family=AF_INET;
647  broadcastaddr.sin_port=htons(mBroadcastAddress.Port());
648  broadcastaddr.sin_addr.s_addr=inet_addr(mBroadcastAddress.Ip().c_str());
649  LOCK_E;
650  sendto(mBroadcastSocket,message.c_str(),message.length(),0,
651  (struct sockaddr *) & broadcastaddr, sizeof(broadcastaddr));
652  UNLOCK_E;
653  // wait until listen thread finished
654  FD_DH("nDevice::Stop(): waiting for listen thread");
655  faudes_thread_join(mThreadListen, NULL);
656  FD_DH("nDevice::Stop(): listen thread finished");
657  // close broadcast socket
658  shutdown(mBroadcastSocket,2);
659  faudes_closesocket(mBroadcastSocket);
660  mBroadcastSocket=-1;
661  // close server socket
662  shutdown(mListenSocket,2);
663  faudes_closesocket(mListenSocket);
664  mListenSocket=-1;
665  // call base (implies reset)
666  vDevice::Stop();
667 }
668 
669 
670 
671 // background thread,
672 // - receiving requests on broadcast port
673 // - sending requests broadcats
674 // - accept connections on listen port for output clients
675 // - notify connected output clients about events
676 // - connecting to input servers to receiving notifications
677 void* NDeviceListen(void* arg){
678  bool term;
679  std::map<std::string,nDevice::ServerState>::iterator sit;
680  std::map<int,nDevice::ClientState>::iterator cit;
681  // cast this object
682  nDevice* ndevice= static_cast<nDevice*>(arg);
683  // say hello
684  FD_DH("nDevice::Listen(" << ndevice << ")");
685  // clear broadcast time stamp
686  faudes_systime_t lastbroadcast;
687  lastbroadcast.tv_sec=0;
688  lastbroadcast.tv_nsec=0;
689 #ifdef FAUDES_DEBUG_IODEVICE
690  // clear debugging time stamp
691  int debuglisten=0;
692 #endif
693 
694  // infinite loop
695  while(true){
696 
697  // detect missing servers
698  int servermis=0;
699  int serverunknown=0;
700  for(sit=ndevice->mInputServerStates.begin(); sit!=ndevice->mInputServerStates.end(); sit++) {
701  // have no address?
702  if(!sit->second.mAddress.Valid()) {
703  FD_DH("nDevice::Listen(): missing server address for node: " << sit->first);
704  serverunknown++;
705  servermis++;
706  continue;
707  }
708  // have no connection
709  if(sit->second.mServerSocket<=0) {
710  FD_DH("nDevice::Listen(): missing server connection for node: " << sit->first);
711  servermis++;
712  continue;
713  }
714  }
715 
716  // detect missing clients (extension 2.22i, trust by number, should ask nodename on subscription)
717  int clientmis= ndevice->mNetworkNodes.size()-1;
718  for(cit=ndevice->mOutputClientStates.begin(); cit!=ndevice->mOutputClientStates.end(); cit++) {
719  if(cit->second.mClientSocket<0) continue;
720  if(cit->second.mConnected) clientmis--;
721  }
722 #ifdef FAUDES_DEBUG_IODEVICE
723  if(clientmis!=servermis)
724  FD_DH("nDevice::Listen(): missing clients to subscribe: #"<< clientmis);
725 #endif
726 
727  // update state
728  if((servermis>0 || clientmis>0) && ndevice->mState==vDevice::Up) {
729  TLOCK_E;
730  ndevice->mState=vDevice::StartUp;
731  TUNLOCK_E;
732  }
733  if(servermis==0 && clientmis==0 && ndevice->mState==vDevice::StartUp) {
734  TLOCK_E;
735  ndevice->mState=vDevice::Up;
736  TUNLOCK_E;
737  }
738 
739 
740  // try to find input servers
741  if(serverunknown>0 && ndevice->mState==vDevice::StartUp) {
742  // is a broadcast due? (period 5sec)
743  faudes_systime_t now;
744  faudes_gettimeofday(&now);
745  faudes_mstime_t diffms;
746  faudes_diffsystime(now,lastbroadcast,&diffms);
747  if(diffms>5000) {
748  // udp message
749  std::string message= "<Request> "
750  + ndevice->mNetwork + " " + ndevice->mName + " </Request>\n";
751  // udp broadcast
752  struct sockaddr_in broadcastaddr;
753  memset(&broadcastaddr, '\0', sizeof(broadcastaddr));
754  broadcastaddr.sin_family=AF_INET;
755  broadcastaddr.sin_port=htons(ndevice->mBroadcastAddress.Port());
756  broadcastaddr.sin_addr.s_addr=inet_addr(ndevice->mBroadcastAddress.Ip().c_str());
757  TLOCK_E;
758  int rc=sendto(ndevice->mBroadcastSocket,message.c_str(),message.length(),
759  0,(struct sockaddr *) & broadcastaddr, sizeof(broadcastaddr));
760  (void) rc;
761  FD_DH("nDevice::Listen(): broadcast request: " << message.substr(0,message.length()-1) << " #" << rc);
762  TUNLOCK_E;
763  faudes_gettimeofday(&lastbroadcast);
764  }
765  }
766 
767  // subscribe to missing servers
768  for(sit=ndevice->mInputServerStates.begin(); sit!=ndevice->mInputServerStates.end(); sit++) {
769  // have active connection?
770  if(sit->second.mServerSocket>0) continue;
771  // have no address?
772  if(!sit->second.mAddress.Valid()) continue;
773  // try to connect
774  FD_DH("nDevice::Listen(): subscribing to " << sit->first <<
775  " at " << sit->second.mAddress.IpColonPort());
776  // open a tcp port: create socket
777  int serversock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
778  if(serversock<=0) {
779  FD_DH("nDevice::Listen(): subscription failed: no socket");
780  continue;
781  }
782  // open a tcp port: set up internet address
783  unsigned long int serverinaddr = INADDR_NONE;
784  if(serverinaddr==INADDR_NONE) {
785  FD_DH("nDevice::Listen(): using address as advertised");
786  serverinaddr = inet_addr(sit->second.mAddress.Ip().c_str());
787  }
788  if(serverinaddr==INADDR_NONE) {
789  struct hostent *host;
790  host = gethostbyname(sit->second.mAddress.Ip().c_str());
791  if(host!=0) {
792  FD_DH("nDevice::Listen(): using address by name lookup");
793  serverinaddr = *(unsigned long int*) host->h_addr;
794  }
795  }
796  if(serverinaddr==INADDR_NONE) {
797  FD_DH("nDevice::Listen(): subscription failed: invalid address " << sit->second.mAddress.Ip());
798  faudes_closesocket(serversock);
799  continue;
800  }
801  // open a tcp port: set up socket address
802  struct sockaddr_in serveraddress;
803  memset(&serveraddress, 0, sizeof(serveraddress));
804  serveraddress.sin_family = AF_INET;
805  serveraddress.sin_addr.s_addr=serverinaddr;
806  serveraddress.sin_port = htons(sit->second.mAddress.Port());
807  // open a tcp port: connect
808  if(connect(serversock, (struct sockaddr*) &serveraddress, sizeof(serveraddress))<0) {
809  FD_DH("nDevice::Listen(): subscription failed: connect");
810  faudes_closesocket(serversock);
811  continue;
812  }
813  // say hello to remote input server
814  try {
815  std::string hello;
816  hello="% Simplenet universal event subscription: "+ndevice->mName+" subscribing from "+sit->first+"\n";
817  syncSend(serversock, hello.c_str(), hello.length(), 0);
818  hello="% Expecting notifications in format '<Notify> event_name </Notify>'\n";
819  syncSend(serversock, hello.c_str(), hello.length(), 0);
820  hello="% Trying to subscribe to all required events\n";
821  syncSend(serversock, hello.c_str(), hello.length(), 0);
822  } catch (faudes::Exception&) {
823  faudes_closesocket(serversock);
824  serversock=-1;
825  }
826  if(serversock<0) {
827  FD_DH("nDevice::Listen(): subscription failed: cannot write");
828  faudes_closesocket(serversock);
829  continue;
830  }
831  // record success
832  FD_DH("nDevice::Listen(): subscribing to " << sit->first << " via socket " << serversock);
833  sit->second.mServerSocket=serversock;
834  // subscribe to all input events
835  EventSet sevents=ndevice->Inputs();
836  sevents.Name("Subscribe");
837  std::string message=sevents.ToString() + "\n";
838  syncSend(serversock,message.c_str(), message.length(),0);
839  // used to get info in pre 2.22h
840  /*
841  hello="% Going to Sending info command, explicit subscription may follow\n";
842  hello="<Cmd> Info </Cmd>\n";
843  syncSend(serversock, hello.c_str(), hello.length(), 0);
844  */
845  FD_DH("nDevice::Listen(): subscribing to " << sit->first << " via socket " << serversock << ": ok");
846  }
847 
848 
849  // prepare relevant wait on sources ...
850  fd_set mysocks;
851  int mysocks_max=0;
852  FD_ZERO(&mysocks);
853  // ... my server listen socket, expecting other nodes to connect and subscribe
854  if(mysocks_max<ndevice->mListenSocket) mysocks_max=ndevice->mListenSocket;
855  if(mysocks_max>= FD_SETSIZE) FD_ERR("NDeviceListen: fail to select socket " << mysocks_max);
856  FD_SET(ndevice->mListenSocket, &mysocks);
857  // ... udp port, expecting requests and adverts
858  if(mysocks_max< ndevice->mBroadcastSocket) mysocks_max=ndevice->mBroadcastSocket;
859  if(mysocks_max>= FD_SETSIZE) FD_ERR("NDeviceListen: fail to select socket " << mysocks_max);
860  FD_SET(ndevice->mBroadcastSocket, &mysocks);
861  // ... input server connections, expecting notifications
862  for(sit=ndevice->mInputServerStates.begin(); sit!=ndevice->mInputServerStates.end(); sit++) {
863  int serversock=sit->second.mServerSocket;
864  if(serversock<0) continue;
865  if(mysocks_max< serversock) mysocks_max=serversock;
866  if(mysocks_max>= FD_SETSIZE) FD_ERR("NDeviceListen: fail to select socket " << mysocks_max);
867  FD_SET(serversock, &mysocks);
868  }
869  // ... output client connections, expecting commands
870  for(cit=ndevice->mOutputClientStates.begin(); cit!=ndevice->mOutputClientStates.end(); cit++) {
871  int clientsock=cit->second.mClientSocket;
872  if(clientsock<0) continue;
873  if(mysocks_max< clientsock) mysocks_max=clientsock;
874  if(mysocks_max>= FD_SETSIZE) FD_ERR("NDeviceListen: fail to select socket " << mysocks_max);
875  FD_SET(clientsock, &mysocks);
876  }
877 
878  // wait for traffic with moderate timeout
879  struct timeval tv;
880  tv.tv_sec = 1;
881  tv.tv_usec = 0;
882  int avail=select(mysocks_max+1, &mysocks, NULL, NULL, &tv);
883 
884  // terminate thread on request (before accepting incomming connections)
885  TLOCK_E;
886  term= ndevice->mStopListen;
887  TUNLOCK_E;
888  if(term) break;
889 
890  // reduce debugging output
891  #ifdef FAUDES_DEBUG_IODEVICE
892  debuglisten++;
893  if((debuglisten>10) || (avail>0)) {
894  FD_DH("nDevice::Listen(): listen as node \"" << ndevice->mName << "\" on network \"" << ndevice->mNetwork << "\"" << " #" << avail);
895  debuglisten=0;
896  }
897  #endif
898 
899  // handle incomming connection requests
900  if(avail>0)
901  if(FD_ISSET(ndevice->mListenSocket,&mysocks)) {
902  avail--;
903  int clientsock=-1;
904  struct sockaddr_in clientaddr;
905  socklen_t clientaddr_len = sizeof(clientaddr);
906  clientsock=accept(ndevice->mListenSocket, (struct sockaddr *) &clientaddr, &clientaddr_len );
907  if(clientsock<0) {
908  FD_DH("nDevice::Listen(): failed to accept incomming connection");
909  break;
910  }
911  FD_DH("nDevice::Listen(): accepted connection from client " << inet_ntoa(clientaddr.sin_addr) <<
912  " on socket " << clientsock);
913  // say hello
914  try {
915  std::string hello;
916  hello="% Simplenet Event Server: "+ndevice->mName+" providing events\n";
917  syncSend(clientsock, hello.c_str(), hello.length(), 0);
918  hello="% Notifications will have format '<Notify> event_name </Notify>'\n";
919  syncSend(clientsock, hello.c_str(), hello.length(), 0);
920  hello="% Commands are accepted in format '<Cmd> cmd_name </Cmd>'\n";
921  syncSend(clientsock, hello.c_str(), hello.length(), 0);
922  hello="% Supported commands are Subscribe, Info, Status, and ResetRequest\n";
923  syncSend(clientsock, hello.c_str(), hello.length(), 0);
924  } catch (faudes::Exception&) {
925  faudes_closesocket(clientsock);
926  clientsock=-1;
927  }
928  if(clientsock<0) {
929  FD_DH("nDevice::Listen(): connection test failed: cannot write");
930  faudes_closesocket(clientsock);
931  break;
932  }
933  // record client by socket
934  TLOCK_E;
935  nDevice::ClientState* cstate = &ndevice->mOutputClientStates[clientsock];
936  cstate->mClientSocket=clientsock;
937  cstate->mEvents.Clear();
938  cstate->mConnected=false;
939  cstate->mLineBuffer="";
940  TUNLOCK_E;
941  }
942 
943  // handle incomming broadcast
944  if(avail>0)
945  if(FD_ISSET(ndevice->mBroadcastSocket,&mysocks)) {
946  avail--;
947  // get message
948  char data[1024];
949  int data_len=1023;
950  struct sockaddr_in fromaddr;
951  socklen_t fromaddr_len = sizeof(fromaddr);
952  data_len=recvfrom(ndevice->mBroadcastSocket,data,data_len,0, (struct sockaddr*) &fromaddr,&fromaddr_len);
953  if(data_len<0) data_len=0; // todo: eof
954  data[data_len]=0;
955  if(data_len>=1) if(data[data_len-1]=='\n') data[data_len-1]=0;
956  FD_DH("nDevice::Listen(): received udp datagram " << data <<
957  " from " << inet_ntoa(fromaddr.sin_addr));
958  // interpret message
959  TokenReader tr(TokenReader::String,std::string(data));
960  try {
961  Token token;
962  tr.Peek(token);
963  // interpret udp request ...
964  if(token.IsBegin("Request")) {
965  tr.ReadBegin("Request");
966  if(tr.ReadString()==ndevice->mNetwork) {
967  // extension 2.22i: identify sender (optional for compatibility)
968  std::string snode;
969  Token stoken;
970  tr.Peek(stoken);
971  if(stoken.IsString()) snode=stoken.StringValue();
972  // extension 2.22i: if this is a missing server, reset my request timer
973  sit=ndevice->mInputServerStates.find(snode);
974  if(sit!=ndevice->mInputServerStates.end()) {
975  if(sit->second.mServerSocket==-1) {
976  lastbroadcast.tv_sec=0;
977  lastbroadcast.tv_nsec=0;
978  }
979  }
980  // extension 2.22i: ignore my own requests
981  if(snode!=ndevice->mName) {
982  // set up advert
983  std::string message= "<Advert> "
984  + ndevice->mNetwork + " "
985  + ndevice->mName + " " +
986  ndevice->mEffectiveListenAddress.IpColonPort()+ " </Advert>\n";
987  // udp reply
988  struct sockaddr_in replyaddr;
989  memset(&replyaddr, '\0', sizeof(replyaddr));
990  replyaddr.sin_family=AF_INET;
991  replyaddr.sin_port=htons(ndevice->mBroadcastAddress.Port());
992  //replyaddr.sin_addr.s_addr=fromaddr.sin_addr.s_addr;
993  replyaddr.sin_addr.s_addr=inet_addr(ndevice->mBroadcastAddress.Ip().c_str());
994  //replyaddr.sin_addr.s_addr=htonl(INADDR_BROADCAST);
995  TLOCK_E;
996  int rc = sendto(ndevice->mBroadcastSocket,message.c_str(),message.length(),0,(struct sockaddr *) & replyaddr, sizeof(replyaddr));
997  TUNLOCK_E
998  FD_DH("nDevice::Listen(): reply advert: " << message.substr(0,message.length()-1) << " #" << rc);
999  } else {
1000  FD_DH("nDevice::Listen(): ingoring request from myself");
1001  }
1002  } else {
1003  FD_DH("nDevice::Listen(): ingoring request from other network");
1004  }
1005  }
1006  // interpret udp advert ...
1007  if(token.IsBegin("Advert")) {
1008  tr.ReadBegin("Advert");
1009  if(tr.ReadString()==ndevice->mNetwork) {
1010  std::string node = tr.ReadString();
1011  std::string saddr = tr.ReadString();
1012  SimplenetAddress addr(saddr);
1013  addr.Ip(inet_ntoa(fromaddr.sin_addr)); // overwrite with actual ip
1014  FD_DHV("nDevice::Listen(): figure actual ip address " << addr.Ip());
1015  if(!addr.Valid()) {
1016  addr=saddr;
1017  FD_DH("nDevice::Listen(): fallback to explicit ip address " << addr.Ip());
1018  }
1019  std::map<std::string,nDevice::ServerState>::iterator sit;
1020  sit=ndevice->mInputServerStates.find(node);
1021  if(sit==ndevice->mInputServerStates.end()) {
1022  FD_DH("nDevice::Listen(): ignoring irrelevant advert from " << node);
1023  } else if(sit->second.mAddress.Valid()) {
1024  FD_DH("nDevice::Listen(): ignoring address overwrite (hardwired?) " << node);
1025  }
1026  if(sit!=ndevice->mInputServerStates.end())
1027  if(!sit->second.mAddress.Valid()) {
1028  FD_DH("nDevice::Listen(): accept advert " << node);
1029  sit->second.mAddress=addr;
1030  if(sit->second.mServerSocket>=0) faudes_closesocket(sit->second.mServerSocket);
1031  sit->second.mServerSocket=-1;
1032  }
1033  } else {
1034  FD_DH("nDevice::Listen(): ingoring advert from other network");
1035  }
1036  }
1037  } catch (faudes::Exception&) {
1038  FD_DH("nDevice::Listen(): ignore invalid udp message");
1039  }
1040  }
1041 
1042 
1043  // handle input servers: receive event notification
1044  int revcount=0;
1045  if(avail>0)
1046  for(sit=ndevice->mInputServerStates.begin(); sit!=ndevice->mInputServerStates.end(); sit++) {
1047  int serversock=sit->second.mServerSocket;
1048  if(serversock<0) continue;
1049  if(FD_ISSET(serversock, &mysocks)) {
1050  avail--;
1051  FD_DH("nDevice::Listen(): reading sock " << serversock);
1052  // buffer data in line buffer
1053  char buffer[1025];
1054  int count = recv(serversock, buffer, 1024, 0);
1055  if(count<=0) { // todo: test eof
1056  FD_DH("nDevice::Listen(): reading server sock " << serversock << " : eof");
1057  faudes_closesocket(serversock);
1058  sit->second.mServerSocket=-1;
1059  continue;
1060  }
1061  FD_DH("nDevice::Listen(): reading server sock " << serversock << ": #" << count);
1062  buffer[count]=0;
1063  sit->second.mLineBuffer +=std::string(buffer);
1064  // interpret line(s)
1065  if(count>0)
1066  if(buffer[count-1]=='\n')
1067  if(sit->second.mLineBuffer.length()>0)
1068  {
1069  const std::string& linebuffer = sit->second.mLineBuffer;
1070 #ifdef FAUDES_DEBUG_IODEVICE
1071  if(linebuffer.length()>0)
1072  if(linebuffer[0]!='%')
1073  FD_DH("nDevice::Listen(): reading server sock " << serversock << ": line: " << linebuffer);
1074 #endif
1075  // tokenise notification
1076  TokenReader tr(TokenReader::String,linebuffer);
1077  try {
1078  Token token;
1079  while(tr.Peek(token)) {
1080  // its an event notify
1081  if(token.Type()==Token::Begin && token.StringValue()=="Notify") {
1082  tr.ReadBegin("Notify");
1083  std::string event = tr.ReadString();
1084  tr.ReadEnd("Notify");
1085  faudes_mutex_lock(ndevice->pBufferMutex);
1086  FD_DH("nDevice::Listen(): found event " << event);
1087  Idx sev=ndevice->mInputs.Index(event);
1088  if(ndevice->mInputs.Exists(sev)) ndevice->pInputBuffer->push_back(sev);
1089  faudes_mutex_unlock(ndevice->pBufferMutex);
1090  revcount++;
1091  continue;
1092  }
1093  // its an info reply (ignored as of 2.22i)
1094  if(token.Type()==Token::Begin && token.StringValue()=="SimplenetDevice") {
1095  FD_DH("nDevice::Listen(): found device info");
1096  nDevice remote;
1097  remote.Read(tr);
1098  FD_DH("nDevice::Listen(): found device with outputs " << remote.Outputs().ToString());
1099  // used to subscribe on relevant events in pre 2.22h
1100  /*
1101  EventSet sevents=ndevice->Inputs();
1102  sevents.SetIntersection(remote.Outputs());
1103  sevents.Name("Subscribe");
1104  std::string message=sevents.ToString();
1105  FD_DH("nDevice::Listen(): subscribing events " << message);
1106  message += "\n";
1107  syncSend(serversock,message.c_str(), message.length(),0);
1108  */
1109  continue;
1110  }
1111  // skip other sections
1112  if(token.Type()==Token::Begin) {
1113  FD_DH("nDevice::Listen(): ignore section " << token.StringValue());
1114  std::string section=token.StringValue();
1115  tr.ReadBegin(section);
1116  while(!tr.Eos(section)) tr.Get(token);
1117  tr.ReadEnd(section);
1118  continue;
1119  }
1120  // ignore token
1121  FD_DH("nDevice::Listen(): error: ignore token");
1122  tr.Get(token);
1123  }
1124  } catch (faudes::Exception&) {
1125  FD_DH("nDevice::Listen(): " << serversock << ": invalid notification");
1126  }
1127  sit->second.mLineBuffer.clear();
1128  }
1129  }
1130  }
1131 
1132  // handle output clients: reply to commands
1133  if(avail>0)
1134  for(cit=ndevice->mOutputClientStates.begin(); cit!=ndevice->mOutputClientStates.end(); cit++) {
1135  int clientsock=cit->second.mClientSocket;
1136  if(clientsock<0) continue;
1137  if(FD_ISSET(clientsock, &mysocks)) {
1138  avail--;
1139  FD_DH("nDevice::Listen(): reading client sock " << clientsock);
1140  // buffer data in line buffer
1141  char buffer[1025];
1142  int count = recv(clientsock, buffer, 1024, 0);
1143  if(count<=0) { // todo: test eof
1144  FD_DH("nDevice::Listen(): reading client sock " << clientsock << " : eof");
1145  TLOCK_E;
1146  faudes_closesocket(clientsock);
1147  cit->second.mClientSocket=-1;
1148  cit->second.mConnected=false;
1149  TUNLOCK_E;
1150  continue;
1151  }
1152  FD_DH("nDevice::Listen(): reading client sock " << clientsock << ": #" << count);
1153  buffer[count]=0;
1154  cit->second.mLineBuffer +=std::string(buffer);
1155  // interpret line(s)
1156  if(count>0)
1157  if(buffer[count-1]=='\n')
1158  if(cit->second.mLineBuffer.length()>0)
1159  {
1160  const std::string& linebuffer = cit->second.mLineBuffer;
1161 #ifdef FAUDES_DEBUG_IODEVICE
1162  if(linebuffer.length()>0)
1163  if(linebuffer[0]!='%')
1164  FD_DH("nDevice::Listen(): reading client sock " << clientsock << ": line: " << linebuffer);
1165 #endif
1166  // tokenise command
1167  TokenReader tr(TokenReader::String,linebuffer);
1168  try {
1169  Token token;
1170  while(tr.Peek(token)) {
1171  // its a command
1172  if(token.IsBegin("Cmd")) {
1173  tr.ReadBegin("Cmd");
1174  std::string cmd = tr.ReadString();
1175  tr.ReadEnd("Cmd");
1176  std::string response="<NAck> </NAck>\n";
1177  FD_DH("nDevice::Reply(" << clientsock << "): received cmd " << cmd);
1178  // command: info
1179  if(cmd=="Info") {
1180  TLOCK_E;
1181  response=ndevice->ToString() + "\n";
1182  TUNLOCK_E;
1183  }
1184  // command: status
1185  if(cmd=="Status") {
1186  TLOCK_E;
1187  if(ndevice->mState==vDevice::Up) response="<Ack> Up </Ack>\n";
1188  if(ndevice->mState==vDevice::StartUp) response="<Ack> StartUp </Ack>\n";
1189  if(ndevice->mState==vDevice::ShutDown) response="<Ack> ShutDown </Ack>\n";
1190  TUNLOCK_E;
1191  }
1192  // its a reset request
1193  if(cmd=="ResetRequest") {
1194  FD_DH("nDevice::Reply(" << clientsock << "): reset request");
1195  faudes_mutex_lock(ndevice->pBufferMutex);
1196  if(!ndevice->mResetRequest) revcount++;
1197  ndevice->mResetRequest=true;
1198  faudes_mutex_unlock(ndevice->pBufferMutex);
1199  response="";
1200  }
1201  // send reply
1202  syncSend(clientsock, response.c_str(), response.length(), 0);
1203  }
1204  // its a event subscription
1205  if(token.IsBegin("Subscribe")) {
1206  EventSet sevents;
1207  sevents.Read(tr,"Subscribe");
1208  sevents.RestrictSet(ndevice->Outputs());
1209  sevents.Name("Subscribed");
1210  FD_DH("nDevice::Reply(" << clientsock << "): providing events " << sevents.ToString());
1211  TLOCK_E;
1212  cit->second.mEvents.Clear();
1213  cit->second.mEvents.InsertSet(sevents);
1214  cit->second.mConnected=true;
1215  std::string response=sevents.ToString()+"\n";
1216  TUNLOCK_E;
1217  // send reply
1218  syncSend(clientsock, response.c_str(), response.length(), 0);
1219  }
1220  }
1221  } catch (faudes::Exception&) {
1222  FD_DH("nDevice::Reply(" << clientsock << "): invalid cmd");
1223  }
1224  cit->second.mLineBuffer.clear();
1225  }
1226  }
1227  }
1228 
1229  // signal condition for received events / reset requests
1230  if(revcount>0) {
1231  FD_DH("nDevice::Listen(): broadcast condition");
1232  faudes_mutex_lock(ndevice->pWaitMutex);
1233  faudes_cond_broadcast(ndevice->pWaitCondition);
1234  faudes_mutex_unlock(ndevice->pWaitMutex);
1235  revcount=0;
1236  }
1237 
1238  // should remove unconnected clients ?
1239 
1240 
1241  // some error
1242  if(avail<0) {
1243  FD_DH("nDevice::Listen(): select error");
1244  }
1245 
1246  }
1247 
1248  // close clientsockets
1249  FD_DH("nDevice::Listen(): close client sockets");
1250  TLOCK_E;
1251  for(cit=ndevice->mOutputClientStates.begin(); cit!=ndevice->mOutputClientStates.end(); cit++) {
1252  int clientsock= cit->second.mClientSocket;
1253  if(clientsock>0) faudes_closesocket(clientsock);
1254  cit->second.mClientSocket=-1;
1255  cit->second.mConnected=false;
1256  }
1257  ndevice->mOutputClientStates.clear();
1258  TUNLOCK_E;
1259  // close serversockets
1260  FD_DH("nDevice::Listen(): close server sockets");
1261  for(sit=ndevice->mInputServerStates.begin(); sit!=ndevice->mInputServerStates.end(); sit++) {
1262  int serversock=sit->second.mServerSocket;
1263  if(serversock>0) faudes_closesocket(serversock);
1264  sit->second.mServerSocket=-1;
1265  }
1266  FD_DH("nDevice::Listen(): terminating listen thread");
1267  faudes_thread_exit(NULL);
1268  return NULL;
1269 }
1270 
1271 
1272 
1273 
1274 // reset dynamic faudes state (buffered events, current time)
1275 void nDevice::Reset(void) {
1276  // call base for time and input buffer
1277  vDevice::Reset();
1278  // bail out (do notify clients even when servers incomplete)
1279  if(mState!=Up && mState!=StartUp) return;
1280  // have message
1281  std::string message= "<Cmd> ResetRequest </Cmd>\n";
1282  // send cmd to all my servers
1283  LOCK_E;
1284  std::map<std::string,ServerState>::iterator sit=mInputServerStates.begin();
1285  for(; sit!=mInputServerStates.end(); sit++) {
1286  int serversock=sit->second.mServerSocket;
1287  if(serversock<0) continue;
1288  FD_DH("nDevice::Reset(): sending reset request to socket " << serversock);
1289  syncSend(serversock, message.c_str(), message.length(), 0);
1290  }
1291  UNLOCK_E;
1292 }
1293 
1294 
1295 #endif // configured simplenet
1296 
1297 
1298 } // name space
1299 
1300 
1301 
#define FD_ERR(message)
Debug: report more errors with file/line info.
void faudes_diffsystime(const faudes_systime_t &end, const faudes_systime_t &begin, faudes_systime_t *res)
#define FAUDES_TYPE_IMPLEMENTATION(ftype, ctype, cbase)
faudes type implementation macros, overall
Definition: cfl_types.h:946
Attribute for the configuration of a input or output mapping.
Definition: iop_vdevice.h:68
const AttributeVoid * pOutputPrototype
Output Prototype (set to nontrivial attribute in derived classes)
Definition: iop_vdevice.h:148
void DefaultOutput(void)
Set to default output attribute.
Definition: iop_vdevice.h:97
const AttributeVoid * pInputPrototype
Input Prototype (set to nontrivial attribute in derived classes)
Definition: iop_vdevice.h:151
void DefaultInput(void)
Set to default input attribute.
Definition: iop_vdevice.h:103
Configuration of a networked input or output.
void DoAssign(const AttributeSimplenetEvent &rSrc)
DoAssign.
AttributeSimplenetEvent(void)
Default constructor (no mapping at all)
static const AttributeSimplenetInput * InputPrototypep(void)
Prototype, input (construct on first use static)
static const AttributeSimplenetOutput * OutputPrototypep(void)
Prototype, output (construct on first use static)
Configuration of a network input mapping.
virtual void DoRead(TokenReader &rTr, const std::string &rLabel="", const Type *pContext=0)
Reads the attribute from TokenReader, see AttributeVoid for public wrappers.
Configuration of a network output mapping.
Definition: iop_simplenet.h:96
virtual void DoRead(TokenReader &rTr, const std::string &rLabel="", const Type *pContext=0)
Reads the attribute from TokenReader, see AttributeVoid for public wrappers.
Minimal Attribute.
Auto register faudes-type with specified type name.
Definition: cfl_registry.h:458
Faudes exception class.
Set of indices with symbolic names.
Definition: cfl_nameset.h:69
bool Exists(const Idx &rIndex) const
Test existence of index.
void SymbolicName(Idx index, const std::string &rName)
Set new name for existing index.
Idx Index(const std::string &rName) const
Index lookup.
void RestrictSet(const NameSet &rOtherSet)
Restrict to elements specified by rOtherSet.
Simplenet node address.
Definition: iop_simplenet.h:31
int Port(void) const
Get TCP port.
Definition: iop_simplenet.h:51
bool operator<(const SimplenetAddress &rOther) const
Order for sorting containers of addresses.
std::string IpColonPort(void) const
Get as colon seperated string.
bool Valid(void) const
Return true if valid.
SimplenetAddress(void)
Default constructor.
std::string mIp
Ip address.
Definition: iop_simplenet.h:71
std::string Ip(void) const
Get IP address.
Definition: iop_simplenet.h:48
Set of indices with symbolic names and attributes.
Definition: cfl_nameset.h:564
A TokenReader reads sequential tokens from a file or string.
std::string FileLine(void) const
Return "filename:line".
bool Eos(const std::string &rLabel)
Peek a token and check whether it ends the specified section.
void ReadEnd(const std::string &rLabel)
Close the current section by matching the previous ReadBegin().
std::string ReadString(void)
Read string token.
void ReadBegin(const std::string &rLabel)
Open a section by specified label.
bool Get(Token &token)
Get next token.
bool Peek(Token &token)
Peek next token.
A TokenWriter writes sequential tokens to a file, a string or stdout.
void WriteEnd(const std::string &rLabel)
Write end label.
Tokens model atomic data for stream IO.
Definition: cfl_token.h:53
const std::string & StringValue(void) const
Get string value of a name token.
Definition: cfl_token.cpp:177
@ Begin
<label> (begin of section)
Definition: cfl_token.h:83
bool IsString(void) const
Test token Type.
Definition: cfl_token.cpp:243
bool ExistsAttributeString(const std::string &name)
Test attibute existence.
Definition: cfl_token.cpp:355
bool IsBegin(void) const
Test token Type.
Definition: cfl_token.cpp:258
void SetEmpty(const std::string &rName)
Initialize as empty-tag token.
Definition: cfl_token.cpp:105
void SetBegin(const std::string &rName)
Initialize as Begin token.
Definition: cfl_token.cpp:91
void InsAttributeString(const std::string &name, const std::string &value)
Insert named attribute with string value.
Definition: cfl_token.cpp:309
const std::string & AttributeStringValue(const std::string &name)
Access attribute value.
Definition: cfl_token.cpp:385
TokenType Type(void) const
Get token Type.
Definition: cfl_token.cpp:198
Base class of all libFAUDES objects that participate in the run-time interface.
Definition: cfl_types.h:239
void Read(const std::string &rFileName, const std::string &rLabel="", const Type *pContext=0)
Read configuration data from file with label specified.
Definition: cfl_types.cpp:261
std::string ToString(const std::string &rLabel="", const Type *pContext=0) const
Write configuration data to a string.
Definition: cfl_types.cpp:169
An nDevice implements networked IO via a simple TCP/IP protocol.
void BroadcastAddress(const std::string &rAddr)
Set broadcast address for address resolution Note: you can only set the broadcast address while the d...
void InsNode(const std::string &rNodeName)
Add a node to the network configuration.
void ServerAddress(const std::string &rAddr)
Set server address of this node.
virtual void Compile(void)
Set up internal data structures.
virtual void DoReadPreface(TokenReader &rTr, const std::string &rLabel="", const Type *pContext=0)
Actual method to read device configuration from tokenreader.
virtual void Clear(void)
Clear all configuration.
void InsNodeAddress(const std::string &rNode, const std::string &rAddress)
Add entry to node name resolution.
SimplenetAddress mListenAddress
Simplenet: address of my server incl port (localhost:40000)
TaNameSet< AttributeSimplenetEvent > * pConfiguration
Overall configuration (with actual type)
std::string mNetwork
Simplenet: network id.
std::map< std::string, EventSet > mInputSubscriptions
Compiled data: map subscriptions.
void NetworkName(const std::string &rNetwork)
Set network name to participate.
faudes_thread_t mThreadListen
Background: thread handle (global)
virtual ~nDevice(void)
Explicit destructor.
std::map< std::string, ServerState > mInputServerStates
Background: connection states to event servers (by node name)
void InsOutputEvent(const std::string &event)
Insert event as output event.
int mBroadcastSocket
Background: udp broadcast socket (background only)
void ClearNodes(void)
Add a node to the network configuration.
virtual void DoWritePreface(TokenWriter &rTw, const std::string &rLabel="", const Type *pContext=0) const
Actual method to write the device configuration to a TokenWriter.
std::map< int, ClientState > mOutputClientStates
Background: map sockets to connection states (shared)
faudes_mutex_t mMutex
Background: mutex for below shared variables.
friend void * NDeviceListen(void *)
nDevice(void)
Default constructor.
SimplenetAddress mEffectiveListenAddress
Simplenet: effective address of my server port.
virtual void Stop(void)
Deactivate the device.
virtual void WriteOutput(Idx output)
Run output command.
SimplenetAddress mBroadcastAddress
Simplenet: address for udp broadcast (255.255.255.255:40000.
bool mStopListen
Background: request to join via flag (mutexed)
virtual void Reset(void)
Reset device.
std::map< std::string, std::string > mNetworkNodes
Simplenet: list of nodes in this network incl default addresses.
int mListenSocket
Background: server socket to listen (background only)
virtual void Start(void)
Activate the device.
void InsInputEvent(const std::string &event)
Insert event as input event.
Virtual base class to define the interface for event io.
Definition: iop_vdevice.h:261
bool mResetRequest
Reset request marker (mutexed)
Definition: iop_vdevice.h:806
faudes_mutex_t * pBufferMutex
Actual mutex for input buffer (mutexted)
Definition: iop_vdevice.h:803
virtual void Clear(void)
Clear all configuration.
virtual const EventSet & Inputs(void) const
Get inputs as plain set.
virtual void Compile(void)
Compile inner data-structures.
std::string mDefaultLabel
Default label for token io.
Definition: iop_vdevice.h:721
faudes_cond_t * pWaitCondition
Actual Wait Condition.
Definition: iop_vdevice.h:782
virtual void DoReadPreface(TokenReader &rTr, const std::string &rLabel="", const Type *pContext=0)
Reads non-event-configuration data from TokenReader.
virtual void Stop(void)
Deactivate the device.
virtual void Start(void)
Activate the device.
EventSet * mpConfiguration
Overall event configuration (uses cast for type)
Definition: iop_vdevice.h:761
EventSet mInputs
All inputs.
Definition: iop_vdevice.h:764
EventSet mOutputs
All outputs.
Definition: iop_vdevice.h:767
virtual void Reset(void)
Reset device.
virtual void DoWritePreface(TokenWriter &rTw, const std::string &rLabel="", const Type *pContext=0) const
Writes non-event-configuration data from TokenWriter.
faudes_mutex_t * pWaitMutex
Actual Wait Condition Mutex.
Definition: iop_vdevice.h:779
std::string mName
Name.
Definition: iop_vdevice.h:758
std::deque< Idx > * pInputBuffer
Actual Fifo buffer for input readings.
Definition: iop_vdevice.h:797
DeviceState mState
Status: running, starting etc.
Definition: iop_vdevice.h:770
virtual const EventSet & Outputs(void) const
Get outputs as plain set.
virtual void Clear(void)
Clear all set.
Definition: cfl_baseset.h:1902
NameSet EventSet
Convenience typedef for plain event sets.
Definition: cfl_nameset.h:531
const std::string & Name(void) const
Return name of TBaseSet.
Definition: cfl_baseset.h:1755
#define TUNLOCK_E
#define UNLOCK_E
#define TLOCK_E
#define LOCK_E
Simple networked events via TCP/IP.
#define FD_DHV(message)
Definition: iop_vdevice.h:37
#define FD_DH(message)
Definition: iop_vdevice.h:27
libFAUDES resides within the namespace faudes.
uint32_t Idx
Type definition for index type (allways 32bit)
void * NDeviceListen(void *arg)
Idx ToIdx(const std::string &rString)
Convert a string to Idx.
Definition: cfl_helper.cpp:100
std::string ToStringInteger(Int number)
integer to string
Definition: cfl_helper.cpp:43
int syncSend(int dest, const char *data, int len, int flag)
AutoRegisterType< nDevice > gRtiRegisterSimplenetDevice("SimplenetDevice")
Background: state of a connection to a client (shared)

libFAUDES 2.32b --- 2024.03.01 --- c++ api documentaion by doxygen