iop_simplenet.cpp
Go to the documentation of this file.
1/** @file iop_simplenet.cpp Simple networked events via tcp/ip */
2
3/*
4 FAU Discrete Event Systems Library (libfaudes)
5
6 Copyright (C) 2008, 2024 Thomas Moor
7 Exclusive copyright is granted to Klaus Schmidt
8
9*/
10
11
12
13#include "iop_simplenet.h"
14
15
16namespace faudes {
17
18
19/*
20 **********************************************
21 **********************************************
22 **********************************************
23
24 implementation: SimplenetAddress
25
26 **********************************************
27 **********************************************
28 **********************************************
29 */
30
31// std faudes
32FAUDES_TYPE_IMPLEMENTATION(SimplenetDevice,nDevice,vDevice)
33
34// construct
36 mIp="";
37 mPort=-1;
38}
39
40// copy construct
42 mIp=rOther.mIp;
43 mPort=rOther.mPort;
44}
45
46// construct from string
47SimplenetAddress::SimplenetAddress(const std::string& rString) {
48 IpColonPort(rString);
49}
50
51// validity
52bool SimplenetAddress::Valid(void) const {
53 if(mPort<=0) return false;
54 if(mIp=="") return false;
55 if(mIp.find(':',0)!=std::string::npos) return false;
56 return true;
57}
58
59// get colon seperated string
60std::string SimplenetAddress::IpColonPort(void) const {
61 std::string res;
62 if(!Valid()) return res;
63 res=mIp+":"+ToStringInteger(mPort);
64 return res;
65}
66
67// Set from colon seperated string
68void SimplenetAddress::IpColonPort(std::string ipcolonport) {
69 FD_DHV("SimplenetAddress::IpColonPort(): " << ipcolonport << " --> ?");
70 // invalid
71 mIp="";
72 mPort=-1;
73 // find colon
74 std::size_t cpos = ipcolonport.find(':',0);
75 if(cpos==std::string::npos) return;
76 if(cpos==0) return;
77 if(cpos+1>= ipcolonport.length()) return;
78 // extract ip
79 mIp=ipcolonport.substr(0,cpos);
80 // extract port
81 mPort=ToIdx(ipcolonport.substr(cpos+1));
82 // report
83 FD_DHV("SimplenetAddress::IpColonPort(): " << ipcolonport << " --> " << IpColonPort());
84 // test for errors (might be too strict)
85 if(IpColonPort()!=ipcolonport) {
86 mIp="";
87 mPort=-1;
88 }
89}
90
91// sorting
93 if(this->mIp < rOther.mIp) return true;
94 if(this->mIp > rOther.mIp) return false;
95 if(this->mPort < rOther.mPort) return true;
96 return false;
97}
98
99
100// only compile for simplenet support
101#ifdef FAUDES_IODEVICE_SIMPLENET
102
103/*
104 **********************************************
105 **********************************************
106 **********************************************
107
108 implementation: AttributeSimplenetOutput
109
110 **********************************************
111 **********************************************
112 **********************************************
113 */
114
115// std faudes type
117
118
119//DoWrite(rTw);
120void AttributeSimplenetOutput::DoWrite(TokenWriter& rTw, const std::string& rLabel, const Type* pContext) const {
121 (void) rLabel; (void) pContext;
122}
123
124
125//DoRead(rTr)
126void AttributeSimplenetOutput::DoRead(TokenReader& rTr, const std::string& rLabel, const Type* pContext) {
127 (void) rLabel; (void) pContext;
128 // report
129 FD_DHV("AttributeSimplenetOutput(" << this << ")::DoRead(tr)");
130
131 // sense and digest pre 2.16 format
132 Token token;
133 rTr.Peek(token);
134 if(token.Type()==Token::Begin)
135 if(token.StringValue()=="Output") {
136 rTr.ReadBegin("Output");
137 rTr.ReadEnd("Output");
138 }
139}
140
141
142/*
143 **********************************************
144 **********************************************
145 **********************************************
146
147 implementation: AttributeSimplenetInput
148
149 **********************************************
150 **********************************************
151 **********************************************
152 */
153
154
155// std faudes type
157
158
159//DoWrite(rTw);
160void AttributeSimplenetInput::DoWrite(TokenWriter& rTw, const std::string& rLabel, const Type* pContext) const {
161 (void) rLabel; (void) pContext;
162}
163
164
165//DoRead(rTr)
166void AttributeSimplenetInput::DoRead(TokenReader& rTr, const std::string& rLabel, const Type* pContext) {
167 (void) rLabel; (void) pContext;
168 // report
169 FD_DHV("AttributeSimplenetInput(" << this << ")::DoRead(tr)");
170
171 // sense and digest pre 2.16 format
172 Token token;
173 rTr.Peek(token);
174 if(token.Type()==Token::Begin)
175 if(token.StringValue()=="Input") {
176 rTr.ReadBegin("Input");
177 rTr.ReadEnd("Input");
178 }
179}
180
181
182
183
184/*
185 **********************************************
186 **********************************************
187 **********************************************
188
189 implementation: AttributeSimplenetEvent
190
191 **********************************************
192 **********************************************
193 **********************************************
194 */
195
196
197
198// std faudes type
200
201// Default constructor, set my prototypes
203 FD_DHV("AttributeSimplenetEvent::AttributeSimplenetEvent(" << this << ")");;
204 pOutputPrototype=OutputPrototypep();
205 pInputPrototype=InputPrototypep();
206}
207
208// Copy constructor
211{
212 FD_DHV("AttributeSimplenetEvent(" << this << "): form other attr " << &rOtherAttr);
215 DoCopy(rOtherAttr);
216}
217
218
219// pseudo statics
224
225// pseudo statics
230
231
232/*
233 **********************************************
234 **********************************************
235 **********************************************
236
237 implementation: nDevice
238
239 **********************************************
240 **********************************************
241 **********************************************
242 */
243
244// autoregister
246
247// helper: send entire buffer
248int syncSend(int dest, const char* data, int len, int flag) {
249 int from=0;
250 int left=len;
251 while(left>0) {
252 int rc=send(dest, data+from, left, 0);
253 if(rc<0) {
254 std::stringstream errstr;
255 errstr << "Simplenet fatal network error (cannot send message)";
256 throw Exception("nDevice::syncSend", errstr.str(), 553, true); // mute console out
257 }
258 left-=rc;
259 }
260 return len;
261}
262
263
264// constructor
266 FD_DHV("nDevice(" << this << ")::nDevice()");
269 // default token section
270 mDefaultLabel="SimplenetDevice";
271 // default network data
272 mName="SimplenetNode";
273 mNetwork="Simplenet";
274 mListenAddress.IpColonPort("localhost:40000");
275 mBroadcastAddress.IpColonPort("255.255.255.255:40000");
276 // clear/prepare state
277 faudes_mutex_init(&mMutex);
278 mListenSocket=-1;
280}
281
282// copy construct
283nDevice::nDevice(const nDevice& rOther) : nDevice() {
284
285}
286
287// destructor
289 FD_DHV("nDevice(" << this << ")::~nDevice()");
290 // stop
291 Stop();
292 // clean up my data
293 faudes_mutex_destroy(&mMutex);
294}
295
296// assign configuration
297void nDevice::DoCopy(const nDevice& rOther) {
298 FD_DHV("nDevice(" << this << ")::DoCopy()");
299 // call base, incl stop
301 // copy my config data members
302 *pConfiguration= *(rOther.pConfiguration);
303 mNetwork = rOther.mNetwork;
307 // compiled data
308 Compile();
309}
310
311// assign configuration
313 DoCopy(rOther);
314}
315
316// clear all configuration
317void nDevice::Clear(void) {
318 FD_DHV("nDevice(" << this << ")::Clear()");
319 // call base, incl virtial Stop and Reset
321 // clear compiled data
322 mInputSubscriptions.clear();
323}
324
325
326// programmatic config: server address
327void nDevice::ServerAddress(const std::string& rAddr) {
328 if(mState!=Down) return;
330}
331
332// programmatic config: broadcast address
333void nDevice::BroadcastAddress(const std::string& rAddr) {
334 if(mState!=Down) return;
336}
337
338
339// programmatic config: network name
340void nDevice::NetworkName(const std::string& rNetwork) {
341 if(mState!=Down) return;
342 mNetwork=rNetwork;
343}
344
345// programmatic config: insert node name
346void nDevice::InsNode(const std::string& rNodeName) {
347 if(mState!=Down) return;
348 mNetworkNodes[rNodeName]="unknown:0";
349}
350
351// programmatic config: insert node address
352void nDevice::InsNodeAddress(const std::string& rNodeName, const std::string& rNodeAddress) {
353 if(mState!=Down) return;
354 mNetworkNodes[rNodeName]=rNodeAddress;
355}
356
357// programmatic config: clear known nodes
359 if(mState!=Down) return;
360 mNetworkNodes.clear();
361}
362
363// programmatic config: insert input event
364void nDevice::InsInputEvent(const std::string& event) {
365 if(mState!=Down) return;
367 inp.DefaultInput();
368 Idx ev=pConfiguration->Insert(event);
369 pConfiguration->Attribute(ev, inp);
370}
371
372// programmatic config: insert output event
373void nDevice::InsOutputEvent(const std::string& event) {
374 if(mState!=Down) return;
376 outp.DefaultOutput();
377 Idx ev=pConfiguration->Insert(event);
378 pConfiguration->Attribute(ev, outp);
379}
380
381
382//Compile(void)
384 //setup up internal data structure
385 FD_DHV("nDevice(" << this << ")::Compile()");
386 // call base
388}
389
390
391//DoWritePreface(rTr,rLabel)
392void nDevice::DoWritePreface(TokenWriter& rTw, const std::string& rLabel, const Type* pContext) const {
393 FD_DHV("nDevice::DoWrite()");
394 //call base
395 vDevice::DoWritePreface(rTw,rLabel,pContext);
396 // write my data: my server role ip address
397 Token vtag;
398 vtag.SetEmpty("ServerAddress");
400 rTw<<vtag;
401 // write my data: my server broadcast address
402 Token btag;
403 btag.SetEmpty("BroadcastAddress");
405 rTw<<btag;
406 // write my data network topology
407 Token ntag;
408 ntag.SetBegin("Network");
409 ntag.InsAttributeString("name",mNetwork);
410 rTw<<ntag;
411 std::map<std::string,std::string>::const_iterator nit;
412 for(nit=mNetworkNodes.begin();nit!=mNetworkNodes.end();nit++) {
413 vtag.SetEmpty("Node");
414 vtag.InsAttributeString("name",nit->first);
415 SimplenetAddress defaddress(nit->second);
416 if(defaddress.Valid())
417 vtag.InsAttributeString("address",nit->second);
418 rTw<<vtag;
419 }
420 rTw.WriteEnd("Network");
421}
422
423//DoRead(rTr,rLabel)
424void nDevice::DoReadPreface(TokenReader& rTr, const std::string& rLabel, const Type* pContext) {
425 FD_DHV("nDevice::DoReadPreface()");
426 // call base (reads name and timescale)
427 vDevice::DoReadPreface(rTr,rLabel,pContext);
428
429 // sense and digest pre 2.16 format
430 Token token;
431 rTr.Peek(token);
432 if(token.IsString()) {
434 if(!mListenAddress.Valid()) {
435 std::stringstream errstr;
436 errstr << "Simplenet address expected at " << rTr.FileLine();
437 throw Exception("nDevice::DoRead", errstr.str(), 50);
438 }
439 mNetwork=rTr.ReadString();
440 mNetworkNodes.clear();
441 rTr.ReadBegin("Network");
442 while(!rTr.Eos("Network")) {
443 mNetworkNodes[rTr.ReadString()]="unknown:0";
444 }
445 rTr.ReadEnd("Network");
446 return;
447 }
448
449 // read my data: server address
450 Token atag;
451 rTr.ReadBegin("ServerAddress",atag);
453 if(!mListenAddress.Valid()) {
454 std::stringstream errstr;
455 errstr << "Simplenet address expected at " << rTr.FileLine();
456 throw Exception("nDevice::DoRead", errstr.str(), 50);
457 }
458 rTr.ReadEnd("ServerAddress");
459 // read my data: broadcast address (optional)
460 mBroadcastAddress.IpColonPort("255.255.255.255:40000");
461 rTr.Peek(token);
462 if(token.IsBegin("BroadcastAddress")) {
463 rTr.ReadBegin("BroadcastAddress",atag);
465 if(!mBroadcastAddress.Valid()) {
466 std::stringstream errstr;
467 errstr << "Simplenet address expected at " << rTr.FileLine();
468 throw Exception("nDevice::DoRead", errstr.str(), 50);
469 }
470 rTr.ReadEnd("BroadcastAddress");
471 }
472 // read my data: network
473 Token ntag;
474 rTr.ReadBegin("Network",ntag);
475 mNetwork=ntag.AttributeStringValue("name");
476 // loop network nodes
477 while(!rTr.Eos("Network")) {
478 rTr.ReadBegin("Node",ntag);
479 if(!ntag.ExistsAttributeString("name")) {
480 std::stringstream errstr;
481 errstr << "Simplenet node name expected at " << rTr.FileLine();
482 throw Exception("nDevice::DoRead", errstr.str(), 50);
483 }
484 std::string node=ntag.AttributeStringValue("name");
485 InsNode(node);
486 // undocumented feature: explicit server addresses in dev file; tmoor 20121113
487 if(ntag.ExistsAttributeString("address")) {
488 SimplenetAddress defaddress;
489 defaddress.IpColonPort(ntag.AttributeStringValue("address"));
490 if(!defaddress.Valid()) {
491 std::stringstream errstr;
492 errstr << "Simplenet node address expected at " << rTr.FileLine();
493 throw Exception("nDevice::DoRead", errstr.str(), 50);
494 }
495 mNetworkNodes[node]=defaddress.IpColonPort();
496 }
497 rTr.ReadEnd("Node");
498 }
499 rTr.ReadEnd("Network");
500}
501
502
503// lock - unlock shortcuts;
504#define LOCK_E {int rc = faudes_mutex_lock(&mMutex); \
505 if(rc) {FD_ERR("nDevice::LOCK_E: lock mutex error\n"); exit(1); }}
506#define UNLOCK_E {int rc = faudes_mutex_unlock(&mMutex); \
507 if(rc) {FD_ERR("nDevice::LOCK_E: unlock mutex error\n"); exit(1); }}
508#define TLOCK_E {int rc = faudes_mutex_lock(&ndevice->mMutex); \
509 if(rc) {FD_ERR("nDevice::TLOCK_E: lock mutex error\n"); exit(1); }}
510#define TUNLOCK_E {int rc = faudes_mutex_unlock(&ndevice->mMutex); \
511 if(rc) {FD_ERR("nDevice::TLOCK_E: unlock mutex error\n"); exit(1); }}
512
513
514// Write Output
516
517 FD_DHV("nDevice::WriteOutput(" << mOutputs.SymbolicName(output) << ")");
518
519 // bail out (do notify clients even when servers incomplete)
520 if(mState!=Up && mState!=StartUp) return;
521
522 // test event
523 if(!mOutputs.Exists(output)) {
524 std::stringstream errstr;
525 errstr << "Unknown output event " << output;
526 throw Exception("nDevice::WriteOutput", errstr.str(), 65);
527 }
528
529 // find properties
530 const AttributeSimplenetOutput* aattr = pConfiguration->Attribute(output).Outputp();
531 if(!aattr) {
532 std::stringstream errstr;
533 errstr << "Invalid output attribute " << output;
534 throw Exception("nDevice::WriteOutput", errstr.str(), 65);
535 }
536
537 // report
538 std::string message= "<Notify> " + mOutputs.SymbolicName(output) + " </Notify>\n";
539 FD_DHV("nDevice::WriteOutput(): message: " << message.substr(0,message.length()-1));
540
541 // send event to those clients that did subscribe
542 LOCK_E;
543 int clientsock=-1;
544 try {
545 std::map<int,ClientState>::iterator sit=mOutputClientStates.begin();
546 for(;sit!=mOutputClientStates.end();sit++) {
547 if(!sit->second.mEvents.Empty())
548 if(!sit->second.mEvents.Exists(output))
549 continue;
550 clientsock=sit->second.mClientSocket;
551 if(clientsock>0) {
552 FD_DHV("nDevice::WriteOutput(): to socket " << clientsock);
553 syncSend(clientsock, message.c_str(), message.length(), 0);
554 }
555 }
556 } catch (faudes::Exception&) {
557 FD_DH("nDevice::WriteOutput(): failed to notify client on socket " << clientsock);
558 }
559 UNLOCK_E;
560 FD_DHV("nDevice::WriteOutput(): done");
561}
562
563
564
565// Start(void)
566void nDevice::Start(void) {
567 if(mState!=Down) return;
568 FD_DH("nDevice(" << mName <<")::Start()");
569 // call base
572 // clear event server states
573 mInputServerStates.clear();
574 std::map<std::string,std::string>::iterator nit;
575 for(nit=mNetworkNodes.begin(); nit!=mNetworkNodes.end();nit++) {
576 if(nit->first == mName) continue;
577 mInputServerStates[nit->first].mAddress= SimplenetAddress(nit->second);
578 mInputServerStates[nit->first].mEvents= EventSet();
579 mInputServerStates[nit->first].mServerSocket=-1;
580 mInputServerStates[nit->first].mLineBuffer="";
581 }
582 // clear client states
583 mOutputClientStates.clear();
584 // set my effective address
585 char hostname[1024];
586 int hostname_len =1023;
587 if(gethostname(hostname,hostname_len)!=0) {
588 std::stringstream errstr;
589 errstr << "Simplenet fatal network error (cannot get hostname)";
590 throw Exception("nDevice::Start", errstr.str(), 553);
591 }
592 hostname[hostname_len]=0;
594 mEffectiveListenAddress.Ip(hostname);
595 FD_DH("nDevice::Start(): server adress " << mEffectiveListenAddress.IpColonPort());
596 // open a tcp port to listen: create socket
597 mListenSocket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
598 if(mListenSocket<=0) {
599 std::stringstream errstr;
600 errstr << "Simplenet fatal network error (cannot open server socket)";
601 throw Exception("nDevice::Start", errstr.str(), 553);
602 }
603 int reuse=1;
604 faudes_setsockopt(mListenSocket,SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
605 // open a tcp port to listen: set up address
606 struct sockaddr_in serveraddr;
607 memset(&serveraddr, 0, sizeof(serveraddr));
608 serveraddr.sin_family = AF_INET;
609 serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
610 serveraddr.sin_port = htons(mListenAddress.Port());
611 // open a tcp port to listen: bind socket to address
612 if(bind(mListenSocket, (struct sockaddr *) &serveraddr,sizeof(serveraddr)) <0) {
613 std::stringstream errstr;
614 errstr << "Simplenet fatal network error (cannot bind socket)";
615 throw Exception("nDevice::Start", errstr.str(), 553);
616 }
617 // open a tcp port to listen: start to listen
618 if(listen(mListenSocket, 77) < 0) { // todo: max pending connections
619 std::stringstream errstr;
620 errstr << "Simplenet fatal network error (cannot listen from socket)";
621 throw Exception("nDevice::Start", errstr.str(), 553);
622 }
623 // open a udp port to listen: create socket
624 mBroadcastSocket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
625 if(mBroadcastSocket<=0) {
626 std::stringstream errstr;
627 errstr << "Simplenet fatal network error (cannot open broadcast socket)";
628 throw Exception("nDevice::Start", errstr.str(), 553);
629 }
630 //int reuse=1;
631 faudes_setsockopt(mBroadcastSocket,SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
632 faudes_setsockopt(mBroadcastSocket,SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse));
633 // open a udp port: enable broadcast
634 // int reuse
635 if(faudes_setsockopt(mBroadcastSocket, SOL_SOCKET, SO_BROADCAST, &reuse, sizeof(reuse)) ) {
636 std::stringstream errstr;
637 errstr << "Simplenet fatal network error (cannot setopt broadcast socket)";
638 throw Exception("nDevice::Start", errstr.str(), 553);
639 }
640 // open a udp port to listen: set up address
641 struct sockaddr_in broadcastaddr;
642 memset(&broadcastaddr, 0, sizeof(broadcastaddr));
643 broadcastaddr.sin_family = AF_INET;
644 broadcastaddr.sin_addr.s_addr = htonl(INADDR_ANY);
645 broadcastaddr.sin_port = htons(mBroadcastAddress.Port());
646 // open a udp port to listen: bind socket to address
647 if(bind(mBroadcastSocket, (struct sockaddr *) &broadcastaddr,sizeof(broadcastaddr)) <0) {
648 std::stringstream errstr;
649 errstr << "Simplenet fatal network error (cannot bind broadcast socket)";
650 throw Exception("nDevice::Start", errstr.str(), 553);
651 }
652 // start background thread to listen: create & run thread
653 mStopListen=false;
654 int rc = faudes_thread_create(&mThreadListen, NDeviceListen, this);
655 // thread error
656 if(rc) {
657 std::stringstream errstr;
658 errstr << "Simplenet fatal thread error (cannot create thread)";
659 throw Exception("nDevice::Start", errstr.str(), 554);
660 }
661}
662
663// Stop(void)
664void nDevice::Stop(void) {
665 // bail out
666 if(mState!=Up && mState!=StartUp) return;
667 FD_DH("nDevice::Stop()");
668 LOCK_E;
669 // stop background threads
670 mStopListen=true;
671 UNLOCK_E;
672 // signal update to my listen thread: via udp message ... release select
673 std::string message= "<Stop> " + mNetwork + " " + mName + " </Stop>\n";
674 struct sockaddr_in broadcastaddr;
675 memset(&broadcastaddr, '\0', sizeof(broadcastaddr));
676 broadcastaddr.sin_family=AF_INET;
677 broadcastaddr.sin_port=htons(mBroadcastAddress.Port());
678 broadcastaddr.sin_addr.s_addr=inet_addr(mBroadcastAddress.Ip().c_str());
679 LOCK_E;
680 sendto(mBroadcastSocket,message.c_str(),message.length(),0,
681 (struct sockaddr *) & broadcastaddr, sizeof(broadcastaddr));
682 UNLOCK_E;
683 // wait until listen thread finished
684 FD_DH("nDevice::Stop(): waiting for listen thread");
685 faudes_thread_join(mThreadListen, NULL);
686 FD_DH("nDevice::Stop(): listen thread finished");
687 // close broadcast socket
688 shutdown(mBroadcastSocket,2);
689 faudes_closesocket(mBroadcastSocket);
691 // close server socket
692 shutdown(mListenSocket,2);
693 faudes_closesocket(mListenSocket);
694 mListenSocket=-1;
695 // call base (implies reset)
697}
698
699
700
701// background thread,
702// - receiving requests on broadcast port
703// - sending requests broadcats
704// - accept connections on listen port for output clients
705// - notify connected output clients about events
706// - connecting to input servers to receiving notifications
707void* NDeviceListen(void* arg){
708 bool term;
709 std::map<std::string,nDevice::ServerState>::iterator sit;
710 std::map<int,nDevice::ClientState>::iterator cit;
711 // cast this object
712 nDevice* ndevice= static_cast<nDevice*>(arg);
713 // say hello
714 FD_DH("nDevice::Listen(" << ndevice << ")");
715 // clear broadcast time stamp
716 faudes_systime_t lastbroadcast;
717 lastbroadcast.tv_sec=0;
718 lastbroadcast.tv_nsec=0;
719#ifdef FAUDES_DEBUG_IODEVICE
720 // clear debugging time stamp
721 int debuglisten=0;
722#endif
723
724 // infinite loop
725 while(true){
726
727 // detect missing servers
728 int servermis=0;
729 int serverunknown=0;
730 for(sit=ndevice->mInputServerStates.begin(); sit!=ndevice->mInputServerStates.end(); sit++) {
731 // have no address?
732 if(!sit->second.mAddress.Valid()) {
733 FD_DH("nDevice::Listen(): missing server address for node: " << sit->first);
734 serverunknown++;
735 servermis++;
736 continue;
737 }
738 // have no connection
739 if(sit->second.mServerSocket<=0) {
740 FD_DH("nDevice::Listen(): missing server connection for node: " << sit->first);
741 servermis++;
742 continue;
743 }
744 }
745
746 // detect missing clients (trust by number, should ask nodename on subscription)
747 int clientcnt=0;
748 for(cit=ndevice->mOutputClientStates.begin(); cit!=ndevice->mOutputClientStates.end(); cit++) {
749 if(cit->second.mClientSocket<0) continue;
750 if(cit->second.mConnected) ++clientcnt;
751 }
752 int clientmis = ndevice->mNetworkNodes.size() - (clientcnt +1); // +1 is myself
753#ifdef FAUDES_DEBUG_IODEVICE
754 if( clientmis !=servermis)
755 FD_DH("nDevice::Listen(): subscribed clients #"<< clientcnt << "/" << ndevice->mNetworkNodes.size()-1);
756#endif
757
758 // update state
759 if((servermis>0 || clientmis>0) && ndevice->mState==vDevice::Up) {
760 TLOCK_E;
761 ndevice->mState=vDevice::StartUp;
762 TUNLOCK_E;
763 }
764 if(servermis==0 && clientmis==0 && ndevice->mState==vDevice::StartUp) {
765 TLOCK_E;
766 ndevice->mState=vDevice::Up;
767 TUNLOCK_E;
768 }
769
770
771 // try to find input servers
772 if(serverunknown>0 && ndevice->mState==vDevice::StartUp) {
773 // is a broadcast due? (period 5sec)
774 faudes_systime_t now;
775 faudes_gettimeofday(&now);
776 faudes_mstime_t diffms;
777 faudes_diffsystime(now,lastbroadcast,&diffms);
778 if(diffms>5000) {
779 // udp message
780 std::string message= "<Request> "
781 + ndevice->mNetwork + " " + ndevice->mName + " </Request>\n";
782 // udp broadcast
783 struct sockaddr_in broadcastaddr;
784 memset(&broadcastaddr, '\0', sizeof(broadcastaddr));
785 broadcastaddr.sin_family=AF_INET;
786 broadcastaddr.sin_port=htons(ndevice->mBroadcastAddress.Port());
787 broadcastaddr.sin_addr.s_addr=inet_addr(ndevice->mBroadcastAddress.Ip().c_str());
788 TLOCK_E;
789 int rc=sendto(ndevice->mBroadcastSocket,message.c_str(),message.length(),
790 0,(struct sockaddr *) & broadcastaddr, sizeof(broadcastaddr));
791 (void) rc;
792 FD_DH("nDevice::Listen(): broadcast request: " << message.substr(0,message.length()-1) << " #" << rc);
793 TUNLOCK_E;
794 faudes_gettimeofday(&lastbroadcast);
795 }
796 }
797
798 // subscribe to missing servers
799 for(sit=ndevice->mInputServerStates.begin(); sit!=ndevice->mInputServerStates.end(); sit++) {
800 // have active connection?
801 if(sit->second.mServerSocket>0) continue;
802 // have no address?
803 if(!sit->second.mAddress.Valid()) continue;
804 // try to connect
805 FD_DH("nDevice::Listen(): subscribing to " << sit->first <<
806 " at " << sit->second.mAddress.IpColonPort());
807 // open a tcp port: create socket
808 int serversock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
809 if(serversock<=0) {
810 FD_DH("nDevice::Listen(): subscription failed: no socket");
811 continue;
812 }
813 // open a tcp port: set up internet address
814 unsigned long int serverinaddr = INADDR_NONE;
815 if(serverinaddr==INADDR_NONE) {
816 FD_DH("nDevice::Listen(): using address as advertised");
817 serverinaddr = inet_addr(sit->second.mAddress.Ip().c_str());
818 }
819 if(serverinaddr==INADDR_NONE) {
820 struct hostent *host;
821 host = gethostbyname(sit->second.mAddress.Ip().c_str());
822 if(host!=0) {
823 FD_DH("nDevice::Listen(): using address by name lookup");
824 serverinaddr = *(unsigned long int*) host->h_addr;
825 }
826 }
827 if(serverinaddr==INADDR_NONE) {
828 FD_DH("nDevice::Listen(): subscription failed: invalid address " << sit->second.mAddress.Ip());
829 faudes_closesocket(serversock);
830 continue;
831 }
832 // open a tcp port: set up socket address
833 struct sockaddr_in serveraddress;
834 memset(&serveraddress, 0, sizeof(serveraddress));
835 serveraddress.sin_family = AF_INET;
836 serveraddress.sin_addr.s_addr=serverinaddr;
837 serveraddress.sin_port = htons(sit->second.mAddress.Port());
838 // open a tcp port: connect
839 if(connect(serversock, (struct sockaddr*) &serveraddress, sizeof(serveraddress))<0) {
840 FD_DH("nDevice::Listen(): subscription failed: connect");
841 faudes_closesocket(serversock);
842 continue;
843 }
844 // say hello to remote input server
845 try {
846 std::string hello;
847 hello="% Simplenet universal event subscription: "+ndevice->mName+" subscribing from "+sit->first+"\n";
848 syncSend(serversock, hello.c_str(), hello.length(), 0);
849 hello="% Expecting notifications in format '<Notify> event_name </Notify>'\n";
850 syncSend(serversock, hello.c_str(), hello.length(), 0);
851 hello="% Trying to subscribe to all required events\n";
852 syncSend(serversock, hello.c_str(), hello.length(), 0);
853 } catch (faudes::Exception&) {
854 faudes_closesocket(serversock);
855 serversock=-1;
856 }
857 if(serversock<0) {
858 FD_DH("nDevice::Listen(): subscription failed: cannot write");
859 faudes_closesocket(serversock);
860 continue;
861 }
862 // record success
863 FD_DH("nDevice::Listen(): subscribing to " << sit->first << " via socket " << serversock);
864 sit->second.mServerSocket=serversock;
865 // subscribe to all input events
866 EventSet sevents=ndevice->Inputs();
867 sevents.Name("Subscribe");
868 std::string message=sevents.ToString() + "\n";
869 syncSend(serversock,message.c_str(), message.length(),0);
870 // used to get info in pre 2.22h
871 /*
872 hello="% Going to Sending info command, explicit subscription may follow\n";
873 hello="<Cmd> Info </Cmd>\n";
874 syncSend(serversock, hello.c_str(), hello.length(), 0);
875 */
876 FD_DH("nDevice::Listen(): subscribing to " << sit->first << " via socket " << serversock << ": subscription requested sent");
877 }
878
879
880 // prepare relevant wait on sources ...
881 fd_set mysocks;
882 int mysocks_max=0;
883 FD_ZERO(&mysocks);
884 // ... my server listen socket, expecting other nodes to connect and subscribe
885 if(mysocks_max<ndevice->mListenSocket) mysocks_max=ndevice->mListenSocket;
886 if(mysocks_max>= FD_SETSIZE) FD_ERR("NDeviceListen: fail to select socket " << mysocks_max);
887 FD_SET(ndevice->mListenSocket, &mysocks);
888 // ... udp port, expecting requests and adverts
889 if(mysocks_max< ndevice->mBroadcastSocket) mysocks_max=ndevice->mBroadcastSocket;
890 if(mysocks_max>= FD_SETSIZE) FD_ERR("NDeviceListen: fail to select socket " << mysocks_max);
891 FD_SET(ndevice->mBroadcastSocket, &mysocks);
892 // ... input server connections, expecting notifications
893 for(sit=ndevice->mInputServerStates.begin(); sit!=ndevice->mInputServerStates.end(); sit++) {
894 int serversock=sit->second.mServerSocket;
895 if(serversock<0) continue;
896 if(mysocks_max< serversock) mysocks_max=serversock;
897 if(mysocks_max>= FD_SETSIZE) FD_ERR("NDeviceListen: fail to select socket " << mysocks_max);
898 FD_SET(serversock, &mysocks);
899 }
900 // ... output client connections, expecting commands
901 for(cit=ndevice->mOutputClientStates.begin(); cit!=ndevice->mOutputClientStates.end(); cit++) {
902 int clientsock=cit->second.mClientSocket;
903 if(clientsock<0) continue;
904 if(mysocks_max< clientsock) mysocks_max=clientsock;
905 if(mysocks_max>= FD_SETSIZE) FD_ERR("NDeviceListen: fail to select socket " << mysocks_max);
906 FD_SET(clientsock, &mysocks);
907 }
908
909 // wait for traffic with moderate timeout
910 struct timeval tv;
911 tv.tv_sec = 1;
912 tv.tv_usec = 0;
913 int avail=select(mysocks_max+1, &mysocks, NULL, NULL, &tv);
914
915 // terminate thread on request (before accepting incomming connections)
916 TLOCK_E;
917 term= ndevice->mStopListen;
918 TUNLOCK_E;
919 if(term) break;
920
921 // reduce debugging output
922 #ifdef FAUDES_DEBUG_IODEVICE
923 debuglisten++;
924 if((debuglisten>10) || (avail>0)) {
925 FD_DH("nDevice::Listen(): listen as node \"" << ndevice->mName << "\" on network \"" << ndevice->mNetwork << "\"" << " #" << avail);
926 debuglisten=0;
927 }
928 #endif
929
930 // handle incomming connection requests
931 if(avail>0)
932 if(FD_ISSET(ndevice->mListenSocket,&mysocks)) {
933 avail--;
934 int clientsock=-1;
935 struct sockaddr_in clientaddr;
936 socklen_t clientaddr_len = sizeof(clientaddr);
937 clientsock=accept(ndevice->mListenSocket, (struct sockaddr *) &clientaddr, &clientaddr_len );
938 if(clientsock<0) {
939 FD_DH("nDevice::Listen(): failed to accept incomming connection");
940 break;
941 }
942 FD_DH("nDevice::Listen(): accepted connection from client " << inet_ntoa(clientaddr.sin_addr) <<
943 " on socket " << clientsock);
944 // say hello
945 try {
946 std::string hello;
947 hello="% Simplenet Event Server: "+ndevice->mName+" providing events\n";
948 syncSend(clientsock, hello.c_str(), hello.length(), 0);
949 hello="% Notifications will have format '<Notify> event_name </Notify>'\n";
950 syncSend(clientsock, hello.c_str(), hello.length(), 0);
951 hello="% Commands are accepted in format '<Cmd> cmd_name </Cmd>'\n";
952 syncSend(clientsock, hello.c_str(), hello.length(), 0);
953 hello="% Supported commands are Subscribe, Info, Status, and ResetRequest\n";
954 syncSend(clientsock, hello.c_str(), hello.length(), 0);
955 } catch (faudes::Exception&) {
956 faudes_closesocket(clientsock);
957 clientsock=-1;
958 }
959 if(clientsock<0) {
960 FD_DH("nDevice::Listen(): connection test failed: cannot write");
961 faudes_closesocket(clientsock);
962 break;
963 }
964 // record client by socket
965 TLOCK_E;
966 nDevice::ClientState* cstate = &ndevice->mOutputClientStates[clientsock];
967 cstate->mClientSocket=clientsock;
968 cstate->mEvents.Clear();
969 cstate->mConnected=false;
970 cstate->mLineBuffer="";
971 TUNLOCK_E;
972 }
973
974 // handle incomming broadcast
975 if(avail>0)
976 if(FD_ISSET(ndevice->mBroadcastSocket,&mysocks)) {
977 avail--;
978 // get message
979 char data[1024];
980 int data_len=1023;
981 struct sockaddr_in fromaddr;
982 socklen_t fromaddr_len = sizeof(fromaddr);
983 data_len=recvfrom(ndevice->mBroadcastSocket,data,data_len,0, (struct sockaddr*) &fromaddr,&fromaddr_len);
984 if(data_len<0) data_len=0; // todo: eof
985 data[data_len]=0;
986 if(data_len>=1) if(data[data_len-1]=='\n') data[data_len-1]=0;
987 FD_DH("nDevice::Listen(): received udp datagram " << data <<
988 " from " << inet_ntoa(fromaddr.sin_addr));
989 // interpret message
990 TokenReader tr(TokenReader::String,std::string(data));
991 try {
992 Token token;
993 tr.Peek(token);
994 // interpret udp request ...
995 if(token.IsBegin("Request")) {
996 tr.ReadBegin("Request");
997 if(tr.ReadString()==ndevice->mNetwork) {
998 // extension 2.22i: identify sender (optional for compatibility)
999 std::string snode;
1000 Token stoken;
1001 tr.Peek(stoken);
1002 if(stoken.IsString()) snode=stoken.StringValue();
1003 // extension 2.22i: if this is a missing server, reset my request timer
1004 sit=ndevice->mInputServerStates.find(snode);
1005 if(sit!=ndevice->mInputServerStates.end()) {
1006 if(sit->second.mServerSocket==-1) {
1007 lastbroadcast.tv_sec=0;
1008 lastbroadcast.tv_nsec=0;
1009 }
1010 }
1011 // extension 2.22i: ignore my own requests
1012 if(snode!=ndevice->mName) {
1013 // set up advert
1014 std::string message= "<Advert> "
1015 + ndevice->mNetwork + " "
1016 + ndevice->mName + " " +
1017 ndevice->mEffectiveListenAddress.IpColonPort()+ " </Advert>\n";
1018 // udp reply
1019 struct sockaddr_in replyaddr;
1020 memset(&replyaddr, '\0', sizeof(replyaddr));
1021 replyaddr.sin_family=AF_INET;
1022 replyaddr.sin_port=htons(ndevice->mBroadcastAddress.Port());
1023 //replyaddr.sin_addr.s_addr=fromaddr.sin_addr.s_addr;
1024 replyaddr.sin_addr.s_addr=inet_addr(ndevice->mBroadcastAddress.Ip().c_str());
1025 //replyaddr.sin_addr.s_addr=htonl(INADDR_BROADCAST);
1026 TLOCK_E;
1027 int rc = sendto(ndevice->mBroadcastSocket,message.c_str(),message.length(),0,(struct sockaddr *) & replyaddr, sizeof(replyaddr));
1028 TUNLOCK_E
1029 FD_DH("nDevice::Listen(): reply advert: " << message.substr(0,message.length()-1) << " #" << rc);
1030 } else {
1031 FD_DH("nDevice::Listen(): ingoring request from myself");
1032 }
1033 } else {
1034 FD_DH("nDevice::Listen(): ingoring request from other network");
1035 }
1036 }
1037 // interpret udp advert ...
1038 if(token.IsBegin("Advert")) {
1039 tr.ReadBegin("Advert");
1040 if(tr.ReadString()==ndevice->mNetwork) {
1041 std::string node = tr.ReadString();
1042 std::string saddr = tr.ReadString();
1043 SimplenetAddress addr(saddr);
1044 addr.Ip(inet_ntoa(fromaddr.sin_addr)); // overwrite with actual ip
1045 FD_DHV("nDevice::Listen(): figure actual ip address " << addr.Ip());
1046 if(!addr.Valid()) {
1047 addr=saddr;
1048 FD_DH("nDevice::Listen(): fallback to explicit ip address " << addr.Ip());
1049 }
1050 std::map<std::string,nDevice::ServerState>::iterator sit;
1051 sit=ndevice->mInputServerStates.find(node);
1052 if(sit==ndevice->mInputServerStates.end()) {
1053 FD_DH("nDevice::Listen(): ignoring irrelevant advert from " << node);
1054 } else if(sit->second.mAddress.Valid()) {
1055 FD_DH("nDevice::Listen(): ignoring address overwrite (hardwired?) " << node);
1056 }
1057 if(sit!=ndevice->mInputServerStates.end())
1058 if(!sit->second.mAddress.Valid()) {
1059 FD_DH("nDevice::Listen(): accept advert " << node);
1060 sit->second.mAddress=addr;
1061 if(sit->second.mServerSocket>=0) faudes_closesocket(sit->second.mServerSocket);
1062 sit->second.mServerSocket=-1;
1063 }
1064 } else {
1065 FD_DH("nDevice::Listen(): ingoring advert from other network");
1066 }
1067 }
1068 } catch (faudes::Exception&) {
1069 FD_DH("nDevice::Listen(): ignore invalid udp message");
1070 }
1071 }
1072
1073
1074 // handle input servers: receive event notification
1075 int revcount=0;
1076 if(avail>0)
1077 for(sit=ndevice->mInputServerStates.begin(); sit!=ndevice->mInputServerStates.end(); sit++) {
1078 int serversock=sit->second.mServerSocket;
1079 if(serversock<0) continue;
1080 if(FD_ISSET(serversock, &mysocks)) {
1081 avail--;
1082 FD_DH("nDevice::Listen(): reading sock " << serversock);
1083 // buffer data in line buffer
1084 char buffer[1025];
1085 int count = recv(serversock, buffer, 1024, 0);
1086 if(count<=0) { // todo: test eof
1087 FD_DH("nDevice::Listen(): reading server sock " << serversock << " : eof");
1088 faudes_closesocket(serversock);
1089 sit->second.mServerSocket=-1;
1090 continue;
1091 }
1092 FD_DH("nDevice::Listen(): reading server sock " << serversock << ": #" << count);
1093 buffer[count]=0;
1094 sit->second.mLineBuffer +=std::string(buffer);
1095 // interpret line(s)
1096 if(count>0)
1097 if(buffer[count-1]=='\n')
1098 if(sit->second.mLineBuffer.length()>0)
1099 {
1100 const std::string& linebuffer = sit->second.mLineBuffer;
1101#ifdef FAUDES_DEBUG_IODEVICE
1102 if(linebuffer.length()>0)
1103 if(linebuffer[0]!='%')
1104 FD_DH("nDevice::Listen(): reading server sock " << serversock << ": line: " << linebuffer);
1105#endif
1106 // tokenise notification
1107 TokenReader tr(TokenReader::String,linebuffer);
1108 try {
1109 Token token;
1110 while(tr.Peek(token)) {
1111 // its an event notify
1112 if(token.Type()==Token::Begin && token.StringValue()=="Notify") {
1113 tr.ReadBegin("Notify");
1114 std::string event = tr.ReadString();
1115 tr.ReadEnd("Notify");
1116 faudes_mutex_lock(ndevice->pBufferMutex);
1117 FD_DH("nDevice::Listen(): found event " << event);
1118 Idx sev=ndevice->mInputs.Index(event);
1119 if(ndevice->mInputs.Exists(sev)) ndevice->pInputBuffer->push_back(sev);
1120 faudes_mutex_unlock(ndevice->pBufferMutex);
1121 revcount++;
1122 continue;
1123 }
1124 // its an info reply (ignored as of 2.22i)
1125 if(token.Type()==Token::Begin && token.StringValue()=="SimplenetDevice") {
1126 FD_DH("nDevice::Listen(): found device info");
1127 nDevice remote;
1128 remote.Read(tr);
1129 FD_DH("nDevice::Listen(): found device with outputs " << remote.Outputs().ToString());
1130 // used to subscribe on relevant events in pre 2.22h
1131 /*
1132 EventSet sevents=ndevice->Inputs();
1133 sevents.SetIntersection(remote.Outputs());
1134 sevents.Name("Subscribe");
1135 std::string message=sevents.ToString();
1136 FD_DH("nDevice::Listen(): subscribing events " << message);
1137 message += "\n";
1138 syncSend(serversock,message.c_str(), message.length(),0);
1139 */
1140 continue;
1141 }
1142 // its a subscription acknowledgement
1143 if(token.Type()==Token::Begin && token.StringValue()=="Subscribed") {
1144 EventSet sevents;
1145 sevents.Read(tr,"Subscribed");
1146 FD_DH("nDevice::Listen(): subscribed to " << sevents.ToString());
1147 continue;
1148 }
1149 // skip other sections
1150 if(token.Type()==Token::Begin) {
1151 FD_DH("nDevice::Listen(): ignore section <" << token.StringValue() <<">");
1152 std::string section=token.StringValue();
1153 tr.ReadBegin(section);
1154 tr.ReadEnd(section);
1155 continue;
1156 }
1157 // ignore token
1158 FD_DH("nDevice::Listen(): error: ignore token");
1159 tr.Get(token);
1160 }
1161 } catch (faudes::Exception&) {
1162 FD_DH("nDevice::Listen(): " << serversock << ": invalid notification");
1163 }
1164 sit->second.mLineBuffer.clear();
1165 }
1166 }
1167 }
1168
1169 // handle output clients: reply to commands
1170 if(avail>0)
1171 for(cit=ndevice->mOutputClientStates.begin(); cit!=ndevice->mOutputClientStates.end(); cit++) {
1172 int clientsock=cit->second.mClientSocket;
1173 if(clientsock<0) continue;
1174 if(FD_ISSET(clientsock, &mysocks)) {
1175 avail--;
1176 FD_DH("nDevice::Listen(): reading client sock " << clientsock);
1177 // buffer data in line buffer
1178 char buffer[1025];
1179 int count = recv(clientsock, buffer, 1024, 0);
1180 if(count<=0) { // todo: test eof
1181 FD_DH("nDevice::Listen(): reading client sock " << clientsock << " : eof");
1182 TLOCK_E;
1183 faudes_closesocket(clientsock);
1184 cit->second.mClientSocket=-1;
1185 cit->second.mConnected=false;
1186 TUNLOCK_E;
1187 continue;
1188 }
1189 FD_DH("nDevice::Listen(): reading client sock " << clientsock << ": #" << count);
1190 buffer[count]=0;
1191 cit->second.mLineBuffer +=std::string(buffer);
1192 // interpret line(s)
1193 if(count>0)
1194 if(buffer[count-1]=='\n')
1195 if(cit->second.mLineBuffer.length()>0)
1196 {
1197 const std::string& linebuffer = cit->second.mLineBuffer;
1198#ifdef FAUDES_DEBUG_IODEVICE
1199 if(linebuffer.length()>0)
1200 if(linebuffer[0]!='%')
1201 FD_DH("nDevice::Listen(): reading client sock " << clientsock << ": line: " << linebuffer);
1202#endif
1203 // tokenise command
1204 TokenReader tr(TokenReader::String,linebuffer);
1205 try {
1206 Token token;
1207 while(tr.Peek(token)) {
1208 bool xmlok=false;
1209 // its a command
1210 if(token.IsBegin("Cmd")) {
1211 tr.ReadBegin("Cmd");
1212 std::string cmd = tr.ReadString();
1213 tr.ReadEnd("Cmd");
1214 xmlok=true;
1215 std::string response="<NAck> </NAck>\n";
1216 FD_DH("nDevice::Reply(" << clientsock << "): received cmd " << cmd);
1217 // command: info
1218 if(cmd=="Info") {
1219 TLOCK_E;
1220 response=ndevice->ToString() + "\n";
1221 TUNLOCK_E;
1222 }
1223 // command: status
1224 if(cmd=="Status") {
1225 TLOCK_E;
1226 if(ndevice->mState==vDevice::Up) response="<Ack> Up </Ack>\n";
1227 if(ndevice->mState==vDevice::StartUp) response="<Ack> StartUp </Ack>\n";
1228 if(ndevice->mState==vDevice::ShutDown) response="<Ack> ShutDown </Ack>\n";
1229 TUNLOCK_E;
1230 }
1231 // its a reset request
1232 if(cmd=="ResetRequest") {
1233 FD_DH("nDevice::Reply(" << clientsock << "): reset request");
1234 faudes_mutex_lock(ndevice->pBufferMutex);
1235 if(!ndevice->mResetRequest) revcount++;
1236 ndevice->mResetRequest=true;
1237 faudes_mutex_unlock(ndevice->pBufferMutex);
1238 response="";
1239 }
1240 // send reply
1241 FD_DHV("nDevice(" << ndevice << ")::Reply: reading client sock: send response");
1242 syncSend(clientsock, response.c_str(), response.length(), 0);
1243 }
1244 // its a event subscription
1245 if(token.IsBegin("Subscribe")) {
1246 EventSet sevents;
1247 sevents.Read(tr,"Subscribe");
1248 sevents.RestrictSet(ndevice->Outputs());
1249 sevents.Name("Subscribed");
1250 xmlok=true;
1251 FD_DH("nDevice::Reply(" << clientsock << "): providing events " << sevents.ToString());
1252 TLOCK_E;
1253 cit->second.mEvents.Clear();
1254 cit->second.mEvents.InsertSet(sevents);
1255 cit->second.mConnected=true;
1256 std::string response=sevents.ToString()+"\n";
1257 TUNLOCK_E;
1258 // send reply
1259 FD_DHV("nDevice(" << ndevice << ")::Reply(): reading client sock: send response");
1260 syncSend(clientsock, response.c_str(), response.length(), 0);
1261 }
1262 // cancle on xml hickup
1263 if(!xmlok) {
1264 FD_DH("nDevice::Reply(" << clientsock << "): invalid xml B");
1265 break;
1266 }
1267 }
1268 } catch (faudes::Exception&) {
1269 FD_DH("nDevice::Reply(" << clientsock << "): invalid xml A");
1270 }
1271 FD_DHV("nDevice(" << ndevice << ")::Reply(): reading client sock: done");
1272 cit->second.mLineBuffer.clear();
1273 }
1274 }
1275 }
1276
1277 // signal condition for received events / reset requests
1278 if(revcount>0) {
1279 FD_DH("nDevice::Listen(): broadcast condition");
1280 faudes_mutex_lock(ndevice->pWaitMutex);
1281 faudes_cond_broadcast(ndevice->pWaitCondition);
1282 faudes_mutex_unlock(ndevice->pWaitMutex);
1283 revcount=0;
1284 }
1285
1286 // should remove unconnected clients ?
1287
1288
1289 // some error
1290 if(avail<0) {
1291 FD_DH("nDevice::Listen(): select error");
1292 }
1293
1294 }
1295
1296 // close clientsockets
1297 FD_DH("nDevice::Listen(): close client sockets");
1298 TLOCK_E;
1299 for(cit=ndevice->mOutputClientStates.begin(); cit!=ndevice->mOutputClientStates.end(); cit++) {
1300 int clientsock= cit->second.mClientSocket;
1301 if(clientsock>0) faudes_closesocket(clientsock);
1302 cit->second.mClientSocket=-1;
1303 cit->second.mConnected=false;
1304 }
1305 ndevice->mOutputClientStates.clear();
1306 TUNLOCK_E;
1307 // close serversockets
1308 FD_DH("nDevice::Listen(): close server sockets");
1309 for(sit=ndevice->mInputServerStates.begin(); sit!=ndevice->mInputServerStates.end(); sit++) {
1310 int serversock=sit->second.mServerSocket;
1311 if(serversock>0) faudes_closesocket(serversock);
1312 sit->second.mServerSocket=-1;
1313 }
1314 FD_DH("nDevice::Listen(): terminating listen thread");
1315 faudes_thread_exit(NULL);
1316 return NULL;
1317}
1318
1319
1320
1321
1322// reset dynamic faudes state (buffered events, current time)
1323void nDevice::Reset(void) {
1324 // call base for time and input buffer
1326 // bail out (do notify clients even when servers incomplete)
1327 if(mState!=Up && mState!=StartUp) return;
1328 // have message
1329 std::string message= "<Cmd> ResetRequest </Cmd>\n";
1330 // send cmd to all my servers
1331 LOCK_E;
1332 std::map<std::string,ServerState>::iterator sit=mInputServerStates.begin();
1333 for(; sit!=mInputServerStates.end(); sit++) {
1334 int serversock=sit->second.mServerSocket;
1335 if(serversock<0) continue;
1336 FD_DH("nDevice::Reset(): sending reset request to socket " << serversock);
1337 syncSend(serversock, message.c_str(), message.length(), 0);
1338 }
1339 UNLOCK_E;
1340}
1341
1342
1343#endif // configured simplenet
1344
1345
1346} // name space
1347
1348
1349
#define FD_ERR(message)
void faudes_diffsystime(const faudes_systime_t &end, const faudes_systime_t &begin, faudes_systime_t *res)
#define FAUDES_TYPE_IMPLEMENTATION(ftype, ctype, cbase)
Definition cfl_types.h:1017
const AttributeVoid * pOutputPrototype
const AttributeVoid * pInputPrototype
void DoCopy(const AttributeSimplenetEvent &rSrc)
static const AttributeSimplenetInput * InputPrototypep(void)
static const AttributeSimplenetOutput * OutputPrototypep(void)
virtual void DoRead(TokenReader &rTr, const std::string &rLabel="", const Type *pContext=0)
virtual void DoRead(TokenReader &rTr, const std::string &rLabel="", const Type *pContext=0)
const std::string & Name(void) const
bool Exists(const Idx &rIndex) const
void SymbolicName(Idx index, const std::string &rName)
Idx Index(const std::string &rName) const
void RestrictSet(const NameSet &rOtherSet)
bool operator<(const SimplenetAddress &rOther) const
std::string IpColonPort(void) const
std::string Ip(void) const
std::string FileLine(void) const
bool Eos(const std::string &rLabel)
void ReadEnd(const std::string &rLabel)
std::string ReadString(void)
void ReadBegin(const std::string &rLabel)
bool Get(Token &token)
bool Peek(Token &token)
void WriteEnd(const std::string &rLabel)
const std::string & StringValue(void) const
@ Begin
<label> (begin of section)
Definition cfl_token.h:84
bool IsString(void) const
bool ExistsAttributeString(const std::string &name)
bool IsBegin(void) const
void SetEmpty(const std::string &rName)
void SetBegin(const std::string &rName)
Definition cfl_token.cpp:92
void InsAttributeString(const std::string &name, const std::string &value)
const std::string & AttributeStringValue(const std::string &name)
TokenType Type(void) const
void Read(const std::string &rFileName, const std::string &rLabel="", const Type *pContext=0)
std::string ToString(const std::string &rLabel="", const Type *pContext=0) const
void BroadcastAddress(const std::string &rAddr)
void InsNode(const std::string &rNodeName)
void DoMove(nDevice &rSrcAttr)
void ServerAddress(const std::string &rAddr)
virtual void Compile(void)
virtual void DoReadPreface(TokenReader &rTr, const std::string &rLabel="", const Type *pContext=0)
virtual void Clear(void)
void InsNodeAddress(const std::string &rNode, const std::string &rAddress)
void DoCopy(const nDevice &rSrc)
SimplenetAddress mListenAddress
TaNameSet< AttributeSimplenetEvent > * pConfiguration
std::string mNetwork
std::map< std::string, EventSet > mInputSubscriptions
void NetworkName(const std::string &rNetwork)
faudes_thread_t mThreadListen
virtual ~nDevice(void)
std::map< std::string, ServerState > mInputServerStates
void InsOutputEvent(const std::string &event)
void ClearNodes(void)
virtual void DoWritePreface(TokenWriter &rTw, const std::string &rLabel="", const Type *pContext=0) const
std::map< int, ClientState > mOutputClientStates
faudes_mutex_t mMutex
friend void * NDeviceListen(void *)
SimplenetAddress mEffectiveListenAddress
virtual void Stop(void)
virtual void WriteOutput(Idx output)
SimplenetAddress mBroadcastAddress
virtual void Reset(void)
std::map< std::string, std::string > mNetworkNodes
virtual void Start(void)
void InsInputEvent(const std::string &event)
faudes_mutex_t * pBufferMutex
virtual void Clear(void)
virtual const EventSet & Inputs(void) const
virtual void Compile(void)
std::string mDefaultLabel
faudes_cond_t * pWaitCondition
virtual void DoReadPreface(TokenReader &rTr, const std::string &rLabel="", const Type *pContext=0)
virtual void Stop(void)
virtual void Start(void)
EventSet * mpConfiguration
EventSet mOutputs
virtual void Reset(void)
virtual void DoWritePreface(TokenWriter &rTw, const std::string &rLabel="", const Type *pContext=0) const
faudes_mutex_t * pWaitMutex
std::string mName
std::deque< Idx > * pInputBuffer
DeviceState mState
virtual const EventSet & Outputs(void) const
virtual void Clear(void)
NameSet EventSet
#define TUNLOCK_E
#define UNLOCK_E
#define TLOCK_E
#define LOCK_E
#define FD_DHV(message)
Definition iop_vdevice.h:37
#define FD_DH(message)
Definition iop_vdevice.h:27
uint32_t Idx
void * NDeviceListen(void *arg)
Idx ToIdx(const std::string &rString)
AutoRegisterType< nDevice > gRtiRegisterSimplenetDevice("SimplenetDevice")
std::string ToStringInteger(Int number)
Definition cfl_utils.cpp:43
int syncSend(int dest, const char *data, int len, int flag)

libFAUDES 2.34e --- 2026.03.16 --- c++ api documentaion by doxygen