42 #include <sys/types.h>    48 #include <boost/algorithm/string.hpp>    51 #include "../../types/TypeTransporter.hpp"    52 #include "../../types/TypeMarshaller.hpp"    53 #include "../../Logger.hpp"    55 #include "../../base/PortInterface.hpp"    56 #include "../../DataFlowInterface.hpp"    57 #include "../../TaskContext.hpp"    65     mtransport(transport), marshaller_cookie(0), buf(0), mis_sender(false), minit_done(false), max_size(0), mdata_size(0)
    82             throw std::runtime_error(
"MQ name_id not set, and the port is either not attached to a task, or said task has no name. Cannot create a reasonably unique MQ name automatically");
    84         std::stringstream name_stream;
    86         std::string name = name_stream.str();
    87         boost::algorithm::replace_all(name, 
"/", 
"_");
    92     mattr.mq_maxmsg = policy.
size ? policy.
size : 10;
    96         throw std::runtime_error(
"Could not open message queue with wrong name. Names must start with '/' and contain no more '/' after the first one.");
    98         throw std::runtime_error(
"Could not open message queue with zero message size.");
   101         oflag |= O_WRONLY | O_NONBLOCK;
   104     mqdes = mq_open(policy.
name_id.c_str(), oflag, S_IREAD | S_IWRITE, &mattr);
   108         int the_error = errno;
   109         log(
Error) << 
"FAILED opening '" << policy.
name_id << 
"' with message size " << mattr.mq_msgsize << 
", buffer size " << mattr.mq_maxmsg << 
" for "   110                 << (is_sender ? 
"writing :" : 
"reading :") << endlog();
   115             log(
Error) << 
"The queue exists, but the caller does not have permission to open it in the specified mode." << endlog();
   119             log(
Error) << 
"Wrong mqueue name given OR, In a process  that  is  unprivileged  (does  not  have  the "   120                     << 
"CAP_SYS_RESOURCE  capability),  attr->mq_maxmsg  must  be  less than or equal to the msg_max limit, and attr->mq_msgsize must be less than or equal to the msgsize_max limit.  In addition, even in a privileged process, "   121                     << 
"attr->mq_maxmsg cannot exceed the HARD_MAX limit.  (See mq_overview(7) for details of these limits.)" << endlog();
   124             log(
Error) << 
"The process already has the maximum number of files and message queues open." << endlog();
   127             log(
Error) << 
"Name was too long." << endlog();
   130             log(
Error) << 
"The system limit on the total number of open files and message queues has been reached." << endlog();
   134                     << 
"Insufficient space for the creation of a new message queue.  This probably occurred because the queues_max limit was encountered; see mq_overview(7)."   138             log(
Error) << 
"Insufficient memory." << endlog();
   141             log(
Error) << 
"Submit a bug report. An unexpected mq error occured with errno=" << errno << 
": " << strerror(errno) << endlog();
   143         throw std::runtime_error(
"Could not open message queue: mq_open returned -1.");
   146     log(
Debug) << 
"Opened '" << policy.
name_id << 
"' with mqdes='" << 
mqdes << 
"', msg size='"<<mattr.mq_msgsize<<
"' an queue length='"<<mattr.mq_maxmsg<<
"' for " << (is_sender ? 
"writing." : 
"reading.") << endlog();
   172         mq_unlink(
mqname.c_str());
   209         struct timespec abs_timeout;
   212         abs_timeout.tv_sec += abs_timeout.tv_nsec / (1000*1000*1000);
   213         abs_timeout.tv_nsec = abs_timeout.tv_nsec % (1000*1000*1000);
   227                 log(
Error) << 
"Failed to initialize MQ Channel Element with initial data sample." << endlog();
   233             log(
Error) << 
"Failed to receive initial data sample for MQ Channel Element: " << strerror(errno) << endlog();
   249     struct timespec abs_timeout;
   252     abs_timeout.tv_sec += abs_timeout.tv_nsec / (1000*1000*1000);
   253     abs_timeout.tv_nsec = abs_timeout.tv_nsec % (1000*1000*1000);
   261         log(
Error) << 
"Failed to read from MQ Channel Element: no data received within 500ms!" <<endlog();
   276         log(
Error) << 
"MQChannel: failed to marshal sample" << endlog();
   280     char* lbuf = (
char*) blob.first;
   281     if (mq_send(
mqdes, lbuf, blob.second, 0) == -1)
   286         log(
Error) << 
"MQChannel "<< 
mqdes << 
" became invalid (mq length="<<
max_size<<
", msg length="<<blob.second<<
"): " << strerror(errno) << endlog();
 virtual bool updateFromBlob(const void *blob, int size, base::DataSourceBase::shared_ptr target, void *cookie=0) const =0
Update target with the contents of blob which is an object of a protocol. 
types::TypeMarshaller const & mtransport
Transport marshaller used for size calculations and data updates. 
virtual void * createCookie() const 
Overload in subclasses for marshallers that need to allocate some internal data. 
virtual void mqNewSample(base::DataSourceBase::shared_ptr ds)
Adapts the mq send/receive buffer size according to the data in mqdata_source, or the value set in md...
bool mqRead(base::DataSourceBase::shared_ptr ds)
Read from the message queue. 
void * marshaller_cookie
A private blob that is returned by mtransport.getCookie(). 
bool mis_sender
True if this object is a sender. 
int data_size
Suggest the payload size of the data sent over this channel. 
void setupStream(base::DataSourceBase::shared_ptr ds, base::PortInterface *port, ConnPolicy const &policy, bool is_sender)
std::string mqname
The name of the queue, as specified in the ConnPolicy when creating the stream, or self-calculated wh...
int max_size
The size of buf. 
const std::string & getName() const 
Get the name of this Port. 
A connection policy object describes how a given connection should behave. 
bool mqWrite(base::DataSourceBase::shared_ptr ds)
Write to the message queue. 
virtual bool mqReady(base::DataSourceBase::shared_ptr ds, base::ChannelElementBase *chan)
Works only in receive mode, waits for a new sample and adapts the receive buffer to match it's size...
int size
If the connection is a buffered connection, the size of the buffer. 
DataFlowInterface * getInterface() const 
Returns the DataFlowInterface this port belongs to or null if it was not added to such an interface...
Convenient short notation for every sub-namespace of RTT. 
Objects implementing this interface have the capability to convert data sources to and from a binary ...
int mdata_size
The size of the data, as specified in the ConnPolicy when creating the stream, or calculated using th...
char * buf
Send/Receive buffer. 
virtual void deleteCookie(void *cookie) const 
Called to delete a cookie created with createCookie. 
virtual std::pair< void const *, int > fillBlob(base::DataSourceBase::shared_ptr source, void *blob, int size, void *cookie=0) const =0
Create an transportable object for a protocol which contains the value of source. ...
bool minit_done
True if setupStream() was called, false after cleanupStream(). 
MQSendRecv(types::TypeMarshaller const &transport)
Create a channel element for remote data exchange. 
Notify the Logger in which 'module' the message occured. 
virtual unsigned int getSampleSize(base::DataSourceBase::shared_ptr sample, void *cookie=0) const =0
Returns the size in bytes of a marshalled data element. 
nsecs Seconds_to_nsecs(const Seconds s)
boost::intrusive_ptr< DataSourceBase > shared_ptr
Use this type to store a pointer to a DataSourceBase. 
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute. 
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
The base class of every data flow port. 
mqd_t mqdes
MQueue file descriptor. 
std::string name_id
The name of this connection. 
virtual const std::string & getName() const 
Returns the name of this TaskContext. 
TaskContext * getOwner() const 
Returns the component this interface belongs to. 
static Dispatcher::shared_ptr Instance()