automotive-message-broker  0.13.1
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Groups Pages
asyncqueue.hpp
1 /*
2  Copyright (C) 2014 Intel Corporation
3 
4  This library is free software; you can redistribute it and/or
5  modify it under the terms of the GNU Lesser General Public
6  License as published by the Free Software Foundation; either
7  version 2.1 of the License, or (at your option) any later version.
8 
9  This library is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  Lesser General Public License for more details.
13 
14  You should have received a copy of the GNU Lesser General Public
15  License along with this library; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include <glib.h>
20 
21 #include <abstractpropertytype.h>
22 #include "listplusplus.h"
23 
24 #include <mutex>
25 #include <condition_variable>
26 #include <unordered_set>
27 #include <vector>
28 
29 namespace amb
30 {
31 
32 template <typename T, class Pred = std::equal_to<T> >
33 class Queue
34 {
35 public:
36  Queue(bool unique = false, bool blocking = false)
37  :mUnique(unique), mBlocking(blocking)
38  {
39 
40  }
41 
42  virtual ~Queue()
43  {
44 
45  }
46 
47  int count()
48  {
49  std::lock_guard<std::mutex> lock(mutex);
50 
51  return mQueue.size();
52  }
53 
54  T pop()
55  {
56  std::unique_lock<std::mutex> lock(mutex);
57 
58  if(mBlocking)
59  {
60  if(!mQueue.size())
61  {
62  cond.wait(lock);
63  }
64  }
65 
66  if(!mQueue.size())
67  {
68  throw std::runtime_error("nothing in queue");
69  }
70 
71  auto itr = mQueue.begin();
72 
73  T item = *itr;
74 
75  mQueue.erase(itr);
76 
77  return item;
78  }
79 
80  virtual void append(T item)
81  {
82  {
83  std::lock_guard<std::mutex> lock(mutex);
84  if(contains(mQueue, item))
85  {
86  mQueue.erase(std::find(mQueue.begin(), mQueue.end(), item));
87  }
88  mQueue.push_back(item);
89  }
90 
91  if(mBlocking)
92  {
93  cond.notify_all();
94  }
95  }
96 
97  void remove(T item)
98  {
99  std::lock_guard<std::mutex> lock(mutex);
100  removeOne(&mQueue, item);
101  }
102 
103 private:
104  bool mBlocking;
105  bool mUnique;
106  std::mutex mutex;
107  std::condition_variable cond;
108  std::vector<T> mQueue;
109 };
110 
111 template <typename T, class Pred = std::equal_to<T> >
113  GSource source;
114  Queue<T, Pred>* queue;
115  int minQueueSize;
116 };
117 
118 template <typename T, class Pred = std::equal_to<T> >
120 {
121 public:
122  typedef function<void (Queue<T, Pred> *)> AsyncQueueWatcherCallback;
123  AsyncQueueWatcher(Queue<T, Pred> * queue, AsyncQueueWatcherCallback cb, int queueSize = 0, AbstractPropertyType::Priority priority = AbstractPropertyType::Normal)
124  : callback(cb), mMaxQueueSize(queueSize)
125  {
126 
127  static GSourceFuncs funcs = {prepare, check, dispatch, finalize};
128  GSource* source = (GSource *) g_source_new(&funcs, sizeof(AsyncQueueSource<T, Pred>));
129 
131  watch->queue = queue;
132  watch->minQueueSize = queueSize;
133 
134  gint p = G_PRIORITY_DEFAULT;
135 
136  if(priority == AbstractPropertyType::Normal)
137  p = G_PRIORITY_DEFAULT;
138  else if(priority == AbstractPropertyType::High)
139  p = G_PRIORITY_HIGH;
140  else if(priority == AbstractPropertyType::Low)
141  p = G_PRIORITY_LOW;
142 
143  g_source_set_priority(source, p);
144  g_source_set_callback(source, nullptr, this, nullptr);
145 
146  g_source_attach(source, nullptr);
147  g_source_unref(source);
148  }
149 
150  AsyncQueueWatcherCallback callback;
151 
152 
153 protected:
154  AsyncQueueWatcher(){}
155 
156  int mMaxQueueSize;
157 
158 private:
159 
160  static gboolean prepare(GSource *source, gint *timeout)
161  {
163  *timeout = -1;
164 
165  if (!s)
166  return false;
167 
168  return s->queue->count() > s->minQueueSize;
169  }
170 
171  static gboolean check(GSource *source)
172  {
174 
175  if (!s)
176  return false;
177 
178  return s->queue->count() > s->minQueueSize;
179  }
180 
181  static gboolean dispatch(GSource *source, GSourceFunc callback, gpointer userData)
182  {
184 
185  if (!s)
186  return false;
187 
188  AsyncQueueWatcher<T, Pred>* watcher = static_cast<AsyncQueueWatcher<T, Pred>*>(userData);
189 
190  watcher->callback(s->queue);
191  return true;
192  }
193 
194  static void finalize(GSource* source)
195  {
196 
197  }
198 };
199 } // namespace amb