Skip to content

Commit d8e8aa5

Browse files
docs(samples): Add code sample for optimistic subscribe (#1182)
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 5094605 commit d8e8aa5

File tree

2 files changed

+143
-0
lines changed

2 files changed

+143
-0
lines changed

samples/snippets/subscriber.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,86 @@ def create_subscription(project_id: str, topic_id: str, subscription_id: str) ->
9494
# [END pubsub_create_pull_subscription]
9595

9696

97+
def optimistic_subscribe(
98+
project_id: str,
99+
topic_id: str,
100+
subscription_id: str,
101+
timeout: Optional[float] = None,
102+
) -> None:
103+
"""Optimistically subscribe to messages instead of making calls to verify existence
104+
of a subscription first and then subscribing to messages from it. This avoids admin
105+
operation calls to verify the existence of a subscription and reduces the probability
106+
of running out of quota for admin operations."""
107+
# [START pubsub_optimistic_subscribe]
108+
from google.api_core.exceptions import NotFound
109+
from google.cloud import pubsub_v1
110+
from concurrent.futures import TimeoutError
111+
112+
# TODO(developer)
113+
# project_id = "your-project-id"
114+
# subscription_id = "your-subscription-id"
115+
# Number of seconds the subscriber should listen for messages
116+
# timeout = 5.0
117+
# topic_id = "your-topic-id"
118+
119+
# Create a subscriber client.
120+
subscriber = pubsub_v1.SubscriberClient()
121+
122+
# The `subscription_path` method creates a fully qualified identifier
123+
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
124+
subscription_path = subscriber.subscription_path(project_id, subscription_id)
125+
126+
# Define callback to be called when a message is received.
127+
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
128+
# Ack message after processing it.
129+
message.ack()
130+
131+
# Wrap subscriber in a 'with' block to automatically call close() when done.
132+
with subscriber:
133+
try:
134+
# Optimistically subscribe to messages on the subscription.
135+
streaming_pull_future = subscriber.subscribe(
136+
subscription_path, callback=callback
137+
)
138+
streaming_pull_future.result(timeout=timeout)
139+
except TimeoutError:
140+
print("Successfully subscribed until the timeout passed.")
141+
streaming_pull_future.cancel() # Trigger the shutdown.
142+
streaming_pull_future.result() # Block until the shutdown is complete.
143+
except NotFound:
144+
print(f"Subscription {subscription_path} not found, creating it.")
145+
146+
try:
147+
# If the subscription does not exist, then create it.
148+
publisher = pubsub_v1.PublisherClient()
149+
topic_path = publisher.topic_path(project_id, topic_id)
150+
subscription = subscriber.create_subscription(
151+
request={"name": subscription_path, "topic": topic_path}
152+
)
153+
154+
if subscription:
155+
print(f"Subscription {subscription.name} created")
156+
else:
157+
raise ValueError("Subscription creation failed.")
158+
159+
# Subscribe on the created subscription.
160+
try:
161+
streaming_pull_future = subscriber.subscribe(
162+
subscription.name, callback=callback
163+
)
164+
streaming_pull_future.result(timeout=timeout)
165+
except TimeoutError:
166+
streaming_pull_future.cancel() # Trigger the shutdown.
167+
streaming_pull_future.result() # Block until the shutdown is complete.
168+
except Exception as e:
169+
print(
170+
f"Exception occurred when creating subscription and subscribing to it: {e}"
171+
)
172+
except Exception as e:
173+
print(f"Exception occurred when attempting optimistic subscribe: {e}")
174+
# [END pubsub_optimistic_subscribe]
175+
176+
97177
def create_subscription_with_dead_letter_topic(
98178
project_id: str,
99179
topic_id: str,
@@ -1161,6 +1241,15 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
11611241
remove_dead_letter_policy_parser.add_argument("topic_id")
11621242
remove_dead_letter_policy_parser.add_argument("subscription_id")
11631243

1244+
optimistic_subscribe_parser = subparsers.add_parser(
1245+
"optimistic-subscribe", help=optimistic_subscribe.__doc__
1246+
)
1247+
optimistic_subscribe_parser.add_argument("topic_id")
1248+
optimistic_subscribe_parser.add_argument("subscription_id")
1249+
optimistic_subscribe_parser.add_argument(
1250+
"timeout", default=None, type=float, nargs="?"
1251+
)
1252+
11641253
receive_parser = subparsers.add_parser("receive", help=receive_messages.__doc__)
11651254
receive_parser.add_argument("subscription_id")
11661255
receive_parser.add_argument("timeout", default=None, type=float, nargs="?")
@@ -1303,6 +1392,10 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
13031392
)
13041393
elif args.command == "remove-dead-letter-policy":
13051394
remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id)
1395+
elif args.command == "optimistic-subscribe":
1396+
optimistic_subscribe(
1397+
args.project_id, args.topic_id, args.subscription_id, args.timeout
1398+
)
13061399
elif args.command == "receive":
13071400
receive_messages(args.project_id, args.subscription_id, args.timeout)
13081401
elif args.command == "receive-custom-attributes":

samples/snippets/subscriber_test.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,56 @@ def test_create_subscription(
234234
subscriber_client.delete_subscription(request={"subscription": subscription_path})
235235

236236

237+
def test_optimistic_subscribe(
238+
subscriber_client: pubsub_v1.SubscriberClient,
239+
topic: str,
240+
publisher_client: pubsub_v1.PublisherClient,
241+
capsys: CaptureFixture[str],
242+
) -> None:
243+
subscription_id = f"subscription_for_optimistic_subscribe-{PY_VERSION}-{UUID}"
244+
subscription_path = subscriber_client.subscription_path(PROJECT_ID, subscription_id)
245+
# Ensure there is no pre-existing subscription.
246+
# So that we can test the case where optimistic subscribe fails.
247+
try:
248+
subscriber_client.delete_subscription(
249+
request={"subscription": subscription_path}
250+
)
251+
except NotFound:
252+
pass
253+
254+
# Invoke optimistic_subscribe when the subscription is not present.
255+
# This tests scenario where optimistic subscribe fails.
256+
subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, subscription_id, 5)
257+
out, _ = capsys.readouterr()
258+
# Verify optimistic subscription failed.
259+
assert f"Subscription {subscription_path} not found, creating it." in out
260+
# Verify that subscription created due to optimistic subscribe failure.
261+
assert f"Subscription {subscription_path} created" in out
262+
# Verify that subscription didn't already exist.
263+
assert "Successfully subscribed until the timeout passed." not in out
264+
265+
# Invoke optimistic_subscribe when the subscription is present.
266+
# This tests scenario where optimistic subscribe succeeds.
267+
subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, subscription_id, 5)
268+
269+
out, _ = capsys.readouterr()
270+
# Verify optimistic subscription succeeded.
271+
assert f"Subscription {subscription_path} not found, creating it." not in out
272+
# Verify that subscription was not created due to optimistic subscribe failure.
273+
assert f"Subscription {subscription_path} created" not in out
274+
# Verify that subscription already existed.
275+
assert "Successfully subscribed until the timeout passed." in out
276+
277+
# Test case where optimistic subscribe throws an exception other than NotFound
278+
# or TimeoutError.
279+
subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, "123", 5)
280+
out, _ = capsys.readouterr()
281+
assert "Exception occurred when attempting optimistic subscribe:" in out
282+
283+
# Clean up resources created during test.
284+
subscriber_client.delete_subscription(request={"subscription": subscription_path})
285+
286+
237287
def test_create_subscription_with_dead_letter_policy(
238288
subscriber_client: pubsub_v1.SubscriberClient,
239289
dead_letter_topic: str,

0 commit comments

Comments
 (0)