Line data Source code
1 : /* SPDX-License-Identifier: Apache-2.0 */
2 : /**
3 : * NNStreamer API / Machine Learning Agent Daemon
4 : * Copyright (C) 2025 Samsung Electronics Co., Ltd. All Rights Reserved.
5 : */
6 :
7 : /**
8 : * @file mlops-agent-node.c
9 : * @date 20 january 2025
10 : * @brief Implementation of mlops node.
11 : * @see https://github.com/nnstreamer/deviceMLOps.MLAgent
12 : * @author Jaeyun Jung <jy1210.jung@samsung.com>
13 : * @bug No known bugs except for NYI items
14 : * @details This implements the node information to run a pipeline.
15 : */
16 :
17 : #include "log.h"
18 : #include "mlops-agent-node.h"
19 : #include "service-db-util.h"
20 :
21 : static GHashTable *g_mlops_node_table = NULL;
22 : G_LOCK_DEFINE_STATIC (mlops_node_table);
23 :
24 : /**
25 : * @brief Structure for mlops node.
26 : */
27 : typedef struct
28 : {
29 : mlops_node_type_e type;
30 : gint64 id;
31 : GMutex lock;
32 : GstElement *element;
33 : gchar *service_name;
34 : gchar *description;
35 : } mlops_node_s;
36 :
37 : /**
38 : * @brief Internal function to get node info.
39 : */
40 : static mlops_node_s *
41 10 : _mlops_node_get (const int64_t id)
42 : {
43 10 : mlops_node_s *node = NULL;
44 :
45 10 : G_LOCK (mlops_node_table);
46 10 : node = (mlops_node_s *) g_hash_table_lookup (g_mlops_node_table, &id);
47 10 : G_UNLOCK (mlops_node_table);
48 :
49 10 : if (!node) {
50 5 : ml_loge ("There is no pipeline matched with ID %" G_GINT64_FORMAT, id);
51 : }
52 :
53 10 : return node;
54 : }
55 :
56 : /**
57 : * @brief Internal function to change pipeline state.
58 : */
59 : static int
60 6 : _mlops_node_set_pipeline_state (mlops_node_s * node, GstState state)
61 : {
62 : GstStateChangeReturn ret;
63 : gint64 nid;
64 :
65 6 : g_return_val_if_fail (node != NULL, -EINVAL);
66 :
67 4 : g_mutex_lock (&node->lock);
68 4 : nid = node->id;
69 4 : ret = gst_element_set_state (node->element, state);
70 4 : g_mutex_unlock (&node->lock);
71 :
72 4 : if (ret == GST_STATE_CHANGE_FAILURE) {
73 0 : ml_loge ("Failed to set the state of the pipeline to %s with ID %"
74 : G_GINT64_FORMAT, gst_element_state_get_name (state), nid);
75 0 : return -ESTRPIPE;
76 : }
77 :
78 4 : return 0;
79 : }
80 :
81 : /**
82 : * @brief Internal function to release mlops node.
83 : */
84 : static void
85 2 : _mlops_node_free (gpointer data)
86 : {
87 2 : mlops_node_s *node = (mlops_node_s *) data;
88 :
89 2 : if (!node) {
90 0 : ml_logw ("The data pointer is null, internal error?");
91 0 : return;
92 : }
93 :
94 2 : _mlops_node_set_pipeline_state (node, GST_STATE_NULL);
95 :
96 2 : g_mutex_lock (&node->lock);
97 :
98 2 : node->type = MLOPS_NODE_TYPE_NONE;
99 2 : node->id = 0;
100 2 : if (node->element) {
101 2 : gst_object_unref (node->element);
102 2 : node->element = NULL;
103 : }
104 2 : g_free (node->service_name);
105 2 : node->service_name = NULL;
106 2 : g_free (node->description);
107 2 : node->description = NULL;
108 :
109 2 : g_mutex_unlock (&node->lock);
110 :
111 2 : g_mutex_clear (&node->lock);
112 2 : g_free (node);
113 : }
114 :
115 : /**
116 : * @brief Initialize mlops node info.
117 : */
118 : int
119 19 : mlops_node_initialize (void)
120 : {
121 19 : int ret = 0;
122 :
123 19 : G_LOCK (mlops_node_table);
124 19 : if (!g_mlops_node_table) {
125 19 : g_mlops_node_table = g_hash_table_new_full (g_int64_hash, g_int64_equal,
126 : g_free, _mlops_node_free);
127 : }
128 :
129 19 : if (g_mlops_node_table == NULL) {
130 0 : ml_logw ("Failed to initialize mlops-agent node table.");
131 0 : ret = -EIO;
132 : }
133 19 : G_UNLOCK (mlops_node_table);
134 :
135 19 : return ret;
136 : }
137 :
138 : /**
139 : * @brief Finalize mlops node info.
140 : */
141 : void
142 19 : mlops_node_finalize (void)
143 : {
144 19 : G_LOCK (mlops_node_table);
145 19 : g_assert (g_mlops_node_table != NULL);
146 19 : g_hash_table_destroy (g_mlops_node_table);
147 19 : g_mlops_node_table = NULL;
148 19 : G_UNLOCK (mlops_node_table);
149 19 : }
150 :
151 : /**
152 : * @brief Check service name and launch the pipeline.
153 : */
154 : int
155 3 : mlops_node_create (const gchar * name, const mlops_node_type_e type,
156 : int64_t * id)
157 : {
158 3 : mlops_node_s *node = NULL;
159 3 : gint result = -EIO;
160 3 : gint64 *node_id = NULL;
161 3 : gchar *desc = NULL;
162 3 : GstElement *pipeline = NULL;
163 3 : GError *err = NULL;
164 : GstStateChangeReturn ret;
165 :
166 6 : g_return_val_if_fail (id != NULL, -EINVAL);
167 :
168 3 : switch (type) {
169 3 : case MLOPS_NODE_TYPE_PIPELINE:
170 : {
171 3 : result = svcdb_pipeline_get (name, &desc);
172 3 : if (result != 0) {
173 1 : ml_loge ("Failed to launch pipeline of '%s'.", name);
174 1 : goto error;
175 : }
176 2 : break;
177 : }
178 0 : default:
179 0 : return -EINVAL;
180 : }
181 :
182 2 : pipeline = gst_parse_launch (desc, &err);
183 2 : if (!pipeline || err) {
184 0 : ml_loge ("Failed to launch pipeline '%s' (error msg: %s).",
185 : desc, (err) ? err->message : "unknown reason");
186 0 : g_clear_error (&err);
187 :
188 0 : result = -ESTRPIPE;
189 0 : goto error;
190 : }
191 :
192 : /* Set pipeline as paused state. */
193 2 : ret = gst_element_set_state (pipeline, GST_STATE_PAUSED);
194 2 : if (ret == GST_STATE_CHANGE_FAILURE) {
195 0 : ml_loge
196 : ("Failed to set the state of the pipeline to PAUSED. For the detail, please check the GStreamer log message.");
197 :
198 0 : result = -ESTRPIPE;
199 0 : goto error;
200 : }
201 :
202 : /* Final step, add node info into hash table. */
203 2 : node = g_new0 (mlops_node_s, 1);
204 2 : node->type = type;
205 2 : node->id = g_get_monotonic_time ();
206 2 : node->element = pipeline;
207 2 : node->service_name = g_strdup (name);
208 2 : node->description = g_strdup (desc);
209 2 : g_mutex_init (&node->lock);
210 :
211 2 : node_id = g_new (gint64, 1);
212 2 : *node_id = node->id;
213 :
214 2 : G_LOCK (mlops_node_table);
215 2 : g_hash_table_insert (g_mlops_node_table, node_id, node);
216 2 : G_UNLOCK (mlops_node_table);
217 :
218 2 : *id = node->id;
219 :
220 3 : error:
221 3 : if (result != 0) {
222 1 : if (pipeline)
223 0 : gst_object_unref (pipeline);
224 : }
225 :
226 3 : g_free (desc);
227 3 : return result;
228 : }
229 :
230 : /**
231 : * @brief Start the pipeline with given id.
232 : */
233 : int
234 2 : mlops_node_start (const int64_t id)
235 : {
236 2 : mlops_node_s *node = NULL;
237 :
238 2 : node = _mlops_node_get (id);
239 2 : return _mlops_node_set_pipeline_state (node, GST_STATE_PLAYING);
240 : }
241 :
242 : /**
243 : * @brief Stop the pipeline with given id.
244 : */
245 : int
246 2 : mlops_node_stop (const int64_t id)
247 : {
248 2 : mlops_node_s *node = NULL;
249 :
250 2 : node = _mlops_node_get (id);
251 2 : return _mlops_node_set_pipeline_state (node, GST_STATE_PAUSED);
252 : }
253 :
254 : /**
255 : * @brief Destroy the pipeline with given id.
256 : */
257 : int
258 3 : mlops_node_destroy (const int64_t id)
259 : {
260 3 : mlops_node_s *node = NULL;
261 :
262 3 : node = _mlops_node_get (id);
263 3 : if (node) {
264 2 : G_LOCK (mlops_node_table);
265 2 : g_hash_table_remove (g_mlops_node_table, &id);
266 2 : G_UNLOCK (mlops_node_table);
267 : }
268 :
269 3 : return node ? 0 : -EINVAL;
270 : }
271 :
272 : /**
273 : * @brief Get the state of pipeline with given id.
274 : */
275 : int
276 3 : mlops_node_get_state (const int64_t id, GstState * state)
277 : {
278 3 : mlops_node_s *node = NULL;
279 : GstStateChangeReturn ret;
280 :
281 3 : node = _mlops_node_get (id);
282 3 : g_return_val_if_fail (node != NULL, -EINVAL);
283 :
284 1 : g_mutex_lock (&node->lock);
285 1 : ret = gst_element_get_state (node->element, state, NULL, GST_MSECOND);
286 1 : g_mutex_unlock (&node->lock);
287 :
288 1 : if (ret == GST_STATE_CHANGE_FAILURE) {
289 0 : ml_loge ("Failed to get the state of the pipeline with ID %"
290 : G_GINT64_FORMAT, id);
291 0 : return -ESTRPIPE;
292 : }
293 :
294 1 : return 0;
295 : }
|