Skip to content

Commit 67b3f4f

Browse files
authored
Merge pull request #3086 from Bravo555/improve/config-manager-simplify-control-flow
refactor: Simplify config manager control flow
2 parents c875a89 + a4f7a49 commit 67b3f4f

File tree

4 files changed

+138
-149
lines changed

4 files changed

+138
-149
lines changed

crates/extensions/tedge_config_manager/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ tedge_uploader_ext = { workspace = true }
2828
tedge_utils = { workspace = true }
2929
tempfile = { workspace = true }
3030
thiserror = { workspace = true }
31+
tokio = { workspace = true }
3132
toml = { workspace = true }
3233
uzers = { workspace = true }
3334

crates/extensions/tedge_config_manager/src/actor.rs

Lines changed: 72 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,18 @@ use anyhow::Context;
22
use async_trait::async_trait;
33
use camino::Utf8Path;
44
use camino::Utf8PathBuf;
5-
use log::debug;
65
use log::error;
76
use log::info;
87
use serde_json::json;
9-
use std::collections::HashMap;
108
use std::io::ErrorKind;
119
use std::os::unix::fs::fchown;
1210
use std::os::unix::fs::MetadataExt;
1311
use std::os::unix::fs::PermissionsExt;
12+
use std::sync::Arc;
1413
use tedge_actors::fan_in_message_type;
1514
use tedge_actors::Actor;
1615
use tedge_actors::ChannelError;
17-
use tedge_actors::DynSender;
16+
use tedge_actors::ClientMessageBox;
1817
use tedge_actors::LoggingReceiver;
1918
use tedge_actors::LoggingSender;
2019
use tedge_actors::MessageReceiver;
@@ -53,16 +52,15 @@ pub type ConfigDownloadResult = (MqttTopic, DownloadResult);
5352
pub type ConfigUploadRequest = (MqttTopic, UploadRequest);
5453
pub type ConfigUploadResult = (MqttTopic, UploadResult);
5554

56-
fan_in_message_type!(ConfigInput[ConfigOperation, FsWatchEvent, ConfigDownloadResult, ConfigUploadResult] : Debug);
55+
fan_in_message_type!(ConfigInput[ConfigOperation, FsWatchEvent] : Debug);
5756

5857
pub struct ConfigManagerActor {
5958
config: ConfigManagerConfig,
6059
plugin_config: PluginConfig,
61-
pending_operations: HashMap<String, ConfigOperation>,
6260
input_receiver: LoggingReceiver<ConfigInput>,
6361
output_sender: LoggingSender<ConfigOperationData>,
64-
download_sender: DynSender<ConfigDownloadRequest>,
65-
upload_sender: DynSender<ConfigUploadRequest>,
62+
downloader: ClientMessageBox<ConfigDownloadRequest, ConfigDownloadResult>,
63+
uploader: ClientMessageBox<ConfigUploadRequest, ConfigUploadResult>,
6664
}
6765

6866
#[async_trait]
@@ -72,20 +70,24 @@ impl Actor for ConfigManagerActor {
7270
}
7371

7472
async fn run(mut self) -> Result<(), RuntimeError> {
75-
self.reload_supported_config_types().await?;
73+
let mut worker = ConfigManagerWorker {
74+
config: Arc::from(self.config),
75+
plugin_config: self.plugin_config,
76+
output_sender: self.output_sender,
77+
downloader: self.downloader,
78+
uploader: self.uploader,
79+
};
80+
81+
worker.reload_supported_config_types().await?;
7682

7783
while let Some(event) = self.input_receiver.recv().await {
7884
let result = match event {
7985
ConfigInput::ConfigOperation(request) => {
80-
self.process_operation_request(request).await
81-
}
82-
ConfigInput::FsWatchEvent(event) => self.process_file_watch_events(event).await,
83-
ConfigInput::ConfigDownloadResult((topic, result)) => {
84-
Ok(self.process_downloaded_config(&topic, result).await?)
85-
}
86-
ConfigInput::ConfigUploadResult((topic, result)) => {
87-
self.process_uploaded_config(&topic, result).await
86+
let mut worker = worker.clone();
87+
tokio::spawn(async move { worker.process_operation_request(request).await });
88+
Ok(())
8889
}
90+
ConfigInput::FsWatchEvent(event) => worker.process_file_watch_events(event).await,
8991
};
9092

9193
if let Err(err) = result {
@@ -103,20 +105,30 @@ impl ConfigManagerActor {
103105
plugin_config: PluginConfig,
104106
input_receiver: LoggingReceiver<ConfigInput>,
105107
output_sender: LoggingSender<ConfigOperationData>,
106-
download_sender: DynSender<ConfigDownloadRequest>,
107-
upload_sender: DynSender<ConfigUploadRequest>,
108+
downloader: ClientMessageBox<ConfigDownloadRequest, ConfigDownloadResult>,
109+
uploader: ClientMessageBox<ConfigUploadRequest, ConfigUploadResult>,
108110
) -> Self {
109111
ConfigManagerActor {
110112
config,
111113
plugin_config,
112-
pending_operations: HashMap::new(),
113114
input_receiver,
114115
output_sender,
115-
download_sender,
116-
upload_sender,
116+
downloader,
117+
uploader,
117118
}
118119
}
120+
}
119121

122+
#[derive(Clone)]
123+
struct ConfigManagerWorker {
124+
config: Arc<ConfigManagerConfig>,
125+
plugin_config: PluginConfig,
126+
output_sender: LoggingSender<ConfigOperationData>,
127+
downloader: ClientMessageBox<ConfigDownloadRequest, ConfigDownloadResult>,
128+
uploader: ClientMessageBox<ConfigUploadRequest, ConfigUploadResult>,
129+
}
130+
131+
impl ConfigManagerWorker {
120132
async fn process_operation_request(
121133
&mut self,
122134
request: ConfigOperation,
@@ -192,18 +204,18 @@ impl ConfigManagerActor {
192204
.execute_config_snapshot_request(&topic, &mut request)
193205
.await
194206
{
195-
Ok(_) => {
196-
self.pending_operations.insert(
197-
topic.name.clone(),
198-
ConfigOperation::Snapshot(topic, request),
207+
Ok(file_path) => {
208+
request.successful(file_path.as_str());
209+
info!(
210+
"Config Snapshot request processed for config type: {}.",
211+
request.config_type
199212
);
213+
self.publish_command_status(ConfigOperation::Snapshot(topic, request))
214+
.await?;
200215
}
201216
Err(error) => {
202-
let error_message = format!(
203-
"Failed to initiate configuration snapshot upload to file-transfer service: {error}",
204-
);
205-
request.failed(&error_message);
206-
error!("{}", error_message);
217+
request.failed(error.to_string());
218+
error!("config-manager failed to process config snapshot: {error}");
207219
self.publish_command_status(ConfigOperation::Snapshot(topic, request))
208220
.await?;
209221
}
@@ -215,7 +227,7 @@ impl ConfigManagerActor {
215227
&mut self,
216228
topic: &Topic,
217229
request: &mut ConfigSnapshotCmdPayload,
218-
) -> Result<(), ConfigManagementError> {
230+
) -> Result<Utf8PathBuf, ConfigManagementError> {
219231
let file_entry = self
220232
.plugin_config
221233
.get_file_entry_from_type(&request.config_type)?;
@@ -238,11 +250,15 @@ impl ConfigManagerActor {
238250
request.config_type, tedge_url
239251
);
240252

241-
self.upload_sender
242-
.send((topic.name.clone(), upload_request))
253+
let (_, upload_result) = self
254+
.uploader
255+
.await_response((topic.name.clone(), upload_request))
243256
.await?;
244257

245-
Ok(())
258+
let upload_response =
259+
upload_result.context("config-manager failed uploading configuration snapshot")?;
260+
261+
Ok(upload_response.file_path)
246262
}
247263

248264
fn create_tedge_url_for_config_operation(
@@ -280,57 +296,24 @@ impl ConfigManagerActor {
280296
))
281297
}
282298

283-
async fn process_uploaded_config(
284-
&mut self,
285-
topic: &str,
286-
result: UploadResult,
287-
) -> Result<(), ChannelError> {
288-
if let Some(ConfigOperation::Snapshot(topic, mut request)) =
289-
self.pending_operations.remove(topic)
290-
{
291-
match result {
292-
Ok(response) => {
293-
request.successful(response.file_path.as_str());
294-
info!(
295-
"Config Snapshot request processed for config type: {}.",
296-
request.config_type
297-
);
298-
self.publish_command_status(ConfigOperation::Snapshot(topic, request))
299-
.await?;
300-
}
301-
Err(err) => {
302-
let error_message = format!(
303-
"config-manager failed uploading configuration snapshot: {}",
304-
err
305-
);
306-
request.failed(&error_message);
307-
error!("{}", error_message);
308-
self.publish_command_status(ConfigOperation::Snapshot(topic, request))
309-
.await?;
310-
}
311-
}
312-
}
313-
314-
Ok(())
315-
}
316-
317299
async fn handle_config_update_request(
318300
&mut self,
319301
topic: Topic,
320302
mut request: ConfigUpdateCmdPayload,
321303
) -> Result<(), ChannelError> {
322304
match self.execute_config_update_request(&topic, &request).await {
323-
Ok(_) => {
324-
self.pending_operations
325-
.insert(topic.name.clone(), ConfigOperation::Update(topic, request));
305+
Ok(deployed_to_path) => {
306+
request.successful(deployed_to_path);
307+
info!(
308+
"Config Update request processed for config type: {}.",
309+
request.config_type
310+
);
311+
self.publish_command_status(ConfigOperation::Update(topic, request))
312+
.await?;
326313
}
327314
Err(error) => {
328-
let error_message = format!(
329-
"config-manager failed to start downloading configuration: {}",
330-
error
331-
);
332-
request.failed(&error_message);
333-
error!("{}", error_message);
315+
request.failed(error.to_string());
316+
error!("config-manager failed to process config update: {error}");
334317
self.publish_command_status(ConfigOperation::Update(topic, request))
335318
.await?;
336319
}
@@ -342,7 +325,7 @@ impl ConfigManagerActor {
342325
&mut self,
343326
topic: &Topic,
344327
request: &ConfigUpdateCmdPayload,
345-
) -> Result<(), ConfigManagementError> {
328+
) -> Result<Utf8PathBuf, ConfigManagementError> {
346329
let file_entry = self
347330
.plugin_config
348331
.get_file_entry_from_type(&request.config_type)?;
@@ -352,8 +335,7 @@ impl ConfigManagerActor {
352335
let temp_path = &self.config.tmp_path.join(&file_entry.config_type);
353336

354337
let Some(tedge_url) = &request.tedge_url else {
355-
debug!("tedge_url not present in config update payload, ignoring");
356-
return Ok(());
338+
return Err(anyhow::anyhow!("tedge_url not present in config update payload").into());
357339
};
358340

359341
let download_request = DownloadRequest::new(tedge_url, temp_path.as_std_path())
@@ -364,71 +346,23 @@ impl ConfigManagerActor {
364346
request.config_type, tedge_url
365347
);
366348

367-
self.download_sender
368-
.send((topic.name.clone(), download_request))
349+
let (_, download_result) = self
350+
.downloader
351+
.await_response((topic.name.clone(), download_request))
369352
.await?;
370353

371-
Ok(())
372-
}
373-
374-
async fn process_downloaded_config(
375-
&mut self,
376-
topic: &str,
377-
result: DownloadResult,
378-
) -> Result<(), ConfigManagementError> {
379-
let Some(ConfigOperation::Update(topic, mut request)) =
380-
self.pending_operations.remove(topic)
381-
else {
382-
return Ok(());
383-
};
384-
385-
let response = match result {
386-
Ok(response) => response,
387-
Err(err) => {
388-
let err =
389-
anyhow::Error::from(err).context("config-manager failed downloading a file");
390-
let error_message = format!("{err:#}");
391-
request.failed(&error_message);
392-
error!("{}", error_message);
393-
self.publish_command_status(ConfigOperation::Update(topic, request))
394-
.await?;
395-
return Ok(());
396-
}
397-
};
398-
399-
// new config was downloaded into tmpdir, we need to write it into destination using tedge-write
400-
let from = Utf8Path::from_path(response.file_path.as_path()).unwrap();
401-
402-
let deployed_to_path = match self.deploy_config_file(from, &request.config_type) {
403-
Ok(path) => path,
404-
Err(err) => {
405-
let error_message =
406-
format!("config-manager failed writing updated configuration file: {err}",);
407-
408-
request.failed(&error_message);
409-
error!("{}", error_message);
410-
self.publish_command_status(ConfigOperation::Update(topic, request))
411-
.await?;
412-
413-
// TODO: source temporary file should be cleaned up automatically
414-
let _ = std::fs::remove_file(from);
354+
let download_response =
355+
download_result.context("config-manager failed downloading a file")?;
415356

416-
return Ok(());
417-
}
418-
};
419-
420-
request.successful(deployed_to_path);
421-
info!(
422-
"Config Update request processed for config type: {}.",
423-
request.config_type
424-
);
425-
self.publish_command_status(ConfigOperation::Update(topic, request))
426-
.await?;
357+
let from = Utf8Path::from_path(download_response.file_path.as_path()).unwrap();
358+
let deployed_to_path = self
359+
.deploy_config_file(from, &request.config_type)
360+
.context("failed to deploy configuration file")?;
427361

428362
// TODO: source temporary file should be cleaned up automatically
429363
let _ = std::fs::remove_file(from);
430364

431-
Ok(())
365+
Ok(deployed_to_path)
432366
}
433367

434368
/// Deploys the new version of the configuration file and returns the path under which it was

crates/extensions/tedge_config_manager/src/lib.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use log::error;
1111
use serde_json::json;
1212
use std::path::PathBuf;
1313
use tedge_actors::Builder;
14-
use tedge_actors::CloneSender;
14+
use tedge_actors::ClientMessageBox;
1515
use tedge_actors::DynSender;
1616
use tedge_actors::LinkError;
1717
use tedge_actors::MappingSender;
@@ -48,8 +48,8 @@ pub struct ConfigManagerBuilder {
4848
config: ConfigManagerConfig,
4949
plugin_config: PluginConfig,
5050
box_builder: SimpleMessageBoxBuilder<ConfigInput, ConfigOperationData>,
51-
download_sender: DynSender<ConfigDownloadRequest>,
52-
upload_sender: DynSender<ConfigUploadRequest>,
51+
downloader: ClientMessageBox<ConfigDownloadRequest, ConfigDownloadResult>,
52+
uploader: ClientMessageBox<ConfigUploadRequest, ConfigUploadResult>,
5353
}
5454

5555
impl ConfigManagerBuilder {
@@ -64,10 +64,9 @@ impl ConfigManagerBuilder {
6464
let plugin_config = PluginConfig::new(config.plugin_config_path.as_path());
6565
let box_builder = SimpleMessageBoxBuilder::new("Tedge-Config-Manager", 16);
6666

67-
let download_sender =
68-
downloader_actor.connect_client(box_builder.get_sender().sender_clone());
67+
let downloader = ClientMessageBox::new(downloader_actor);
6968

70-
let upload_sender = uploader_actor.connect_client(box_builder.get_sender().sender_clone());
69+
let uploader = ClientMessageBox::new(uploader_actor);
7170

7271
fs_notify.connect_sink(
7372
ConfigManagerBuilder::watched_directory(&config),
@@ -78,8 +77,8 @@ impl ConfigManagerBuilder {
7877
config,
7978
plugin_config,
8079
box_builder,
81-
download_sender,
82-
upload_sender,
80+
downloader,
81+
uploader,
8382
})
8483
}
8584

@@ -189,8 +188,8 @@ impl Builder<ConfigManagerActor> for ConfigManagerBuilder {
189188
self.plugin_config,
190189
input_receiver,
191190
output_sender,
192-
self.download_sender,
193-
self.upload_sender,
191+
self.downloader,
192+
self.uploader,
194193
))
195194
}
196195
}

0 commit comments

Comments
 (0)