@@ -94,6 +94,86 @@ def create_subscription(project_id: str, topic_id: str, subscription_id: str) ->
94
94
# [END pubsub_create_pull_subscription]
95
95
96
96
97
+ def optimistic_subscribe (
98
+ project_id : str ,
99
+ topic_id : str ,
100
+ subscription_id : str ,
101
+ timeout : Optional [float ] = None ,
102
+ ) -> None :
103
+ """Optimistically subscribe to messages instead of making calls to verify existence
104
+ of a subscription first and then subscribing to messages from it. This avoids admin
105
+ operation calls to verify the existence of a subscription and reduces the probability
106
+ of running out of quota for admin operations."""
107
+ # [START pubsub_optimistic_subscribe]
108
+ from google .api_core .exceptions import NotFound
109
+ from google .cloud import pubsub_v1
110
+ from concurrent .futures import TimeoutError
111
+
112
+ # TODO(developer)
113
+ # project_id = "your-project-id"
114
+ # subscription_id = "your-subscription-id"
115
+ # Number of seconds the subscriber should listen for messages
116
+ # timeout = 5.0
117
+ # topic_id = "your-topic-id"
118
+
119
+ # Create a subscriber client.
120
+ subscriber = pubsub_v1 .SubscriberClient ()
121
+
122
+ # The `subscription_path` method creates a fully qualified identifier
123
+ # in the form `projects/{project_id}/subscriptions/{subscription_id}`
124
+ subscription_path = subscriber .subscription_path (project_id , subscription_id )
125
+
126
+ # Define callback to be called when a message is received.
127
+ def callback (message : pubsub_v1 .subscriber .message .Message ) -> None :
128
+ # Ack message after processing it.
129
+ message .ack ()
130
+
131
+ # Wrap subscriber in a 'with' block to automatically call close() when done.
132
+ with subscriber :
133
+ try :
134
+ # Optimistically subscribe to messages on the subscription.
135
+ streaming_pull_future = subscriber .subscribe (
136
+ subscription_path , callback = callback
137
+ )
138
+ streaming_pull_future .result (timeout = timeout )
139
+ except TimeoutError :
140
+ print ("Successfully subscribed until the timeout passed." )
141
+ streaming_pull_future .cancel () # Trigger the shutdown.
142
+ streaming_pull_future .result () # Block until the shutdown is complete.
143
+ except NotFound :
144
+ print (f"Subscription { subscription_path } not found, creating it." )
145
+
146
+ try :
147
+ # If the subscription does not exist, then create it.
148
+ publisher = pubsub_v1 .PublisherClient ()
149
+ topic_path = publisher .topic_path (project_id , topic_id )
150
+ subscription = subscriber .create_subscription (
151
+ request = {"name" : subscription_path , "topic" : topic_path }
152
+ )
153
+
154
+ if subscription :
155
+ print (f"Subscription { subscription .name } created" )
156
+ else :
157
+ raise ValueError ("Subscription creation failed." )
158
+
159
+ # Subscribe on the created subscription.
160
+ try :
161
+ streaming_pull_future = subscriber .subscribe (
162
+ subscription .name , callback = callback
163
+ )
164
+ streaming_pull_future .result (timeout = timeout )
165
+ except TimeoutError :
166
+ streaming_pull_future .cancel () # Trigger the shutdown.
167
+ streaming_pull_future .result () # Block until the shutdown is complete.
168
+ except Exception as e :
169
+ print (
170
+ f"Exception occurred when creating subscription and subscribing to it: { e } "
171
+ )
172
+ except Exception as e :
173
+ print (f"Exception occurred when attempting optimistic subscribe: { e } " )
174
+ # [END pubsub_optimistic_subscribe]
175
+
176
+
97
177
def create_subscription_with_dead_letter_topic (
98
178
project_id : str ,
99
179
topic_id : str ,
@@ -1161,6 +1241,15 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
1161
1241
remove_dead_letter_policy_parser .add_argument ("topic_id" )
1162
1242
remove_dead_letter_policy_parser .add_argument ("subscription_id" )
1163
1243
1244
+ optimistic_subscribe_parser = subparsers .add_parser (
1245
+ "optimistic-subscribe" , help = optimistic_subscribe .__doc__
1246
+ )
1247
+ optimistic_subscribe_parser .add_argument ("topic_id" )
1248
+ optimistic_subscribe_parser .add_argument ("subscription_id" )
1249
+ optimistic_subscribe_parser .add_argument (
1250
+ "timeout" , default = None , type = float , nargs = "?"
1251
+ )
1252
+
1164
1253
receive_parser = subparsers .add_parser ("receive" , help = receive_messages .__doc__ )
1165
1254
receive_parser .add_argument ("subscription_id" )
1166
1255
receive_parser .add_argument ("timeout" , default = None , type = float , nargs = "?" )
@@ -1303,6 +1392,10 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
1303
1392
)
1304
1393
elif args .command == "remove-dead-letter-policy" :
1305
1394
remove_dead_letter_policy (args .project_id , args .topic_id , args .subscription_id )
1395
+ elif args .command == "optimistic-subscribe" :
1396
+ optimistic_subscribe (
1397
+ args .project_id , args .topic_id , args .subscription_id , args .timeout
1398
+ )
1306
1399
elif args .command == "receive" :
1307
1400
receive_messages (args .project_id , args .subscription_id , args .timeout )
1308
1401
elif args .command == "receive-custom-attributes" :
0 commit comments