Skip to content

Commit 2b6b0da

Browse files
authored
fix(pubsub): move flow control release to callback completion (#9311)
* fix(pubsub): move flow control release to end of callback rather done function * make test failure faster, add exactly once logic back
1 parent 97d62c7 commit 2b6b0da

File tree

2 files changed

+54
-10
lines changed

2 files changed

+54
-10
lines changed

pubsub/subscription.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525

2626
"cloud.google.com/go/iam"
2727
"cloud.google.com/go/internal/optional"
28-
ipubsub "cloud.google.com/go/internal/pubsub"
2928
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
3029
"cloud.google.com/go/pubsub/internal/scheduler"
3130
gax "github.com/googleapis/gax-go/v2"
@@ -1389,24 +1388,19 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
13891388
return nil
13901389
}
13911390
iter.eoMu.RLock()
1392-
ackh, _ := msgAckHandler(msg, iter.enableExactlyOnceDelivery)
1391+
msgAckHandler(msg, iter.enableExactlyOnceDelivery)
13931392
iter.eoMu.RUnlock()
1394-
old := ackh.doneFunc
1395-
msgLen := len(msg.Data)
1396-
ackh.doneFunc = func(ackID string, ack bool, r *ipubsub.AckResult, receiveTime time.Time) {
1397-
defer fc.release(ctx, msgLen)
1398-
old(ackID, ack, r, receiveTime)
1399-
}
1393+
14001394
wg.Add(1)
14011395
// Make sure the subscription has ordering enabled before adding to scheduler.
14021396
var key string
14031397
if s.enableOrdering {
14041398
key = msg.OrderingKey
14051399
}
1406-
// TODO(deklerk): Can we have a generic handler at the
1407-
// constructor level?
1400+
msgLen := len(msg.Data)
14081401
if err := sched.Add(key, msg, func(msg interface{}) {
14091402
defer wg.Done()
1403+
defer fc.release(ctx, msgLen)
14101404
f(ctx2, msg.(*Message))
14111405
}); err != nil {
14121406
wg.Done()

pubsub/subscription_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,3 +787,53 @@ func TestExactlyOnceDelivery_ReceiptModackError(t *testing.T) {
787787
t.Fatal("expected message to not have been delivered when exactly once enabled")
788788
})
789789
}
790+
791+
func TestSubscribeMessageExpirationFlowControl(t *testing.T) {
792+
t.Parallel()
793+
ctx, cancel := context.WithCancel(context.Background())
794+
defer cancel()
795+
client, srv := newFake(t)
796+
defer client.Close()
797+
defer srv.Close()
798+
799+
topic := mustCreateTopic(t, client, "t")
800+
subConfig := SubscriptionConfig{
801+
Topic: topic,
802+
}
803+
s, err := client.CreateSubscription(ctx, "s", subConfig)
804+
if err != nil {
805+
t.Fatalf("create sub err: %v", err)
806+
}
807+
808+
s.ReceiveSettings.NumGoroutines = 1
809+
s.ReceiveSettings.MaxOutstandingMessages = 1
810+
s.ReceiveSettings.MaxExtension = 10 * time.Second
811+
s.ReceiveSettings.MaxExtensionPeriod = 10 * time.Second
812+
r := topic.Publish(ctx, &Message{
813+
Data: []byte("redelivered-message"),
814+
})
815+
if _, err := r.Get(ctx); err != nil {
816+
t.Fatalf("failed to publish message: %v", err)
817+
}
818+
819+
deliveryCount := 0
820+
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
821+
defer cancel()
822+
err = s.Receive(ctx, func(ctx context.Context, msg *Message) {
823+
// Only acknowledge the message on the 2nd invocation of the callback (2nd delivery).
824+
if deliveryCount == 1 {
825+
msg.Ack()
826+
}
827+
// Otherwise, do nothing and let the message expire.
828+
deliveryCount++
829+
if deliveryCount == 2 {
830+
cancel()
831+
}
832+
})
833+
if deliveryCount != 2 {
834+
t.Fatalf("expected 2 iterations of the callback, got %d", deliveryCount)
835+
}
836+
if err != nil {
837+
t.Fatalf("s.Receive err: %v", err)
838+
}
839+
}

0 commit comments

Comments
 (0)