Orocos Real-Time Toolkit  2.8.3
AtomicQueue.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: The SourceWorks Tue Sep 7 00:55:18 CEST 2010 AtomicQueue.hpp
3 
4  AtomicQueue.hpp - description
5  -------------------
6  begin : Tue September 07 2010
7  copyright : (C) 2010 The SourceWorks
8  email : peter@thesourceworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #ifndef ORO_CORELIB_ATOMIC_QUEUE_HPP
40 #define ORO_CORELIB_ATOMIC_QUEUE_HPP
41 
42 #include "../os/CAS.hpp"
43 #include <utility>
44 
45 namespace RTT
46 {
47  namespace internal {
69  template<class T>
71  {
72  const int _size;
73  typedef T C;
74  typedef volatile C* CachePtrType;
75  typedef C* volatile CacheObjType;
76  typedef C ValueType;
77  typedef C* PtrType;
78 
79  union SIndexes
80  {
81  unsigned long _value;
82  unsigned short _index[2];
83  };
84 
89  CachePtrType _buf;
90 
95  volatile SIndexes _indxes;
96 
104  CachePtrType recover_r() const
105  {
106  // The implementation starts from the read pointer,
107  // and wraps around until all fields were scanned.
108  // As such, the out-of-order elements will at least
109  // be returned in their relative order.
110  SIndexes start;
111  start._value = _indxes._value;
112  unsigned short r = start._index[1];
113  while( r != _size) {
114  if (_buf[r])
115  return &_buf[r];
116  ++r;
117  }
118  r = 0;
119  while( r != start._index[1]) {
120  if (_buf[r])
121  return &_buf[r];
122  ++r;
123  }
124  return 0;
125  }
126 
131  CachePtrType propose_w()
132  {
133  SIndexes oldval, newval;
134  do {
135  oldval._value = _indxes._value; /*Points to a free writable pointer.*/
136  newval._value = oldval._value; /*Points to the next writable pointer.*/
137  // check for full on a *Copy* of oldval:
138  if ( (newval._index[0] == newval._index[1] - 1) || (newval._index[0] == newval._index[1] + _size - 1) )
139  {
140  // note: in case of high contention, there might be existing empty fields
141  // in _buf that aren't used.
142  return 0;
143  }
144  ++newval._index[0];
145  if ( newval._index[0] == _size )
146  newval._index[0] = 0;
147  // if ptr is unchanged, replace it with newval.
148  } while ( !os::CAS( &_indxes._value, oldval._value, newval._value) );
149 
150  // the returned field may contain data, in that case, the caller needs to retry.
151  return &_buf[ oldval._index[0] ];
152  }
157  CachePtrType propose_r()
158  {
159  SIndexes oldval, newval;
160  do {
161  oldval._value = _indxes._value;
162  newval._value = oldval._value;
163  // check for empty on a *Copy* of oldval:
164  if ( newval._index[0] == newval._index[1] )
165  {
166  // seldom: R and W are indicating empty, but 'lost' fields
167  // are to be picked up. Return these
168  // that would have been read eventually after some writes.
169  return recover_r();
170  }
171  ++newval._index[1];
172  if ( newval._index[1] == _size )
173  newval._index[1] = 0;
174 
175  } while ( !os::CAS( &_indxes._value, oldval._value, newval._value) );
176  // the returned field may contain *no* data, in that case, the caller needs to retry.
177  // as such r will advance until it hits a data sample or write pointer.
178  return &_buf[oldval._index[1] ];
179  }
180 
181  // non-copyable !
182  AtomicQueue( const AtomicQueue<T>& );
183  public:
184  typedef unsigned int size_type;
185 
190  AtomicQueue( unsigned int size )
191  : _size(size+1)
192  {
193  _buf= new C[_size];
194  this->clear();
195  }
196 
198  {
199  delete[] _buf;
200  }
201 
206  bool isFull() const
207  {
208  // two cases where the queue is full :
209  // if wptr is one behind rptr or if wptr is at end
210  // and rptr at beginning.
211  SIndexes val;
212  val._value = _indxes._value;
213  return val._index[0] == val._index[1] - 1 || val._index[0] == val._index[1] + _size - 1;
214  }
215 
220  bool isEmpty() const
221  {
222  // empty if nothing to read.
223  SIndexes val;
224  val._value = _indxes._value;
225  return val._index[0] == val._index[1] && recover_r() == 0;
226  }
227 
231  size_type capacity() const
232  {
233  return _size -1;
234  }
235 
241  size_type size() const
242  {
243  int c = 0, ret = 0;
244  while (c != _size ) {
245  if (_buf[c++] )
246  ++ret;
247  }
248  return ret;
249  //int c = (_indxes._index[0] - _indxes._index[1]);
250  //return c >= 0 ? c : c + _size;
251  }
252 
258  bool enqueue(const T& value)
259  {
260  if ( value == 0 )
261  return false;
262  CachePtrType loc;
263  C null = 0;
264  do {
265  loc = propose_w();
266  if ( loc == 0 )
267  return false; //full
268  // if loc contains a zero, write it, otherwise, re-try.
269  } while( !os::CAS(loc, null, value));
270  return true;
271  }
272 
278  bool dequeue( T& result )
279  {
280  CachePtrType loc;
281  C null = 0;
282  do {
283  loc = propose_r();
284  if ( loc == 0 )
285  return false; // empty
286  result = *loc;
287  // if loc still contains result, clear it, otherwise, re-try.
288  } while( result == 0 || !os::CAS(loc, result, null) );
289  assert(result);
290  return true;
291  }
292 
296  const T front() const
297  {
298  return _buf[_indxes._index[1] ];
299  }
300 
304  void clear()
305  {
306  for(int i = 0 ; i != _size; ++i) {
307  _buf[i] = 0;
308  }
309  _indxes._value = 0;
310  }
311 
312  };
313 
314 }}
315 
316 #endif
size_type size() const
Return the exact number of elements in the queue.
void clear()
Clear all contents of the Queue and thus make it empty.
Create an atomic, non-blocking single ended queue (FIFO) for storing a pointer to T...
Definition: AtomicQueue.hpp:70
bool CAS(volatile T *addr, const V &expected, const W &value)
Compare And Swap.
Definition: CAS.hpp:54
bool enqueue(const T &value)
Enqueue an item.
bool isEmpty() const
Inspect if the Queue is empty.
bool dequeue(T &result)
Dequeue an item.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:51
const T front() const
Return the next to be read value.
size_type capacity() const
Return the maximum number of items this queue can contain.
AtomicQueue(unsigned int size)
Create an AtomicQueue with queue size size.
bool isFull() const
Inspect if the Queue is full.