
    qi'                         d Z ddlZddlmZmZ ddlmZmZmZ ddl	Z	ddl
mZ ddlmZmZ ddlmZ ddlmZ dd	lmZ  G d
 de      Zy)zFBase websocket service with automatic reconnection and error handling.    N)ABCabstractmethod)	AwaitableCallableOptional)logger)ConnectionClosedErrorConnectionClosedOK)State)
ErrorFrame)exponential_backoff_timec            	          e Zd ZdZdddefdZdefdZdedefd	Z	 	 ddede	e
eged
   f      defdZde
eged
   f   fdZ	 ddede
eged
   f   de	e   defdZde
eged
   f   fdZd Zd Zed        Zed        Zed        Zy
)WebsocketServicea+  Base class for websocket-based services with automatic reconnection.

    Provides websocket connection management, automatic reconnection with
    exponential backoff, connection verification, and error handling.
    Subclasses implement service-specific connection and message handling logic.
    T)reconnect_on_errorr   c                <    d| _         || _        d| _        d| _        y)zInitialize the websocket service.

        Args:
            reconnect_on_error: Whether to automatically reconnect on connection errors.
            **kwargs: Additional arguments (unused, for compatibility).
        NF)
_websocket_reconnect_on_error_reconnect_in_progress_disconnecting)selfr   kwargss      T/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/services/websocket_service.py__init__zWebsocketService.__init__   s%     IM#5 ,1#$)    returnc                   K   	 | j                   r&| j                   j                  t        j                  u ry| j                   j	                          d{    y7 # t
        $ r$}t        j                  |  d|        Y d}~yd}~ww xY ww)zVerify the websocket connection is active and responsive.

        Returns:
            True if connection is verified working, False otherwise.
        FNTz! connection verification failed: )r   stater   CLOSEDping	Exceptionr   error)r   es     r   _verify_connectionz#WebsocketService._verify_connection*   sr     	??doo&;&;u||&K//&&((( ) 	LLD6!B1#FG	sJ   B2A BA AA BA 	B	%B?BB		Battempt_numberc                    K   t        j                  |  d| d       | j                          d{    | j                          d{    | j	                          d{   S 7 47 7 w)zReconnect the websocket with the current attempt number.

        Args:
            attempt_number: Current retry attempt number for logging.

        Returns:
            True if reconnection and verification successful, False otherwise.
        z reconnecting (attempt: )N)r   warning_disconnect_websocket_connect_websocketr#   )r   r$   s     r   _reconnect_websocketz%WebsocketService._reconnect_websocket9   sf      	$77GqIJ((***%%''',,... 	+'.s3   /A,A&A,	A(
A,!A*"A,(A,*A,Nmax_retriesreport_errorc                    K   | j                   rt        j                  |  d       yd| _         d }	 t        d|dz         D ]  }	 t        j                  |  d|        | j	                  |       d {   r#t        j
                  |  d|         d| _         y	 t        |      }t        j                  |       d {     |  d	| d
}|r|d| z  }t        j                  |       |r |t        |d             d {    	 d| _         y7 # t        $ rM}|}t        j                  |  d| d|        |r" |t        |  d| d|              d {  7   Y d }~d }~ww xY w7 7 j# d| _         w xY ww)Nz/ reconnect attempt aborted: already in progressFT   z reconnecting, attempt z% reconnected successfully on attempt z reconnection attempt z	 failed: z failed to reconnect after z	 attemptsz: )fatal)r   r   r'   ranger*   infor    r!   r   r   asynciosleep)r   r+   r,   last_exceptionattemptr"   	wait_time	fatal_msgs           r   _try_reconnectzWebsocketService._try_reconnectG   s     &&NNdV#RST&*#.2	0 K!O4 /NNdV+B7)#LM!66w???tf,QRYQZ$[\#$ +0D') @ 5W=	mmI.../  & ;K=	RIr.!122	LL#":it#DEEE*/D') @ ! %&NLLD6)?y	RSQT!UV#*&$/EgYiXYWZ'[\  	 / F +0D's   /E>E2 .D3D4DE2 E>$E2 ?E. AE2 E0E2 E>D	E+=E&EE&!E2 &E++E2 0E2 2	E;;E>c                   K   	 | j                   j                  |       d{    y7 # t        $ r}t        j                  |  d| d       | j                  |       d{  7  }|r=t        j                  |  d       | j                   j                  |       d{  7   nt        j                  |  d       Y d}~yY d}~yd}~ww xY ww)zAAttempt to send a message, retrying after reconnect if necessary.Nz send failed: z, will try to reconnectr,   z6 reconnected successfully, will retry send the messagez! send failed; unable to reconnect)r   sendr    r   r!   r8   r1   )r   messager,   r"   successs        r   send_with_retryz WebsocketService.send_with_retrym   s     	I//&&w/// 		ILLD6s2IJK //\/JJJGtf$Z[\oo**7333v%FGHH 4		IsN   C* (* C* 	C0C
#A&$=C
!B$"C
 
C
CCerror_messager!   c                 D  K   | j                   r6|rt        j                  |  d|        yt        j                  |  d       yt        j                  |       | j                  r| j                  |       d{   }|S  |t        |             d{    y7  7 w)a  Check if reconnection should be attempted and try if appropriate.

        Args:
            error_message: Human-readable error message for logging.
            report_error: Callback function to report connection errors.
            error: The exception that occurred (optional, may be None for graceful closes).

        Returns:
            True if should continue the receive loop, False if should break.
        z error during disconnect: z% receive loop ended during disconnectFr:   N)r   r   r'   debugr   r8   r   )r   r?   r,   r!   r=   s        r   _maybe_try_reconnectz%WebsocketService._maybe_try_reconnect|   s     " $'A%IJ  v%JKL 	}% ## //\/JJGN z-8999 K :s$   A9B ;B<B BB B c                   K   	 	 | j                          d{    |  d}| j                  ||       d{   }|sy	 =7 (7 # t        $ r$}t        j                  |  d|        Y d}~yd}~wt
        $ r4}|  d| }| j                  |||       d{  7  }|sY d}~yY d}~hd}~wt        $ r4}|  d| }| j                  |||       d{  7  }|sY d}~yY d}~d}~ww xY ww)al  Handle websocket message receiving with automatic retry logic.

        Continuously receives messages with automatic reconnection on errors.
        Uses exponential backoff between retry attempts and reports fatal errors
        after maximum retries are exhausted.

        Args:
            report_error: Callback function to report connection errors.
        Nz connection closed by serverz connection closed normally: z' connection closed, but with an error: z error receiving messages: )_receive_messagesrB   r
   r   rA   r	   r    )r   r,   r<   should_continuer"   s        r   _receive_task_handlerz&WebsocketService._receive_task_handler   s     ,,...
 "F">?(,(A(A'<(X"X& ' . #Y & v%B1#FG(  "F"I!M(,(A(A'<YZ(["["[& ' !F"=aSA(,(A(A'<YZ(["["[& '	s   C+A AA AA C+A A 	C(A-(C+-C(9B(BB(
C+(C(4C#CC#
C+#C((C+c                    K   d| _         yw)aP  Connect to the service and reset disconnecting flag.

        Manages the disconnecting flag to enable reconnection. Subclasses should
        call super()._connect() first, then implement their specific connection
        logic including websocket connection via _connect_websocket() and any
        additional setup required.
        FNr   r   s    r   _connectzWebsocketService._connect   s      $   	c                    K   d| _         yw)au  Disconnect from the service and set disconnecting flag.

        Manages the disconnecting flag to prevent reconnection during intentional
        disconnect. Subclasses should call super()._disconnect() first, then
        implement their specific disconnection logic including websocket
        disconnection via _disconnect_websocket() and any cleanup required.
        TNrH   rI   s    r   _disconnectzWebsocketService._disconnect   s      #rK   c                    K   yw)zEstablish the websocket connection.

        Implement the low-level websocket connection logic specific to the service.
        Should only handle websocket connection, not additional service setup.
        N rI   s    r   r)   z#WebsocketService._connect_websocket         	   c                    K   yw)zClose the websocket connection.

        Implement the low-level websocket disconnection logic specific to the service.
        Should only handle websocket disconnection, not additional service cleanup.
        NrO   rI   s    r   r(   z&WebsocketService._disconnect_websocket   rP   rQ   c                    K   yw)zReceive and process websocket messages.

        Implement service-specific logic for receiving and handling messages
        from the websocket connection. Called continuously by the receive task handler.
        NrO   rI   s    r   rD   z"WebsocketService._receive_messages   rP   rQ   )   N)N)__name__
__module____qualname____doc__boolr   r#   intr*   r   r   r   r   r8   r>   strr    rB   rF   rJ   rM   r   r)   r(   rD   rO   r   r   r   r      sA    6: 
*d 
*$ / / /  JN$0$0 xio(EFG$0 
	$0LI8ZLR[\`RaDa;b I& &*	"" 
|Yt_<=" 	"	"
 
"H%*yY]A^8_ %N$#      r   r   )rX   r2   abcr   r   typingr   r   r   
websocketslogurur   websockets.exceptionsr	   r
   websockets.protocolr   pipecat.frames.framesr   pipecat.utils.networkr   r   rO   r   r   <module>rd      s7    M  # 0 0   K % , :^s ^r   