Orocos Real-Time Toolkit  2.8.3
MQChannelElement.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 MQChannelElement.hpp
3 
4  MQChannelElement.hpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #ifndef MQ_CHANNEL_ELEMENT_H
40 #define MQ_CHANNEL_ELEMENT_H
41 
42 #include "MQSendRecv.hpp"
43 #include "../../Logger.hpp"
44 #include "../../base/ChannelElement.hpp"
45 #include "../../internal/DataSource.hpp"
46 #include "../../internal/DataSources.hpp"
47 #include <stdexcept>
48 
49 namespace RTT
50 {
51  namespace mqueue
52  {
61  template<typename T>
63  {
65  typename internal::ValueDataSource<T>::shared_ptr read_sample;
68 
69  public:
75  const ConnPolicy& policy, bool is_sender)
76  : MQSendRecv(transport)
77  , read_sample(new internal::ValueDataSource<T>)
78  , write_sample(new internal::LateConstReferenceDataSource<T>)
79 
80  {
81  Logger::In in("MQChannelElement");
82  setupStream(read_sample, port, policy, is_sender);
83  }
84 
86  cleanupStream();
87  }
88 
89  virtual bool inputReady() {
90  if ( mqReady(read_sample, this) ) {
91  typename base::ChannelElement<T>::shared_ptr output =
92  this->getOutput();
93  assert(output);
94  output->data_sample(read_sample->rvalue());
95  return true;
96  }
97  return false;
98  }
99 
100  virtual bool data_sample(typename base::ChannelElement<T>::param_t sample)
101  {
102  // send initial data sample to the other side using a plain write.
103  if (mis_sender) {
104  typename base::ChannelElement<T>::shared_ptr output =
105  this->getOutput();
106 
107  write_sample->setPointer(&sample);
108  // update MQSendRecv buffer:
109  mqNewSample(write_sample);
110  return mqWrite(write_sample);
111  }
112  return false;
113  }
114 
132  bool signal()
133  {
134  // copy messages into channel
135  if (mis_sender) {
136  // this read should always succeed since signal() means
137  // 'data available in a data element'.
138  typename base::ChannelElement<T>::shared_ptr input =
139  this->getInput();
140  if( input && input->read(read_sample->set(), false) == NewData )
141  return this->write(read_sample->rvalue());
142  } else {
143  typename base::ChannelElement<T>::shared_ptr output =
144  this->getOutput();
145  if (output && mqRead(read_sample))
146  return output->write(read_sample->rvalue());
147  }
148  return false;
149  }
150 
156  FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
157  {
158  throw std::runtime_error("not implemented");
159  }
160 
167  {
168  write_sample->setPointer(&sample);
169  return mqWrite(write_sample);
170  }
171 
172  virtual bool isRemoteElement() const
173  {
174  return true;
175  }
176 
177  virtual std::string getRemoteURI() const
178  {
179  //check for output element case
181  if(base->getOutput())
183 
184  return mqname;
185  }
186 
187  virtual std::string getLocalURI() const
188  {
189  //check for input element case
191  if(base->getInput())
193 
194  return mqname;
195  }
196 
197  virtual std::string getElementName() const
198  {
199  return "MQChannelElement";
200  }
201  };
202  }
203 }
204 
205 #endif
206 
Implements the sending/receiving of mqueue messages.
Definition: MQSendRecv.hpp:54
virtual bool inputReady()
This is called by an input port when it is ready to receive data.
boost::call_traits< T >::param_type param_t
boost::intrusive_ptr< ChannelElement< T > > shared_ptr
virtual void mqNewSample(base::DataSourceBase::shared_ptr ds)
Adapts the mq send/receive buffer size according to the data in mqdata_source, or the value set in md...
Definition: MQSendRecv.cpp:188
bool mqRead(base::DataSourceBase::shared_ptr ds)
Read from the message queue.
Definition: MQSendRecv.cpp:246
bool mis_sender
True if this object is a sender.
Definition: MQSendRecv.hpp:83
boost::intrusive_ptr< LateConstReferenceDataSource< T > > shared_ptr
void setupStream(base::DataSourceBase::shared_ptr ds, base::PortInterface *port, ConnPolicy const &policy, bool is_sender)
Definition: MQSendRecv.cpp:69
FlowStatus
Returns the status of a data flow read.
Definition: FlowStatus.hpp:54
std::string mqname
The name of the queue, as specified in the ConnPolicy when creating the stream, or self-calculated wh...
Definition: MQSendRecv.hpp:96
Implements the a ChannelElement using message queues.
virtual std::string getLocalURI() const
This function return the URI of this element.
FlowStatus read(typename base::ChannelElement< T >::reference_t sample, bool copy_old_data)
Read from the message queue.
virtual FlowStatus read(reference_t sample, bool copy_old_data)
Reads a sample from the connection.
virtual std::string getElementName() const
Returns the class name of this element.
virtual bool write(param_t sample)
Writes a new sample on this connection.
A connection policy object describes how a given connection should behave.
Definition: ConnPolicy.hpp:92
bool mqWrite(base::DataSourceBase::shared_ptr ds)
Write to the message queue.
Definition: MQSendRecv.cpp:261
void set(typename AssignableDataSource< T >::param_t t)
Definition: DataSources.inl:32
virtual bool mqReady(base::DataSourceBase::shared_ptr ds, base::ChannelElementBase *chan)
Works only in receive mode, waits for a new sample and adapts the receive buffer to match it&#39;s size...
Definition: MQSendRecv.cpp:198
A DataSource which is used to manipulate a const reference to an external value, by means of a pointe...
virtual bool data_sample(typename base::ChannelElement< T >::param_t sample)
virtual std::string getRemoteURI() const
This function returns the URI of the next channel element in the logical chain.
ChannelElementBase::shared_ptr getInput()
Returns the current input channel element.
Objects implementing this interface have the capability to convert data sources to and from a binary ...
A typed version of ChannelElementBase.
virtual std::string getLocalURI() const
This function return the URI of this element.
boost::call_traits< T >::reference reference_t
bool write(typename base::ChannelElement< T >::param_t sample)
Write to the message queue.
Notify the Logger in which &#39;module&#39; the message occured.
Definition: Logger.hpp:159
void setPointer(const typename AssignableDataSource< T >::value_t *ptr)
bool signal()
Signal will cause a read-write cycle to transfer the data from the data/buffer element to the message...
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:51
AssignableDataSource< T >::const_reference_t rvalue() const
Get a const reference to the value of this DataSource.
Definition: DataSources.hpp:95
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
The base class of every data flow port.
MQChannelElement(base::PortInterface *port, types::TypeMarshaller const &transport, const ConnPolicy &policy, bool is_sender)
Create a channel element for remote data exchange.
virtual bool isRemoteElement() const
This function may be used to identify, if the current element uses a network transport, to send the data to the next Element in the logical chain.
A simple, yet very useful DataSource, which keeps a value, and returns it in its get() method...
Definition: DataSources.hpp:60
ChannelElementBase::shared_ptr getOutput()
Returns the next channel element in the channel&#39;s propagation direction.
virtual std::string getRemoteURI() const
This function returns the URI of the next channel element in the logical chain.
virtual bool data_sample(param_t sample)
Provides a data sample to initialize this connection.
boost::intrusive_ptr< ValueDataSource< T > > shared_ptr
Definition: DataSources.hpp:72