Skip to content

Commit 2bf6e14

Browse files
mikeklaashongalex
authored andcommitted
fix(pubsub/pstest): start DeliveryAttempt at 1
Per [the documentation](https://github.com/googleapis/google-cloud-go/blob/74da335fea6cd70b27808507f2e58ae53f5f4910/internal/pubsub/message.go#L63) `DeliveryAttempt` should start at 1 when a dead-letter topic is enabled, not 0 as pstest currently returns Add `DeliveryAttempt` to streaming pull responses.
1 parent e5c3f7a commit 2bf6e14

File tree

2 files changed

+29
-4
lines changed

2 files changed

+29
-4
lines changed

pubsub/pstest/fake.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1037,10 +1037,10 @@ func (s *subscription) pull(max int) []*pb.ReceivedMessage {
10371037
s.publishToDeadLetter(m)
10381038
continue
10391039
}
1040+
(*m.deliveries)++
10401041
if s.proto.DeadLetterPolicy != nil {
10411042
m.proto.DeliveryAttempt = int32(*m.deliveries)
10421043
}
1043-
(*m.deliveries)++
10441044
m.ackDeadline = now.Add(s.ackTimeout)
10451045
msgs = append(msgs, m.proto)
10461046
if len(msgs) >= max {
@@ -1122,6 +1122,13 @@ func (s *subscription) deliver() {
11221122
//
11231123
// Must be called with the lock held.
11241124
func (s *subscription) tryDeliverMessage(m *message, start int, now time.Time) (int, bool) {
1125+
// Optimistically increment DeliveryAttempt assuming we'll be able to deliver the message. This is
1126+
// safe since the lock is held for the duration of this function, and the channel receiver does not
1127+
// modify the message.
1128+
if s.proto.DeadLetterPolicy != nil {
1129+
m.proto.DeliveryAttempt = int32(*m.deliveries) + 1
1130+
}
1131+
11251132
for i := 0; i < len(s.streams); i++ {
11261133
idx := (i + start) % len(s.streams)
11271134

@@ -1139,6 +1146,10 @@ func (s *subscription) tryDeliverMessage(m *message, start int, now time.Time) (
11391146
default:
11401147
}
11411148
}
1149+
// Restore the correct value of DeliveryAttempt if we were not able to deliver the message.
1150+
if s.proto.DeadLetterPolicy != nil {
1151+
m.proto.DeliveryAttempt = int32(*m.deliveries)
1152+
}
11421153
return 0, false
11431154
}
11441155

pubsub/pstest/fake_test.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -346,8 +346,8 @@ func TestSubscriptionDeadLetter(t *testing.T) {
346346

347347
}
348348
for _, m := range pull.ReceivedMessages {
349-
if int32(i) != m.DeliveryAttempt {
350-
t.Fatalf("message delivery attempt not the expected one. expected: %d, actual: %d", i, m.DeliveryAttempt)
349+
if int32(i+1) != m.DeliveryAttempt {
350+
t.Fatalf("message delivery attempt not the expected one. expected: %d, actual: %d", i+1, m.DeliveryAttempt)
351351
}
352352
_, err := server.GServer.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{
353353
Subscription: sub.Name,
@@ -551,19 +551,33 @@ func TestStreamingPull(t *testing.T) {
551551
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
552552
defer cleanup()
553553

554+
deadLetterTopic := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{
555+
Name: "projects/P/topics/deadLetter",
556+
})
557+
554558
top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
555559
sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{
556560
Name: "projects/P/subscriptions/S",
557561
Topic: top.Name,
558562
AckDeadlineSeconds: 10,
563+
DeadLetterPolicy: &pb.DeadLetterPolicy{
564+
DeadLetterTopic: deadLetterTopic.Name,
565+
MaxDeliveryAttempts: 3,
566+
},
559567
})
560568

561569
want := publish(t, srv, pclient, top, []*pb.PubsubMessage{
562570
{Data: []byte("d1")},
563571
{Data: []byte("d2")},
564572
{Data: []byte("d3")},
565573
})
566-
got := pubsubMessages(streamingPullN(context.TODO(), t, len(want), sclient, sub))
574+
received := streamingPullN(context.TODO(), t, len(want), sclient, sub)
575+
for _, m := range received {
576+
if m.DeliveryAttempt != 1 {
577+
t.Errorf("got DeliveryAttempt==%d, want 1", m.DeliveryAttempt)
578+
}
579+
}
580+
got := pubsubMessages(received)
567581
if diff := testutil.Diff(got, want); diff != "" {
568582
t.Error(diff)
569583
}

0 commit comments

Comments
 (0)