Orocos Real-Time Toolkit  2.9.0
ConnFactory.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:08 CEST 2009 ConnFactory.hpp
3 
4  ConnFactory.hpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #ifndef ORO_CONN_FACTORY_HPP
40 #define ORO_CONN_FACTORY_HPP
41 
42 #include <string>
43 #include "Channels.hpp"
44 #include "ConnInputEndPoint.hpp"
45 #include "ConnOutputEndPoint.hpp"
46 #include "SharedConnection.hpp"
47 #include "../base/PortInterface.hpp"
48 #include "../base/InputPortInterface.hpp"
49 #include "../base/OutputPortInterface.hpp"
50 #include "../DataFlowInterface.hpp"
51 
52 #include "../base/DataObject.hpp"
53 #include "../base/DataObjectUnSync.hpp"
54 #include "../base/Buffer.hpp"
55 #include "../base/BufferUnSync.hpp"
56 #include "../Logger.hpp"
57 
58 #include "../rtt-config.h"
59 
60 namespace RTT
61 { namespace internal {
62 
66  struct LocalConnID : public ConnID
67  {
70  : ptr(obj) {}
71  virtual ConnID* clone() const;
72  virtual bool isSameID(ConnID const& id) const;
73  };
74 
78  struct RTT_API StreamConnID : public ConnID
79  {
80  std::string name_id;
81  StreamConnID(const std::string& name)
82  : name_id(name) {}
83  virtual ConnID* clone() const;
84  virtual bool isSameID(ConnID const& id) const;
85  };
86 
87 
95  {
96  public:
97  virtual ~ConnFactory() {}
98 
103  virtual base::InputPortInterface* inputPort(std::string const& name) const = 0;
104 
109  virtual base::OutputPortInterface* outputPort(std::string const& name) const = 0;
110 
117  virtual base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const& policy) const = 0;
118 
126  virtual base::ChannelElementBase::shared_ptr buildChannelOutput(base::InputPortInterface& port, ConnPolicy const& policy) const = 0;
127 
135  virtual base::ChannelElementBase::shared_ptr buildChannelInput(base::OutputPortInterface& port, ConnPolicy const& policy) const = 0;
136 
147  virtual internal::SharedConnectionBase::shared_ptr buildSharedConnection(base::OutputPortInterface *output_port, base::InputPortInterface *input_port, ConnPolicy const& policy) const = 0;
148 
156  template<typename T>
157  static base::ChannelElement<T>* buildDataStorage(ConnPolicy const& policy, const T& initial_value = T())
158  {
159  if (policy.type == ConnPolicy::DATA)
160  {
161  typename base::DataObjectInterface<T>::shared_ptr data_object;
162  switch (policy.lock_policy)
163  {
164 #ifndef OROBLD_OS_NO_ASM
166  data_object.reset( new base::DataObjectLockFree<T>(initial_value, policy) );
167  break;
168 #else
170  RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog();
171 #endif
172  case ConnPolicy::LOCKED:
173  data_object.reset( new base::DataObjectLocked<T>(initial_value) );
174  break;
175  case ConnPolicy::UNSYNC:
176  data_object.reset( new base::DataObjectUnSync<T>(initial_value) );
177  break;
178  }
179  return new ChannelDataElement<T>(data_object, policy);
180  }
181  else if (policy.type == ConnPolicy::BUFFER || policy.type == ConnPolicy::CIRCULAR_BUFFER)
182  {
183  typename base::BufferInterface<T>::shared_ptr buffer_object;
184  switch (policy.lock_policy)
185  {
186 #ifndef OROBLD_OS_NO_ASM
188  buffer_object.reset(new base::BufferLockFree<T>(policy.size, initial_value, policy));
189  break;
190 #else
192  RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog();
193 #endif
194  case ConnPolicy::LOCKED:
195  buffer_object.reset(new base::BufferLocked<T>(policy.size, initial_value, policy));
196  break;
197  case ConnPolicy::UNSYNC:
198  buffer_object.reset(new base::BufferUnSync<T>(policy.size, initial_value, policy));
199  break;
200  }
201  return new ChannelBufferElement<T>(buffer_object, policy);
202  }
203  return NULL;
204  }
205 
218  template<typename T>
219  static base::ChannelElementBase::shared_ptr buildChannelInput(OutputPort<T>& port, ConnPolicy const& policy, bool force_unbuffered = false)
220  {
221  typename internal::ConnInputEndpoint<T>::shared_ptr endpoint = port.getEndpoint();
222  typename base::ChannelElement<T>::shared_ptr buffer = port.getSharedBuffer();
223 
224  // Note: PerInputPort implies PUSH and PerOutputPort implies PULL
225  bool pull = policy.pull;
226  if (policy.buffer_policy == PerInputPort) pull = ConnPolicy::PUSH;
227  if (policy.buffer_policy == PerOutputPort) pull = ConnPolicy::PULL;
228 
229  if (pull == ConnPolicy::PULL && !force_unbuffered) {
230  if (!buffer) {
231  buffer = buildDataStorage<T>(policy, port.getLastWrittenValue());
232  if (!buffer) return typename internal::ConnOutputEndpoint<T>::shared_ptr();
233 
234  if (policy.buffer_policy == PerOutputPort) {
235  // For PerOutputPort connections, the buffer is installed BEFORE the input endpoint!
236  if (endpoint->connected()) {
237  log(Error) << "You tried to create a shared output buffer connection for output port " << port.getName() << ", "
238  << "but the port already has at least one incompatible outgoing connection." << endlog();
240  }
241  return buffer->connectTo(endpoint) ? endpoint : typename internal::ConnInputEndpoint<T>::shared_ptr();
242  } else {
243  return endpoint->connectTo(buffer, policy.mandatory) ? buffer : typename internal::ConnInputEndpoint<T>::shared_ptr();
244  }
245 
246  } else if (policy.buffer_policy == PerOutputPort) {
247  // check that the existing buffer type is compatible to the new ConnPolicy
248  assert(buffer->getConnPolicy());
249  ConnPolicy buffer_policy = *(buffer->getConnPolicy());
250 
251  if (
252  (buffer_policy.type != policy.type) ||
253  (buffer_policy.size != policy.size) ||
254  (buffer_policy.lock_policy != policy.lock_policy)
255  )
256  {
257  log(Error) << "You mixed incompatible connection policies for the shared output buffer of port " << port.getName() << ": "
258  << "The new connection requests a " << policy << " connection, "
259  << "but the port already has a " << buffer_policy << " buffer." << endlog();
261  }
262 
263  return endpoint;
264  }
265  }
266 
267  if (buffer) {
268  // The new connection requires an unbuffered channel input or a per-connection buffer, but this port already has a shared output buffer!
269  assert(buffer->getConnPolicy());
270  ConnPolicy buffer_policy = *(buffer->getConnPolicy());
271 
272  log(Error) << "You mixed incompatible connection policies for output port " << port.getName() << ": "
273  << "The new connection requests a " << policy << " connection, "
274  << "but the port already has a " << buffer_policy << " buffer." << endlog();
276  }
277 
278  return endpoint;
279  }
280 
296  template<typename T>
297  static base::ChannelElementBase::shared_ptr buildChannelOutput(InputPort<T>& port, ConnPolicy const& policy, T const& initial_value = T() )
298  {
299  typename internal::ConnOutputEndpoint<T>::shared_ptr endpoint = port.getEndpoint();
300  typename base::ChannelElement<T>::shared_ptr buffer = port.getSharedBuffer();
301 
302  // Note: PerInputPort implies PUSH and PerOutputPort implies PULL
303  bool pull = policy.pull;
304  if (policy.buffer_policy == PerInputPort) pull = ConnPolicy::PUSH;
305  if (policy.buffer_policy == PerOutputPort) pull = ConnPolicy::PULL;
306 
307  if (pull == ConnPolicy::PUSH) {
308  if (!buffer) {
309  buffer = buildDataStorage<T>(policy, initial_value);
310  if (!buffer) return typename internal::ConnOutputEndpoint<T>::shared_ptr();
311 
312  if (policy.buffer_policy == PerInputPort) {
313  // For PerInputPort connections, the buffer is installed AFTER the output endpoint!
314  if (endpoint->connected()) {
315  log(Error) << "You tried to create a shared input buffer connection for input port " << port.getName() << ", "
316  << "but the port already has at least one incompatible incoming connection." << endlog();
318  }
319  return endpoint->connectTo(buffer) ? endpoint : typename internal::ConnOutputEndpoint<T>::shared_ptr();
320  } else {
321  return buffer->connectTo(endpoint) ? buffer : typename internal::ConnOutputEndpoint<T>::shared_ptr();
322  }
323 
324  } else if (policy.buffer_policy == PerInputPort) {
325  // check that the existing buffer type is compatible to the new ConnPolicy
326  assert(buffer->getConnPolicy());
327  ConnPolicy buffer_policy = *(buffer->getConnPolicy());
328 
329  if (
330  (buffer_policy.type != policy.type) ||
331  (buffer_policy.size != policy.size) ||
332  (buffer_policy.lock_policy != policy.lock_policy)
333  )
334  {
335  log(Error) << "You mixed incompatible connection policies for the shared input buffer of port " << port.getName() << ": "
336  << "The new connection requests a " << policy << " connection, "
337  << "but the port already has a " << buffer_policy << " buffer." << endlog();
339  }
340 
341  return endpoint;
342  }
343  }
344 
345  if (buffer) {
346  // The new connection requires an unbuffered channel output or a per-connection buffer, but this port already has a shared input buffer!
347  assert(buffer->getConnPolicy());
348  ConnPolicy buffer_policy = *(buffer->getConnPolicy());
349 
350  log(Error) << "You mixed incompatible connection policies for input port " << port.getName() << ": "
351  << "The new connection requests a " << policy << " connection, "
352  << "but the port already has a " << buffer_policy << " buffer." << endlog();
354  }
355 
356  return endpoint;
357  }
358 
371  static bool findSharedConnection(base::OutputPortInterface *output_port, base::InputPortInterface *input_port, ConnPolicy const& policy, SharedConnectionBase::shared_ptr &shared_connection);
372 
383  template <typename T>
385  {
386  // try to find an existing shared connection first
387  SharedConnectionBase::shared_ptr shared_connection;
388 
389  // abort if an incompatible connection has been found or one of the ports is already connected to another shared connection
390  if (findSharedConnection(output_port, input_port, policy, shared_connection) && !shared_connection) {
392  }
393 
394  // for remote input ports, and if we can derive the type from the output port, build the shared buffer at the remote side and only generate a proxy here:
395  if (input_port && !input_port->isLocal()) {
396  if (!output_port) {
397  log(Error) << "Cannot create a shared connection for a remote input port or a non-standard transport without knowing the local output port." << endlog();
399  }
400 
401  // If no existing shared connection has been found, ask the remote side to build a shared channel output and create a local proxy.
402  if (!shared_connection) {
403  base::ChannelElementBase::shared_ptr output_half = buildRemoteChannelOutput( *output_port, *input_port, policy);
404  if (!output_half) {
405  log(Error) << "Could not create a shared remote connection for input port '" << input_port->getName() << "'." << endlog();
407  }
408 
409  shared_connection = new SharedRemoteConnection<T>(policy);
410  shared_connection->connectTo(output_half, policy.mandatory);
411 
412  // ...or only ask the remote side to connect the additional input port to an existing remote shared connection
413  } else {
414 // typename SharedRemoteConnection<T>::shared_ptr shared_remote_connection = boost::dynamic_pointer_cast<SharedRemoteConnection<T> >(shared_connection);
415 
416 // if (!shared_remote_connection) {
417 // log(Error) << "Cannot create a shared connection for a remote input port because the local output port is already connected to a local shared connection." << endlog();
418 // return SharedConnectionBase::shared_ptr();
419 // }
420 
421 // if (!input_port->createConnection(shared_remote_connection, policy)) {
422  if (!input_port->createConnection(shared_connection, policy)) {
423  log(Error) << "The remote side refused to connect the input port '" << input_port->getName() << "' to the existing shared connection '" << shared_connection->getName() << "'." << endlog();
425  }
426  }
427  }
428 
429  // create a new shared connection instance
430  if (!shared_connection) {
431  RTT::OutputPort<T> *output_p = dynamic_cast<RTT::OutputPort<T> *>(output_port);
432  typename base::ChannelElement<T>::shared_ptr buffer = buildDataStorage<T>(policy, (output_p ? output_p->getLastWrittenValue() : T()));
433  if (!buffer) return SharedConnectionBase::shared_ptr();
434  shared_connection.reset(new SharedConnection<T>(buffer.get(), policy));
435  }
436 
437  return shared_connection;
438  }
439 
449  template<typename T>
450  static bool createConnection(OutputPort<T>& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy)
451  {
452  if ( !output_port.isLocal() ) {
453  log(Error) << "Need a local OutputPort to create connections." <<endlog();
454  return false;
455  }
456 
457  if (output_port.connectedTo(&input_port)) {
458  log(Info) << "OutputPort " << output_port.getName() << " is already connected to " << input_port.getName() << ", ignoring new connection." << endlog();
459  return true;
460  }
461 
462  InputPort<T>* input_p = dynamic_cast<InputPort<T>*>(&input_port);
463 
464  // Shared push connection? => forward to createAndCheckSharedConnection()
465  if (policy.buffer_policy == Shared) {
466  return createAndCheckSharedConnection(&output_port, &input_port, buildSharedConnection<T>(&output_port, &input_port, policy), policy);
467  }
468 
469  // This is the input and output channel element of the output half
471  if (input_port.isLocal() && policy.transport == 0)
472  {
473  // Local connection
474  if (!input_p)
475  {
476  log(Error) << "Port " << input_port.getName() << " is not compatible with " << output_port.getName() << endlog();
477  return false;
478  }
479 
480  // local ports, create buffer here.
481  output_half = buildChannelOutput<T>(*input_p, policy, output_port.getLastWrittenValue());
482  }
483  else
484  {
485  // if the input is not local, this is a pure remote connection,
486  // if the input *is* local, the user requested to use a different transport
487  // than plain memory, rare case, but we accept it. The unit tests use this for example
488  // to test the OOB transports.
489  if ( !input_port.isLocal() ) {
490  output_half = buildRemoteChannelOutput( output_port, input_port, policy);
491  } else if (input_p) {
492  return createOutOfBandConnection<T>( output_port, *input_p, policy);
493  } else {
494  log(Error) << "Port " << input_port.getName() << " is not compatible with " << output_port.getName() << endlog();
495  return false;
496  }
497  }
498 
499  if (!output_half)
500  return false;
501 
502  // Since output is local, buildChannelInput is local as well.
503  // This this the input channel element of the whole connection
505  channel_input = buildChannelInput<T>(output_port, policy);
506 
507  if (!channel_input) {
508  output_half->disconnect(true);
509  return false;
510  }
511 
512  // NOTE: channel_input and output_half are not yet connected!
513  return createAndCheckConnection(output_port, input_port, channel_input, output_half, policy);
514  }
515 
523  template<class T>
524  static bool createStream(OutputPort<T>& output_port, ConnPolicy const& policy)
525  {
526  StreamConnID *sid = new StreamConnID(policy.name_id);
527  // Stream channel inputs are always unbuffered (push). It's the transport that has to add a buffer element if required.
528  RTT::base::ChannelElementBase::shared_ptr chan = buildChannelInput( output_port, policy, /* force_unbuffered = */ true );
529  if (!chan) return false;
530  return bool(createAndCheckStream(output_port, policy, chan, sid));
531  }
532 
540  template<class T>
541  static bool createStream(InputPort<T>& input_port, ConnPolicy const& policy)
542  {
543  StreamConnID *sid = new StreamConnID(policy.name_id);
544  RTT::base::ChannelElementBase::shared_ptr outhalf = buildChannelOutput( input_port, policy );
545  if (!outhalf) return false;
546  return bool(createAndCheckStream(input_port, policy, outhalf, sid));
547  }
548 
549  static bool createAndCheckSharedConnection(base::OutputPortInterface* output_port, base::InputPortInterface* input_port, SharedConnectionBase::shared_ptr shared_connection, ConnPolicy const& policy);
550 
551  protected:
552  static bool createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, base::ChannelElementBase::shared_ptr channel_output, ConnPolicy const& policy);
553 
554  static base::ChannelElementBase::shared_ptr createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr channel_input, StreamConnID* conn_id);
555 
556  static base::ChannelElementBase::shared_ptr createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr channel_output, StreamConnID* conn_id);
557 
558  static base::ChannelElementBase::shared_ptr buildRemoteChannelOutput(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy);
559 
567  template<class T>
568  static bool createOutOfBandConnection(OutputPort<T>& output_port, InputPort<T>& input_port, ConnPolicy const& policy) {
569  // constructs an out-of-band channel:
570  // output_port -> channel_input -> stream_input -> (out-of-band transport of data) -> stream_output -> channel_output -> input_port
571  // |-- (direct connection for coordination) --^
572  //
573 
574  // Stream channel inputs are always unbuffered (push). It's the transport that has to add a buffer element if required.
575  RTT::base::ChannelElementBase::shared_ptr channel_input = buildChannelInput( output_port, policy, /* force_unbuffered = */ true );
576  if (!channel_input) return false;
577 
578  RTT::base::ChannelElementBase::shared_ptr stream_input = createAndCheckStream(output_port, policy, channel_input, new StreamConnID(policy.name_id));
579  if (!stream_input) return false;
580 
581  RTT::base::ChannelElementBase::shared_ptr channel_output = ConnFactory::buildChannelOutput<T>(input_port, policy, output_port.getLastWrittenValue());
582  if (!channel_output) return false;
583 
584  RTT::base::ChannelElementBase::shared_ptr stream_output = createAndCheckStream(input_port, policy, channel_output, new StreamConnID(policy.name_id));
585  if (!stream_output) return false;
586 
587  return stream_input->getOutputEndPoint()->connectTo(stream_output->getInputEndPoint(), policy.mandatory);
588  }
589 
590  };
591 
592  typedef boost::shared_ptr<ConnFactory> ConnFactoryPtr;
593 
594 }}
595 
596 #endif
597 
The base class of the InputPort.
boost::intrusive_ptr< ChannelElement< T > > shared_ptr
virtual bool createConnection(internal::SharedConnectionBase::shared_ptr shared_connection, ConnPolicy const &policy=ConnPolicy())
Connects the port to an existing shared connection instance.
This class provides the basic tools to create channels that represent connections between two ports...
Definition: ConnFactory.hpp:94
boost::intrusive_ptr< SharedConnectionBase > shared_ptr
base::PortInterface const * ptr
Definition: ConnFactory.hpp:68
A Lock-free buffer implementation to read and write data of type T in a FIFO way. ...
virtual bool isLocal() const
Returns true if this port is located on this process, and false otherwise.
virtual ConnID * clone() const
Definition: ConnFactory.cpp:57
static base::ChannelElementBase::shared_ptr buildChannelOutput(InputPort< T > &port, ConnPolicy const &policy, T const &initial_value=T())
During the process of building a connection between two ports, this method builds the output part of ...
boost::shared_ptr< DataObjectInterface< T > > shared_ptr
Used for shared_ptr management.
static const int CIRCULAR_BUFFER
Definition: ConnPolicy.hpp:113
boost::intrusive_ptr< ConnInputEndpoint< T > > shared_ptr
Represents a Stream connection created by the ConnFactory.
Definition: ConnFactory.hpp:78
virtual const ConnPolicy * getConnPolicy() const
Get a pointer to the connection policy used to build this channel element, if available.
static const bool PULL
Definition: ConnPolicy.hpp:120
virtual base::ChannelElement< T >::shared_ptr getSharedBuffer() const
Definition: InputPort.hpp:233
int lock_policy
This is the locking policy on the connection.
Definition: ConnPolicy.hpp:196
A connection element that can store a fixed number of data samples.
const std::string & getName() const
Get the name of this Port.
virtual bool isSameID(ConnID const &id) const
Definition: ConnFactory.cpp:49
A component&#39;s data input port.
Definition: InputPort.hpp:63
static const int LOCKED
Definition: ConnPolicy.hpp:116
#define RTT_API
Definition: rtt-config.h:97
int type
DATA, BUFFER or CIRCULAR_BUFFER.
Definition: ConnPolicy.hpp:190
virtual internal::ConnInputEndpoint< T > * getEndpoint() const
Returns the input or output endpoint of this port (if any).
Definition: OutputPort.hpp:333
static const bool PUSH
Definition: ConnPolicy.hpp:119
A connection policy object describes how a given connection should behave.
Definition: ConnPolicy.hpp:107
A class which provides unprotected (not thread-safe) access to one typed element of data...
static const int DATA
Definition: ConnPolicy.hpp:111
The base class of each OutputPort.
virtual internal::ConnOutputEndpoint< T > * getEndpoint() const
Returns the input or output endpoint of this port (if any).
Definition: InputPort.hpp:227
boost::shared_ptr< ConnFactory > ConnFactoryPtr
int size
If the connection is a buffered connection, the size of the buffer.
Definition: ConnPolicy.hpp:193
Implements a not threadsafe buffer.
Represents a local connection created by the ConnFactory.
Definition: ConnFactory.hpp:66
bool pull
If true, then the sink will have to pull data.
Definition: ConnPolicy.hpp:209
StreamConnID(const std::string &name)
Definition: ConnFactory.hpp:81
static bool createStream(InputPort< T > &input_port, ConnPolicy const &policy)
Creates, attaches and checks an inbound stream to an Input port.
bool mandatory
Whether the connection described by this connection policy is mandatory, which means that write opera...
Definition: ConnPolicy.hpp:232
static base::ChannelElement< T > * buildDataStorage(ConnPolicy const &policy, const T &initial_value=T())
This method creates the connection element that will store data inside the connection, based on the given policy.
virtual bool connectTo(ChannelElementBase::shared_ptr const &output, bool mandatory=true)
Connects a new output to this element.
A typed version of ChannelElementBase.
virtual bool connectedTo(PortInterface *port)
Returns true if this port is connected to the given port.
This DataObject is a Lock-Free implementation, such that reads and writes can happen concurrently wit...
static bool createStream(OutputPort< T > &output_port, ConnPolicy const &policy)
Creates, attaches and checks an outbound stream to an Output port.
static bool createOutOfBandConnection(OutputPort< T > &output_port, InputPort< T > &input_port, ConnPolicy const &policy)
This code is for setting up an in-process out-of-band connection.
static const int LOCK_FREE
Definition: ConnPolicy.hpp:117
virtual bool connected()
Returns true, if this channel element has at least one input, independent of whether is has an output...
boost::intrusive_ptr< ChannelElementBase > shared_ptr
Implements a very simple blocking thread-safe buffer, using mutexes (locks).
A class which provides locked/protected access to one typed element of data.
virtual base::ChannelElement< T >::shared_ptr getSharedBuffer() const
Definition: OutputPort.hpp:339
A connection element that stores a single data sample.
int transport
The prefered transport used.
Definition: ConnPolicy.hpp:238
LocalConnID(base::PortInterface const *obj)
Definition: ConnFactory.hpp:69
A component&#39;s data output port.
Definition: OutputPort.hpp:70
static SharedConnectionBase::shared_ptr buildSharedConnection(OutputPort< T > *output_port, base::InputPortInterface *input_port, ConnPolicy const &policy)
Tries to find an existing or creates a new shared connection object for the given output port...
static const int UNSYNC
Definition: ConnPolicy.hpp:115
int buffer_policy
The policy on how buffer elements will be installed for this connection, which influences the behavio...
Definition: ConnPolicy.hpp:216
This class is used in places where a permanent representation of a reference to a connection is neede...
Definition: ConnID.hpp:58
boost::intrusive_ptr< ConnOutputEndpoint< T > > shared_ptr
static const int BUFFER
Definition: ConnPolicy.hpp:112
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:52
static base::ChannelElementBase::shared_ptr buildChannelInput(OutputPort< T > &port, ConnPolicy const &policy, bool force_unbuffered=false)
During the process of building a connection between two ports, this method builds the input half (sta...
The base class of every data flow port.
std::string name_id
The name of this connection.
Definition: ConnPolicy.hpp:256
T getLastWrittenValue() const
Returns the last written value written to this port, in case it is kept by this port, otherwise, returns a default T().
Definition: OutputPort.hpp:173
static bool createConnection(OutputPort< T > &output_port, base::InputPortInterface &input_port, ConnPolicy const &policy)
Creates a connection from a local output_port to a local or remote input_port.
virtual bool connected()
Returns true, if this channel element has at least one output, independent of whether is has an input...
boost::shared_ptr< BufferInterface< T > > shared_ptr