automotive-message-broker  0.14.803
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Groups Pages
databasesink.h
1 /*
2  Copyright (C) 2012 Intel Corporation
3 
4  This library is free software; you can redistribute it and/or
5  modify it under the terms of the GNU Lesser General Public
6  License as published by the Free Software Foundation; either
7  version 2.1 of the License, or (at your option) any later version.
8 
9  This library is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  Lesser General Public License for more details.
13 
14  You should have received a copy of the GNU Lesser General Public
15  License along with this library; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 
20 #ifndef DATABASESINK_H
21 #define DATABASESINK_H
22 
23 #include "abstractsink.h"
24 #include "abstractsource.h"
25 #include "basedb.hpp"
26 #include <asyncqueue.hpp>
27 #include "listplusplus.h"
28 #include "ambpluginimpl.h"
29 
30 #include <glib.h>
31 
32 #include <functional>
33 #include <thread>
34 #include <mutex>
35 #include <condition_variable>
36 #include <unordered_map>
37 
38 const std::string DatabaseLogging = "DatabaseLogging";
39 const std::string DatabasePlayback = "DatabasePlayback";
40 const std::string DatabaseFile = "DatabaseFile";
41 
42 class DBObject {
43 public:
44  DBObject(): zone(0), time(0), sequence(0), quit(false) {}
45  std::string key;
46  std::string value;
47  std::string source;
48  int32_t zone;
49  double time;
50  int32_t sequence;
51  std::string tripId;
52 
53  bool quit;
54 
55  bool operator == (const DBObject & other) const
56  {
57  return (key == other.key && source == other.source && zone == other.zone &&
58  value == other.value && sequence == other.sequence && time == other.time);
59  }
60 
61  bool operator != (const DBObject & other)
62  {
63  return (*this == other) == false;
64  }
65 };
66 
67 namespace amb
68 {
69 
71 {
72  bool operator()(DBObject const & lhs, DBObject & rhs) const
73  {
74  if (lhs == rhs)
75  {
76  return true;
77  }
78 
79  return false;
80  }
81 
82 };
83 
84 }
85 
86 namespace std {
87  template <> struct hash<DBObject>
88  {
89  size_t operator()(const DBObject & x) const
90  {
91  return x.key.length() * x.value.length() + x.time;
92  }
93  };
94 }
95 
96 class Shared
97 {
98 public:
99  Shared()
100  :queue(true, true)
101  {
102  db = new BaseDB;
103  }
104  ~Shared()
105  {
106  delete db;
107  }
108 
109  BaseDB * db;
110  amb::Queue<DBObject, amb::DBObjectCompare> queue;
111  std::string tripId;
112 };
113 
115 {
116 public:
117  PlaybackShared(AbstractRoutingEngine* re, std::string u, uint playbackMult)
118  :routingEngine(re),uuid(u),playBackMultiplier(playbackMult),stop(false) {}
119  ~PlaybackShared()
120  {
121  for(auto itr = playbackQueue.begin(); itr != playbackQueue.end(); itr++)
122  {
123  DBObject obj = *itr;
124  }
125 
126  playbackQueue.clear();
127  }
128 
129  AbstractRoutingEngine* routingEngine;
130  std::list<DBObject> playbackQueue;
131  uint playBackMultiplier;
132  std::string uuid;
133  bool stop;
134 };
135 
136 PROPERTYTYPEBASIC(DatabaseLogging, bool)
137 PROPERTYTYPEBASIC(DatabasePlayback, bool)
138 PROPERTYTYPE(DatabaseFile, DatabaseFileType, StringPropertyType, std::string)
139 
141 {
142 
143 public:
144  DatabaseSink(AbstractRoutingEngine* engine, map<string, string> config, AbstractSource &parent);
145  ~DatabaseSink();
146  virtual void supportedChanged(const PropertyList & supportedProperties);
147  virtual void propertyChanged(AbstractPropertyType *value);
148  const std::string uuid() const;
149 
150  void init();
151 
153  virtual void getRangePropertyAsync(AsyncRangePropertyReply *reply);
154  virtual AsyncPropertyReply * setProperty(const AsyncSetPropertyRequest & request);
155  virtual void subscribeToPropertyChanges(VehicleProperty::Property property);
156  virtual void unsubscribeToPropertyChanges(VehicleProperty::Property property);
157  int supportedOperations() const { return AbstractSource::GetRanged | AbstractSource::Get | AbstractSource::Set;}
158 
159 private: //methods:
160 
161  void parseConfig();
162  void stopDb();
163  void startDb();
164  void startPlayback();
165  void initDb();
166  void updateForNewDbFilename();
167 
168 private:
169  PropertyList mSubscriptions;
170  Shared *shared;
171  std::unique_ptr<std::thread> thread;
172  std::string tablename;
173  std::string tablecreate;
174  PropertyList propertiesToSubscribeTo;
175  PlaybackShared* playbackShared;
176  uint playbackMultiplier;
177  std::shared_ptr<AbstractPropertyType> playback;
178  std::shared_ptr<AbstractPropertyType> databaseName;
179  std::shared_ptr<AbstractPropertyType> databaseLogging;
180 };
181 
182 #endif // DATABASESINK_H