Skip to content

Conversation

didier-wenzek
Copy link
Contributor

Proposed changes

Ensure the MQTT connection is properly closed

  • by tedge mqtt pub <topic> <message>
  • by tedge mqtt sub <topic-filter> when interrupted.

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue

#2922

Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s)
  • I ran cargo fmt as mentioned in CODING_GUIDELINES
  • I used cargo clippy as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

Copy link

codecov bot commented Jun 12, 2024

Codecov Report

Attention: Patch coverage is 61.90476% with 8 lines in your changes missing coverage. Please review.

Project coverage is 78.2%. Comparing base (e6d5178) to head (23c2714).
Report is 36 commits behind head on main.

Additional details and impacted files
Files Coverage Δ
...common/tedge_config_macros/impl/src/input/parse.rs 60.5% <ø> (+0.7%) ⬆️
crates/core/tedge/src/cli/disconnect/cli.rs 42.1% <ø> (+3.6%) ⬆️
...core/tedge/src/cli/disconnect/disconnect_bridge.rs 32.8% <ø> (ø)
crates/core/tedge/src/cli/mqtt/cli.rs 63.7% <ø> (-0.6%) ⬇️
crates/core/tedge/src/cli/reconnect/cli.rs 0.0% <ø> (ø)
crates/core/tedge/src/cli/reconnect/command.rs 0.0% <ø> (ø)
crates/core/tedge/src/cli/mqtt/publish.rs 43.3% <66.6%> (+1.8%) ⬆️
crates/core/tedge/src/cli/mqtt/subscribe.rs 0.0% <0.0%> (ø)
crates/core/tedge/src/cli/mqtt/mod.rs 69.2% <69.2%> (ø)

... and 21 files with indirect coverage changes

Copy link
Contributor

github-actions bot commented Jun 12, 2024

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
453 0 3 453 100 57m43.569133s

Copy link
Contributor

@albinsuresh albinsuresh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Happy to approve once the query about not having a signal handler for the publisher is answered.

@@ -135,6 +135,13 @@ fn publish(cmd: &MqttPublishCommand) -> Result<(), MqttError> {
}

client.disconnect()?;
for event in connection.iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I found the placement of this loop here, after the disconnect call, very confusing. But, I'm assuming that the disconnection request is also not processed until we explicitly poll the event loop as per the rumqtt design. Never expected this for a disconnect call as I thought that the Outgoing::Disconnect event is received by the event loop after the disconnect message is already sent to the broker, just as an ack to the client which doesn't need to be actively polled. But I guess they went for consistency in the processing model for disconnect as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the root cause of the issue. Triggering a disconnect() is not enough.

However, the point of polling the rumqtt event loop for an Outgoing::Disconnect event is:

  • not to perform the disconnection (this is done by rumqtt in a background thread)
  • but to be sure the disconnection actually happened before the tedge mqtt pub command dies.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but to be sure the disconnection actually happened before the tedge mqtt pub command dies.

In that case, should we wait for the Event::Incoming(Incoming::Disconnect(_)) ack from the broker as well? To make sure that the broker received it as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, should we wait for the Event::Incoming(Incoming::Disconnect(_)) ack from the broker as well? To make sure that the broker received it as well?

No, this event is sent when the broker itself triggers the disconnection.

@@ -108,6 +110,10 @@ fn subscribe(cmd: &MqttSubscribeCommand) -> Result<(), MqttError> {
eprintln!("INFO: Disconnected");
break;
}
Ok(Event::Outgoing(Outgoing::Disconnect)) => {
eprintln!("INFO: Interrupted");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
eprintln!("INFO: Interrupted");
eprintln!("ERROR: Interrupted");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't class this log message as an ERROR as it is normal behaviour for the user to want to stop the subscriber and the expected way is to use Ctrl-c

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I even wonder to get rid of this INFO message.

I tried to be consistent with the messages already emitted by this command (Connected, Disconnect, and now Interrupted). But are these really useful?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I even wonder to get rid of this INFO message.

I vote +1 for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's get rid of the log message. I just checked what mosquitto_sub does, and it does not print out a log message about being killed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed by 53d6536

@@ -82,6 +83,7 @@ fn subscribe(cmd: &MqttSubscribeCommand) -> Result<(), MqttError> {
}

let (mut client, mut connection) = Client::new(options, DEFAULT_QUEUE_CAPACITY);
disconnect_if_interrupted(client.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do this for the publisher as well, to handle cases like interrupting the publisher that's stuck in the connect phase itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not. A publisher will unlikely be stuck (tedge mqtt pub disconnects on error and awaits nothing more than an acknowledgement) but the connection could be not responsive.

=> I will add it for consistency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added by 531411e

Copy link
Contributor

@albinsuresh albinsuresh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Copy link
Contributor

@reubenmiller reubenmiller left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functions nicely now, and clean disconnects for both tedge mqtt pub and tedge mqtt sub commands.

@reubenmiller reubenmiller added theme:mqtt Theme: mqtt and mosquitto related topics theme:cli Theme: cli related topics labels Jun 13, 2024
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
@didier-wenzek didier-wenzek temporarily deployed to Test Pull Request June 14, 2024 09:27 — with GitHub Actions Inactive
@didier-wenzek didier-wenzek added this pull request to the merge queue Jun 14, 2024
Merged via the queue into thin-edge:main with commit 82732a9 Jun 14, 2024
@didier-wenzek didier-wenzek deleted the fix/mqtt-disconnect branch June 14, 2024 10:36
gligorisaev added a commit to gligorisaev/thin-edge.io that referenced this pull request Jun 17, 2024
…ly closed

Signed-off-by: gligorisaev <gligorisaev@gmail.com>
gligorisaev added a commit to gligorisaev/thin-edge.io that referenced this pull request Jun 21, 2024
…ly closed

Signed-off-by: gligorisaev <gligorisaev@gmail.com>
gligorisaev added a commit to gligorisaev/thin-edge.io that referenced this pull request Jul 2, 2024
…ly closed

Signed-off-by: gligorisaev <gligorisaev@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
theme:cli Theme: cli related topics theme:mqtt Theme: mqtt and mosquitto related topics
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants