Line data Source code
1 : /* SPDX-License-Identifier: Apache-2.0 */
2 : /**
3 : * Copyright (c) 2023 Samsung Electronics Co., Ltd. All Rights Reserved.
4 : *
5 : * @file ml-api-service-extension.c
6 : * @date 1 September 2023
7 : * @brief ML service extension C-API.
8 : * @see https://github.com/nnstreamer/api
9 : * @author Jaeyun Jung <jy1210.jung@samsung.com>
10 : * @bug No known bugs except for NYI items
11 : */
12 :
13 : #include "ml-api-service-extension.h"
14 :
15 : /**
16 : * @brief The time to wait for new input data in message thread, in millisecond.
17 : */
18 : #define DEFAULT_TIMEOUT 200
19 :
20 : /**
21 : * @brief The max number of input data in message queue (0 for no limit).
22 : */
23 : #define DEFAULT_MAX_INPUT 5
24 :
25 : /**
26 : * @brief Internal enumeration for ml-service extension types.
27 : */
28 : typedef enum
29 : {
30 : ML_EXTENSION_TYPE_UNKNOWN = 0,
31 : ML_EXTENSION_TYPE_SINGLE = 1,
32 : ML_EXTENSION_TYPE_PIPELINE = 2,
33 :
34 : ML_EXTENSION_TYPE_MAX
35 : } ml_extension_type_e;
36 :
37 : /**
38 : * @brief Internal structure of the message in ml-service extension handle.
39 : */
40 : typedef struct
41 : {
42 : gchar *name;
43 : ml_tensors_data_h input;
44 : ml_tensors_data_h output;
45 : } ml_extension_msg_s;
46 :
47 : /**
48 : * @brief Internal structure for ml-service extension handle.
49 : */
50 : typedef struct
51 : {
52 : ml_extension_type_e type;
53 : gboolean running;
54 : guint timeout; /**< The time to wait for new input data in message thread, in millisecond (see DEFAULT_TIMEOUT). */
55 : guint max_input; /**< The max number of input data in message queue (see DEFAULT_MAX_INPUT). */
56 : GThread *msg_thread;
57 : GAsyncQueue *msg_queue;
58 :
59 : /**
60 : * Handles for each ml-service extension type.
61 : * - single : Default. Open model file and prepare invoke. The configuration should include model information.
62 : * - pipeline : Construct a pipeline from configuration. The configuration should include pipeline description.
63 : */
64 : ml_single_h single;
65 :
66 : ml_pipeline_h pipeline;
67 : GHashTable *node_table;
68 : } ml_extension_s;
69 :
70 : /**
71 : * @brief Internal function to handle the asynchronous invoke.
72 : */
73 : static int
74 0 : _ml_extension_async_cb (const ml_tensors_data_h data, void *user_data)
75 : {
76 0 : ml_service_s *mls = (ml_service_s *) user_data;
77 :
78 0 : return _ml_service_invoke_event_new_data (mls, NULL, data);
79 : }
80 :
81 : /**
82 : * @brief Internal function to create node info in pipeline.
83 : */
84 : static ml_service_node_info_s *
85 0 : _ml_extension_node_info_new (ml_service_s * mls, const gchar * name,
86 : ml_service_node_type_e type)
87 : {
88 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
89 : ml_service_node_info_s *node_info;
90 :
91 0 : if (!STR_IS_VALID (name)) {
92 0 : _ml_error_report_return (NULL,
93 : "Cannot add new node info, invalid node name '%s'.", name);
94 : }
95 :
96 0 : if (g_hash_table_lookup (ext->node_table, name)) {
97 0 : _ml_error_report_return (NULL,
98 : "Cannot add duplicated node '%s' in ml-service pipeline.", name);
99 : }
100 :
101 0 : node_info = g_try_new0 (ml_service_node_info_s, 1);
102 0 : if (!node_info) {
103 0 : _ml_error_report_return (NULL,
104 : "Failed to allocate new memory for node info in ml-service pipeline. Out of memory?");
105 : }
106 :
107 0 : node_info->name = g_strdup (name);
108 0 : node_info->type = type;
109 0 : node_info->mls = mls;
110 :
111 0 : g_hash_table_insert (ext->node_table, g_strdup (name), node_info);
112 :
113 0 : return node_info;
114 : }
115 :
116 : /**
117 : * @brief Internal function to release pipeline node info.
118 : */
119 : static void
120 0 : _ml_extension_node_info_free (gpointer data)
121 : {
122 0 : ml_service_node_info_s *node_info = (ml_service_node_info_s *) data;
123 :
124 0 : if (!node_info)
125 0 : return;
126 :
127 0 : if (node_info->info)
128 0 : ml_tensors_info_destroy (node_info->info);
129 :
130 0 : g_clear_pointer (&node_info->name, g_free);
131 0 : g_free (node_info);
132 : }
133 :
134 : /**
135 : * @brief Internal function to get the node info in ml-service extension.
136 : */
137 : static ml_service_node_info_s *
138 0 : _ml_extension_node_info_get (ml_extension_s * ext, const gchar * name)
139 : {
140 0 : if (!STR_IS_VALID (name))
141 0 : return NULL;
142 :
143 0 : return g_hash_table_lookup (ext->node_table, name);
144 : }
145 :
146 : /**
147 : * @brief Internal function to release ml-service extension message.
148 : */
149 : static void
150 0 : _ml_extension_msg_free (gpointer data)
151 : {
152 0 : ml_extension_msg_s *msg = (ml_extension_msg_s *) data;
153 :
154 0 : if (!msg)
155 0 : return;
156 :
157 0 : if (msg->input)
158 0 : ml_tensors_data_destroy (msg->input);
159 0 : if (msg->output)
160 0 : ml_tensors_data_destroy (msg->output);
161 0 : g_clear_pointer (&msg->name, g_free);
162 :
163 0 : g_free (msg);
164 : }
165 :
166 : /**
167 : * @brief Internal function to process ml-service extension message.
168 : */
169 : static gpointer
170 0 : _ml_extension_msg_thread (gpointer data)
171 : {
172 0 : ml_service_s *mls = (ml_service_s *) data;
173 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
174 : int status;
175 :
176 0 : g_mutex_lock (&mls->lock);
177 0 : ext->running = TRUE;
178 0 : g_cond_signal (&mls->cond);
179 0 : g_mutex_unlock (&mls->lock);
180 :
181 0 : while (ext->running) {
182 : ml_extension_msg_s *msg;
183 :
184 0 : msg = g_async_queue_timeout_pop (ext->msg_queue,
185 0 : ext->timeout * G_TIME_SPAN_MILLISECOND);
186 :
187 0 : if (msg) {
188 0 : switch (ext->type) {
189 0 : case ML_EXTENSION_TYPE_SINGLE:
190 : {
191 0 : status = ml_single_invoke (ext->single, msg->input, &msg->output);
192 :
193 0 : if (status == ML_ERROR_NONE) {
194 0 : _ml_service_invoke_event_new_data (mls, NULL, msg->output);
195 : } else {
196 0 : _ml_error_report
197 : ("Failed to invoke the model in ml-service extension thread.");
198 : }
199 0 : break;
200 : }
201 0 : case ML_EXTENSION_TYPE_PIPELINE:
202 : {
203 : ml_service_node_info_s *node_info;
204 :
205 0 : node_info = _ml_extension_node_info_get (ext, msg->name);
206 :
207 0 : if (node_info && node_info->type == ML_SERVICE_NODE_TYPE_INPUT) {
208 : /* The input data will be released in the pipeline. */
209 0 : status = ml_pipeline_src_input_data (node_info->handle, msg->input,
210 : ML_PIPELINE_BUF_POLICY_AUTO_FREE);
211 0 : msg->input = NULL;
212 :
213 0 : if (status != ML_ERROR_NONE) {
214 0 : _ml_error_report
215 : ("Failed to push input data into the pipeline in ml-service extension thread.");
216 : }
217 : } else {
218 0 : _ml_error_report
219 : ("Failed to push input data into the pipeline, cannot find input node '%s'.",
220 : msg->name);
221 : }
222 0 : break;
223 : }
224 0 : default:
225 : /* Unknown ml-service extension type, skip this. */
226 0 : break;
227 : }
228 :
229 0 : _ml_extension_msg_free (msg);
230 : }
231 : }
232 :
233 0 : return NULL;
234 : }
235 :
236 : /**
237 : * @brief Wrapper to release tensors-info handle.
238 : */
239 : static void
240 0 : _ml_extension_destroy_tensors_info (void *data)
241 : {
242 0 : ml_tensors_info_h info = (ml_tensors_info_h) data;
243 :
244 0 : if (info)
245 0 : ml_tensors_info_destroy (info);
246 0 : }
247 :
248 : /**
249 : * @brief Internal function to parse common option from json.
250 : */
251 : static void
252 0 : _ml_extension_conf_parse_common (ml_service_s * mls, JsonObject * object)
253 : {
254 0 : const gchar *value = NULL;
255 :
256 0 : g_return_if_fail (object != NULL);
257 :
258 0 : if (json_object_has_member (object, "input_queue_size")) {
259 0 : value = json_object_get_string_member (object, "input_queue_size");
260 :
261 0 : if (STR_IS_VALID (value))
262 0 : _ml_service_extension_set_information (mls, "input_queue_size", value);
263 : }
264 :
265 0 : if (json_object_has_member (object, "max_input")) {
266 0 : value = json_object_get_string_member (object, "max_input");
267 :
268 0 : if (STR_IS_VALID (value))
269 0 : _ml_service_extension_set_information (mls, "max_input", value);
270 : }
271 :
272 0 : if (json_object_has_member (object, "timeout")) {
273 0 : value = json_object_get_string_member (object, "timeout");
274 :
275 0 : if (STR_IS_VALID (value))
276 0 : _ml_service_extension_set_information (mls, "timeout", value);
277 : }
278 : }
279 :
280 : /**
281 : * @brief Internal function to parse single-shot info from json.
282 : */
283 : static int
284 0 : _ml_extension_conf_parse_single (ml_service_s * mls, JsonObject * single)
285 : {
286 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
287 : ml_option_h option;
288 : int status;
289 :
290 0 : status = ml_option_create (&option);
291 0 : if (status != ML_ERROR_NONE) {
292 0 : _ml_error_report_return (status,
293 : "Failed to parse configuration file, cannot create ml-option handle.");
294 : }
295 :
296 : /**
297 : * 1. "key" : load model info from ml-service agent.
298 : * 2. "model" : configuration file includes model path.
299 : */
300 0 : if (json_object_has_member (single, "key")) {
301 0 : const gchar *key = json_object_get_string_member (single, "key");
302 :
303 0 : if (STR_IS_VALID (key)) {
304 : ml_information_h model_info;
305 :
306 0 : status = ml_service_model_get_activated (key, &model_info);
307 0 : if (status == ML_ERROR_NONE) {
308 0 : gchar *paths = NULL;
309 :
310 : /** @todo parse desc and other information if necessary. */
311 0 : ml_information_get (model_info, "path", (void **) (&paths));
312 0 : ml_option_set (option, "models", g_strdup (paths), g_free);
313 :
314 0 : ml_information_destroy (model_info);
315 : } else {
316 0 : _ml_error_report
317 : ("Failed to parse configuration file, cannot get the model of '%s'.",
318 : key);
319 0 : goto error;
320 : }
321 : }
322 0 : } else if (json_object_has_member (single, "model")) {
323 0 : JsonNode *file_node = json_object_get_member (single, "model");
324 0 : gchar *paths = NULL;
325 :
326 0 : status = _ml_service_conf_parse_string (file_node, ",", &paths);
327 0 : if (status != ML_ERROR_NONE) {
328 0 : _ml_error_report
329 : ("Failed to parse configuration file, it should have valid model path.");
330 0 : goto error;
331 : }
332 :
333 0 : ml_option_set (option, "models", paths, g_free);
334 : } else {
335 0 : status = ML_ERROR_INVALID_PARAMETER;
336 0 : _ml_error_report
337 : ("Failed to parse configuration file, cannot get the model path.");
338 0 : goto error;
339 : }
340 :
341 0 : if (json_object_has_member (single, "framework")) {
342 0 : const gchar *fw = json_object_get_string_member (single, "framework");
343 :
344 0 : if (STR_IS_VALID (fw))
345 0 : ml_option_set (option, "framework_name", g_strdup (fw), g_free);
346 : }
347 :
348 0 : if (json_object_has_member (single, "input_info")) {
349 0 : JsonNode *info_node = json_object_get_member (single, "input_info");
350 : ml_tensors_info_h in_info;
351 :
352 0 : status = _ml_service_conf_parse_tensors_info (info_node, &in_info);
353 0 : if (status != ML_ERROR_NONE) {
354 0 : _ml_error_report
355 : ("Failed to parse configuration file, cannot parse input information.");
356 0 : goto error;
357 : }
358 :
359 0 : ml_option_set (option, "input_info", in_info,
360 : _ml_extension_destroy_tensors_info);
361 : }
362 :
363 0 : if (json_object_has_member (single, "output_info")) {
364 0 : JsonNode *info_node = json_object_get_member (single, "output_info");
365 : ml_tensors_info_h out_info;
366 :
367 0 : status = _ml_service_conf_parse_tensors_info (info_node, &out_info);
368 0 : if (status != ML_ERROR_NONE) {
369 0 : _ml_error_report
370 : ("Failed to parse configuration file, cannot parse output information.");
371 0 : goto error;
372 : }
373 :
374 0 : ml_option_set (option, "output_info", out_info,
375 : _ml_extension_destroy_tensors_info);
376 : }
377 :
378 : /* parse latency profiling option - "profile": "true" or "1" */
379 0 : if (json_object_has_member (single, "profile")) {
380 0 : const gchar *profile = json_object_get_string_member (single, "profile");
381 :
382 0 : if (STR_IS_VALID (profile))
383 0 : ml_option_set (option, "profile", g_strdup (profile), g_free);
384 : }
385 :
386 : /* parse latency profiling option - "latency": "true" or "1" */
387 0 : if (json_object_has_member (single, "latency")) {
388 0 : const gchar *latency = json_object_get_string_member (single, "latency");
389 :
390 0 : if (STR_IS_VALID (latency))
391 0 : ml_option_set (option, "profile", g_strdup (latency), g_free);
392 : }
393 :
394 0 : if (json_object_has_member (single, "custom")) {
395 0 : const gchar *custom = json_object_get_string_member (single, "custom");
396 :
397 0 : if (STR_IS_VALID (custom))
398 0 : ml_option_set (option, "custom", g_strdup (custom), g_free);
399 : }
400 :
401 0 : if (json_object_has_member (single, "invoke_dynamic")) {
402 : const gchar *invoke_dynamic =
403 0 : json_object_get_string_member (single, "invoke_dynamic");
404 :
405 0 : if (STR_IS_VALID (invoke_dynamic)) {
406 0 : ml_option_set (option, "invoke_dynamic", g_strdup (invoke_dynamic),
407 : g_free);
408 : }
409 : }
410 :
411 0 : if (json_object_has_member (single, "invoke_async")) {
412 : const gchar *invoke_async =
413 0 : json_object_get_string_member (single, "invoke_async");
414 :
415 0 : if (STR_IS_VALID (invoke_async)) {
416 0 : ml_option_set (option, "invoke_async", g_strdup (invoke_async), g_free);
417 :
418 0 : if (g_ascii_strcasecmp (invoke_async, "true") == 0) {
419 0 : ml_option_set (option, "async_callback", _ml_extension_async_cb, NULL);
420 0 : ml_option_set (option, "async_data", mls, NULL);
421 : }
422 : }
423 : }
424 :
425 0 : error:
426 0 : if (status == ML_ERROR_NONE)
427 0 : status = ml_single_open_with_option (&ext->single, option);
428 :
429 0 : ml_option_destroy (option);
430 0 : return status;
431 : }
432 :
433 : /**
434 : * @brief Internal function to parse the node info in pipeline.
435 : */
436 : static int
437 0 : _ml_extension_conf_parse_pipeline_node (ml_service_s * mls, JsonNode * node,
438 : ml_service_node_type_e type)
439 : {
440 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
441 0 : JsonArray *array = NULL;
442 : JsonObject *object;
443 : guint i, n;
444 : int status;
445 :
446 0 : n = 1;
447 0 : if (JSON_NODE_HOLDS_ARRAY (node)) {
448 0 : array = json_node_get_array (node);
449 0 : n = json_array_get_length (array);
450 : }
451 :
452 0 : for (i = 0; i < n; i++) {
453 0 : const gchar *name = NULL;
454 : ml_service_node_info_s *node_info;
455 :
456 0 : if (array)
457 0 : object = json_array_get_object_element (array, i);
458 : else
459 0 : object = json_node_get_object (node);
460 :
461 0 : name = _ml_service_get_json_string_member (object, "name");
462 :
463 0 : node_info = _ml_extension_node_info_new (mls, name, type);
464 0 : if (!node_info) {
465 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
466 : "Failed to parse configuration file, cannot add new node information.");
467 : }
468 :
469 0 : if (json_object_has_member (object, "info")) {
470 0 : JsonNode *info_node = json_object_get_member (object, "info");
471 :
472 0 : status = _ml_service_conf_parse_tensors_info (info_node,
473 : &node_info->info);
474 0 : if (status != ML_ERROR_NONE) {
475 0 : _ml_error_report_return (status,
476 : "Failed to parse configuration file, cannot parse the information.");
477 : }
478 : } else {
479 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
480 : "Failed to parse configuration file, cannot find node information.");
481 : }
482 :
483 0 : switch (type) {
484 0 : case ML_SERVICE_NODE_TYPE_INPUT:
485 0 : status = ml_pipeline_src_get_handle (ext->pipeline, name,
486 0 : &node_info->handle);
487 0 : break;
488 0 : case ML_SERVICE_NODE_TYPE_OUTPUT:
489 0 : status = ml_pipeline_sink_register (ext->pipeline, name,
490 0 : _ml_service_pipeline_sink_cb, node_info, &node_info->handle);
491 0 : break;
492 0 : default:
493 0 : status = ML_ERROR_INVALID_PARAMETER;
494 0 : break;
495 : }
496 :
497 0 : if (status != ML_ERROR_NONE) {
498 0 : _ml_error_report_return (status,
499 : "Failed to parse configuration file, cannot get the handle for pipeline node.");
500 : }
501 : }
502 :
503 0 : return ML_ERROR_NONE;
504 : }
505 :
506 : /**
507 : * @brief Internal function to parse pipeline info from json.
508 : */
509 : static int
510 0 : _ml_extension_conf_parse_pipeline (ml_service_s * mls, JsonObject * pipe)
511 : {
512 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
513 0 : g_autofree gchar *desc = NULL;
514 : int status;
515 :
516 : /**
517 : * 1. "key" : load pipeline from ml-service agent.
518 : * 2. "description" : configuration file includes pipeline description.
519 : */
520 0 : if (json_object_has_member (pipe, "key")) {
521 0 : const gchar *key = json_object_get_string_member (pipe, "key");
522 :
523 0 : if (STR_IS_VALID (key)) {
524 0 : status = ml_service_pipeline_get (key, &desc);
525 0 : if (status != ML_ERROR_NONE) {
526 0 : _ml_error_report_return (status,
527 : "Failed to parse configuration file, cannot get the pipeline of '%s'.",
528 : key);
529 : }
530 : }
531 0 : } else if (json_object_has_member (pipe, "description")) {
532 0 : desc = g_strdup (json_object_get_string_member (pipe, "description"));
533 : } else {
534 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
535 : "Failed to parse configuration file, cannot get the pipeline description.");
536 : }
537 :
538 0 : status = ml_pipeline_construct (desc, NULL, NULL, &ext->pipeline);
539 0 : if (status != ML_ERROR_NONE) {
540 0 : _ml_error_report_return (status,
541 : "Failed to parse configuration file, cannot construct the pipeline.");
542 : }
543 :
544 0 : if (json_object_has_member (pipe, "input_node")) {
545 0 : JsonNode *node = json_object_get_member (pipe, "input_node");
546 :
547 0 : status = _ml_extension_conf_parse_pipeline_node (mls, node,
548 : ML_SERVICE_NODE_TYPE_INPUT);
549 0 : if (status != ML_ERROR_NONE) {
550 0 : _ml_error_report_return (status,
551 : "Failed to parse configuration file, cannot get the input node.");
552 : }
553 : } else {
554 0 : _ml_logw
555 : ("No input node is defined in the pipeline. Might Non-appsrc be used?");
556 : }
557 :
558 0 : if (json_object_has_member (pipe, "output_node")) {
559 0 : JsonNode *node = json_object_get_member (pipe, "output_node");
560 :
561 0 : status = _ml_extension_conf_parse_pipeline_node (mls, node,
562 : ML_SERVICE_NODE_TYPE_OUTPUT);
563 0 : if (status != ML_ERROR_NONE) {
564 0 : _ml_error_report_return (status,
565 : "Failed to parse configuration file, cannot get the output node.");
566 : }
567 : } else {
568 0 : _ml_logw ("No output node is defined in the pipeline.");
569 : }
570 :
571 : /* Start pipeline when creating ml-service handle to check pipeline description. */
572 0 : status = ml_pipeline_start (ext->pipeline);
573 0 : if (status != ML_ERROR_NONE) {
574 0 : _ml_error_report_return (status,
575 : "Failed to parse configuration file, cannot start the pipeline.");
576 : }
577 :
578 0 : return ML_ERROR_NONE;
579 : }
580 :
581 : /**
582 : * @brief Internal function to parse configuration file.
583 : */
584 : static int
585 0 : _ml_extension_conf_parse_json (ml_service_s * mls, JsonObject * object)
586 : {
587 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
588 0 : JsonObject *sub = NULL;
589 : int status;
590 :
591 0 : if (json_object_has_member (object, "single")) {
592 0 : sub = json_object_get_object_member (object, "single");
593 :
594 0 : status = _ml_extension_conf_parse_single (mls, sub);
595 0 : if (status != ML_ERROR_NONE)
596 0 : return status;
597 :
598 0 : ext->type = ML_EXTENSION_TYPE_SINGLE;
599 0 : } else if (json_object_has_member (object, "pipeline")) {
600 0 : sub = json_object_get_object_member (object, "pipeline");
601 :
602 0 : status = _ml_extension_conf_parse_pipeline (mls, sub);
603 0 : if (status != ML_ERROR_NONE)
604 0 : return status;
605 :
606 0 : ext->type = ML_EXTENSION_TYPE_PIPELINE;
607 : } else {
608 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
609 : "Failed to parse configuration file, cannot get the valid type from configuration.");
610 : }
611 :
612 0 : _ml_extension_conf_parse_common (mls, sub);
613 0 : return ML_ERROR_NONE;
614 : }
615 :
616 : /**
617 : * @brief Internal function to create ml-service extension.
618 : */
619 : int
620 0 : _ml_service_extension_create (ml_service_s * mls, JsonObject * object)
621 : {
622 : ml_extension_s *ext;
623 0 : g_autofree gchar *thread_name = g_strdup_printf ("ml-ext-msg-%d", getpid ());
624 : int status;
625 :
626 0 : mls->priv = ext = g_try_new0 (ml_extension_s, 1);
627 0 : if (ext == NULL) {
628 0 : _ml_error_report_return (ML_ERROR_OUT_OF_MEMORY,
629 : "Failed to allocate memory for ml-service extension. Out of memory?");
630 : }
631 :
632 0 : ext->type = ML_EXTENSION_TYPE_UNKNOWN;
633 0 : ext->running = FALSE;
634 0 : ext->timeout = DEFAULT_TIMEOUT;
635 0 : ext->max_input = DEFAULT_MAX_INPUT;
636 0 : ext->node_table = g_hash_table_new_full (g_str_hash, g_str_equal, g_free,
637 : _ml_extension_node_info_free);
638 :
639 0 : status = _ml_extension_conf_parse_json (mls, object);
640 0 : if (status != ML_ERROR_NONE) {
641 0 : _ml_error_report_return (status,
642 : "Failed to parse the ml-service extension configuration.");
643 : }
644 :
645 0 : g_mutex_lock (&mls->lock);
646 :
647 0 : ext->msg_queue = g_async_queue_new_full (_ml_extension_msg_free);
648 0 : ext->msg_thread = g_thread_new (thread_name, _ml_extension_msg_thread, mls);
649 :
650 : /* Wait until the message thread has been initialized. */
651 0 : g_cond_wait (&mls->cond, &mls->lock);
652 0 : g_mutex_unlock (&mls->lock);
653 :
654 0 : return ML_ERROR_NONE;
655 : }
656 :
657 : /**
658 : * @brief Internal function to release ml-service extension.
659 : */
660 : int
661 0 : _ml_service_extension_destroy (ml_service_s * mls)
662 : {
663 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
664 :
665 : /* Supposed internal function call to release handle. */
666 0 : if (!ext)
667 0 : return ML_ERROR_NONE;
668 :
669 : /**
670 : * Close message thread.
671 : * If model inference is running, it may wait for the result in message thread.
672 : * This takes time, so do not call join with extension lock.
673 : */
674 0 : ext->running = FALSE;
675 0 : if (ext->msg_thread) {
676 0 : g_thread_join (ext->msg_thread);
677 0 : ext->msg_thread = NULL;
678 : }
679 :
680 0 : if (ext->msg_queue) {
681 0 : g_async_queue_unref (ext->msg_queue);
682 0 : ext->msg_queue = NULL;
683 : }
684 :
685 0 : if (ext->single) {
686 0 : ml_single_close (ext->single);
687 0 : ext->single = NULL;
688 : }
689 :
690 0 : if (ext->pipeline) {
691 0 : ml_pipeline_stop (ext->pipeline);
692 0 : ml_pipeline_destroy (ext->pipeline);
693 0 : ext->pipeline = NULL;
694 : }
695 :
696 0 : if (ext->node_table) {
697 0 : g_hash_table_destroy (ext->node_table);
698 0 : ext->node_table = NULL;
699 : }
700 :
701 0 : g_free (ext);
702 0 : mls->priv = NULL;
703 :
704 0 : return ML_ERROR_NONE;
705 : }
706 :
707 : /**
708 : * @brief Internal function to start ml-service extension.
709 : */
710 : int
711 0 : _ml_service_extension_start (ml_service_s * mls)
712 : {
713 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
714 0 : int status = ML_ERROR_NONE;
715 :
716 0 : switch (ext->type) {
717 0 : case ML_EXTENSION_TYPE_PIPELINE:
718 0 : status = ml_pipeline_start (ext->pipeline);
719 0 : break;
720 0 : case ML_EXTENSION_TYPE_SINGLE:
721 : /* Do nothing. */
722 0 : break;
723 0 : default:
724 0 : status = ML_ERROR_NOT_SUPPORTED;
725 0 : break;
726 : }
727 :
728 0 : return status;
729 : }
730 :
731 : /**
732 : * @brief Internal function to stop ml-service extension.
733 : */
734 : int
735 0 : _ml_service_extension_stop (ml_service_s * mls)
736 : {
737 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
738 0 : int status = ML_ERROR_NONE;
739 :
740 0 : switch (ext->type) {
741 0 : case ML_EXTENSION_TYPE_PIPELINE:
742 0 : status = ml_pipeline_stop (ext->pipeline);
743 0 : break;
744 0 : case ML_EXTENSION_TYPE_SINGLE:
745 : /* Do nothing. */
746 0 : break;
747 0 : default:
748 0 : status = ML_ERROR_NOT_SUPPORTED;
749 0 : break;
750 : }
751 :
752 0 : return status;
753 : }
754 :
755 : /**
756 : * @brief Internal function to get the information of required input data.
757 : */
758 : int
759 0 : _ml_service_extension_get_input_information (ml_service_s * mls,
760 : const char *name, ml_tensors_info_h * info)
761 : {
762 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
763 : int status;
764 :
765 0 : switch (ext->type) {
766 0 : case ML_EXTENSION_TYPE_SINGLE:
767 0 : status = ml_single_get_input_info (ext->single, info);
768 0 : break;
769 0 : case ML_EXTENSION_TYPE_PIPELINE:
770 : {
771 : ml_service_node_info_s *node_info;
772 :
773 0 : node_info = _ml_extension_node_info_get (ext, name);
774 :
775 0 : if (node_info && node_info->type == ML_SERVICE_NODE_TYPE_INPUT) {
776 0 : status = _ml_tensors_info_create_from (node_info->info, info);
777 : } else {
778 0 : status = ML_ERROR_INVALID_PARAMETER;
779 : }
780 0 : break;
781 : }
782 0 : default:
783 0 : status = ML_ERROR_NOT_SUPPORTED;
784 0 : break;
785 : }
786 :
787 0 : return status;
788 : }
789 :
790 : /**
791 : * @brief Internal function to get the information of output data.
792 : */
793 : int
794 0 : _ml_service_extension_get_output_information (ml_service_s * mls,
795 : const char *name, ml_tensors_info_h * info)
796 : {
797 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
798 : int status;
799 :
800 0 : switch (ext->type) {
801 0 : case ML_EXTENSION_TYPE_SINGLE:
802 0 : status = ml_single_get_output_info (ext->single, info);
803 0 : break;
804 0 : case ML_EXTENSION_TYPE_PIPELINE:
805 : {
806 : ml_service_node_info_s *node_info;
807 :
808 0 : node_info = _ml_extension_node_info_get (ext, name);
809 :
810 0 : if (node_info && node_info->type == ML_SERVICE_NODE_TYPE_OUTPUT) {
811 0 : status = _ml_tensors_info_create_from (node_info->info, info);
812 : } else {
813 0 : status = ML_ERROR_INVALID_PARAMETER;
814 : }
815 0 : break;
816 : }
817 0 : default:
818 0 : status = ML_ERROR_NOT_SUPPORTED;
819 0 : break;
820 : }
821 :
822 0 : if (status != ML_ERROR_NONE) {
823 0 : if (*info) {
824 0 : ml_tensors_info_destroy (*info);
825 0 : *info = NULL;
826 : }
827 : }
828 :
829 0 : return status;
830 : }
831 :
832 : /**
833 : * @brief Internal function to set the information for ml-service extension.
834 : */
835 : int
836 0 : _ml_service_extension_set_information (ml_service_s * mls, const char *name,
837 : const char *value)
838 : {
839 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
840 :
841 : /**
842 : * Check limitation of message queue and other options.
843 : * When adding new value, you should fix _ml_extension_conf_parse_common() also.
844 : */
845 0 : if (g_ascii_strcasecmp (name, "input_queue_size") == 0 ||
846 0 : g_ascii_strcasecmp (name, "max_input") == 0) {
847 0 : ext->max_input = (guint) g_ascii_strtoull (value, NULL, 10);
848 0 : } else if (g_ascii_strcasecmp (name, "timeout") == 0) {
849 0 : ext->timeout = (guint) g_ascii_strtoull (value, NULL, 10);
850 : }
851 :
852 0 : return ML_ERROR_NONE;
853 : }
854 :
855 : /**
856 : * @brief Internal function to add an input data to process the model in ml-service extension handle.
857 : */
858 : int
859 0 : _ml_service_extension_request (ml_service_s * mls, const char *name,
860 : const ml_tensors_data_h data)
861 : {
862 0 : ml_extension_s *ext = (ml_extension_s *) mls->priv;
863 : ml_extension_msg_s *msg;
864 : int status, len;
865 :
866 0 : if (ext->type == ML_EXTENSION_TYPE_PIPELINE) {
867 : ml_service_node_info_s *node_info;
868 :
869 0 : if (!STR_IS_VALID (name)) {
870 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
871 : "The parameter, name '%s', is invalid.", name);
872 : }
873 :
874 0 : node_info = _ml_extension_node_info_get (ext, name);
875 :
876 0 : if (!node_info || node_info->type != ML_SERVICE_NODE_TYPE_INPUT) {
877 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
878 : "The parameter, name '%s', is invalid, cannot find the input node from pipeline.",
879 : name);
880 : }
881 : }
882 :
883 0 : len = g_async_queue_length (ext->msg_queue);
884 :
885 0 : if (ext->max_input > 0 && len > 0 && ext->max_input <= len) {
886 0 : _ml_error_report_return (ML_ERROR_STREAMS_PIPE,
887 : "Failed to push input data into the queue, the max number of input is %u.",
888 : ext->max_input);
889 : }
890 :
891 0 : msg = g_try_new0 (ml_extension_msg_s, 1);
892 0 : if (!msg) {
893 0 : _ml_error_report_return (ML_ERROR_OUT_OF_MEMORY,
894 : "Failed to allocate the ml-service extension message. Out of memory?");
895 : }
896 :
897 0 : msg->name = g_strdup (name);
898 0 : status = ml_tensors_data_clone (data, &msg->input);
899 :
900 0 : if (status != ML_ERROR_NONE) {
901 0 : _ml_extension_msg_free (msg);
902 0 : _ml_error_report_return (status, "Failed to clone input data.");
903 : }
904 :
905 0 : g_async_queue_push (ext->msg_queue, msg);
906 :
907 0 : return ML_ERROR_NONE;
908 : }
|