Orocos Real-Time Toolkit  2.9.0
MQChannelElement.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 MQChannelElement.hpp
3 
4  MQChannelElement.hpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #ifndef MQ_CHANNEL_ELEMENT_H
40 #define MQ_CHANNEL_ELEMENT_H
41 
42 #include "MQSendRecv.hpp"
43 #include "../../Logger.hpp"
44 #include "../../base/ChannelElement.hpp"
45 #include "../../internal/DataSource.hpp"
46 #include "../../internal/DataSources.hpp"
47 #include <stdexcept>
48 
49 namespace RTT
50 {
51  namespace mqueue
52  {
61  template<typename T>
63  {
65  typename internal::ValueDataSource<T>::shared_ptr read_sample;
68 
69  public:
75  const ConnPolicy& policy, bool is_sender)
76  : MQSendRecv(transport)
77  , read_sample(new internal::ValueDataSource<T>)
78  , write_sample(new internal::LateConstReferenceDataSource<T>)
79 
80  {
81  Logger::In in("MQChannelElement");
82  setupStream(read_sample, port, policy, is_sender);
83  }
84 
86  cleanupStream();
87  }
88 
89  virtual bool inputReady(base::ChannelElementBase::shared_ptr const& caller) {
90  if ( mqReady(read_sample, this) ) {
91  typename base::ChannelElement<T>::shared_ptr output = caller->narrow<T>();
92  assert(output);
93  output->data_sample(read_sample->rvalue());
94  return true;
95  }
96  return false;
97  }
98 
99  virtual WriteStatus data_sample(typename base::ChannelElement<T>::param_t sample, bool reset = true)
100  {
101  // send initial data sample to the other side using a plain write.
102  if (mis_sender && (!write_sample->getRawDataConst() || reset)) {
103  write_sample->setPointer(&sample);
104  // update MQSendRecv buffer:
105  mqNewSample(write_sample);
106  return mqWrite(write_sample) ? WriteSuccess : WriteFailure;
107  }
108  return NotConnected;
109  }
110 
128  bool signal()
129  {
130  // copy messages into channel
131  if (mis_sender) {
132  // this read should always succeed since signal() means
133  // 'data available in a data element'.
135  this->getInput();
136  if( input && input->read(read_sample->set(), false) == NewData )
137  return ( this->write(read_sample->rvalue()) == WriteSuccess );
138  } else {
140  this->getOutput();
141  if (output && mqRead(read_sample))
142  return ( output->write(read_sample->rvalue()) == WriteSuccess );
143  }
144  return false;
145  }
146 
152  FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
153  {
154  throw std::runtime_error("not implemented");
155  }
156 
163  {
164  write_sample->setPointer(&sample);
165  if (!mqWrite(write_sample)) {
166  return WriteFailure;
167  }
168  return WriteSuccess;
169  }
170 
171  virtual bool isRemoteElement() const
172  {
173  return true;
174  }
175 
176  virtual std::string getRemoteURI() const
177  {
178  //check for output element case
180  if(base->getOutput())
182 
183  return mqname;
184  }
185 
186  virtual std::string getLocalURI() const
187  {
188  //check for input element case
190  if(base->getInput())
192 
193  return mqname;
194  }
195 
196  virtual std::string getElementName() const
197  {
198  return "MQChannelElement";
199  }
200  };
201  }
202 }
203 
204 #endif
205 
Implements the sending/receiving of mqueue messages.
Definition: MQSendRecv.hpp:54
WriteStatus write(typename base::ChannelElement< T >::param_t sample)
Write to the message queue.
boost::call_traits< T >::param_type param_t
boost::intrusive_ptr< ChannelElement< T > > shared_ptr
virtual void mqNewSample(base::DataSourceBase::shared_ptr ds)
Adapts the mq send/receive buffer size according to the data in mqdata_source, or the value set in md...
Definition: MQSendRecv.cpp:188
bool mqRead(base::DataSourceBase::shared_ptr ds)
Read from the message queue.
Definition: MQSendRecv.cpp:246
bool mis_sender
True if this object is a sender.
Definition: MQSendRecv.hpp:83
boost::intrusive_ptr< LateConstReferenceDataSource< T > > shared_ptr
void setupStream(base::DataSourceBase::shared_ptr ds, base::PortInterface *port, ConnPolicy const &policy, bool is_sender)
Definition: MQSendRecv.cpp:69
FlowStatus
Returns the status of a data flow read operation.
Definition: FlowStatus.hpp:56
std::string mqname
The name of the queue, as specified in the ConnPolicy when creating the stream, or self-calculated wh...
Definition: MQSendRecv.hpp:96
Implements the a ChannelElement using message queues.
virtual std::string getLocalURI() const
This function return the URI of this element.
FlowStatus read(typename base::ChannelElement< T >::reference_t sample, bool copy_old_data)
Read from the message queue.
virtual bool inputReady(base::ChannelElementBase::shared_ptr const &caller)
This is called by an input port when it is ready to receive data.
virtual std::string getElementName() const
Returns the class name of this element.
A connection policy object describes how a given connection should behave.
Definition: ConnPolicy.hpp:107
bool mqWrite(base::DataSourceBase::shared_ptr ds)
Write to the message queue.
Definition: MQSendRecv.cpp:271
void set(typename AssignableDataSource< T >::param_t t)
Definition: DataSources.inl:32
virtual bool mqReady(base::DataSourceBase::shared_ptr ds, base::ChannelElementBase *chan)
Works only in receive mode, waits for a new sample and adapts the receive buffer to match it&#39;s size...
Definition: MQSendRecv.cpp:198
A DataSource which is used to manipulate a const reference to an external value, by means of a pointe...
virtual std::string getRemoteURI() const
This function returns the URI of the next channel element in the logical chain.
shared_ptr getInput()
Returns the current input channel element.
Objects implementing this interface have the capability to convert data sources to and from a binary ...
A typed version of ChannelElementBase.
virtual WriteStatus data_sample(typename base::ChannelElement< T >::param_t sample, bool reset=true)
virtual std::string getLocalURI() const
This function return the URI of this element.
virtual FlowStatus read(reference_t sample, bool copy_old_data=true)
Reads a sample from the connection.
boost::call_traits< T >::reference reference_t
boost::intrusive_ptr< ChannelElementBase > shared_ptr
Notify the Logger in which &#39;module&#39; the message occured.
Definition: Logger.hpp:159
void setPointer(const typename AssignableDataSource< T >::value_t *ptr)
virtual WriteStatus data_sample(param_t sample, bool reset=true)
Provides a data sample to initialize this connection.
bool signal()
Signal will cause a read-write cycle to transfer the data from the data/buffer element to the message...
virtual WriteStatus write(param_t sample)
Writes a new sample on this connection.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:52
AssignableDataSource< T >::const_reference_t rvalue() const
Get a const reference to the value of this DataSource.
Definition: DataSources.hpp:95
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
The base class of every data flow port.
MQChannelElement(base::PortInterface *port, types::TypeMarshaller const &transport, const ConnPolicy &policy, bool is_sender)
Create a channel element for remote data exchange.
virtual bool isRemoteElement() const
This function may be used to identify, if the current element uses a network transport, to send the data to the next Element in the logical chain.
A simple, yet very useful DataSource, which keeps a value, and returns it in its get() method...
Definition: DataSources.hpp:60
shared_ptr getOutput()
Returns the next channel element in the channel&#39;s propagation direction.
virtual std::string getRemoteURI() const
This function returns the URI of the next channel element in the logical chain.
WriteStatus
Returns the status of a data flow write operation.
Definition: FlowStatus.hpp:66
boost::intrusive_ptr< ValueDataSource< T > > shared_ptr
Definition: DataSources.hpp:72