@@ -2,19 +2,18 @@ use anyhow::Context;
2
2
use async_trait:: async_trait;
3
3
use camino:: Utf8Path ;
4
4
use camino:: Utf8PathBuf ;
5
- use log:: debug;
6
5
use log:: error;
7
6
use log:: info;
8
7
use serde_json:: json;
9
- use std:: collections:: HashMap ;
10
8
use std:: io:: ErrorKind ;
11
9
use std:: os:: unix:: fs:: fchown;
12
10
use std:: os:: unix:: fs:: MetadataExt ;
13
11
use std:: os:: unix:: fs:: PermissionsExt ;
12
+ use std:: sync:: Arc ;
14
13
use tedge_actors:: fan_in_message_type;
15
14
use tedge_actors:: Actor ;
16
15
use tedge_actors:: ChannelError ;
17
- use tedge_actors:: DynSender ;
16
+ use tedge_actors:: ClientMessageBox ;
18
17
use tedge_actors:: LoggingReceiver ;
19
18
use tedge_actors:: LoggingSender ;
20
19
use tedge_actors:: MessageReceiver ;
@@ -53,16 +52,15 @@ pub type ConfigDownloadResult = (MqttTopic, DownloadResult);
53
52
pub type ConfigUploadRequest = ( MqttTopic , UploadRequest ) ;
54
53
pub type ConfigUploadResult = ( MqttTopic , UploadResult ) ;
55
54
56
- fan_in_message_type ! ( ConfigInput [ ConfigOperation , FsWatchEvent , ConfigDownloadResult , ConfigUploadResult ] : Debug ) ;
55
+ fan_in_message_type ! ( ConfigInput [ ConfigOperation , FsWatchEvent ] : Debug ) ;
57
56
58
57
pub struct ConfigManagerActor {
59
58
config : ConfigManagerConfig ,
60
59
plugin_config : PluginConfig ,
61
- pending_operations : HashMap < String , ConfigOperation > ,
62
60
input_receiver : LoggingReceiver < ConfigInput > ,
63
61
output_sender : LoggingSender < ConfigOperationData > ,
64
- download_sender : DynSender < ConfigDownloadRequest > ,
65
- upload_sender : DynSender < ConfigUploadRequest > ,
62
+ downloader : ClientMessageBox < ConfigDownloadRequest , ConfigDownloadResult > ,
63
+ uploader : ClientMessageBox < ConfigUploadRequest , ConfigUploadResult > ,
66
64
}
67
65
68
66
#[ async_trait]
@@ -72,20 +70,24 @@ impl Actor for ConfigManagerActor {
72
70
}
73
71
74
72
async fn run ( mut self ) -> Result < ( ) , RuntimeError > {
75
- self . reload_supported_config_types ( ) . await ?;
73
+ let mut worker = ConfigManagerWorker {
74
+ config : Arc :: from ( self . config ) ,
75
+ plugin_config : self . plugin_config ,
76
+ output_sender : self . output_sender ,
77
+ downloader : self . downloader ,
78
+ uploader : self . uploader ,
79
+ } ;
80
+
81
+ worker. reload_supported_config_types ( ) . await ?;
76
82
77
83
while let Some ( event) = self . input_receiver . recv ( ) . await {
78
84
let result = match event {
79
85
ConfigInput :: ConfigOperation ( request) => {
80
- self . process_operation_request ( request) . await
81
- }
82
- ConfigInput :: FsWatchEvent ( event) => self . process_file_watch_events ( event) . await ,
83
- ConfigInput :: ConfigDownloadResult ( ( topic, result) ) => {
84
- Ok ( self . process_downloaded_config ( & topic, result) . await ?)
85
- }
86
- ConfigInput :: ConfigUploadResult ( ( topic, result) ) => {
87
- self . process_uploaded_config ( & topic, result) . await
86
+ let mut worker = worker. clone ( ) ;
87
+ tokio:: spawn ( async move { worker. process_operation_request ( request) . await } ) ;
88
+ Ok ( ( ) )
88
89
}
90
+ ConfigInput :: FsWatchEvent ( event) => worker. process_file_watch_events ( event) . await ,
89
91
} ;
90
92
91
93
if let Err ( err) = result {
@@ -103,20 +105,30 @@ impl ConfigManagerActor {
103
105
plugin_config : PluginConfig ,
104
106
input_receiver : LoggingReceiver < ConfigInput > ,
105
107
output_sender : LoggingSender < ConfigOperationData > ,
106
- download_sender : DynSender < ConfigDownloadRequest > ,
107
- upload_sender : DynSender < ConfigUploadRequest > ,
108
+ downloader : ClientMessageBox < ConfigDownloadRequest , ConfigDownloadResult > ,
109
+ uploader : ClientMessageBox < ConfigUploadRequest , ConfigUploadResult > ,
108
110
) -> Self {
109
111
ConfigManagerActor {
110
112
config,
111
113
plugin_config,
112
- pending_operations : HashMap :: new ( ) ,
113
114
input_receiver,
114
115
output_sender,
115
- download_sender ,
116
- upload_sender ,
116
+ downloader ,
117
+ uploader ,
117
118
}
118
119
}
120
+ }
119
121
122
+ #[ derive( Clone ) ]
123
+ struct ConfigManagerWorker {
124
+ config : Arc < ConfigManagerConfig > ,
125
+ plugin_config : PluginConfig ,
126
+ output_sender : LoggingSender < ConfigOperationData > ,
127
+ downloader : ClientMessageBox < ConfigDownloadRequest , ConfigDownloadResult > ,
128
+ uploader : ClientMessageBox < ConfigUploadRequest , ConfigUploadResult > ,
129
+ }
130
+
131
+ impl ConfigManagerWorker {
120
132
async fn process_operation_request (
121
133
& mut self ,
122
134
request : ConfigOperation ,
@@ -192,18 +204,18 @@ impl ConfigManagerActor {
192
204
. execute_config_snapshot_request ( & topic, & mut request)
193
205
. await
194
206
{
195
- Ok ( _) => {
196
- self . pending_operations . insert (
197
- topic. name . clone ( ) ,
198
- ConfigOperation :: Snapshot ( topic, request) ,
207
+ Ok ( file_path) => {
208
+ request. successful ( file_path. as_str ( ) ) ;
209
+ info ! (
210
+ "Config Snapshot request processed for config type: {}." ,
211
+ request. config_type
199
212
) ;
213
+ self . publish_command_status ( ConfigOperation :: Snapshot ( topic, request) )
214
+ . await ?;
200
215
}
201
216
Err ( error) => {
202
- let error_message = format ! (
203
- "Failed to initiate configuration snapshot upload to file-transfer service: {error}" ,
204
- ) ;
205
- request. failed ( & error_message) ;
206
- error ! ( "{}" , error_message) ;
217
+ request. failed ( error. to_string ( ) ) ;
218
+ error ! ( "config-manager failed to process config snapshot: {error}" ) ;
207
219
self . publish_command_status ( ConfigOperation :: Snapshot ( topic, request) )
208
220
. await ?;
209
221
}
@@ -215,7 +227,7 @@ impl ConfigManagerActor {
215
227
& mut self ,
216
228
topic : & Topic ,
217
229
request : & mut ConfigSnapshotCmdPayload ,
218
- ) -> Result < ( ) , ConfigManagementError > {
230
+ ) -> Result < Utf8PathBuf , ConfigManagementError > {
219
231
let file_entry = self
220
232
. plugin_config
221
233
. get_file_entry_from_type ( & request. config_type ) ?;
@@ -238,11 +250,15 @@ impl ConfigManagerActor {
238
250
request. config_type, tedge_url
239
251
) ;
240
252
241
- self . upload_sender
242
- . send ( ( topic. name . clone ( ) , upload_request) )
253
+ let ( _, upload_result) = self
254
+ . uploader
255
+ . await_response ( ( topic. name . clone ( ) , upload_request) )
243
256
. await ?;
244
257
245
- Ok ( ( ) )
258
+ let upload_response =
259
+ upload_result. context ( "config-manager failed uploading configuration snapshot" ) ?;
260
+
261
+ Ok ( upload_response. file_path )
246
262
}
247
263
248
264
fn create_tedge_url_for_config_operation (
@@ -280,57 +296,24 @@ impl ConfigManagerActor {
280
296
) )
281
297
}
282
298
283
- async fn process_uploaded_config (
284
- & mut self ,
285
- topic : & str ,
286
- result : UploadResult ,
287
- ) -> Result < ( ) , ChannelError > {
288
- if let Some ( ConfigOperation :: Snapshot ( topic, mut request) ) =
289
- self . pending_operations . remove ( topic)
290
- {
291
- match result {
292
- Ok ( response) => {
293
- request. successful ( response. file_path . as_str ( ) ) ;
294
- info ! (
295
- "Config Snapshot request processed for config type: {}." ,
296
- request. config_type
297
- ) ;
298
- self . publish_command_status ( ConfigOperation :: Snapshot ( topic, request) )
299
- . await ?;
300
- }
301
- Err ( err) => {
302
- let error_message = format ! (
303
- "config-manager failed uploading configuration snapshot: {}" ,
304
- err
305
- ) ;
306
- request. failed ( & error_message) ;
307
- error ! ( "{}" , error_message) ;
308
- self . publish_command_status ( ConfigOperation :: Snapshot ( topic, request) )
309
- . await ?;
310
- }
311
- }
312
- }
313
-
314
- Ok ( ( ) )
315
- }
316
-
317
299
async fn handle_config_update_request (
318
300
& mut self ,
319
301
topic : Topic ,
320
302
mut request : ConfigUpdateCmdPayload ,
321
303
) -> Result < ( ) , ChannelError > {
322
304
match self . execute_config_update_request ( & topic, & request) . await {
323
- Ok ( _) => {
324
- self . pending_operations
325
- . insert ( topic. name . clone ( ) , ConfigOperation :: Update ( topic, request) ) ;
305
+ Ok ( deployed_to_path) => {
306
+ request. successful ( deployed_to_path) ;
307
+ info ! (
308
+ "Config Update request processed for config type: {}." ,
309
+ request. config_type
310
+ ) ;
311
+ self . publish_command_status ( ConfigOperation :: Update ( topic, request) )
312
+ . await ?;
326
313
}
327
314
Err ( error) => {
328
- let error_message = format ! (
329
- "config-manager failed to start downloading configuration: {}" ,
330
- error
331
- ) ;
332
- request. failed ( & error_message) ;
333
- error ! ( "{}" , error_message) ;
315
+ request. failed ( error. to_string ( ) ) ;
316
+ error ! ( "config-manager failed to process config update: {error}" ) ;
334
317
self . publish_command_status ( ConfigOperation :: Update ( topic, request) )
335
318
. await ?;
336
319
}
@@ -342,7 +325,7 @@ impl ConfigManagerActor {
342
325
& mut self ,
343
326
topic : & Topic ,
344
327
request : & ConfigUpdateCmdPayload ,
345
- ) -> Result < ( ) , ConfigManagementError > {
328
+ ) -> Result < Utf8PathBuf , ConfigManagementError > {
346
329
let file_entry = self
347
330
. plugin_config
348
331
. get_file_entry_from_type ( & request. config_type ) ?;
@@ -352,8 +335,7 @@ impl ConfigManagerActor {
352
335
let temp_path = & self . config . tmp_path . join ( & file_entry. config_type ) ;
353
336
354
337
let Some ( tedge_url) = & request. tedge_url else {
355
- debug ! ( "tedge_url not present in config update payload, ignoring" ) ;
356
- return Ok ( ( ) ) ;
338
+ return Err ( anyhow:: anyhow!( "tedge_url not present in config update payload" ) . into ( ) ) ;
357
339
} ;
358
340
359
341
let download_request = DownloadRequest :: new ( tedge_url, temp_path. as_std_path ( ) )
@@ -364,71 +346,23 @@ impl ConfigManagerActor {
364
346
request. config_type, tedge_url
365
347
) ;
366
348
367
- self . download_sender
368
- . send ( ( topic. name . clone ( ) , download_request) )
349
+ let ( _, download_result) = self
350
+ . downloader
351
+ . await_response ( ( topic. name . clone ( ) , download_request) )
369
352
. await ?;
370
353
371
- Ok ( ( ) )
372
- }
373
-
374
- async fn process_downloaded_config (
375
- & mut self ,
376
- topic : & str ,
377
- result : DownloadResult ,
378
- ) -> Result < ( ) , ConfigManagementError > {
379
- let Some ( ConfigOperation :: Update ( topic, mut request) ) =
380
- self . pending_operations . remove ( topic)
381
- else {
382
- return Ok ( ( ) ) ;
383
- } ;
384
-
385
- let response = match result {
386
- Ok ( response) => response,
387
- Err ( err) => {
388
- let err =
389
- anyhow:: Error :: from ( err) . context ( "config-manager failed downloading a file" ) ;
390
- let error_message = format ! ( "{err:#}" ) ;
391
- request. failed ( & error_message) ;
392
- error ! ( "{}" , error_message) ;
393
- self . publish_command_status ( ConfigOperation :: Update ( topic, request) )
394
- . await ?;
395
- return Ok ( ( ) ) ;
396
- }
397
- } ;
398
-
399
- // new config was downloaded into tmpdir, we need to write it into destination using tedge-write
400
- let from = Utf8Path :: from_path ( response. file_path . as_path ( ) ) . unwrap ( ) ;
401
-
402
- let deployed_to_path = match self . deploy_config_file ( from, & request. config_type ) {
403
- Ok ( path) => path,
404
- Err ( err) => {
405
- let error_message =
406
- format ! ( "config-manager failed writing updated configuration file: {err}" , ) ;
407
-
408
- request. failed ( & error_message) ;
409
- error ! ( "{}" , error_message) ;
410
- self . publish_command_status ( ConfigOperation :: Update ( topic, request) )
411
- . await ?;
412
-
413
- // TODO: source temporary file should be cleaned up automatically
414
- let _ = std:: fs:: remove_file ( from) ;
354
+ let download_response =
355
+ download_result. context ( "config-manager failed downloading a file" ) ?;
415
356
416
- return Ok ( ( ) ) ;
417
- }
418
- } ;
419
-
420
- request. successful ( deployed_to_path) ;
421
- info ! (
422
- "Config Update request processed for config type: {}." ,
423
- request. config_type
424
- ) ;
425
- self . publish_command_status ( ConfigOperation :: Update ( topic, request) )
426
- . await ?;
357
+ let from = Utf8Path :: from_path ( download_response. file_path . as_path ( ) ) . unwrap ( ) ;
358
+ let deployed_to_path = self
359
+ . deploy_config_file ( from, & request. config_type )
360
+ . context ( "failed to deploy configuration file" ) ?;
427
361
428
362
// TODO: source temporary file should be cleaned up automatically
429
363
let _ = std:: fs:: remove_file ( from) ;
430
364
431
- Ok ( ( ) )
365
+ Ok ( deployed_to_path )
432
366
}
433
367
434
368
/// Deploys the new version of the configuration file and returns the path under which it was
0 commit comments