libsdr  0.1.0
A simple SDR library
queue.hh
1 #ifndef __SDR_QUEUE_HH__
2 #define __SDR_QUEUE_HH__
3 
4 #include <list>
5 #include <map>
6 #include "buffer.hh"
7 #include <pthread.h>
8 #include <iostream>
9 
10 
11 namespace sdr {
12 
13 // Forward decl.
14 class SinkBase;
15 
18 public:
20  virtual void operator() () = 0;
22  virtual void *instance() = 0;
23 };
24 
26 template <class T>
28 {
29 public:
31  Delegate(T *instance, void (T::*func)(void)) : _instance(instance), _function(func) { }
33  virtual ~Delegate() {}
35  virtual void operator() () { (_instance->*_function)(); }
37  virtual void *instance() { return _instance; }
38 
39 protected:
43  void (T::*_function)(void);
44 };
45 
46 
53 class Queue
54 {
55 public:
57  class Message {
58  public:
60  Message(const RawBuffer &buffer, SinkBase *sink, bool allow_overwrite)
61  : _buffer(buffer), _sink(sink), _allow_overwrite(allow_overwrite) { }
63  Message(const Message &other)
64  : _buffer(other._buffer), _sink(other._sink), _allow_overwrite(other._allow_overwrite) { }
66  const Message &operator= (const Message &other) {
67  _buffer = other._buffer;
68  _sink = other._sink;
70  return *this;
71  }
73  inline const RawBuffer &buffer() const { return _buffer; }
75  inline RawBuffer &buffer() { return _buffer; }
77  inline SinkBase *sink() const { return _sink; }
79  inline bool allowOverwrite() const { return _allow_overwrite; }
80 
81  protected:
88  };
89 
90 protected:
92  Queue();
93 
94 public:
96  virtual ~Queue();
97 
102  static Queue &get();
103 
106  void send(const RawBuffer &buffer, SinkBase *sink, bool allow_overwrite=false);
107 
111  void start();
112 
114  void stop();
116  void wait();
117 
119  bool isStopped() const;
121  bool isRunning() const;
122 
126  template <class T>
127  void addIdle(T *instance, void (T::*function)(void)) {
128  _idle.push_back(new Delegate<T>(instance, function));
129  }
130 
132  template <class T>
133  void remIdle(T *instance) {
134  std::list<DelegateInterface *>::iterator item = _idle.begin();
135  while (item != _idle.end()) {
136  if ( (*item)->instance() == ((void *)instance)) {
137  item = _idle.erase(item);
138  } else {
139  item++;
140  }
141  }
142  }
143 
145  template <class T>
146  void addStart(T *instance, void (T::*function)(void)) {
147  _onStart.push_back(new Delegate<T>(instance, function));
148  }
149 
151  template <class T>
152  void remStart(T *instance) {
153  std::list<DelegateInterface *>::iterator item = _onStart.begin();
154  while (item != _onStart.end()) {
155  if ( (*item)->instance() == ((void *)instance)) {
156  item = _onStart.erase(item);
157  } else {
158  item++;
159  }
160  }
161  }
162 
164  template <class T>
165  void addStop(T *instance, void (T::*function)(void)) {
166  _onStop.push_back(new Delegate<T>(instance, function));
167  }
168 
170  template <class T>
171  void remStop(T *instance) {
172  std::list<DelegateInterface *>::iterator item = _onStop.begin();
173  while (item != _onStop.end()) {
174  if ( (*item)->instance() == ((void *)instance)) {
175  item = _onStop.erase(item);
176  } else {
177  item++;
178  }
179  }
180  }
181 
182 protected:
184  void _main();
186  void _signalIdle();
188  void _signalStart();
190  void _signalStop();
191 
192 protected:
194  bool _running;
196  pthread_t _thread;
198  pthread_mutex_t _queue_lock;
200  pthread_cond_t _queue_cond;
201 
203  std::list<Message> _queue;
205  std::list<DelegateInterface *> _idle;
207  std::list<DelegateInterface *> _onStart;
209  std::list<DelegateInterface *> _onStop;
210 
211 private:
213  static Queue *_instance;
215  static void *__thread_start(void *ptr);
216 };
217 
218 
219 }
220 #endif // __SDR_QUEUE_HH__
std::list< DelegateInterface * > _onStop
Stop event callbacks.
Definition: queue.hh:209
SinkBase * _sink
The destination.
Definition: queue.hh:85
std::list< Message > _queue
The message queue.
Definition: queue.hh:203
Base class of all buffers, represents an untyped array of bytes.
Definition: buffer.hh:32
void send(const RawBuffer &buffer, SinkBase *sink, bool allow_overwrite=false)
Adds a buffer and its receiver to the queue.
Definition: queue.cc:36
The internal used message type.
Definition: queue.hh:57
void wait()
Wait for the queue to exit the queue loop.
Definition: queue.cc:70
virtual void * instance()=0
Returns the instance of the delegate.
pthread_mutex_t _queue_lock
The queue mutex.
Definition: queue.hh:198
Specific delegate to a method of an object .
Definition: queue.hh:27
bool isStopped() const
Returns true if the queue loop is stopped.
Definition: queue.cc:46
bool isRunning() const
Returns true if the queue loop is running.
Definition: queue.cc:51
Delegate(T *instance, void(T::*func)(void))
Constructs a delegate to the method func of the instance instance.
Definition: queue.hh:31
void _signalStop()
Emits the stop signal.
Definition: queue.cc:144
std::list< DelegateInterface * > _idle
Idle event callbacks.
Definition: queue.hh:205
Definition: autocast.hh:8
Central message queue (singleton).
Definition: queue.hh:53
T * _instance
The instance.
Definition: queue.hh:41
void stop()
Signals the queue to stop processing.
Definition: queue.cc:64
virtual void * instance()
Returns the instance of the delegate.
Definition: queue.hh:37
Message(const Message &other)
Copy constructor.
Definition: queue.hh:63
void addIdle(T *instance, void(T::*function)(void))
Adds a callback to the idle event.
Definition: queue.hh:127
const RawBuffer & buffer() const
Returns the buffer of the message.
Definition: queue.hh:73
bool allowOverwrite() const
If true, the sender allows to overwrite the content of the buffer.
Definition: queue.hh:79
pthread_t _thread
If _parallel is true, the thread of the queue loop.
Definition: queue.hh:196
void addStop(T *instance, void(T::*function)(void))
Adds a callback to the stop event.
Definition: queue.hh:165
const Message & operator=(const Message &other)
Assignment operator.
Definition: queue.hh:66
void(T::* _function)(void)
The method.
Definition: queue.hh:43
virtual ~Queue()
Destructor.
Definition: queue.cc:30
Interface of a delegate.
Definition: queue.hh:17
SinkBase * sink() const
Returns the destination of the message.
Definition: queue.hh:77
void start()
Enters the queue loop, if parallel=true was passed to get, exec will execute the queue loop in a sepa...
Definition: queue.cc:57
void remStop(T *instance)
Removes all callbacks of the given instance from the stop signal.
Definition: queue.hh:171
virtual ~Delegate()
Destructor.
Definition: queue.hh:33
RawBuffer & buffer()
Returns the buffer of the message.
Definition: queue.hh:75
Basic interface of all Sinks.
Definition: node.hh:174
void remIdle(T *instance)
Removes all callbacks of the given instance from the idle signal.
Definition: queue.hh:133
std::list< DelegateInterface * > _onStart
Start event callbacks.
Definition: queue.hh:207
virtual void operator()()=0
Call back interface.
void _main()
The actual queue loop.
Definition: queue.cc:84
Message(const RawBuffer &buffer, SinkBase *sink, bool allow_overwrite)
Constructor.
Definition: queue.hh:60
bool _running
While this is true, the queue loop is executed.
Definition: queue.hh:194
bool _allow_overwrite
If true, the sender allows to overwrite the buffer.
Definition: queue.hh:87
virtual void operator()()
Callback, simply calls the method of the instance given to the constructor.
Definition: queue.hh:35
void _signalIdle()
Emits the idle signal.
Definition: queue.cc:128
pthread_cond_t _queue_cond
The queue condition.
Definition: queue.hh:200
void _signalStart()
Emits the start signal.
Definition: queue.cc:136
void remStart(T *instance)
Removes all callbacks of the given instance from the start signal.
Definition: queue.hh:152
Queue()
Hidden constructor, use get to get the singleton instance.
Definition: queue.cc:22
void addStart(T *instance, void(T::*function)(void))
Adds a callback to the start event.
Definition: queue.hh:146
RawBuffer _buffer
The buffer being send.
Definition: queue.hh:83