-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-19661: Streams groups sometimes describe as NOT_READY when STABLE #20457
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
Streams groups sometimes describe as NOT_READY when STABLE. That is, the group is configured and all topics exist, but when you use LIST_GROUP and STREAMS_GROUP_DESCRIBE, the group will show up as not ready. The root cause seems to be that apache#19802 moved the creation of the soft state configured topology from the replay path to the heartbeat. This way, LIST_GROUP and STREAMS_GROUP_DESCRIBE, which show the snapshot of the last committed offset, may not show the configured topology, because the configured topology that is created in the heartbeat is "thrown away", and the new group is recreated on the replay-path. We can work around this issue by storing a reference to the streams group objected created in the heartbeat immediately in the GroupMetadataManager. This is a snapshottable collection, so if we fail to write the corresponding records to the consumer offset topic, this addition of the streams group will be reverted.
959a6d5
to
641d90c
Compare
641d90c
to
e84ba2f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I need some more background information, as I am not familiar with this part of the code.
From the PR description:
this addition of the streams group will be reverted.
How does this revert happen?
@@ -239,7 +239,8 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { | |||
|
|||
def createStreamsGroup[K, V](configOverrides: Properties = new Properties, | |||
configsToRemove: List[String] = List(), | |||
inputTopic: String, | |||
inputTopics: Set[String], | |||
changelogTopics: Set[String] = Set(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this is only test code, I am not sure if I understand why we add this parameter now? Why is this required?
util.Set.of(), | ||
util.Map.of(), | ||
util.Map.of(inputTopic + "-store-changelog", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of())), | ||
changelogTopics.map(c => (c, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.empty(), util.Map.of()))).toMap.asJava, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we change number of partition from 1
to "empty"?
@@ -17194,8 +17288,7 @@ public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() { | |||
new StreamsGroupHeartbeatResponseData() | |||
.setMemberId(memberId) | |||
.setMemberEpoch(1) | |||
.setHeartbeatIntervalMs(5000) | |||
.setEndpointInformationEpoch(-1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why we need this change and the changes below?
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) | ||
.build(); | ||
|
||
// Commit the offset and test again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I understand this comment? What "offset" do we commit? -- Also "test again"? Above we use context.groupMetadataManager.streamsGroup(groupId)
to see if the group exists, but below we us ontext.groupMetadataManager.streamsGroupDescribe(...)
-- so we do different things.
context.commit(); | ||
|
||
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.streamsGroupDescribe(List.of(groupId), context.lastCommittedOffset); | ||
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup() | |
StreamsGroupDescribeResponseData.DescribedGroup excpetedGroupDescription = new StreamsGroupDescribeResponseData.DescribedGroup() |
if (group == null) { | ||
return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics); | ||
StreamsGroup newGroup = new StreamsGroup(logContext, snapshotRegistry, groupId, metrics); | ||
groups.put(groupId, newGroup); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the fix, right? We add the new group directly into the soft state? (same below)
expectedMember.asStreamsGroupDescribeMember(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, | ||
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) | ||
)) | ||
.setGroupState(StreamsGroupState.STABLE.toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this status would be NOT_READY
w/o this fix? Because the HB itself did not update the soft state, which "describe" code path queries?
removeGroup(groupId); | ||
// In case of streams groups, which get inserted into memory immediately to store soft state, | ||
// It may happen that the groups map contains the new streams groups already, and the classic group | ||
// was removed already. In this case, we can ignore the tombstone. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I fully understand? Also for my own education: when is replay
actually executed? Only during GC load? During this phase, do we accept HB already (if not, softstate could not have been updated while we are loading).
It's not fully clear to me, what this means (from the PR description):
because the configured topology that is created in the heartbeat is "thrown
away", and the new group is recreated on the replay-path.
@@ -4441,6 +4442,55 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { | |||
} | |||
} | |||
|
|||
@Test | |||
def testDescribeStreamsGroupsForStatelessTopology(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it relevant if the group is stateless (vs stateful)?
Streams groups sometimes describe as NOT_READY when STABLE. That is, the
group is configured and all topics exist, but when you use LIST_GROUP
and STREAMS_GROUP_DESCRIBE, the group will show up as not ready.
The root cause seems to be that #19802 moved the creation of the soft
state configured topology from the replay path to the heartbeat. This
way, LIST_GROUP and STREAMS_GROUP_DESCRIBE, which show the snapshot of
the last committed offset, may not show the configured topology, because
the configured topology that is created in the heartbeat is "thrown
away", and the new group is recreated on the replay-path.
We can work around this issue by storing a reference to the streams
group objected created in the heartbeat immediately in the
GroupMetadataManager. This is a snapshottable collection, so if we fail
to write the corresponding records to the consumer offset topic, this
addition of the streams group will be reverted.