Orocos Real-Time Toolkit  2.8.3
TaskContext.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Tue Dec 21 22:43:08 CET 2004 TaskContext.cxx
3 
4  TaskContext.cxx - description
5  -------------------
6  begin : Tue December 21 2004
7  copyright : (C) 2004 Peter Soetens
8  email : peter.soetens@mech.kuleuven.ac.be
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 
40 #include "TaskContext.hpp"
41 #include "base/ActionInterface.hpp"
42 #include "plugin/PluginLoader.hpp"
43 
44 #include <string>
45 #include <algorithm>
46 #include <functional>
47 #include <boost/bind.hpp>
48 #include <boost/mem_fn.hpp>
49 
50 #include "internal/DataSource.hpp"
51 #include "internal/mystd.hpp"
52 #include "internal/MWSRQueue.hpp"
53 #include "OperationCaller.hpp"
54 
55 #include "rtt-config.h"
56 
57 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
59 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
60 #include "Activity.hpp"
61 #endif
62 
63 namespace RTT
64 {
65 
66  using namespace boost;
67  using namespace std;
68  using namespace detail;
69 
70  TaskContext::TaskContext(const std::string& name, TaskState initial_state /*= Stopped*/)
71  : TaskCore( initial_state)
72  ,portqueue( new MWSRQueue<PortInterface*>(64) )
73  ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) )
74 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
75  ,our_act( new SequentialActivity( this->engine() ) )
76 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
77  ,our_act( new Activity( this->engine(), name ) )
78 #endif
79  {
80  this->setup();
81  }
82 
83  TaskContext::TaskContext(const std::string& name, ExecutionEngine* parent, TaskState initial_state /*= Stopped*/ )
84  : TaskCore(parent, initial_state)
85  ,portqueue( new MWSRQueue<PortInterface*>(64) )
86  ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) )
87 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
88  ,our_act( parent ? 0 : new SequentialActivity( this->engine() ) )
89 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
90  ,our_act( parent ? 0 : new Activity( this->engine(), name ) )
91 #endif
92  {
93  this->setup();
94  }
95 
96  void TaskContext::setup()
97  {
98  tcservice->setOwner(this);
99  // from Service
100  provides()->doc("The interface of this TaskContext.");
101 
102  this->addOperation("configure", &TaskContext::configure, this, ClientThread).doc("Configure this TaskContext (= configureHook() ).");
103  this->addOperation("isConfigured", &TaskContext::isConfigured, this, ClientThread).doc("Is this TaskContext configured ?");
104  this->addOperation("start", &TaskContext::start, this, ClientThread).doc("Start this TaskContext (= startHook() + updateHook() ).");
105  this->addOperation("activate", &TaskContext::activate, this, ClientThread).doc("Activate the Execution Engine of this TaskContext.");
106  this->addOperation("stop", &TaskContext::stop, this, ClientThread).doc("Stop this TaskContext (= stopHook() ).");
107  this->addOperation("isRunning", &TaskContext::isRunning, this, ClientThread).doc("Is this TaskContext started ?");
108  this->addOperation("getPeriod", &TaskContext::getPeriod, this, ClientThread).doc("Get the configured execution period. -1.0: no thread associated, 0.0: non periodic, > 0.0: the period.");
109  this->addOperation("setPeriod", &TaskContext::setPeriod, this, ClientThread).doc("Set the execution period in seconds.").arg("s", "Period in seconds.");
110  this->addOperation("getCpuAffinity", &TaskContext::getCpuAffinity, this, ClientThread).doc("Get the configured cpu affinity.");
111  this->addOperation("setCpuAffinity", &TaskContext::setCpuAffinity, this, ClientThread).doc("Set the cpu affinity.").arg("cpu", "Cpu mask.");
112  this->addOperation("isActive", &TaskContext::isActive, this, ClientThread).doc("Is the Execution Engine of this TaskContext active ?");
113  this->addOperation("inFatalError", &TaskContext::inFatalError, this, ClientThread).doc("Check if this TaskContext is in the FatalError state.");
114  this->addOperation("error", &TaskContext::error, this, ClientThread).doc("Enter the RunTimeError state (= errorHook() ).");
115  this->addOperation("inRunTimeError", &TaskContext::inRunTimeError, this, ClientThread).doc("Check if this TaskContext is in the RunTimeError state.");
116  this->addOperation("inException", &TaskContext::inException, this, ClientThread).doc("Check if this TaskContext is in the Exception state.");
117  this->addOperation("cleanup", &TaskContext::cleanup, this, ClientThread).doc("Reset this TaskContext to the PreOperational state ( =cleanupHook() ).");
118  this->addOperation("update", &TaskContext::update, this, ClientThread).doc("Execute (call) the update method directly.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task.");
119 
120  this->addOperation("trigger", &TaskContext::trigger, this, ClientThread).doc("Trigger the update method for execution in the thread of this task.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task.");
121  this->addOperation("loadService", &TaskContext::loadService, this, ClientThread).doc("Loads a service known to RTT into this component.").arg("service_name","The name with which the service is registered by in the PluginLoader.");
122  // activity runs from the start.
123  if (our_act)
124  our_act->start();
125  }
126 
128  {
129  if (our_act)
130  our_act->stop();
131  // We don't call stop() or cleanup() here since this is
132  // the responsibility of the subclass. Calling these functions
133  // here would only lead to calling invalid virtual functions.
134  // [Rule no 1: Don't call virtual functions in a destructor.]
135  // [Rule no 2: Don't call virtual functions in a constructor.]
136  tcservice->clear();
137 
138  tcrequests->clear();
139 
140  // remove from all users.
141  while( !musers.empty() ) {
142  musers.front()->removePeer(this);
143  }
144  // since we are destroyed, be sure that the peer no longer
145  // has a 'user' pointer to us.
146  while ( !_task_map.empty() ) {
147  _task_map.begin()->second->removeUser(this);
148  _task_map.erase( _task_map.begin() );
149  }
150  // Do not call this->disconnect() !!!
151  // Ports are probably already destructed by user code.
152  delete portqueue;
153  }
154 
156  {
157  bool failure = false;
158  const std::string& location = this->getName();
159  Logger::In in( location.c_str() );
160 
161  DataFlowInterface::Ports myports = this->ports()->getPorts();
162  for (DataFlowInterface::Ports::iterator it = myports.begin();
163  it != myports.end();
164  ++it) {
165 
166  // Then try to get the peer port's connection
167  PortInterface* peerport = peer->ports()->getPort( (*it)->getName() );
168  if ( !peerport ) {
169  log(Debug)<< "Peer Task "<<peer->getName() <<" has no Port " << (*it)->getName() << endlog();
170  continue;
171  }
172 
173  // Skip if they have the same type
174  if((dynamic_cast<OutputPortInterface*>(*it) && dynamic_cast<OutputPortInterface*>(peerport)) ||
175  (dynamic_cast<InputPortInterface*>(*it) && dynamic_cast<InputPortInterface*>(peerport)))
176  {
177  log(Debug)<< (*it)->getName() << " and " << peerport->getName() << " have the same type" << endlog();
178  continue;
179  }
180 
181  // Try to find a way to connect them
182  if ( !(*it)->connectTo( peerport ) ) {
183  log(Debug)<< "Data flow incompatible between ports "
184  << getName() << "." << (*it)->getName() << " and "
185  << peer->getName() << "." << (*it)->getName() << endlog();
186  failure = true;
187  }
188  }
189  return !failure;
190  }
191 
193  {
194  bool success = true;
195  const std::string& location = this->getName();
196  Logger::In in( location.c_str() );
197 
198  vector<string> myreqs = this->requires()->getRequesterNames();
199  vector<string> peerreqs = peer->requires()->getRequesterNames();
200 
201  this->requires()->connectTo( peer->provides() );
202  for (vector<string>::iterator it = myreqs.begin();
203  it != myreqs.end();
204  ++it) {
205  ServiceRequester::shared_ptr sr = this->requires(*it);
206  if ( !sr->ready() ) {
207  if (peer->provides()->hasService( *it ))
208  success = sr->connectTo( peer->provides(*it) ) && success;
209  else {
210  log(Debug)<< "Peer Task "<<peer->getName() <<" provides no Service " << *it << endlog();
211  }
212  }
213  }
214 
215  peer->requires()->connectTo( this->provides() );
216  for (vector<string>::iterator it = peerreqs.begin();
217  it != peerreqs.end();
218  ++it) {
219  ServiceRequester::shared_ptr sr = peer->requires(*it);
220  if ( !sr->ready() ) {
221  if (this->provides()->hasService(*it))
222  success = sr->connectTo( this->provides(*it) ) && success;
223  else
224  log(Debug)<< "This Task provides no Service " << *it << " for peer Task "<<peer->getName() <<"."<< endlog();
225  }
226  }
227  return success;
228  }
229 
230  bool TaskContext::prepareProvide(const std::string& name) {
231  return tcservice->hasService(name) || plugin::PluginLoader::Instance()->loadService(name, this);
232  }
233 
234  bool TaskContext::loadService(const std::string& service_name) {
235  if ( provides()->hasService(service_name))
236  return true;
237  return PluginLoader::Instance()->loadService(service_name, this);
238  }
239 
240  void TaskContext::addUser( TaskContext* peer )
241  {
242  if (peer)
243  musers.push_back(peer);
244  }
245 
246  void TaskContext::removeUser( TaskContext* peer )
247  {
248  Users::iterator it = find(musers.begin(), musers.end(), peer);
249  if ( it != musers.end() )
250  musers.erase(it);
251  }
252 
253  bool TaskContext::addPeer( TaskContext* peer, std::string alias )
254  {
255  if ( alias.empty() )
256  alias = peer->getName();
257  if ( !peer || _task_map.count( alias ) != 0 )
258  return false;
259  _task_map[ alias ] = peer;
260  peer->addUser( this );
261  return true;
262  }
263 
264  void TaskContext::removePeer( const std::string& name )
265  {
266  PeerMap::iterator it = _task_map.find( name );
267  if ( _task_map.end() != it ) {
268  it->second->removeUser( this );
269  _task_map.erase( _task_map.find( name ) );
270  }
271  }
272 
274  {
275  for( PeerMap::iterator it = _task_map.begin(); it != _task_map.end(); ++it)
276  if ( it->second == peer ) {
277  peer->removeUser( this );
278  _task_map.erase( it );
279  return;
280  }
281  }
282 
284  {
285  if ( _task_map.count( peer->getName() ) != 0
286  || peer->hasPeer( this->getName() ) )
287  return false;
288  this->addPeer ( peer );
289  peer->addPeer ( this );
290  return true;
291  }
292 
294  Logger::In in( this->getName().c_str() );
295  // disconnect all our ports
296  DataFlowInterface::Ports myports = this->ports()->getPorts();
297  for (DataFlowInterface::Ports::iterator it = myports.begin();
298  it != myports.end();
299  ++it) {
300  (*it)->disconnect();
301  }
302 
303  // remove from all users.
304  while( !musers.empty() ) {
305  musers.front()->removePeer(this);
306  }
307 
308  while ( !_task_map.empty() ) {
309  _task_map.begin()->second->removeUser(this);
310  _task_map.erase( _task_map.begin() );
311  }
312  }
313 
314  void TaskContext::disconnectPeers( const std::string& name )
315  {
316  if ( _task_map.end() != _task_map.find( name ) ) {
317  TaskContext* peer = _task_map.find(name)->second;
318  this->removePeer(peer);
319  peer->removePeer(this);
320  }
321  }
322 
323  std::vector<std::string> TaskContext::getPeerList() const
324  {
325  std::vector<std::string> res;
326  std::transform(_task_map.begin(), _task_map.end(),
327  std::back_inserter( res ),
329  return res;
330  }
331 
332  bool TaskContext::hasPeer( const std::string& peer_name ) const
333  {
334  return _task_map.count( peer_name ) == 1;
335  }
336 
337  TaskContext* TaskContext::getPeer(const std::string& peer_name ) const
338  {
339  if (this->hasPeer( peer_name ) )
340  return _task_map.find(peer_name)->second;
341  return 0;
342  }
343 
345  {
346  if (this->isRunning())
347  return false;
348  if ( new_act == 0) {
349 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
350  new_act = new SequentialActivity();
351 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
352  new_act = new Activity();
353 #endif
354  }
355  new_act->stop();
356  if(our_act){
357  our_act->stop();
358  }
359  new_act->run( this->engine() );
360  our_act = ActivityInterface::shared_ptr( new_act );
361  our_act->start();
362  return true;
363  }
364 
366  {
367  if (!new_act)
368  return;
369  new_act->stop();
370  if(our_act){
371  our_act->stop();
372  }
373  our_act.reset( new_act );
374  our_act->run( this->engine() );
375  our_act->start();
376  }
377 
379  {
380  if (this->engine()->getActivity() != our_act.get() )
381  return this->engine()->getActivity();
382  return our_act.get();
383  }
384 
386  {
387  tcservice->clear();
388  tcrequests->clear();
389  }
390 
392  {
393  return true;
394  }
395 
397  return A->connectPorts(B);
398  }
399 
401  return A->connectPeers(B);
402  }
403 
405  {
406  if ( this->isRunning() )
407  return false;
408 #ifdef ORO_SIGNALLING_PORTS
409  ports()->setupHandles();
410 #endif
411  return TaskCore::start(); // calls startHook()
412  }
413 
415  {
416  if ( !this->isRunning() )
417  return false;
418  if (TaskCore::stop()) { // calls stopHook()
419 #ifdef ORO_SIGNALLING_PORTS
420  ports()->cleanupHandles();
421 #endif
422  return true;
423  }
424  return false;
425  }
426 
427  void TaskContext::dataOnPort(PortInterface* port)
428  {
429  if ( this->dataOnPortHook(port) ) {
430  portqueue->enqueue( port );
431  this->getActivity()->trigger();
432  }
433  }
434 
436  return this->isRunning();
437  }
438 
439  void TaskContext::dataOnPortCallback(InputPortInterface* port, TaskContext::SlotFunction callback) {
440  // user_callbacks will only be emitted from updateHook().
441  MutexLock lock(mportlock);
442  user_callbacks[port] = callback;
443  }
444 
445  void TaskContext::dataOnPortRemoved(PortInterface* port) {
446  MutexLock lock(mportlock);
447  UserCallbacks::iterator it = user_callbacks.find(port);
448  if (it != user_callbacks.end() ) {
449  user_callbacks.erase(it);
450  }
451  }
452 
453  void TaskContext::prepareUpdateHook()
454  {
455  MutexLock lock(mportlock);
456  PortInterface* port = 0;
457  while ( portqueue->dequeue( port ) == true ) {
458  UserCallbacks::iterator it = user_callbacks.find(port);
459  if (it != user_callbacks.end() )
460  it->second(port); // fire the user callback
461  }
462  }
463 }
464 
TaskContext(const std::string &name, TaskState initial_state=Stopped)
Create a TaskContext.
Definition: TaskContext.cpp:70
virtual ~TaskContext()
ActivityInterface * getActivity() const
Query for the task this interface is run in.
The base class of the InputPort.
virtual bool trigger()=0
Trigger that work has to be done.
The minimal Orocos task.
Definition: TaskCore.hpp:54
Service::shared_ptr provides()
Returns this Service.
virtual bool trigger()
Invoke this method to trigger the thread of this TaskContext to execute its ExecutionEngine and the u...
Definition: TaskCore.cpp:97
The default, thread-less activity for any newly created TaskContext.
virtual bool stop()
This method stops the execution of updateHook() of this component.
bool loadService(const std::string &service_name)
Use this method to load a service known to RTT into this component.
STL namespace.
void forceActivity(base::ActivityInterface *new_act)
Forces the current activity to become new_act, even if this TaskContext is still running.
boost::shared_ptr< ServiceRequester > shared_ptr
const std::string & getName() const
Get the name of this Port.
virtual bool connectPeers(TaskContext *peer)
Add a two-way connection from this task to a peer task.
virtual void removePeer(const std::string &name)
Remove a one-way connection from this task to a peer task.
virtual bool isConfigured() const
Inspect if the component is configured, i.e.
Definition: TaskCore.cpp:270
virtual bool activate()
This method starts the ExecutionEngine of this component in case it was not running.
Definition: TaskCore.cpp:258
virtual bool setCpuAffinity(unsigned cpu)
Sets the cpu affinity of this component.
Definition: TaskCore.cpp:306
virtual bool isRunning() const
Inspect if the component is in the Running or RunTimeError state.
Definition: TaskCore.cpp:266
virtual bool configure()
This method instructs the component to (re-)read configuration data and try to enter the Stopped stat...
Definition: TaskCore.cpp:104
bool setActivity(base::ActivityInterface *new_act)
Sets the activity of this TaskContext.
An execution engine serialises (executes one after the other) the execution of all commands...
virtual TaskContext * getPeer(const std::string &peer_name) const
Get a pointer to a peer of this task.
virtual void error()
Call this method in a Running state to indicate a run-time error condition.
Definition: TaskCore.cpp:162
virtual Seconds getPeriod() const
Get the configured execution period of this component.
Definition: TaskCore.cpp:291
virtual bool run(RunnableInterface *r)
Run exclusively this RunnableInterface.
virtual bool inException() const
Inspect if the component is in the Exception state.
Definition: TaskCore.cpp:278
This class allows storage and retrieval of operations, ports, attributes and properties provided by a...
Definition: Service.hpp:93
virtual bool hasPeer(const std::string &peer_name) const
Return true if it knows a peer by that name.
ServiceRequester::shared_ptr requires()
Returns the object that manages which methods this Task requires to be implemented by another task...
boost::function< void(base::PortInterface *)> SlotFunction
Name and add a Port to the interface of this task and add a Service with the same name of the port...
virtual void disconnect()
Disconnect this TaskContext from it&#39;s peers and ports.
DataFlowInterface * ports()
Get the Data flow ports of this task.
virtual bool inFatalError() const
Inspect if the component is in the FatalError state.
Definition: TaskCore.cpp:274
base::PortInterface * getPort(const std::string &name) const
Get an added port.
virtual PeerList getPeerList() const
Return a standard container which contains all the Peer names of this TaskContext.
boost::shared_ptr< ActivityInterface > shared_ptr
virtual bool update()
Invoke this method to execute the ExecutionEngine and the update() method.
Definition: TaskCore.cpp:90
Interface to start/stop and query a Activity.
virtual void disconnectPeers(const std::string &name)
Remove a two-way connection from this task to a peer task.
TaskState
Describes the different states a component can have.
Definition: TaskCore.hpp:99
std::vector< base::PortInterface * > Ports
A sequence of pointers to ports.
An Activity is an object that represents a thread.
Definition: Activity.hpp:63
virtual bool isActive() const
Inspect if the component&#39;s ExecutionEngine is processing requests.
Definition: TaskCore.cpp:286
This object represents the default Multi-Writer, Single-Reader queue implementation used by Orocos ob...
Definition: MWSRQueue.hpp:66
virtual bool inRunTimeError() const
Inspect if the component is in the RunTimeError state.
Definition: TaskCore.cpp:282
Notify the Logger in which &#39;module&#39; the message occured.
Definition: Logger.hpp:159
The TaskContext is the C++ representation of an Orocos component.
Definition: TaskContext.hpp:93
virtual unsigned getCpuAffinity() const
Get the configured cpu affinity of this component.
Definition: TaskCore.cpp:301
An object that expresses you wish to use a service.
virtual bool connectPorts(TaskContext *peer)
Add a data flow connection from this task&#39;s ports to a peer&#39;s ports.
static boost::shared_ptr< PluginLoader > Instance()
Create the instance of the PluginLoader.
Ports getPorts() const
Get all ports of this interface.
virtual bool connectServices(TaskContext *peer)
Connects all requires/provides services of this component to these of a peer.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:51
virtual bool stop()
This method stops the execution of updateHook() of this component.
Definition: TaskCore.cpp:233
The base class of every data flow port.
Operation< Signature > & addOperation(Operation< Signature > &op)
Add an operation object to the interface.
virtual bool setPeriod(Seconds s)
Sets the period of this component.
Definition: TaskCore.cpp:296
const ExecutionEngine * engine() const
Get a const pointer to the ExecutionEngine of this Task.
Definition: TaskCore.hpp:327
virtual bool start()
This method starts the execution of the updateHook() with each trigger or period. ...
virtual bool addPeer(TaskContext *peer, std::string alias="")
Add a one-way connection from this task to a peer task.
base::ActivityInterface * getActivity()
Get a pointer to the activity running this component.
virtual bool cleanup()
This method instructs a stopped component to enter the pre-operational state again.
Definition: TaskCore.cpp:136
virtual const std::string & getName() const
Returns the name of this TaskContext.
virtual void clear()
Clear the complete interface of this Component.
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51
#define ORO_ACT_DEFAULT_ACTIVITY
virtual bool start()
This method starts the execution of the updateHook() with each trigger or period. ...
Definition: TaskCore.cpp:202
virtual bool stop()=0
Stop the activity This will stop the activity by removing it from the &#39;run-queue&#39; of a thread or call...
virtual bool ready()
Checks the validity of this TaskContext.
virtual bool dataOnPortHook(base::PortInterface *port)
Reimplement this method to influence how writing to event ports is handled by the component...