Skip to content

Commit 3f4ca07

Browse files
committed
Improve TDLib Server itreator.
1 parent 337fea2 commit 3f4ca07

File tree

1 file changed

+42
-5
lines changed

1 file changed

+42
-5
lines changed

pytdbot/client.py

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ def __init__(
183183
self._current_handlers = {}
184184
self._results: Dict[str, asyncio.Future] = {}
185185
self._workers_tasks = None
186+
self.__rabbitmq_iterator_task = None
186187
self.__authorization_state = None
187188
self.__cache = {"is_coro_filter": {}}
188189
self.__local_handlers = {
@@ -248,12 +249,18 @@ async def start(self) -> None:
248249

249250
if isinstance(self.workers, int):
250251
self._workers_tasks = [
251-
self.loop.create_task(self._queue_update_worker())
252+
self.loop.create_task(
253+
self._queue_update_worker()
254+
if not self.is_rabbitmq
255+
else self.__rabbitmq_worker()
256+
)
252257
for _ in range(self.workers)
253258
]
254259
self.__is_queue_worker = True
255260

256261
self.logger.info(f"Started with {self.workers} workers")
262+
elif self.is_rabbitmq:
263+
raise ValueError("workers must be an int when using TDLib Server")
257264
else:
258265
self.__is_queue_worker = False
259266
self.logger.info("Started with unlimited updates processes")
@@ -694,7 +701,7 @@ async def process_update(self, update):
694701
if update_handler:
695702
self.loop.create_task(update_handler(update))
696703

697-
if self.__is_queue_worker:
704+
if not self.is_rabbitmq and self.__is_queue_worker:
698705
self.queue.put_nowait(update)
699706
else:
700707
await self._handle_update(update)
@@ -882,7 +889,9 @@ async def __handle_authorization_state(
882889
f"{str(self.me.id) if not self.me.usernames else '@' + self.me.usernames.editable_username}"
883890
)
884891

885-
if (
892+
if self.authorization_state == "authorizationStateClosing":
893+
self.__is_closing = True
894+
elif (
886895
self.authorization_state == "authorizationStateClosed"
887896
and self.__is_closing is False
888897
):
@@ -968,7 +977,7 @@ async def __start_rabbitmq(self):
968977

969978
self.__rqueues = {
970979
"updates": updates_queue,
971-
"requests": await self.__rchannel.get_queue(self.my_id + "_requests"),
980+
"requests": await self.__rchannel.get_queue(f"{self.my_id}_requests"),
972981
"notify": notify_queue,
973982
"responses": responses_queue,
974983
}
@@ -986,10 +995,35 @@ async def __start_rabbitmq(self):
986995
await self.process_update(obj_to_dict(update))
987996

988997
if not self.no_updates:
989-
await self.__rqueues["updates"].consume(self.__on_update, no_ack=True)
998+
self.__rabbitmq_iterator_task = self.loop.create_task(
999+
self.__rabbitmq_iterator()
1000+
)
9901001

9911002
await self.__rqueues["notify"].consume(self.__on_update, no_ack=True)
9921003

1004+
async def __rabbitmq_iterator(self):
1005+
async with self.__rqueues["updates"].iterator() as iterator:
1006+
async for message in iterator:
1007+
await self.queue.put(message)
1008+
1009+
async def __rabbitmq_worker(self):
1010+
while self.is_running:
1011+
message: aio_pika.IncomingMessage = await self.queue.get()
1012+
1013+
try:
1014+
update = json_loads(message.body)
1015+
if self.__is_closing and not isinstance(
1016+
update, types.UpdateAuthorizationState
1017+
):
1018+
await message.nack(requeue=True)
1019+
continue
1020+
1021+
await self.process_update(update)
1022+
except Exception:
1023+
self.logger.exception("Error processing message")
1024+
1025+
await message.ack() # ack after processing
1026+
9931027
async def __handle_rabbitmq_message(self, message: aio_pika.IncomingMessage):
9941028
await self.process_update(json_loads(message.body))
9951029

@@ -1026,6 +1060,9 @@ def __stop_client(self) -> None:
10261060
self.is_authenticated = False
10271061
self.is_running = False
10281062

1063+
if self.__rabbitmq_iterator_task:
1064+
self.__rabbitmq_iterator_task.cancel()
1065+
10291066
if self.__is_queue_worker:
10301067
for worker_task in self._workers_tasks:
10311068
worker_task.cancel()

0 commit comments

Comments
 (0)