Skip to content

Commit 8437d42

Browse files
authored
Merge pull request #3755 from albinsuresh/feat/3430/inventory-json-support-for-child-devices
feat: inventory.json support for child devices
2 parents 2a493a2 + 6b57b14 commit 8437d42

File tree

17 files changed

+470
-254
lines changed

17 files changed

+470
-254
lines changed

.taplo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
include = ["**/Cargo.toml"]
2+
exclude = ["**/.venv/**"]
23

34
[formatting]
45
indent_string = " "

crates/core/tedge_agent/src/agent.rs

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use crate::software_manager::config::SoftwareManagerConfig;
1515
use crate::state_repository::state::agent_default_state_dir;
1616
use crate::state_repository::state::agent_state_dir;
1717
use crate::tedge_to_te_converter::converter::TedgetoTeConverter;
18+
use crate::twin_manager::builder::TwinManagerActorBuilder;
19+
use crate::twin_manager::builder::TwinManagerConfig;
1820
use crate::AgentOpt;
1921
use crate::Capabilities;
2022
use anyhow::Context;
@@ -28,7 +30,6 @@ use reqwest::Identity;
2830
use std::fmt::Debug;
2931
use std::net::SocketAddr;
3032
use std::sync::Arc;
31-
use std::time::Duration;
3233
use tedge_actors::Concurrent;
3334
use tedge_actors::ConvertingActor;
3435
use tedge_actors::ConvertingActorBuilder;
@@ -65,7 +66,6 @@ use tedge_script_ext::ScriptActor;
6566
use tedge_signal_ext::SignalActor;
6667
use tedge_uploader_ext::UploaderActor;
6768
use tedge_utils::file::create_directory_with_defaults;
68-
use tracing::error;
6969
use tracing::info;
7070
use tracing::instrument;
7171
use tracing::warn;
@@ -242,6 +242,8 @@ impl Agent {
242242
// `config_dir` by default is `/etc/tedge` (or whatever the user sets with --config-dir)
243243
create_directory_with_defaults(agent_default_state_dir(self.config.config_dir.clone()))
244244
.await?;
245+
// Create directory for device inventory.json
246+
create_directory_with_defaults(self.config.config_dir.join("device")).await?;
245247
create_directory_with_defaults(&self.config.agent_log_dir).await?;
246248
create_directory_with_defaults(&self.config.data_dir).await?;
247249
create_directory_with_defaults(&self.config.http_config.file_transfer_dir).await?;
@@ -298,10 +300,20 @@ impl Agent {
298300
let service_topic_id = device_topic_id.to_default_service_topic_id("tedge-agent")
299301
.with_context(|| format!("Device topic id {} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", device_topic_id))?;
300302
let service = Service {
301-
service_topic_id,
303+
service_topic_id: service_topic_id.clone(),
302304
device_topic_id: DeviceTopicId::new(device_topic_id.clone()),
303305
};
304306
let mqtt_schema = MqttSchema::with_root(self.config.mqtt_topic_root.to_string());
307+
308+
let twin_manager_config = TwinManagerConfig::new(
309+
self.config.config_dir.clone(),
310+
mqtt_schema.clone(),
311+
device_topic_id.clone(),
312+
service_topic_id.into(),
313+
);
314+
let twin_manager_builder =
315+
TwinManagerActorBuilder::new(twin_manager_config, &mut mqtt_actor_builder);
316+
305317
let health_actor = HealthMonitorBuilder::from_service_topic_id(
306318
service,
307319
&mut mqtt_actor_builder,
@@ -387,11 +399,8 @@ impl Agent {
387399
state_dir,
388400
clean_start,
389401
)?;
390-
let entity_store_server_config = EntityStoreServerConfig::new(
391-
self.config.config_dir.into_std_path_buf(),
392-
mqtt_schema.clone(),
393-
self.config.entity_auto_register,
394-
);
402+
let entity_store_server_config =
403+
EntityStoreServerConfig::new(mqtt_schema.clone(), self.config.entity_auto_register);
395404
let entity_store_server = EntityStoreServer::new(
396405
entity_store_server_config,
397406
entity_store,
@@ -409,24 +418,6 @@ impl Agent {
409418
})
410419
},
411420
);
412-
let mut init_sender = entity_store_actor_builder.get_sender();
413-
tokio::spawn(async move {
414-
// Allow the entity store to complete its initialization
415-
// after giving it some time to process all the retained metadata messages on startup
416-
tokio::time::sleep(Duration::from_secs(1)).await;
417-
if let Err(err) = init_sender
418-
.send(RequestEnvelope {
419-
request: EntityStoreRequest::InitComplete,
420-
reply_to: Box::new(NullSender),
421-
})
422-
.await
423-
{
424-
error!(
425-
"Failed to send init complete message to entity store: {}",
426-
err
427-
);
428-
}
429-
});
430421

431422
let file_transfer_server_builder = HttpServerBuilder::try_bind(
432423
self.config.http_config,
@@ -453,6 +444,7 @@ impl Agent {
453444
runtime.spawn(signal_actor_builder).await?;
454445
runtime.spawn(mqtt_actor_builder).await?;
455446
runtime.spawn(fs_watch_actor_builder).await?;
447+
runtime.spawn(twin_manager_builder).await?;
456448
runtime.spawn(downloader_actor_builder).await?;
457449
runtime.spawn(uploader_actor_builder).await?;
458450
if let Some(config_actor_builder) = config_actor_builder {

crates/core/tedge_agent/src/entity_manager/server.rs

Lines changed: 1 addition & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ use futures::channel::mpsc;
33
use futures::StreamExt as _;
44
use serde_json::Map;
55
use serde_json::Value;
6-
use std::fs::File;
7-
use std::path::PathBuf;
86
use tedge_actors::LoggingSender;
97
use tedge_actors::MappingSender;
108
use tedge_actors::MessageSink;
@@ -28,8 +26,6 @@ use tedge_mqtt_ext::MqttRequest;
2826
use tedge_mqtt_ext::TopicFilter;
2927
use tracing::error;
3028

31-
const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "device/inventory.json";
32-
3329
#[derive(Debug)]
3430
pub enum EntityStoreRequest {
3531
Get(EntityTopicId),
@@ -38,7 +34,6 @@ pub enum EntityStoreRequest {
3834
Delete(EntityTopicId),
3935
List(ListFilters),
4036
MqttMessage(MqttMessage),
41-
InitComplete,
4237
GetTwinFragment(EntityTopicId, String),
4338
SetTwinFragment(EntityTwinMessage),
4439
GetTwinFragments(EntityTopicId),
@@ -67,15 +62,13 @@ pub struct EntityStoreServer {
6762
}
6863

6964
pub struct EntityStoreServerConfig {
70-
pub config_dir: PathBuf,
7165
pub mqtt_schema: MqttSchema,
7266
pub entity_auto_register: bool,
7367
}
7468

7569
impl EntityStoreServerConfig {
76-
pub fn new(config_dir: PathBuf, mqtt_schema: MqttSchema, entity_auto_register: bool) -> Self {
70+
pub fn new(mqtt_schema: MqttSchema, entity_auto_register: bool) -> Self {
7771
Self {
78-
config_dir,
7972
mqtt_schema,
8073
entity_auto_register,
8174
}
@@ -173,46 +166,11 @@ impl Server for EntityStoreServer {
173166
self.process_mqtt_message(mqtt_message).await;
174167
EntityStoreResponse::Ok
175168
}
176-
EntityStoreRequest::InitComplete => {
177-
if let Err(err) = self.init_complete().await {
178-
error!("Failed to process inventory.json file: {err}");
179-
}
180-
EntityStoreResponse::Ok
181-
}
182169
}
183170
}
184171
}
185172

186173
impl EntityStoreServer {
187-
async fn init_complete(&mut self) -> Result<(), entity_store::Error> {
188-
let inventory_file_path = self
189-
.config
190-
.config_dir
191-
.join(INVENTORY_FRAGMENTS_FILE_LOCATION);
192-
let file = File::open(inventory_file_path)?;
193-
let inventory_json: Value = serde_json::from_reader(file)?;
194-
let main_device = self.entity_store.main_device().clone();
195-
if let Value::Object(map) = inventory_json {
196-
for (key, value) in map {
197-
if self
198-
.entity_store
199-
.get_twin_fragment(&main_device, &key)
200-
.is_none()
201-
{
202-
self.publish_twin_data(&main_device, key.clone(), value.clone())
203-
.await;
204-
}
205-
}
206-
} else {
207-
error!(
208-
"Invalid inventory.json format: expected a JSON object, found {:?}",
209-
inventory_json
210-
);
211-
}
212-
213-
Ok(())
214-
}
215-
216174
pub(crate) async fn process_mqtt_message(&mut self, message: MqttMessage) {
217175
if let Ok((topic_id, channel)) = self.config.mqtt_schema.entity_channel_of(&message.topic) {
218176
if let Channel::EntityMetadata = channel {

crates/core/tedge_agent/src/entity_manager/tests.rs

Lines changed: 1 addition & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use tedge_api::entity::EntityType;
1717
use tedge_api::mqtt_topics::EntityTopicId;
1818
use tedge_mqtt_ext::test_helpers::assert_received_contains_str;
1919
use tedge_mqtt_ext::MqttMessage;
20-
use tedge_test_utils::fs::TempTedgeDir;
2120

2221
#[tokio::test]
2322
async fn new_entity_store() {
@@ -30,69 +29,6 @@ async fn new_entity_store() {
3029
)
3130
}
3231

33-
#[tokio::test]
34-
async fn process_inventory_json_content_on_init() {
35-
let handle = entity::server("device-under-test");
36-
let (mut entity_store, mut mqtt_box, tmp_dir) =
37-
(handle.entity_store, handle.mqtt_output, handle.tmp_dir);
38-
39-
let inventory_json = json!({
40-
"boolean_key": true,
41-
"numeric_key": 10,
42-
"string_key": "value"
43-
});
44-
create_inventory_json_file_with_content(&tmp_dir, &inventory_json.to_string());
45-
46-
entity::init_complete_signal(&mut entity_store)
47-
.await
48-
.unwrap();
49-
mqtt_box
50-
.assert_received([
51-
MqttMessage::from(("te/device/main///twin/boolean_key", "true")).with_retain(),
52-
MqttMessage::from(("te/device/main///twin/numeric_key", "10")).with_retain(),
53-
MqttMessage::from(("te/device/main///twin/string_key", "\"value\"")).with_retain(),
54-
])
55-
.await;
56-
}
57-
58-
#[tokio::test]
59-
async fn inventory_json_value_ignored_if_twin_data_present() {
60-
let handle = entity::server("device-under-test");
61-
let (mut entity_store, mut mqtt_box, tmp_dir) =
62-
(handle.entity_store, handle.mqtt_output, handle.tmp_dir);
63-
64-
let inventory_json = json!({
65-
"x": 1,
66-
"y": 2,
67-
"z": 3,
68-
});
69-
create_inventory_json_file_with_content(&tmp_dir, &inventory_json.to_string());
70-
71-
entity::set_twin_fragments(
72-
&mut entity_store,
73-
"device/main//",
74-
json!({"y": 20}).as_object().unwrap().clone(),
75-
)
76-
.await
77-
.unwrap();
78-
mqtt_box.skip(1).await; // Skip the above twin update
79-
80-
entity::init_complete_signal(&mut entity_store)
81-
.await
82-
.unwrap();
83-
mqtt_box
84-
.assert_received([
85-
MqttMessage::from(("te/device/main///twin/x", "1")).with_retain(),
86-
MqttMessage::from(("te/device/main///twin/z", "3")).with_retain(),
87-
])
88-
.await;
89-
}
90-
91-
fn create_inventory_json_file_with_content(ttd: &TempTedgeDir, content: &str) {
92-
let file = ttd.dir("device").file("inventory.json");
93-
file.with_raw_content(content);
94-
}
95-
9632
#[tokio::test]
9733
async fn removing_an_unknown_child_using_mqtt() {
9834
let registrations = vec![
@@ -486,16 +422,6 @@ mod entity {
486422
anyhow::bail!("Unexpected response");
487423
}
488424

489-
pub async fn init_complete_signal(
490-
entity_store: &mut EntityStoreServer,
491-
) -> Result<(), anyhow::Error> {
492-
if let EntityStoreResponse::Ok = entity_store.handle(EntityStoreRequest::InitComplete).await
493-
{
494-
return Ok(());
495-
};
496-
anyhow::bail!("Unexpected response");
497-
}
498-
499425
pub fn server(device_id: &str) -> TestHandle {
500426
let mqtt_schema = MqttSchema::default();
501427
let main_device = EntityRegistrationMessage::main_device(Some(device_id.to_string()));
@@ -513,11 +439,7 @@ mod entity {
513439
)
514440
.unwrap();
515441

516-
let config = EntityStoreServerConfig::new(
517-
tmp_dir.path().to_path_buf(),
518-
mqtt_schema.clone(),
519-
entity_auto_register,
520-
);
442+
let config = EntityStoreServerConfig::new(mqtt_schema.clone(), entity_auto_register);
521443

522444
let mqtt_actor = SimpleMessageBoxBuilder::new("MQTT", 64);
523445
let mut actor_builder = TestMqttActorBuilder {
@@ -530,15 +452,13 @@ mod entity {
530452
let mqtt_output = actor_builder.messages.build();
531453

532454
TestHandle {
533-
tmp_dir,
534455
entity_store: server,
535456
mqtt_input,
536457
mqtt_output,
537458
}
538459
}
539460

540461
pub(crate) struct TestHandle {
541-
pub tmp_dir: TempTedgeDir,
542462
pub entity_store: EntityStoreServer,
543463
pub mqtt_input: DynSender<MqttRequest>,
544464
pub mqtt_output: SimpleMessageBox<MqttRequest, MqttMessage>,

crates/core/tedge_agent/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ mod restart_manager;
2828
mod software_manager;
2929
mod state_repository;
3030
mod tedge_to_te_converter;
31+
mod twin_manager;
3132

3233
#[derive(Debug, Clone, clap::Parser)]
3334
#[clap(

0 commit comments

Comments
 (0)