libsdr  0.1.0
A simple SDR library
combine.hh
1 #ifndef __SDR_COMBINE_HH__
2 #define __SDR_COMBINE_HH__
3 
4 #include "config.hh"
5 #include "traits.hh"
6 #include "node.hh"
7 #include "buffer.hh"
8 #include "logger.hh"
9 #include <limits>
10 
11 namespace sdr {
12 
13 template <class Scalar> class Combine;
14 
16 template <class Scalar>
17 class CombineSink: public Sink<Scalar>
18 {
19 public:
21  CombineSink(Combine<Scalar> *combine, size_t index, RingBuffer<Scalar> &buffer)
22  : Sink<Scalar>(), _index(index), _parent(combine), _buffer(buffer)
23  {
24  // Pass...
25  }
27  virtual ~CombineSink() {
28  // pass...
29  }
30 
32  virtual void config(const Config &src_cfg) {
33  // Requires sample rate and type
34  if (!src_cfg.hasType() || !src_cfg.hasSampleRate()) { return; }
35  // Check type
36  if (Config::typeId<Scalar>() != src_cfg.type()) {
37  ConfigError err;
38  err << "Can not configure CombinSink: Invalid source type " << src_cfg.type()
39  << ", expected " << Config::typeId<Scalar>();
40  throw err;
41  }
42  // Notify parent
43  _parent->notifyConfig(_index, src_cfg);
44  }
45 
47  virtual void process(const Buffer<Scalar> &buffer, bool allow_overwrite) {
48  // Copy data into ring buffer and notify parent
49  _buffer.put(buffer);
50  _parent->notifyData(_index);
51  }
52 
53 protected:
55  size_t _index;
60 };
61 
62 
65 template <class Scalar>
66 class Combine
67 {
68 public:
70  Combine(size_t N) {
71  _buffers.reserve(N); _sinks.reserve(N);
72  for (size_t i=0; i<N; i++) {
73  _buffers.push_back(RingBuffer<Scalar>());
74  _sinks.push_back(new CombineSink<Scalar>(this, i, _buffers.back()));
75  }
76  }
77 
79  virtual ~Combine() {
80  // Unref all buffers and free sinks
81  for (size_t i=0; i<_sinks.size(); i++) {
82  delete _sinks[i];
83  _buffers[i].unref();
84  }
85  _buffers.clear();
86  _sinks.clear();
87  }
88 
90  virtual void config(const Config &cfg) = 0;
92  virtual void process(std::vector< RingBuffer<Scalar> > &buffers, size_t N) = 0;
93 
94 
95 protected:
97  void notifyConfig(size_t idx, const Config &cfg)
98  {
99  // Requires type, sampleRate and buffer size
100  if (!cfg.hasType() || !cfg.hasSampleRate() || !cfg.hasBufferSize()) { return; }
101  // check or unify type
102  if (!_config.hasType()) { _config.setType(cfg.type()); }
103  else if (_config.type() != cfg.type()) {
104  ConfigError err;
105  err << "Can not configure Combine node: Invalid type of sink #" << idx
106  << " " << cfg.type() << ", expected " << _config.type();
107  throw err;
108  }
109  // Check sample rate
111  else if (_config.sampleRate() != cfg.sampleRate()) {
112  ConfigError err;
113  err << "Can ont configure Combine node: Invalid sample rate of sink #" << idx
114  << " " << cfg.sampleRate() << ", expected " << _config.sampleRate();
115  throw err;
116  }
117  // Determine max buffer size
119  else {
120  // Take maximum:
121  _config.setBufferSize(std::max(_config.bufferSize(), cfg.bufferSize()));
122  }
123  // Reallocate buffers
124  for (size_t i=0; i<_sinks.size(); i++) {
126  }
127  // Propergate config
128  this->config(_config);
129  }
130 
132  void notifyData(size_t idx) {
133  // Determine minimum size of available data
134  size_t N = std::numeric_limits<size_t>::max();
135  for (size_t i=0; i<_sinks.size(); i++) {
136  N = std::min(N, _buffers[i].stored());
137  }
138  if (N > 0) { this->process(_buffers, N); }
139  }
140 
141 protected:
143  std::vector< RingBuffer<Scalar> > _buffers;
145  std::vector< CombineSink<Scalar> *> _sinks;
148 
149  friend class CombineSink<Scalar>;
150 };
151 
152 
155 template <class Scalar>
156 class Interleave : public Combine<Scalar>, public Source
157 {
158 public:
160  Interleave(size_t N)
161  : Combine<Scalar>(N), Source(), _N(N), _buffer()
162  {
163  // pass...
164  }
165 
167  Sink<Scalar> *sink(size_t i) {
168  if (i >= _N) {
169  RuntimeError err;
170  err << "Interleave: Sink index " << i << " out of range [0," << _N << ")";
171  throw err;
172  }
173  return Combine<Scalar>::_sinks[i];
174  }
175 
177  virtual void config(const Config &cfg) {
178  //Requres type & buffer size
179  if (!cfg.hasType() || !cfg.hasBufferSize()) { return; }
180  // Check type
181  if (Config::typeId<Scalar>() != cfg.type()) {
182  ConfigError err;
183  err << "Can not configure Interleave node: Invalid source type " << cfg.type()
184  << ", expected " << Config::typeId<Scalar>();
185  throw err;
186  }
187  // Allocate buffer:
189  // Propergate config
190  this->setConfig(Config(Config::typeId<Scalar>(), cfg.sampleRate(), _buffer.size(), 1));
191  }
192 
194  virtual void process(std::vector<RingBuffer<Scalar> > &buffers, size_t N) {
195  if (0 == N) { return; }
196  if (! _buffer.isUnused()) {
197 #ifdef SDR_DEBUG
198  LogMessage msg(LOG_WARNING);
199  msg << "Interleave: Output buffer in use: Drop " << _N << "x" << N
200  << " input values";
201  Logger::get().log(msg);
202 #endif
203  for (size_t i=0; i<buffers.size(); i++) { buffers[i].drop(N); }
204  return;
205  }
206  size_t num = std::min(_buffer.size()/_N,N);
207  // Interleave data
208  size_t idx = 0;
209  for (size_t i=0; i<num; i++) {
210  for (size_t j=0; j<_N; j++, idx++) {
211  _buffer[idx] = buffers[j][i];
212  }
213  }
214  // Drop num elements from all ring buffers
215  for (size_t i=0; i<_N; i++) {
216  buffers[i].drop(num);
217  }
218  // Send buffer
219  this->send(_buffer.head(num*_N));
220  }
221 
222 protected:
224  size_t _N;
227 };
228 
229 }
230 #endif // __SDR_COMBINE_HH__
A collection of configuration information that is send by a source to all connected sinks to properga...
Definition: node.hh:35
virtual void config(const Config &cfg)
Configures the interleave node.
Definition: combine.hh:177
Interleaves several input streams.
Definition: combine.hh:156
size_t _index
The index of the sink within the combine node.
Definition: combine.hh:55
void notifyData(size_t idx)
Determines the minimum amount of data that is available on all ring buffers.
Definition: combine.hh:132
A simple typed ring-buffer.
Definition: buffer.hh:473
virtual void send(const RawBuffer &buffer, bool allow_overwrite=false)
Sends the given buffer to all connected sinks.
Definition: node.cc:67
Buffer< Scalar > _buffer
The putput buffer.
Definition: combine.hh:226
Typed sink.
Definition: node.hh:192
virtual void process(const Buffer< Scalar > &buffer, bool allow_overwrite)
Handles the given buffer.
Definition: combine.hh:47
Definition: autocast.hh:8
virtual void config(const Config &cfg)=0
Needs to be overridden.
bool hasSampleRate() const
If true, the configuration has a sample rate.
Definition: node.hh:75
Generic source class.
Definition: node.hh:213
RingBuffer< Scalar > & _buffer
The input ring-buffer.
Definition: combine.hh:59
The runtime error class.
Definition: exception.hh:36
size_t _N
The number of sinks.
Definition: combine.hh:224
size_t size() const
Returns the number of elements of type T in this buffer.
Definition: buffer.hh:166
bool hasType() const
If true, the configuration has a type.
Definition: node.hh:69
virtual void process(std::vector< RingBuffer< Scalar > > &buffers, size_t N)=0
Needs to be overridden.
A combine node.
Definition: combine.hh:13
virtual void setConfig(const Config &config)
Stores the configuration and propergates it if the configuration has been changed.
Definition: node.cc:98
Buffer< T > head(size_t n) const
Returns a new view on this buffer.
Definition: buffer.hh:237
void log(const LogMessage &message)
Logs a message.
Definition: logger.cc:100
std::vector< RingBuffer< Scalar > > _buffers
The ring buffers of all combine sinks.
Definition: combine.hh:143
Type type() const
Returns the type.
Definition: node.hh:71
void setType(Type type)
Sets the type.
Definition: node.hh:73
std::vector< CombineSink< Scalar > * > _sinks
The combine sinks.
Definition: combine.hh:145
virtual void config(const Config &src_cfg)
Configures the sink.
Definition: combine.hh:32
static Logger & get()
Returns the singleton instance of the logger.
Definition: logger.cc:89
Combine< Scalar > * _parent
A reference to the combine node.
Definition: combine.hh:57
The configuration error class.
Definition: exception.hh:24
virtual void process(std::vector< RingBuffer< Scalar > > &buffers, size_t N)
Processes the data from all sinks.
Definition: combine.hh:194
A log message.
Definition: logger.hh:22
size_t bufferSize() const
Returns the max.
Definition: node.hh:83
Combine(size_t N)
Constructor, N specifies the number of sinks.
Definition: combine.hh:70
Interleave(size_t N)
Constructor.
Definition: combine.hh:160
virtual ~CombineSink()
Destructor.
Definition: combine.hh:27
void setSampleRate(double rate)
Sets the sample rate.
Definition: node.hh:79
Sink< Scalar > * sink(size_t i)
Retunrs the i-th sink.
Definition: combine.hh:167
Config _config
The output configuration.
Definition: combine.hh:147
bool isUnused() const
We assume here that buffers are owned by one object: A buffer is therefore "unused" if the owner hold...
Definition: buffer.hh:87
void notifyConfig(size_t idx, const Config &cfg)
Unifies the configuration of all sinks.
Definition: combine.hh:97
CombineSink(Combine< Scalar > *combine, size_t index, RingBuffer< Scalar > &buffer)
Constructor.
Definition: combine.hh:21
A single sink of a Combine node.
Definition: combine.hh:17
virtual ~Combine()
Destructor.
Definition: combine.hh:79
void setBufferSize(size_t size)
Sets the max.
Definition: node.hh:85
bool hasBufferSize() const
If true, the configuration has a buffer size.
Definition: node.hh:81
double sampleRate() const
Returns the sample rate.
Definition: node.hh:77