@@ -787,3 +787,53 @@ func TestExactlyOnceDelivery_ReceiptModackError(t *testing.T) {
787
787
t .Fatal ("expected message to not have been delivered when exactly once enabled" )
788
788
})
789
789
}
790
+
791
+ func TestSubscribeMessageExpirationFlowControl (t * testing.T ) {
792
+ t .Parallel ()
793
+ ctx , cancel := context .WithCancel (context .Background ())
794
+ defer cancel ()
795
+ client , srv := newFake (t )
796
+ defer client .Close ()
797
+ defer srv .Close ()
798
+
799
+ topic := mustCreateTopic (t , client , "t" )
800
+ subConfig := SubscriptionConfig {
801
+ Topic : topic ,
802
+ }
803
+ s , err := client .CreateSubscription (ctx , "s" , subConfig )
804
+ if err != nil {
805
+ t .Fatalf ("create sub err: %v" , err )
806
+ }
807
+
808
+ s .ReceiveSettings .NumGoroutines = 1
809
+ s .ReceiveSettings .MaxOutstandingMessages = 1
810
+ s .ReceiveSettings .MaxExtension = 10 * time .Second
811
+ s .ReceiveSettings .MaxExtensionPeriod = 10 * time .Second
812
+ r := topic .Publish (ctx , & Message {
813
+ Data : []byte ("redelivered-message" ),
814
+ })
815
+ if _ , err := r .Get (ctx ); err != nil {
816
+ t .Fatalf ("failed to publish message: %v" , err )
817
+ }
818
+
819
+ deliveryCount := 0
820
+ ctx , cancel = context .WithTimeout (context .Background (), 30 * time .Second )
821
+ defer cancel ()
822
+ err = s .Receive (ctx , func (ctx context.Context , msg * Message ) {
823
+ // Only acknowledge the message on the 2nd invocation of the callback (2nd delivery).
824
+ if deliveryCount == 1 {
825
+ msg .Ack ()
826
+ }
827
+ // Otherwise, do nothing and let the message expire.
828
+ deliveryCount ++
829
+ if deliveryCount == 2 {
830
+ cancel ()
831
+ }
832
+ })
833
+ if deliveryCount != 2 {
834
+ t .Fatalf ("expected 2 iterations of the callback, got %d" , deliveryCount )
835
+ }
836
+ if err != nil {
837
+ t .Fatalf ("s.Receive err: %v" , err )
838
+ }
839
+ }
0 commit comments