iop_d3ripURT.cpp
Go to the documentation of this file.
1 /** @file iop_d3ripURT.cpp d3ripURT protocol and friends */
2 
3 /*
4  FAU Discrete Event Systems Library (libfaudes)
5 
6  Copyright (C) 2011, Ece Schmidt, Klaus Schmidt, Ulas Turan
7 
8 */
9 
10 
11 #include "iop_d3ripURT.h"
12 
13 
14 //only compile with D3RIP-URT protocol operation
15 #ifdef FAUDES_IODEVICE_D3RIP_URT
16 
17 
18 #include <sys/mman.h>
19 #include <errno.h>
20 #include <signal.h>
21 namespace faudes {
22 
23 
24 /*
25  **********************************************
26  **********************************************
27  **********************************************
28 
29  implementation: AttributeD3ripURTOutput
30 
31  **********************************************
32  **********************************************
33  **********************************************
34  */
35 
36 // faudes type std
37 FAUDES_TYPE_IMPLEMENTATION(Void,AttributeD3ripURTOutput,AttributeVoid)
38 
39 //DoAssign(Src)
40 void AttributeD3ripURTOutput::DoAssign(const AttributeD3ripURTOutput& rSrcAttr) {
41  FD_DHV("AttributeD3ripURTOutput(" << this << "):DoAssign(): assignment from " << &rSrcAttr);
42  mChannelToTransmit=rSrcAttr.mChannelToTransmit;
43  mParameterRecords=rSrcAttr.mParameterRecords;
44  mEventId=rSrcAttr.mEventId;
45 }
46 
47 //DoWrite(rTw);
48 void AttributeD3ripURTOutput::DoWrite(TokenWriter& rTw, const std::string& rLabel, const Type* pContext) const {
49  (void) rLabel; (void) pContext;
50 
51  FD_DHV("AttributeD3ripURTOutput::DoWrite()" );
52  Token ptoken;
53  ptoken.SetEmpty("ChannelToTransmit");
54  ptoken.InsAttributeInteger("value",mChannelToTransmit);
55  rTw << ptoken;
56 }
57 
58 
59 //DoRead(rTr)
60 void AttributeD3ripURTOutput::DoRead(TokenReader& rTr, const std::string& rLabel, const Type* pContext) {
61  mParameterRecords.reserve(10);
62  (void) rLabel; (void) pContext;
63  Token token;
64  FD_DHV("AttributeD3ripURTOutput::DoRead()");
65  // loop all D3ripURT parameters
66  while(1) {
67  rTr.Peek(token);
68  if(!token.IsBegin()) return;
69  //ChannelToTransmit
70  if(token.IsBegin("ChannelToTransmit")) {
71  rTr.Get(token);
72  if(token.ExistsAttributeInteger("value"))
73  mChannelToTransmit=token.AttributeIntegerValue("value");
74  rTr.ReadEnd("ChannelToTransmit");
75  continue;
76  }
77  if(token.IsBegin("EventId")) {
78  rTr.Get(token);
79  if(token.ExistsAttributeInteger("value"))
80  mEventId=token.AttributeIntegerValue("value");
81  rTr.ReadEnd("EventId");
82  continue;
83  }
84 
85  //ParameterRecord
86  if(token.IsBegin("ParameterRecord")) {
87  // mParameterRecords is a std::vector<ParameterRecord>
88  mParameterRecords.resize(mParameterRecords.size()+1); //grow by one, get reference to last
89  ParameterRecord& record = mParameterRecords.back();
90  rTr.ReadBegin("ParameterRecord",token);
91  //if(token.ExistsAttributeInteger("name"))
92  while(!rTr.Eos("ParameterRecord")) {
93  rTr.Peek(token);
94  //DestinationNode
95  if(token.IsBegin("DestinationNode")) {
96  rTr.Get(token);
97  if(token.ExistsAttributeInteger("value"))
98  record.destinationNode=token.AttributeIntegerValue("value");
99  rTr.ReadEnd("DestinationNode");
100  continue;
101  }
102  //DestinationChannel
103  if(token.IsBegin("DestinationChannel")) {
104  rTr.Get(token);
105  if(token.ExistsAttributeInteger("value"))
106  record.destinationChannel=token.AttributeIntegerValue("value");
107  rTr.ReadEnd("DestinationChannel");
108  continue;
109  }
110  //EligibilityTime
111  if(token.IsBegin("EligibilityTime")) {
112  rTr.Get(token);
113  if(token.ExistsAttributeInteger("value"))
114  record.eligibilityTime=token.AttributeIntegerValue("value");
115  rTr.ReadEnd("EligibilityTime");
116  continue;
117  }
118  //DeadlineTime
119  if(token.IsBegin("DeadlineTime")) {
120  rTr.Get(token);
121  if(token.ExistsAttributeInteger("value"))
122  record.deadlineTime=token.AttributeIntegerValue("value");
123  rTr.ReadEnd("DeadlineTime");
124  continue;
125  }
126  rTr.Get(token);
127  rTr.ReadEnd(token.StringValue());
128  }
129  rTr.ReadEnd("ParameterRecord");
130  continue;
131  }
132  // unknown
133  break;
134  }
135 }
136 
137 
138 
139 
140 /*
141  **********************************************
142  **********************************************
143  **********************************************
144 
145  implementation: AttributeD3ripURTInput
146 
147  **********************************************
148  **********************************************
149  **********************************************
150  */
151 
152 
153 // faudes type std
154 FAUDES_TYPE_IMPLEMENTATION(Void,AttributeD3ripURTInput,AttributeVoid)
155 
156 
157 //DoAssign(Src)
158 void AttributeD3ripURTInput::DoAssign(const AttributeD3ripURTInput& rSrcAttr) {
159  FD_DHV("AttributeD3ripURTInput(" << this << "):DoAssign(): assignment from " << &rSrcAttr);
160  mEventId=rSrcAttr.mEventId;
161 }
162 
163 
164 //DoWrite(rTw,rLabel,pContext)
165 void AttributeD3ripURTInput::DoWrite(TokenWriter& rTw, const std::string& rLabel, const Type* pContext) const {
166  (void) rLabel; (void) pContext;
167 }//DoWrite(rTw,rLabel,pContext)
168 
169 
170 //DoRead(rTr,rLabel,pContext)
171 void AttributeD3ripURTInput::DoRead(TokenReader& rTr, const std::string& rLabel, const Type* pContext) {
172  (void) rLabel; (void) pContext;
173  Token token;
174  FD_DHV("AttributeD3ripURTOutput::DoRead()");
175  // loop all D3ripURT parameters
176  while(1) {
177  rTr.Peek(token);
178  if(!token.IsBegin()) return;
179  if(token.IsBegin("EventId")) {
180  rTr.Get(token);
181  if(token.ExistsAttributeInteger("value"))
182  mEventId=token.AttributeIntegerValue("value");
183  rTr.ReadEnd("EventId");
184  continue;
185  }
186 
187  // unknown
188  break;
189  }
190 }
191 
192 
193 
194 /*
195  **********************************************
196  **********************************************
197  **********************************************
198 
199  implementation: AttributeD3ripURTEvent
200 
201  **********************************************
202  **********************************************
203  **********************************************
204  */
205 
206 
207 
208 // std faudes type
209 FAUDES_TYPE_IMPLEMENTATION(Void,AttributeD3ripURTEvent,AttributeDeviceEvent)
210 
211 // Default constructor, set my prototypes
212 AttributeD3ripURTEvent::AttributeD3ripURTEvent(void) : AttributeDeviceEvent() {
213  FD_DHV("AttributeD3ripURTEvent::AttributeD3ripURTEvent(" << this << ")");
214  pOutputPrototype=OutputPrototypep();
215  pInputPrototype=InputPrototypep();
216 }
217 
218 // Copy constructor
219 AttributeD3ripURTEvent::AttributeD3ripURTEvent(const AttributeD3ripURTEvent& rOtherAttr) :
220  AttributeDeviceEvent()
221 {
222  FD_DHV("AttributeD3ripURTEvent(" << this << "): form other attr " << &rOtherAttr);
223  pOutputPrototype=OutputPrototypep();
224  pInputPrototype=InputPrototypep();
225  DoAssign(rOtherAttr);
226 }
227 
228 
229 // autoregister event configuration (required for XML token format)
230 AutoRegisterType< TaNameSet<AttributeD3ripURTEvent> >
231  gRti1RegisterD3ripURTDeviceEventSet("D3ripURTDeviceEventSet");
232 AutoRegisterXElementTag< TaNameSet<AttributeD3ripURTEvent> >
233  gRti1XElementTagD3ripURTDeviceEventSet("D3ripURTDeviceEventSet", "Event");
234 
235 // pseudo statics
236 const AttributeD3ripURTOutput* AttributeD3ripURTEvent::OutputPrototypep(void){
237  AttributeD3ripURTOutput* attrp= new AttributeD3ripURTOutput();
238  return attrp;
239 }
240 
241 // pseudo statics
242 const AttributeD3ripURTInput* AttributeD3ripURTEvent::InputPrototypep(void) {
243  AttributeD3ripURTInput* attrp= new AttributeD3ripURTInput();
244  return attrp;
245 }
246 
247 /*
248  **********************************************
249  **********************************************
250  **********************************************
251 
252  implementation: d3ripURTDevice
253 
254  **********************************************
255  **********************************************
256  **********************************************
257  */
258 
259 
260 // autoregister
261 AutoRegisterType<d3ripURTDevice> gRtiRegisterD3ripURTDevice("D3ripURTDevice");
262 
263 // faudes type std
264 FAUDES_TYPE_IMPLEMENTATION(D3ripURTDevice,d3ripURTDevice,vDevice)
265 void* DoListenModule(void* threadArg);
266 
267 ////////////////////////////////////////////////
268 // construction and destruction
269 
270 // constructor
271 d3ripURTDevice::d3ripURTDevice(void) : vDevice() {
272  FD_DHV("d3ripURTDevice(" << mName << ")::d3ripURTDevice()");
273  // have event set with appropriate attributes
274  mpConfiguration = new TaNameSet<AttributeD3ripURTEvent>;
275  pConfiguration = dynamic_cast< TaNameSet<AttributeD3ripURTEvent>* >(mpConfiguration);
276  // have appropriate default label for token io
277  mDefaultLabel ="D3ripURTDevice";
278  // configuration defaults
279  mName="D3ripURTDevice";
280 }
281 
282 
283 //~d3ripURTDevice()
284 d3ripURTDevice::~d3ripURTDevice(void) {
285  // destructor
286  FD_DHV("d3ripURTDevice(" << mName << ")::~d3ripURTDevice()");
287 }
288 
289 
290 // Clear()
291 void d3ripURTDevice::Clear(void) {
292  //delete static data
293  FD_DHV("d3ripURTDevice(" << mName << ")::Clear()");
294  // call base; note: clear implies stop (incl. all eventg attributes)
295  vDevice::Clear();
296  // clear compiled data
297  // xxxx
298 }
299 
300 
301 // Start(void)
302 void d3ripURTDevice::Start(void) {
303  //set device state and create thread
304 
305  //return if device was allready started
306  if( (mState == Up)||(mState == StartUp) ) return;
307  FD_DHV("d3ripURTDevice("<<mName<<")::Start()");
308  //call base incl virtual reset
309  vDevice::Start();
310  mState=StartUp;
311  //create background for listening module
312  if (mlockall(MCL_CURRENT | MCL_FUTURE)) {/*Lock all process memory pages in RAM(disable paging)*/
313  perror("mlockall failed:");
314  }
315  struct sched_param scheduling_parameters;
316  scheduling_parameters.sched_priority = sched_get_priority_max(SCHED_FIFO);
317  sched_setscheduler(0, SCHED_FIFO, &scheduling_parameters);
318 
319  NameSet::Iterator eit;
320  for(eit=pConfiguration->Begin(); eit != pConfiguration->End(); eit++) {
321 
322  AttributeD3ripURTEvent attr;
323  attr = pConfiguration->Attribute(*eit);
324  const AttributeD3ripURTOutput* oattr = attr.Outputp();
325  if(!oattr) continue;
326 
327  FD_DHV("d3ripURTDevice("<<mName<<") demo: output idx " << *eit);
328  FD_DHV("d3ripURTDevice("<<mName<<") demo: output name " << pConfiguration->SymbolicName(*eit));
329  FD_DHV("d3ripURTDevice("<<mName<<") demo: Channel To Transmit: " << oattr->mChannelToTransmit);
330  }
331 
332  pthread_create(&mThreadListenModule, NULL, &DoListenCLModule, this);
333  mState=Up;
334 
335 }
336 
337 // Stop()
338 void d3ripURTDevice::Stop(void) {
339  //return if device allready got the command to shut down
340  if(mState != Up && mState !=StartUp) return;
341  FD_DHV("d3ripURTDevice("<<mName<<")::Stop()");
342  //set device state, kill thread and close message queues
343  mContinueListening=0;
344  mq_close(mMQueueToSend);
345  mq_close(mMQueueToReceive);
346  pthread_kill(mThreadListenModule, SIGTERM);
347  // call base
348  vDevice::Stop();
349 }//end Stop()
350 
351 
352 //Reset()
353 void d3ripURTDevice::Reset(void){
354  //delete dynamic data
355  FD_DHV("d3ripURTDevice("<<mName<<")::Reset()");
356  // call base (resets time)
357  vDevice::Reset();
358  // xxxxx
359 }
360 
361 //Compile(void)
362 void d3ripURTDevice::Compile(void){
363  //setup up internal data structure
364  mContinueListening=1;
365  struct mq_attr wanted_attrs;
366  wanted_attrs.mq_flags=0;
367  wanted_attrs.mq_maxmsg=10;
368  wanted_attrs.mq_msgsize=1500;
369  wanted_attrs.mq_curmsgs=0;
370 
371  mMQueueToSend = mq_open("/MQUEUE_AP2CL_ID", O_RDWR | O_CREAT, S_IRWXU | S_IRWXG , &wanted_attrs);
372 
373  if(mMQueueToSend==-1 || mMQueueToReceive==-1) {
374  FD_DHV("d3ripURTDevice("<<mName<<"): Cannot Open Message Queue");
375  }
376 
377  int i=0;
378  int j=0;
379  NameSet::Iterator eit;
380  for(eit=pConfiguration->Begin(); eit != pConfiguration->End(); eit++) {
381 
382  AttributeD3ripURTEvent attr;
383  attr = pConfiguration->Attribute(*eit);
384  const AttributeD3ripURTOutput* oattr = attr.Outputp();
385  const AttributeD3ripURTInput* iattr = attr.Inputp();
386 
387  if(oattr) {
388  //maps a global EventId for all output event names for the consistency between controllers
389  mEventIdMap[mOutputs.SymbolicName(*eit)] = oattr->mEventId;
390  //header part
391  mEventParameters[i][D3RIP_URT_EVENT_ID]=*eit;
392  mEventParameters[i][D3RIP_URT_CHANNEL_TO_TRANSMIT]=oattr->mChannelToTransmit;
393 
394  //and the parameters
395  for(j=0;j<oattr->mParameterRecords.size();j++) {
396  mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_DESTIONATION_NODE]=oattr->mParameterRecords[j].destinationNode;
397  mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_DESTIONATION_CHANNEL]=oattr->mParameterRecords[j].destinationChannel;
398  mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_ELIGIBILITY_TIME]=oattr->mParameterRecords[j].eligibilityTime;
399  mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_DEADLINE_TIME]=oattr->mParameterRecords[j].deadlineTime;
400  }
401  //set the parameter count field in the end
402  mEventParameters[i][D3RIP_URT_PARAMETER_COUNT]=j;
403  i++;
404  }
405  else if(iattr) {
406  mEventIdMap[mInputs.SymbolicName(*eit)] = iattr->mEventId;
407  }
408  else
409  continue;
410  }
411  mEventCount=i;
412  FD_DHV("d3ripURTDevice(" << mName << ")::Compile()");
413  // call base
414  vDevice::Compile();
415 }
416 
417 // send/execute output event
418 void d3ripURTDevice::WriteOutput(Idx output) {
419  int i;
420  int j;
421  int messageLength=0;
422  mEventsToSendCount=0;
423  for(i=0;i<mEventCount;i++) {
424  if(mEventParameters[i][D3RIP_URT_EVENT_ID]==output){
425  //populate the mCommunicationRequests array with Communication requests of the event(s)
426  for(j=0;j<mEventParameters[i][D3RIP_URT_PARAMETER_COUNT];j++){
427 
428  mCommunicationRequests[mCommunicationRequestCount].destinationNode=
429  mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_DESTIONATION_NODE];
430 
431  mCommunicationRequests[mCommunicationRequestCount].destinationChannel=
432  mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_DESTIONATION_CHANNEL];
433 
434  mCommunicationRequests[mCommunicationRequestCount].eligibilityTime=
435  mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_ELIGIBILITY_TIME];
436 
437  mCommunicationRequests[mCommunicationRequestCount].deadlineTime=
438  mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_DEADLINE_TIME];
439 
440 
441  mCommunicationRequestCount++;
442  }
443  //store Ids for event(s) in unsigned char format
444  mEventIdsToSend[mEventsToSendCount]=output;
445  mEventsToSendCount++;
446  break;
447  }
448  }
449  FlushOutput(1);
450 }
451 
452 void d3ripURTDevice::FlushOutput(unsigned char channel) {
453  int i;
454  int msgLength;
455  char msgToSend[D3RIP_RT_MESSAGE_MAX_LENGTH];
456  msgToSend[0]=1;
457  msgToSend[1]=mCommunicationRequestCount;
458  for(i=0;i<mCommunicationRequestCount;i++){
459  msgToSend[2+D3RIP_URT_DESTIONATION_NODE+i*D3RIP_URT_PARAMETER_SIZE]=mCommunicationRequests[i].destinationNode;
460  msgToSend[2+D3RIP_URT_DESTIONATION_CHANNEL+i*D3RIP_URT_PARAMETER_SIZE]=mCommunicationRequests[i].destinationChannel;
461  msgToSend[2+D3RIP_URT_ELIGIBILITY_TIME+i*D3RIP_URT_PARAMETER_SIZE]=mCommunicationRequests[i].eligibilityTime;
462  msgToSend[2+D3RIP_URT_DEADLINE_TIME+i*D3RIP_URT_PARAMETER_SIZE]=mCommunicationRequests[i].deadlineTime;
463  }
464 
465  msgToSend[2+mCommunicationRequestCount*D3RIP_URT_PARAMETER_SIZE]=mEventsToSendCount;
466  for(i=0;i<mEventsToSendCount;i++) {
467  msgToSend[i+3+mCommunicationRequestCount*D3RIP_URT_PARAMETER_SIZE]=mEventIdMap[mOutputs.SymbolicName(mEventIdsToSend[i])];
468  //printf("Event Send! Idx: %d",mEventIdMap[mOutputs.SymbolicName(mEventIdsToSend[i])]);
469  }
470 
471  msgLength=i+3+mCommunicationRequestCount*D3RIP_URT_PARAMETER_SIZE;
472  if(msgLength>D3RIP_RT_MESSAGE_MAX_LENGTH){
473  FD_DHV("RT Message Length exceeds MAX limit!");
474  return;
475  }
476  mq_send(mMQueueToSend,(char*)&msgToSend[0],msgLength,0);
477 
478  //reset counters for events and CRs
479  mCommunicationRequestCount=0;
480  mEventsToSendCount=0;
481 }
482 
483 
484 //DoWritePreface(rTw,rLabel,pContext)
485 void d3ripURTDevice::DoWritePreface(TokenWriter& rTw, const std::string& rLabel, const Type* pContext) const{
486  FD_DHV("d3ripURTDevice("<<mName<<")::DoWritePreface()");
487  //call base
488  vDevice::DoWritePreface(rTw,rLabel,pContext);
489 }
490 
491 //DoReadPreface(rTr,rLabel,pContext)
492 void d3ripURTDevice::DoReadPreface(TokenReader& rTr,const std::string& rLabel, const Type* pContext){
493  FD_DHV("d3ripURTDevice("<<mName<<")::DoReadPreface()");
494  // call base
495  vDevice::DoReadPreface(rTr,rLabel,pContext);
496 }
497 
498 //DoListenCLModule(d3ripURTDevice)
499 void* DoListenCLModule(void* threadArg){
500  d3ripURTDevice* pD3ripURTDevice = (d3ripURTDevice*)threadArg;
501  int i;
502  int length;
503  int communicationRequestCount;
504  int eventIdCount;
505  unsigned char receivedEventId;
506  char msg[D3RIP_RT_MESSAGE_MAX_LENGTH];
507  struct timespec timeout;
508  mq_attr attr;
509  mq_getattr(pD3ripURTDevice->mMQueueToReceive,&attr);
510  std::map<std::string,int>::iterator ii;
511  int evIndexToProcess;
512  struct timespec ts,tr;
513  ts.tv_sec = 1;
514  ts.tv_nsec = 0;
515  nanosleep(&ts, &tr);
516  ts.tv_sec = 0;
517  ts.tv_nsec = 5000;
518 
519  fflush(stdout);
520  mqd_t mMQueueToReceive = mq_open("/MQUEUE_CL2AP_ID", O_RDWR | O_CREAT, S_IRWXU | S_IRWXG , 0);
521  fflush(stdout);
522  for(;;) {
523 
524  clock_gettime(CLOCK_REALTIME, &timeout);
525  timeout.tv_sec += 1;
526  length = mq_receive(mMQueueToReceive, msg, 1500, 0);
527  nanosleep(&ts, &tr);
528  if(length > 0)
529  {
530  FD_DHV("CL2AP received");
531  communicationRequestCount=msg[0];
532  if(communicationRequestCount>0) {
533  fflush(stdout);
534  }
535 
536  eventIdCount=msg[1+communicationRequestCount*D3RIP_URT_PARAMETER_SIZE];
537  for(i=0;i<eventIdCount;i++){
538  receivedEventId=msg[i+2+communicationRequestCount*D3RIP_URT_PARAMETER_SIZE];
539  fflush(stdout);
540  for( ii=pD3ripURTDevice->mEventIdMap.begin(); ii!=pD3ripURTDevice->mEventIdMap.end(); ++ii)
541  {
542  if((*ii).second==receivedEventId) {
543  evIndexToProcess=pD3ripURTDevice->mInputs.Index((*ii).first.c_str());
544  if(pD3ripURTDevice->mInputs.Exists(evIndexToProcess)) {
545  pthread_mutex_lock(pD3ripURTDevice->pBufferMutex);
546  pD3ripURTDevice->pInputBuffer->push_back(evIndexToProcess);
547  pthread_mutex_unlock(pD3ripURTDevice->pBufferMutex);
548  //printf("event received! %d->%d\n",receivedEventId,evIndexToProcess);
549  pthread_mutex_lock(pD3ripURTDevice->pWaitMutex);
550  pthread_cond_broadcast(pD3ripURTDevice->pWaitCondition);
551  pthread_mutex_unlock(pD3ripURTDevice->pWaitMutex);
552  //printf("event notification sent!\n");
553  fflush(stdout);
554  }
555  }
556  }
557  }
558  }
559  if( !pD3ripURTDevice->mContinueListening ) {
560  FD_DHV("Stopping the CL Listener");
561  break;
562  };
563  }
564  return NULL;
565 }
566 
567 }//end namespace faudes
568 
569 #endif // configure
570 
#define FAUDES_TYPE_IMPLEMENTATION(ftype, ctype, cbase)
faudes type implementation macros, overall
Definition: cfl_types.h:946
iodevice for d3ripURT protocol and friends
#define FD_DHV(message)
Definition: iop_vdevice.h:37
libFAUDES resides within the namespace faudes.
uint32_t Idx
Type definition for index type (allways 32bit)

libFAUDES 2.32b --- 2024.03.01 --- c++ api documentaion by doxygen