Skip to content

Commit c88fbdf

Browse files
authored
fix(pubsub): fix bug with AckWithResult with exactly once disabled (#7319)
* fix(pubsub): fix bug with AckWithResult with exactly once disabled * switch wait argument to time.Duration
1 parent e364f7a commit c88fbdf

File tree

5 files changed

+23
-16
lines changed

5 files changed

+23
-16
lines changed

pubsub/integration_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,13 @@ func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronou
333333
timeout := 3 * time.Minute
334334
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
335335
defer cancel()
336-
gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) {
336+
gotMsgs, err := pullN(timeoutCtx, sub, len(want), 2*time.Second, func(ctx context.Context, m *Message) {
337+
if exactlyOnceDelivery {
338+
if _, err := m.AckWithResult().Get(ctx); err != nil {
339+
t.Fatalf("failed to ack message with exactly once delivery: %v", err)
340+
}
341+
return
342+
}
337343
m.Ack()
338344
})
339345
if err != nil {
@@ -2003,16 +2009,13 @@ func TestIntegration_TopicRetention(t *testing.T) {
20032009
}
20042010
}
20052011

2006-
func TestExactlyOnceDelivery_PublishReceive(t *testing.T) {
2012+
func TestIntegration_ExactlyOnceDelivery_PublishReceive(t *testing.T) {
20072013
ctx := context.Background()
20082014
client := integrationTestClient(ctx, t)
20092015

20102016
for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited
20112017
testPublishAndReceive(t, client, maxMsgs, false, true, 10, 0)
20122018
}
2013-
2014-
// Tests for large messages (larger than the 4MB gRPC limit).
2015-
testPublishAndReceive(t, client, 0, false, true, 1, 5*1024*1024)
20162019
}
20172020

20182021
func TestIntegration_TopicUpdateSchema(t *testing.T) {

pubsub/message.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,20 +157,20 @@ func (ah *psAckHandler) OnNack() {
157157
}
158158

159159
func (ah *psAckHandler) OnAckWithResult() *AckResult {
160+
// call done with true to indicate ack.
161+
ah.done(true)
160162
if !ah.exactlyOnceDelivery {
161163
return newSuccessAckResult()
162164
}
163-
// call done with true to indicate ack.
164-
ah.done(true)
165165
return ah.ackResult
166166
}
167167

168168
func (ah *psAckHandler) OnNackWithResult() *AckResult {
169+
// call done with false to indicate nack.
170+
ah.done(false)
169171
if !ah.exactlyOnceDelivery {
170172
return newSuccessAckResult()
171173
}
172-
// call done with false to indicate nack.
173-
ah.done(false)
174174
return ah.ackResult
175175
}
176176

pubsub/streaming_pull_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func TestStreamingPullMultipleFetches(t *testing.T) {
6666

6767
func testStreamingPullIteration(t *testing.T, client *Client, server *mockServer, msgs []*pb.ReceivedMessage) {
6868
sub := client.Subscription("S")
69-
gotMsgs, err := pullN(context.Background(), sub, len(msgs), func(_ context.Context, m *Message) {
69+
gotMsgs, err := pullN(context.Background(), sub, len(msgs), 0, func(_ context.Context, m *Message) {
7070
id, err := strconv.Atoi(msgAckID(m))
7171
if err != nil {
7272
t.Fatalf("pullN err: %v", err)
@@ -196,7 +196,7 @@ func TestStreamingPullRetry(t *testing.T) {
196196

197197
sub := client.Subscription("S")
198198
sub.ReceiveSettings.NumGoroutines = 1
199-
gotMsgs, err := pullN(context.Background(), sub, len(testMessages), func(_ context.Context, m *Message) {
199+
gotMsgs, err := pullN(context.Background(), sub, len(testMessages), 0, func(_ context.Context, m *Message) {
200200
id, err := strconv.Atoi(msgAckID(m))
201201
if err != nil {
202202
t.Fatalf("pullN err: %v", err)
@@ -297,7 +297,7 @@ func TestStreamingPullConcurrent(t *testing.T) {
297297
sub := client.Subscription("S")
298298
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
299299
defer cancel()
300-
gotMsgs, err := pullN(ctx, sub, nMessages, func(ctx context.Context, m *Message) {
300+
gotMsgs, err := pullN(ctx, sub, nMessages, 0, func(ctx context.Context, m *Message) {
301301
m.Ack()
302302
})
303303
if c := status.Convert(err); err != nil && c.Code() != codes.Canceled {
@@ -513,7 +513,8 @@ func newMock(t *testing.T) (*Client, *mockServer) {
513513
}
514514

515515
// pullN calls sub.Receive until at least n messages are received.
516-
func pullN(ctx context.Context, sub *Subscription, n int, f func(context.Context, *Message)) ([]*Message, error) {
516+
// Wait a provided duration before cancelling.
517+
func pullN(ctx context.Context, sub *Subscription, n int, wait time.Duration, f func(context.Context, *Message)) ([]*Message, error) {
517518
var (
518519
mu sync.Mutex
519520
msgs []*Message
@@ -526,6 +527,9 @@ func pullN(ctx context.Context, sub *Subscription, n int, f func(context.Context
526527
mu.Unlock()
527528
f(ctx, m)
528529
if nSeen >= n {
530+
// Wait a specified amount of time so that for exactly once delivery,
531+
// Acks aren't cancelled immediately.
532+
time.Sleep(wait)
529533
cancel()
530534
}
531535
})

pubsub/subscription.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -397,8 +397,8 @@ type SubscriptionConfig struct {
397397
// by Pub/Sub and have distinct MessageID values.
398398
//
399399
// Lastly, to guarantee messages have been acked or nacked properly, you must
400-
// call Message.AckWithResponse() or Message.NackWithResponse(). These return an
401-
// AckResponse which will be ready if the message has been acked (or failed to be acked).
400+
// call Message.AckWithResult() or Message.NackWithResult(). These return an
401+
// AckResult which will be ready if the message has been acked (or failed to be acked).
402402
EnableExactlyOnceDelivery bool
403403

404404
// State indicates whether or not the subscription can receive messages.

pubsub/subscription_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ func testReceive(t *testing.T, synchronous, exactlyOnceDelivery bool) {
296296
srv.Publish(topic.name, []byte{byte(i)}, nil)
297297
}
298298
sub.ReceiveSettings.Synchronous = synchronous
299-
msgs, err := pullN(ctx, sub, 256, func(_ context.Context, m *Message) {
299+
msgs, err := pullN(ctx, sub, 256, 0, func(_ context.Context, m *Message) {
300300
if exactlyOnceDelivery {
301301
ar := m.AckWithResult()
302302
// Don't use the above ctx here since that will get cancelled.

0 commit comments

Comments
 (0)