Skip to content

Conversation

lucasbru
Copy link
Member

@lucasbru lucasbru commented Sep 2, 2025

Streams groups sometimes describe as NOT_READY when STABLE. That is, the
group is configured and all topics exist, but when you use LIST_GROUP
and STREAMS_GROUP_DESCRIBE, the group will show up as not ready.

The root cause seems to be that #19802 moved the creation of the soft
state configured topology from the replay path to the heartbeat. This
way, LIST_GROUP and STREAMS_GROUP_DESCRIBE, which show the snapshot of
the last committed offset, may not show the configured topology, because
the configured topology that is created in the heartbeat is "thrown
away", and the new group is recreated on the replay-path.

We can work around this issue by storing a reference to the streams
group objected created in the heartbeat immediately in the
GroupMetadataManager. This is a snapshottable collection, so if we fail
to write the corresponding records to the consumer offset topic, this
addition of the streams group will be reverted.

Streams groups sometimes describe as NOT_READY when STABLE. That is,
the group is configured and all topics exist, but when you use
LIST_GROUP and STREAMS_GROUP_DESCRIBE, the group will show up as not
ready.

The root cause seems to be that apache#19802 moved the creation of the soft
state configured topology from the replay path to the heartbeat. This
way, LIST_GROUP and STREAMS_GROUP_DESCRIBE, which show the snapshot of
the last committed offset, may not show the configured topology, because
the configured topology that is created in the heartbeat is "thrown away",
and the new group is recreated on the replay-path.

We can work around this issue by storing a reference to the streams
group objected created in the heartbeat immediately in the
GroupMetadataManager. This is a snapshottable collection, so if
we fail to write the corresponding records to the consumer offset
topic, this addition of the streams group will be reverted.
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I need some more background information, as I am not familiar with this part of the code.

From the PR description:

this addition of the streams group will be reverted.

How does this revert happen?

@@ -239,7 +239,8 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {

def createStreamsGroup[K, V](configOverrides: Properties = new Properties,
configsToRemove: List[String] = List(),
inputTopic: String,
inputTopics: Set[String],
changelogTopics: Set[String] = Set(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is only test code, I am not sure if I understand why we add this parameter now? Why is this required?

util.Set.of(),
util.Map.of(),
util.Map.of(inputTopic + "-store-changelog", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of())),
changelogTopics.map(c => (c, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.empty(), util.Map.of()))).toMap.asJava,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we change number of partition from 1 to "empty"?

@@ -17194,8 +17288,7 @@ public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() {
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000)
.setEndpointInformationEpoch(-1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we need this change and the changes below?

TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)))
.build();

// Commit the offset and test again
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand this comment? What "offset" do we commit? -- Also "test again"? Above we use context.groupMetadataManager.streamsGroup(groupId) to see if the group exists, but below we us ontext.groupMetadataManager.streamsGroupDescribe(...) -- so we do different things.

context.commit();

List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.streamsGroupDescribe(List.of(groupId), context.lastCommittedOffset);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
StreamsGroupDescribeResponseData.DescribedGroup excpetedGroupDescription = new StreamsGroupDescribeResponseData.DescribedGroup()

if (group == null) {
return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
StreamsGroup newGroup = new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
groups.put(groupId, newGroup);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix, right? We add the new group directly into the soft state? (same below)

expectedMember.asStreamsGroupDescribeMember(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)))
))
.setGroupState(StreamsGroupState.STABLE.toString())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this status would be NOT_READY w/o this fix? Because the HB itself did not update the soft state, which "describe" code path queries?

removeGroup(groupId);
// In case of streams groups, which get inserted into memory immediately to store soft state,
// It may happen that the groups map contains the new streams groups already, and the classic group
// was removed already. In this case, we can ignore the tombstone.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I fully understand? Also for my own education: when is replay actually executed? Only during GC load? During this phase, do we accept HB already (if not, softstate could not have been updated while we are loading).

It's not fully clear to me, what this means (from the PR description):

because the configured topology that is created in the heartbeat is "thrown
away", and the new group is recreated on the replay-path.

@@ -4441,6 +4442,55 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
}

@Test
def testDescribeStreamsGroupsForStatelessTopology(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it relevant if the group is stateless (vs stateful)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants