@@ -183,6 +183,7 @@ def __init__(
183
183
self ._current_handlers = {}
184
184
self ._results : Dict [str , asyncio .Future ] = {}
185
185
self ._workers_tasks = None
186
+ self .__rabbitmq_iterator_task = None
186
187
self .__authorization_state = None
187
188
self .__cache = {"is_coro_filter" : {}}
188
189
self .__local_handlers = {
@@ -248,12 +249,18 @@ async def start(self) -> None:
248
249
249
250
if isinstance (self .workers , int ):
250
251
self ._workers_tasks = [
251
- self .loop .create_task (self ._queue_update_worker ())
252
+ self .loop .create_task (
253
+ self ._queue_update_worker ()
254
+ if not self .is_rabbitmq
255
+ else self .__rabbitmq_worker ()
256
+ )
252
257
for _ in range (self .workers )
253
258
]
254
259
self .__is_queue_worker = True
255
260
256
261
self .logger .info (f"Started with { self .workers } workers" )
262
+ elif self .is_rabbitmq :
263
+ raise ValueError ("workers must be an int when using TDLib Server" )
257
264
else :
258
265
self .__is_queue_worker = False
259
266
self .logger .info ("Started with unlimited updates processes" )
@@ -694,7 +701,7 @@ async def process_update(self, update):
694
701
if update_handler :
695
702
self .loop .create_task (update_handler (update ))
696
703
697
- if self .__is_queue_worker :
704
+ if not self . is_rabbitmq and self .__is_queue_worker :
698
705
self .queue .put_nowait (update )
699
706
else :
700
707
await self ._handle_update (update )
@@ -882,7 +889,9 @@ async def __handle_authorization_state(
882
889
f"{ str (self .me .id ) if not self .me .usernames else '@' + self .me .usernames .editable_username } "
883
890
)
884
891
885
- if (
892
+ if self .authorization_state == "authorizationStateClosing" :
893
+ self .__is_closing = True
894
+ elif (
886
895
self .authorization_state == "authorizationStateClosed"
887
896
and self .__is_closing is False
888
897
):
@@ -968,7 +977,7 @@ async def __start_rabbitmq(self):
968
977
969
978
self .__rqueues = {
970
979
"updates" : updates_queue ,
971
- "requests" : await self .__rchannel .get_queue (self .my_id + " _requests" ),
980
+ "requests" : await self .__rchannel .get_queue (f" { self .my_id } _requests" ),
972
981
"notify" : notify_queue ,
973
982
"responses" : responses_queue ,
974
983
}
@@ -986,10 +995,35 @@ async def __start_rabbitmq(self):
986
995
await self .process_update (obj_to_dict (update ))
987
996
988
997
if not self .no_updates :
989
- await self .__rqueues ["updates" ].consume (self .__on_update , no_ack = True )
998
+ self .__rabbitmq_iterator_task = self .loop .create_task (
999
+ self .__rabbitmq_iterator ()
1000
+ )
990
1001
991
1002
await self .__rqueues ["notify" ].consume (self .__on_update , no_ack = True )
992
1003
1004
+ async def __rabbitmq_iterator (self ):
1005
+ async with self .__rqueues ["updates" ].iterator () as iterator :
1006
+ async for message in iterator :
1007
+ await self .queue .put (message )
1008
+
1009
+ async def __rabbitmq_worker (self ):
1010
+ while self .is_running :
1011
+ message : aio_pika .IncomingMessage = await self .queue .get ()
1012
+
1013
+ try :
1014
+ update = json_loads (message .body )
1015
+ if self .__is_closing and not isinstance (
1016
+ update , types .UpdateAuthorizationState
1017
+ ):
1018
+ await message .nack (requeue = True )
1019
+ continue
1020
+
1021
+ await self .process_update (update )
1022
+ except Exception :
1023
+ self .logger .exception ("Error processing message" )
1024
+
1025
+ await message .ack () # ack after processing
1026
+
993
1027
async def __handle_rabbitmq_message (self , message : aio_pika .IncomingMessage ):
994
1028
await self .process_update (json_loads (message .body ))
995
1029
@@ -1026,6 +1060,9 @@ def __stop_client(self) -> None:
1026
1060
self .is_authenticated = False
1027
1061
self .is_running = False
1028
1062
1063
+ if self .__rabbitmq_iterator_task :
1064
+ self .__rabbitmq_iterator_task .cancel ()
1065
+
1029
1066
if self .__is_queue_worker :
1030
1067
for worker_task in self ._workers_tasks :
1031
1068
worker_task .cancel ()
0 commit comments