Line data Source code
1 : /* SPDX-License-Identifier: Apache-2.0 */
2 : /**
3 : * Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved.
4 : *
5 : * @file ml-api-service-query.c
6 : * @date 30 Aug 2022
7 : * @brief Query client implementation of NNStreamer/Service C-API
8 : * @see https://github.com/nnstreamer/nnstreamer
9 : * @author Yongjoo Ahn <yongjoo1.ahn@samsung.com>
10 : * @bug No known bugs except for NYI items
11 : */
12 :
13 : #include <glib.h>
14 : #include <gst/gst.h>
15 : #include <gst/gstbuffer.h>
16 : #include <gst/app/app.h>
17 : #include <string.h>
18 :
19 : #include "ml-api-internal.h"
20 : #include "ml-api-service-query.h"
21 :
22 : /**
23 : * @brief Structure for ml_service_query
24 : */
25 : typedef struct
26 : {
27 : ml_pipeline_h pipe_h;
28 : ml_pipeline_src_h src_h;
29 : ml_pipeline_sink_h sink_h;
30 :
31 : guint timeout; /**< in ms unit */
32 : GAsyncQueue *out_data_queue;
33 : } _ml_service_query_s;
34 :
35 : /**
36 : * @brief Sink callback for query_client
37 : */
38 : static void
39 0 : _sink_callback_for_query_client (const ml_tensors_data_h data,
40 : const ml_tensors_info_h info, void *user_data)
41 : {
42 0 : _ml_service_query_s *query = (_ml_service_query_s *) user_data;
43 : ml_tensors_data_h copied;
44 : int status;
45 :
46 0 : status = ml_tensors_data_clone (data, &copied);
47 0 : if (ML_ERROR_NONE != status) {
48 0 : _ml_error_report_continue
49 : ("Failed to create a new tensors data for query_client.");
50 0 : return;
51 : }
52 :
53 0 : g_async_queue_push (query->out_data_queue, copied);
54 : }
55 :
56 : /**
57 : * @brief Internal function to release ml-service query data.
58 : */
59 : int
60 0 : _ml_service_query_release_internal (ml_service_s * mls)
61 : {
62 0 : _ml_service_query_s *query = (_ml_service_query_s *) mls->priv;
63 : ml_tensors_data_h data_h;
64 :
65 : /* Supposed internal function call to release handle. */
66 0 : if (!query)
67 0 : return ML_ERROR_NONE;
68 :
69 0 : if (query->pipe_h) {
70 0 : if (ml_pipeline_destroy (query->pipe_h))
71 0 : _ml_error_report ("Failed to destroy pipeline");
72 : }
73 :
74 0 : if (query->out_data_queue) {
75 0 : while ((data_h = g_async_queue_try_pop (query->out_data_queue))) {
76 0 : ml_tensors_data_destroy (data_h);
77 : }
78 :
79 0 : g_async_queue_unref (query->out_data_queue);
80 : }
81 :
82 0 : g_free (query);
83 0 : mls->priv = NULL;
84 :
85 0 : return ML_ERROR_NONE;
86 : }
87 :
88 : /**
89 : * @brief Internal function to create query client service handle with given ml-option handle.
90 : */
91 : int
92 0 : _ml_service_query_create (ml_service_s * mls, ml_option_h option)
93 : {
94 0 : int status = ML_ERROR_NONE;
95 :
96 0 : g_autofree gchar *description = NULL;
97 : void *value;
98 :
99 : GString *tensor_query_client_prop;
100 0 : g_autofree gchar *prop = NULL;
101 :
102 : _ml_service_query_s *query_s;
103 : ml_pipeline_h pipe_h;
104 : ml_pipeline_src_h src_h;
105 : ml_pipeline_sink_h sink_h;
106 0 : g_autofree gchar *caps = NULL;
107 0 : guint timeout = 1000U; /* default 1s timeout */
108 :
109 0 : g_return_val_if_fail (mls && option, ML_ERROR_INVALID_PARAMETER);
110 :
111 0 : mls->priv = query_s = g_try_new0 (_ml_service_query_s, 1);
112 0 : if (query_s == NULL) {
113 0 : _ml_error_report_return (ML_ERROR_OUT_OF_MEMORY,
114 : "Failed to allocate memory for the service handle's private data. Out of memory?");
115 : }
116 :
117 0 : tensor_query_client_prop = g_string_new (NULL);
118 :
119 0 : if (ML_ERROR_NONE == ml_option_get (option, "host", &value))
120 0 : g_string_append_printf (tensor_query_client_prop, " host=%s ",
121 : (gchar *) value);
122 :
123 0 : if (ML_ERROR_NONE == ml_option_get (option, "port", &value))
124 0 : g_string_append_printf (tensor_query_client_prop, " port=%u ",
125 0 : *((guint *) value));
126 :
127 0 : if (ML_ERROR_NONE == ml_option_get (option, "dest-host", &value))
128 0 : g_string_append_printf (tensor_query_client_prop, " dest-host=%s ",
129 : (gchar *) value);
130 :
131 0 : if (ML_ERROR_NONE == ml_option_get (option, "dest-port", &value))
132 0 : g_string_append_printf (tensor_query_client_prop, " dest-port=%u ",
133 0 : *((guint *) value));
134 :
135 0 : if (ML_ERROR_NONE == ml_option_get (option, "connect-type", &value))
136 0 : g_string_append_printf (tensor_query_client_prop, " connect-type=%s ",
137 : (gchar *) value);
138 :
139 0 : if (ML_ERROR_NONE == ml_option_get (option, "topic", &value))
140 0 : g_string_append_printf (tensor_query_client_prop, " topic=%s ",
141 : (gchar *) value);
142 :
143 0 : if (ML_ERROR_NONE == ml_option_get (option, "timeout", &value))
144 0 : g_string_append_printf (tensor_query_client_prop, " timeout=%u ",
145 0 : *((guint *) value));
146 :
147 0 : if (ML_ERROR_NONE != ml_option_get (option, "caps", &value)) {
148 0 : g_string_free (tensor_query_client_prop, TRUE);
149 0 : _ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
150 : "The option 'caps' must be set before call ml_service_query_create.");
151 : }
152 0 : caps = g_strdup ((gchar *) value);
153 :
154 0 : prop = g_string_free (tensor_query_client_prop, FALSE);
155 0 : description =
156 0 : g_strdup_printf
157 : ("appsrc name=srcx ! %s ! tensor_query_client %s name=qcx ! tensor_sink name=sinkx async=false sync=false",
158 : caps, prop);
159 :
160 0 : status = ml_pipeline_construct (description, NULL, NULL, &pipe_h);
161 0 : if (ML_ERROR_NONE != status) {
162 0 : _ml_error_report_return (status, "Failed to construct pipeline");
163 : }
164 :
165 0 : status = ml_pipeline_start (pipe_h);
166 0 : if (ML_ERROR_NONE != status) {
167 0 : ml_pipeline_destroy (pipe_h);
168 0 : _ml_error_report_return (status, "Failed to start pipeline");
169 : }
170 :
171 0 : status = ml_pipeline_src_get_handle (pipe_h, "srcx", &src_h);
172 0 : if (ML_ERROR_NONE != status) {
173 0 : ml_pipeline_destroy (pipe_h);
174 0 : _ml_error_report_return (status, "Failed to get src handle");
175 : }
176 :
177 0 : status = ml_pipeline_sink_register (pipe_h, "sinkx",
178 : _sink_callback_for_query_client, query_s, &sink_h);
179 0 : if (ML_ERROR_NONE != status) {
180 0 : ml_pipeline_destroy (pipe_h);
181 0 : _ml_error_report_return (status, "Failed to register sink handle");
182 : }
183 :
184 0 : query_s->timeout = timeout;
185 0 : query_s->pipe_h = pipe_h;
186 0 : query_s->src_h = src_h;
187 0 : query_s->sink_h = sink_h;
188 0 : query_s->out_data_queue = g_async_queue_new ();
189 :
190 0 : return ML_ERROR_NONE;
191 : }
192 :
193 : /**
194 : * @brief Internal function to request an output to query client service with given input data.
195 : */
196 : int
197 0 : _ml_service_query_request (ml_service_s * mls,
198 : const ml_tensors_data_h input, ml_tensors_data_h * output)
199 : {
200 0 : int status = ML_ERROR_NONE;
201 : _ml_service_query_s *query;
202 :
203 0 : g_return_val_if_fail (mls && input && output, ML_ERROR_INVALID_PARAMETER);
204 :
205 0 : query = (_ml_service_query_s *) mls->priv;
206 :
207 0 : status = ml_pipeline_src_input_data (query->src_h, input,
208 : ML_PIPELINE_BUF_POLICY_DO_NOT_FREE);
209 0 : if (ML_ERROR_NONE != status) {
210 0 : _ml_error_report_return (status, "Failed to input data");
211 : }
212 :
213 0 : *output = g_async_queue_timeout_pop (query->out_data_queue,
214 0 : query->timeout * G_TIME_SPAN_MILLISECOND);
215 0 : if (NULL == *output) {
216 0 : _ml_error_report_return (ML_ERROR_TIMED_OUT, "timeout!");
217 : }
218 :
219 0 : return ML_ERROR_NONE;
220 : }
|