-
Notifications
You must be signed in to change notification settings - Fork 66
fix: Ensure the MQTT connection is properly closed #2936
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Ensure the MQTT connection is properly closed #2936
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files
|
Robot Results
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Happy to approve once the query about not having a signal handler for the publisher is answered.
@@ -135,6 +135,13 @@ fn publish(cmd: &MqttPublishCommand) -> Result<(), MqttError> { | |||
} | |||
|
|||
client.disconnect()?; | |||
for event in connection.iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially I found the placement of this loop here, after the disconnect call, very confusing. But, I'm assuming that the disconnection request is also not processed until we explicitly poll the event loop as per the rumqtt design. Never expected this for a disconnect call as I thought that the Outgoing::Disconnect
event is received by the event loop after the disconnect message is already sent to the broker, just as an ack to the client which doesn't need to be actively polled. But I guess they went for consistency in the processing model for disconnect as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the root cause of the issue. Triggering a disconnect()
is not enough.
However, the point of polling the rumqtt event loop for an Outgoing::Disconnect
event is:
- not to perform the disconnection (this is done by rumqtt in a background thread)
- but to be sure the disconnection actually happened before the
tedge mqtt pub
command dies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but to be sure the disconnection actually happened before the tedge mqtt pub command dies.
In that case, should we wait for the Event::Incoming(Incoming::Disconnect(_))
ack from the broker as well? To make sure that the broker received it as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, should we wait for the
Event::Incoming(Incoming::Disconnect(_))
ack from the broker as well? To make sure that the broker received it as well?
No, this event is sent when the broker itself triggers the disconnection.
@@ -108,6 +110,10 @@ fn subscribe(cmd: &MqttSubscribeCommand) -> Result<(), MqttError> { | |||
eprintln!("INFO: Disconnected"); | |||
break; | |||
} | |||
Ok(Event::Outgoing(Outgoing::Disconnect)) => { | |||
eprintln!("INFO: Interrupted"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eprintln!("INFO: Interrupted"); | |
eprintln!("ERROR: Interrupted"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't class this log message as an ERROR as it is normal behaviour for the user to want to stop the subscriber and the expected way is to use Ctrl-c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I even wonder to get rid of this INFO message.
I tried to be consistent with the messages already emitted by this command (Connected
, Disconnect
, and now Interrupted
). But are these really useful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I even wonder to get rid of this INFO message.
I vote +1 for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let's get rid of the log message. I just checked what mosquitto_sub
does, and it does not print out a log message about being killed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed by 53d6536
@@ -82,6 +83,7 @@ fn subscribe(cmd: &MqttSubscribeCommand) -> Result<(), MqttError> { | |||
} | |||
|
|||
let (mut client, mut connection) = Client::new(options, DEFAULT_QUEUE_CAPACITY); | |||
disconnect_if_interrupted(client.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not do this for the publisher as well, to handle cases like interrupting the publisher that's stuck in the connect phase itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not. A publisher will unlikely be stuck (tedge mqtt pub
disconnects on error and awaits nothing more than an acknowledgement) but the connection could be not responsive.
=> I will add it for consistency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added by 531411e
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Functions nicely now, and clean disconnects for both tedge mqtt pub
and tedge mqtt sub
commands.
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
53d6536
to
2ad3d06
Compare
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
af7c83c
to
23c2714
Compare
…ly closed Signed-off-by: gligorisaev <gligorisaev@gmail.com>
…ly closed Signed-off-by: gligorisaev <gligorisaev@gmail.com>
…ly closed Signed-off-by: gligorisaev <gligorisaev@gmail.com>
Proposed changes
Ensure the MQTT connection is properly closed
tedge mqtt pub <topic> <message>
tedge mqtt sub <topic-filter>
when interrupted.Types of changes
Paste Link to the issue
#2922
Checklist
cargo fmt
as mentioned in CODING_GUIDELINEScargo clippy
as mentioned in CODING_GUIDELINESFurther comments