Orocos Real-Time Toolkit  2.9.0
Dispatcher.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 Dispatcher.hpp
3 
4  Dispatcher.hpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 
40 #include "../../os/MutexLock.hpp"
41 #include "../../Activity.hpp"
42 #include "../../base/ChannelElementBase.hpp"
43 #include "../../Logger.hpp"
44 #include <map>
45 #include <sys/select.h>
46 #include <mqueue.h>
47 
48 namespace RTT { namespace mqueue { class Dispatcher; } }
49 
50 namespace RTT {
51  namespace mqueue {
54 
62  class Dispatcher : public Activity
63  {
64  friend void intrusive_ptr_add_ref(const RTT::mqueue::Dispatcher* p );
65  friend void intrusive_ptr_release(const RTT::mqueue::Dispatcher* p );
66  mutable os::AtomicInt refcount;
67  static Dispatcher* DispatchI;
68 
69  typedef std::map<mqd_t,base::ChannelElementBase*> MQMap;
70  MQMap mqmap;
71 
72  fd_set socks; /* Socket file descriptors we want to wake up for, using select() */
73 
74  int highsock; /* Highest #'d file descriptor, needed for select() */
75 
76  bool do_exit;
77 
78  os::Mutex maplock;
79 
80  Dispatcher( const std::string& name)
81  : Activity(ORO_SCHED_RT, os::HighestPriority, 0.0, 0, name),
82  highsock(0), do_exit(false)
83  {}
84 
85  ~Dispatcher() {
86  Logger::In in("Dispatcher");
87  log(Info) << "Dispacher cleans up: no more work."<<endlog();
88  stop();
89  DispatchI = 0;
90  }
91 
92  void build_select_list() {
93 
94  /* First put together fd_set for select(), which will
95  consist of the sock veriable in case a new connection
96  is coming in, plus all the sockets we have already
97  accepted. */
98 
99 
100  /* FD_ZERO() clears out the fd_set called socks, so that
101  it doesn't contain any file descriptors. */
102 
103  FD_ZERO(&socks);
104  highsock = 0;
105 
106  /* Loops through all the possible connections and adds
107  those sockets to the fd_set */
108  os::MutexLock lock(maplock);
109  for (MQMap::const_iterator it = mqmap.begin(); it != mqmap.end(); ++it) {
110  FD_SET( it->first, &socks);
111  if ( int(it->first) > highsock)
112  highsock = int(it->first);
113  }
114  }
115 
116  void read_socks() {
117  /* OK, now socks will be set with whatever socket(s)
118  are ready for reading.*/
119 
120  /* Run through our sockets and check to see if anything
121  happened with them, if so 'service' them. */
122  os::MutexLock lock(maplock);
123  for (MQMap::iterator it = mqmap.begin(); it != mqmap.end(); ++it) {
124  if ( FD_ISSET( it->first, &socks) ) {
125  //log(Debug) << "New data on " << it->first <<endlog();
126  it->second->signal();
127  }
128  }
129  }
130 
131  public:
132  typedef boost::intrusive_ptr<Dispatcher> shared_ptr;
133 
135  if ( DispatchI == 0) {
136  DispatchI = new Dispatcher("MQueueDispatch");
137  DispatchI->start();
138  }
139  return DispatchI;
140  }
141 
142  void addQueue( mqd_t mqdes, base::ChannelElementBase* chan ) {
143  Logger::In in("Dispatcher");
144  if (mqdes < 0) {
145  log(Error) <<"Invalid mqd_t given to MQueue Dispatcher." <<endlog();
146  return;
147  }
148  log(Debug) <<"Dispatcher is monitoring mqdes "<< mqdes <<endlog();
149  os::MutexLock lock(maplock);
150  // we add a refcount per channel we monitor.
151  if (mqmap.count(mqdes) == 0)
152  refcount.inc();
153  mqmap[mqdes] = chan;
154  }
155 
156  void removeQueue(mqd_t mqdes) {
157  Logger::In in("Dispatcher");
158  log(Debug) <<"Dispatcher drops mqdes "<< mqdes <<endlog();
159  os::MutexLock lock(maplock);
160  if (mqmap.count(mqdes)) {
161  mqmap.erase( mqmap.find(mqdes) );
162  refcount.dec();
163  }
164  }
165 
166  bool initialize() {
167  do_exit = false;
168  return true;
169  }
170 
171  void loop() {
172  struct timeval timeout; /* Timeout for select */
173  int readsocks; /* Number of sockets ready for reading */
174  while (1) { /* select loop */
175  build_select_list();
176  timeout.tv_sec = 0;
177  timeout.tv_usec = 50000;
178 
179  /* The first argument to select is the highest file
180  descriptor value plus 1.*/
181 
182  readsocks = select(highsock+1, &socks, (fd_set *) 0,
183  (fd_set *) 0, &timeout);
184 
185  /* select() returns the number of sockets that had
186  things going on with them -- i.e. they're readable. */
187 
188  /* Once select() returns, the original fd_set has been
189  modified so it now reflects the state of why select()
190  woke up. i.e. If file descriptor 4 was originally in
191  the fd_set, and then it became readable, the fd_set
192  contains file descriptor 4 in it. */
193 
194  if (readsocks < 0) {
195  if (errno != EINTR)
196  {
197  log(Error) <<"Dispatcher failed to select on message queues. Stopped thread. error: "<<strerror(errno)<<endlog();
198  return;
199  }
200  }
201  else if (readsocks == 0) {
202  // nop
203  } else // readsocks > 0
204  read_socks();
205 
206  if ( do_exit )
207  return;
208  } /* while(1) */
209  }
210 
211  bool breakLoop() {
212  do_exit = true;
213  return true;
214  }
215  };
216  }
217 }
218 
C++ abstraction of atomic integer operations.
Definition: Atomic.hpp:49
void intrusive_ptr_release(const RTT::mqueue::Dispatcher *p)
Definition: Dispatcher.cpp:48
boost::intrusive_ptr< Dispatcher > shared_ptr
Definition: Dispatcher.hpp:132
#define RTT_API
Definition: rtt-config.h:97
void removeQueue(mqd_t mqdes)
Definition: Dispatcher.hpp:156
const int HighestPriority
An integer denoting the highest priority of the selected OS.
Definition: ecosthreads.cpp:45
void intrusive_ptr_add_ref(const RTT::mqueue::Dispatcher *p)
Definition: Dispatcher.cpp:45
This object waits on a set of open message queue file descriptors and signals the channel that has re...
Definition: Dispatcher.hpp:62
An Activity executes a RunnableInterface object in a (periodic) thread.
Definition: Activity.hpp:70
void addQueue(mqd_t mqdes, base::ChannelElementBase *chan)
Definition: Dispatcher.hpp:142
Notify the Logger in which &#39;module&#39; the message occured.
Definition: Logger.hpp:159
#define ORO_SCHED_RT
Definition: fosi.h:49
An object oriented wrapper around a non recursive mutex.
Definition: Mutex.hpp:92
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:52
virtual bool start()
Start the activity.
Definition: Activity.cpp:273
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51
static Dispatcher::shared_ptr Instance()
Definition: Dispatcher.hpp:134