
    qi*H                     n   d Z ddlZddlZddlZddlZddlmZmZmZ ddl	Z	ddl
mZ ddlmZ ddlmZ ddlmZmZmZmZmZmZmZmZmZ ddlmZ dd	lmZ dd
lm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z&m'Z' ddl(m)Z)  G d de'      Z* G d de      Z+ G d d      Z, G d de"      Z- G d de$      Z. G d de&      Z/y)a  WebSocket client transport implementation for Pipecat.

This module provides a WebSocket client transport that enables bidirectional
communication over WebSocket connections, with support for audio streaming,
frame serialization, and connection management.
    N)	AwaitableCallableOptional)logger)	BaseModel)connect)	CancelFrameEndFrameFrameInputAudioRawFrameInputTransportMessageFrameOutputAudioRawFrameOutputTransportMessageFrame!OutputTransportMessageUrgentFrame
StartFrame)FrameProcessorSetup)FrameSerializer)ProtobufFrameSerializer)BaseInputTransport)BaseOutputTransport)BaseTransportTransportParams)BaseTaskManagerc                   R    e Zd ZU dZdZeed<   dZee	e
e
f      ed<   dZee   ed<   y)WebsocketClientParamszConfiguration parameters for WebSocket client transport.

    Parameters:
        add_wav_header: Whether to add WAV headers to audio frames.
        serializer: Frame serializer for encoding/decoding messages.
    Tadd_wav_headerNadditional_headers
serializer)__name__
__module____qualname____doc__r   bool__annotations__r   r   dictstrr   r        U/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/transports/websocket/client.pyr   r   -   s:      ND37c3h07,0J)0r(   r   c                       e Zd ZU dZeej                  ged   f   ed<   eej                  ged   f   ed<   eej                  ej                  ged   f   ed<   y)WebsocketClientCallbacksa  Callback functions for WebSocket client events.

    Parameters:
        on_connected: Called when WebSocket connection is established.
        on_disconnected: Called when WebSocket connection is closed.
        on_message: Called when a message is received from the WebSocket.
    Non_connectedon_disconnected
on_message)
r   r    r!   r"   r   
websocketsWebSocketClientProtocolr   r$   Datar'   r(   r)   r+   r+   :   si     J>>?4PQQzAABIdOSTT*<<jooNPYZ^P__``r(   r+   c                       e Zd ZdZdedededefdZede	fd       Z
d	e	fd
Zd Zd Zdej                  defdZedefd       Zedefd       Zd Zd Zy)WebsocketClientSessionzManages a WebSocket client connection session.

    Handles connection lifecycle, message sending/receiving, and provides
    callback mechanisms for connection events.
    uriparams	callbackstransport_namec                 f    || _         || _        || _        || _        d| _        d| _        d| _        y)a4  Initialize the WebSocket client session.

        Args:
            uri: The WebSocket URI to connect to.
            params: Configuration parameters for the session.
            callbacks: Callback functions for session events.
            transport_name: Name of the parent transport for logging.
        r   N)_uri_params
_callbacks_transport_name_leave_counter_task_manager
_websocket)selfr4   r5   r6   r7   s        r)   __init__zWebsocketClientSession.__init__O   s8     	#-8<HLr(   returnc                 b    | j                   st        | j                   d      | j                   S )zGet the task manager for this session.

        Returns:
            The task manager instance.

        Raises:
            Exception: If task manager is not initialized.
        zM::WebsocketClientSession: TaskManager not initialized (pipeline not started?))r>   	Exceptionr<   r@   s    r)   task_managerz#WebsocketClientSession.task_managerg   s:     !!''((uv  !!!r(   rF   c                 ^   K   | xj                   dz  c_         | j                  s|| _        yyw)zSet up the session with a task manager.

        Args:
            task_manager: The task manager to use for session tasks.
           N)r=   r>   )r@   rF   s     r)   setupzWebsocketClientSession.setupw   s/      	q !!!-D "s   +-c                   K   | j                   ry	 t        | j                  d| j                  j                         d{   | _         | j
                  j                  | j                         | j                   d      | _	        | j                  j                  | j                          d{    y7 s7 # t        $ r% t        j                  d| j                          Y yw xY ww)z Connect to the WebSocket server.N
   )r4   open_timeoutr   z.::WebsocketClientSession::_client_task_handlerzTimeout connecting to )r?   websocket_connectr9   r:   r   rF   create_task_client_task_handlerr<   _client_taskr;   r,   TimeoutErrorr   errorrE   s    r)   r   zWebsocketClientSession.connect   s     ??	?$5II#'<<#B#B% DO
 !% 1 1 = =))+''((VW!D //..t??? @ 	?LL1$))=>	?sM   C)/B8  B4A-B8 .B6/B8 3C)4B8 6B8 8+C&#C)%C&&C)c                 "  K   | xj                   dz  c_         | j                  r| j                   dkD  ry| j                  j                  | j                         d{    | j                  j                          d{    d| _        y7 .7 w)z%Disconnect from the WebSocket server.rH   r   N)r=   r?   rF   cancel_taskrP   closerE   s    r)   
disconnectz!WebsocketClientSession.disconnect   st     q $"5"5"9++D,=,=>>>oo##%%% 	?%s$   ABB!B>B?BBmessagec           	         K   d}	 | j                   r%| j                   j                  |       d{    d}|S 7 # t        $ r=}t        j                  |  d|j
                  j                   d| d       Y d}~|S d}~ww xY w# |c cY S xY ww)zwSend a message through the WebSocket connection.

        Args:
            message: The message data to send.
        FNTz exception sending data:  ())r?   sendrD   r   rR   	__class__r   )r@   rW   resultes       r)   r[   zWebsocketClientSession.send   s      	oo**7333 M 4 	YLLD6!:1;;;O;O:PPRSTRUUVWXXM	Y MsI   B*; 9; B; 	B2A<6B :B<BB B	Bc                     | j                   r1| j                   j                  t        j                  j                  k(  S dS )zCheck if the WebSocket is currently connected.

        Returns:
            True if the WebSocket is in connected state.
        F)r?   stater/   StateOPENrE   s    r)   is_connectedz#WebsocketClientSession.is_connected   s3     BFt$$
(8(8(=(==[V[[r(   c                     | j                   r1| j                   j                  t        j                  j                  k(  S dS )zCheck if the WebSocket is currently closing.

        Returns:
            True if the WebSocket is in the process of closing.
        F)r?   r`   r/   ra   CLOSINGrE   s    r)   
is_closingz!WebsocketClientSession.is_closing   s3     EIOOt$$
(8(8(@(@@^Y^^r(   c           	        K   	 | j                   2 3 d{   }| j                  j                  | j                   |       d{    87 37 6 nH# t        $ r<}t	        j
                  |  d|j                  j                   d| d       Y d}~nd}~ww xY w| j                  j                  | j                          d{  7   yw)z7Handle incoming messages from the WebSocket connection.Nz exception receiving data: rY   rZ   )	r?   r;   r.   rD   r   rR   r\   r   r-   )r@   rW   r^   s      r)   rO   z+WebsocketClientSession._client_task_handler   s     	[!% K Kgoo00'JJJKJ "1 	[LLD6!<Q[[=Q=Q<RRTUVTWWXYZZ	[ oo--doo>>>sl   CA AAA*A A
A A
A A C	B2B	CB+C>C?Cc                      | j                    dS )z6String representation of the WebSocket client session.z::WebsocketClientSession)r<   rE   s    r)   __str__zWebsocketClientSession.__str__   s    &&''?@@r(   N)r   r    r!   r"   r&   r   r+   rA   propertyr   rF   rI   r   rV   r/   r1   r#   r[   rc   rf   rO   ri   r'   r(   r)   r3   r3   H   s    MM &M ,	M
 M0 "o " ". .?&	*// d   \d \ \ _D _ _	?Ar(   r3   c                        e Zd ZdZdededef fdZdef fdZ	de
f fd	Zdef fd
Zdef fdZ fdZd Z xZS )WebsocketClientInputTransportzWebSocket client input transport for receiving frames.

    Handles incoming WebSocket messages, deserializes them to frames,
    and pushes them downstream in the processing pipeline.
    	transportsessionr5   c                 \    t         |   |       || _        || _        || _        d| _        y)a  Initialize the WebSocket client input transport.

        Args:
            transport: The parent transport instance.
            session: The WebSocket session to use for communication.
            params: Configuration parameters for the transport.
        FN)superrA   
_transport_sessionr:   _initializedr@   rm   rn   r5   r\   s       r)   rA   z&WebsocketClientInputTransport.__init__   s1     	 # "r(   rI   c                    K   t         |   |       d{    | j                  j                  |j                         d{    y7 27 w)zSet up the input transport with the frame processor setup.

        Args:
            setup: The frame processor setup configuration.
        Nrp   rI   rr   rF   r@   rI   r\   s     r)   rI   z#WebsocketClientInputTransport.setup   B      gmE"""mm!!%"4"4555 	#5!   AA	,AAAAframec                 p  K   t         |   |       d{    | j                  ryd| _        | j                  j                  r-| j                  j                  j                  |       d{    | j                  j                          d{    | j                  |       d{    y7 7 B7 "7 w)zStart the input transport and initialize the WebSocket connection.

        Args:
            frame: The start frame containing initialization parameters.
        NT)	rp   startrs   r:   r   rI   rr   r   set_transport_readyr@   rz   r\   s     r)   r|   z#WebsocketClientInputTransport.start   s      gmE""" <<"",,))//666mm##%%%&&u--- 	# 7%-sF   B6B.AB6-B0.!B6B2B6(B4)B60B62B64B6c                    K   t         |   |       d{    | j                  j                          d{    y7 '7 w)zStop the input transport and disconnect from WebSocket.

        Args:
            frame: The end frame signaling transport shutdown.
        Nrp   stoprr   rV   r~   s     r)   r   z"WebsocketClientInputTransport.stop	  :      gl5!!!mm&&((( 	"(   A>!AA A Ac                    K   t         |   |       d{    | j                  j                          d{    y7 '7 w)zCancel the input transport and disconnect from WebSocket.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        Nrp   cancelrr   rV   r~   s     r)   r   z$WebsocketClientInputTransport.cancel  :      gnU###mm&&((( 	$(r   c                    K   t         |           d{    | j                  j                          d{    y7 '7 w)z'Clean up the input transport resources.Nrp   cleanuprq   r@   r\   s    r)   r   z%WebsocketClientInputTransport.cleanup  6     gooo%%''' 	 '   A=!A?AAc                   K   | j                   j                  sy| j                   j                  j                  |       d{   }|syt        |t              r0| j                   j
                  r| j                  |       d{    yt        |t              r*| j                  t        |j                         d{    y| j                  |       d{    y7 7 [7 #7 w)zHandle incoming WebSocket messages.

        Args:
            websocket: The WebSocket connection that received the message.
            message: The received message data.
        N)rW   )r:   r   deserialize
isinstancer   audio_in_enabledpush_audio_framer   broadcast_framerW   
push_frame)r@   	websocketrW   rz   s       r)   r.   z(WebsocketClientInputTransport.on_message   s      ||&&ll--99'BBe/0T\\5R5R''...9:&&'A5==&YYY//%((( C /Y(sI   A C&CAC&C 9C&>C"?C&C$C& C&"C&$C&)r   r    r!   r"   r   r3   r   rA   r   rI   r   r|   r
   r   r	   r   r   r.   __classcell__r\   s   @r)   rl   rl      sd    " " (" &	",6!4 6. .$) ))+ )(
)r(   rl   c                        e Zd ZdZdededef fdZdef fdZ	de
f fd	Zdef fd
Zdef fdZ fdZdeez  fdZdedefdZdefdZd Z xZS )WebsocketClientOutputTransportzWebSocket client output transport for sending frames.

    Handles outgoing frames, serializes them for WebSocket transmission,
    and manages audio streaming with proper timing simulation.
    rm   rn   r5   c                 x    t         |   |       || _        || _        || _        d| _        d| _        d| _        y)a  Initialize the WebSocket client output transport.

        Args:
            transport: The parent transport instance.
            session: The WebSocket session to use for communication.
            params: Configuration parameters for the transport.
        r   FN)rp   rA   rq   rr   r:   _send_interval_next_send_timers   rt   s       r)   rA   z'WebsocketClientOutputTransport.__init__;  sC     	 #    "r(   rI   c                    K   t         |   |       d{    | j                  j                  |j                         d{    y7 27 w)zSet up the output transport with the frame processor setup.

        Args:
            setup: The frame processor setup configuration.
        Nrv   rw   s     r)   rI   z$WebsocketClientOutputTransport.setupY  rx   ry   rz   c                   K   t         |   |       d{    | j                  ryd| _        | j                  | j                  z  dz  | _        | j                  j                  r-| j                  j                  j                  |       d{    | j                  j                          d{    | j                  |       d{    y7 7 B7 "7 w)zStart the output transport and initialize the WebSocket connection.

        Args:
            frame: The start frame containing initialization parameters.
        NT   )rp   r|   rs   audio_chunk_sizesample_rater   r:   r   rI   rr   r   r}   r~   s     r)   r|   z$WebsocketClientOutputTransport.startb  s      gmE""" #44t7G7GG1L<<"",,))//666mm##%%%&&u--- 	# 7%-sF   CCA7CC!C0C1C	C
CCCCc                    K   t         |   |       d{    | j                  j                          d{    y7 '7 w)zStop the output transport and disconnect from WebSocket.

        Args:
            frame: The end frame signaling transport shutdown.
        Nr   r~   s     r)   r   z#WebsocketClientOutputTransport.stopu  r   r   c                    K   t         |   |       d{    | j                  j                          d{    y7 '7 w)zCancel the output transport and disconnect from WebSocket.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        Nr   r~   s     r)   r   z%WebsocketClientOutputTransport.cancel~  r   r   c                    K   t         |           d{    | j                  j                          d{    y7 '7 w)z(Clean up the output transport resources.Nr   r   s    r)   r   z&WebsocketClientOutputTransport.cleanup  r   r   c                 B   K   | j                  |       d{    y7 w)zSend a transport message through the WebSocket.

        Args:
            frame: The transport message frame to send.
        N)_write_frame)r@   rz   s     r)   send_messagez+WebsocketClientOutputTransport.send_message  s      &&&s   rB   c                 8  K   | j                   j                  s| j                   j                  syt        |j                  | j
                  | j                  j                        }| j                  j                  rt        j                         5 }t        j                  |d      5 }|j                  d       |j                  |j                         |j!                  |j
                         |j#                  |j                         ddd       t        |j%                         |j
                  |j                        }|}ddd       | j'                  |       d{    | j)                          d{    y# 1 sw Y   uxY w# 1 sw Y   GxY w7 57 w)zWrite an audio frame to the WebSocket with optional WAV header.

        Args:
            frame: The output audio frame to write.

        Returns:
            True if the audio frame was written successfully, False otherwise.
        F)audior   num_channelswbr   N)r   r   T)rr   rf   rc   r   r   r   r:   audio_out_channelsr   ioBytesIOwaveopensetsampwidthsetnchannelsr   setframeratewriteframesgetvaluer   _write_audio_sleep)r@   rz   bufferwf	wav_frames        r)   write_audio_framez0WebsocketClientOutputTransport.write_audio_frame  sF     ==##4==+E+E#++((88
 <<&& "YYvt, 0OOA&OOE$6$67OOE$5$56NN5;;/	0
 0OO% % 1 1!&!3!3	
 "" &&& %%'''#0 0" " 	' 	(sU   BFF
'A#E>
:F
F F!F8F9F>F	F

FFFc                 B  K   | j                   j                  s| j                   j                  sy| j                  j                  sy| j                  j                  j                  |       d{   }|r$| j                   j                  |       d{    yy7 +7 w)z3Write a frame to the WebSocket after serialization.N)rr   rf   rc   r:   r   	serializer[   )r@   rz   payloads      r)   r   z+WebsocketClientOutputTransport._write_frame  sy     ==##4==+E+E||&&//99%@@--$$W---  A-s$   A-B/B0$BBBBc                 :  K   t        j                         }t        d| j                  |z
        }t	        j
                  |       d{    |dk(  r't        j                         | j                  z   | _        y| xj                  | j                  z  c_        y7 Pw)z1Simulate audio playback timing with sleep delays.r   N)time	monotonicmaxr   asynciosleepr   )r@   current_timesleep_durations      r)   r   z1WebsocketClientOutputTransport._write_audio_sleep  s{      ~~'Q 4 4| CDmmN+++Q#'>>#3d6I6I#ID   D$7$77 	 	,s   ABB	AB)r   r    r!   r"   r   r3   r   rA   r   rI   r   r|   r
   r   r	   r   r   r   r   r   r   r#   r   r   r   r   r   r   s   @r)   r   r   4  s    " " (" &	"<6!4 6. .&) ))+ )(
'03TT'%-@ %T %N
. 
.	8r(   r   c                   `     e Zd ZdZ	 ddedee   f fdZdefdZ	de
fdZd Zd	 Zd
 Z xZS )WebsocketClientTransporta  WebSocket client transport for bidirectional communication.

    Provides a complete WebSocket client transport implementation with
    input and output capabilities, connection management, and event handling.

    Event handlers available:

    - on_connected(transport): Connected to WebSocket server
    - on_disconnected(transport): Disconnected from WebSocket server

    Example::

        @transport.event_handler("on_connected")
        async def on_connected(transport):
            ...
    r4   r5   c                    t         |           |xs
 t               | _        | j                  j                  xs
 t               | j                  _        t        | j                  | j                  | j                        }t        || j                  || j                        | _        d| _        d| _        | j                  d       | j                  d       y)zInitialize the WebSocket client transport.

        Args:
            uri: The WebSocket URI to connect to.
            params: Optional configuration parameters for the transport.
        )r,   r-   r.   Nr,   r-   )rp   rA   r   r:   r   r   r+   _on_connected_on_disconnected_on_messager3   namerr   _input_output_register_event_handler)r@   r4   r5   r6   r\   s       r)   rA   z!WebsocketClientTransport.__init__  s     	8!6!8"&,,"9"9"V=T=V,++ 11''
	 /sDLL)TYYW?CAE 	$$^4$$%67r(   rB   c                 ~    | j                   s&t        | | j                  | j                        | _         | j                   S )zGet the input transport for receiving frames.

        Returns:
            The WebSocket client input transport instance.
        )r   rl   rr   r:   rE   s    r)   inputzWebsocketClientTransport.input  s-     {{7dmmT\\ZDK{{r(   c                 ~    | j                   s&t        | | j                  | j                        | _         | j                   S )zGet the output transport for sending frames.

        Returns:
            The WebSocket client output transport instance.
        )r   r   rr   r:   rE   s    r)   outputzWebsocketClientTransport.output  s-     ||9$t||\DL||r(   c                 D   K   | j                  d|       d{    y7 w)z.Handle WebSocket connection established event.r,   N_call_event_handlerr@   r   s     r)   r   z&WebsocketClientTransport._on_connected  s     &&~yAAA     c                 D   K   | j                  d|       d{    y7 w)z)Handle WebSocket connection closed event.r-   Nr   r   s     r)   r   z)WebsocketClientTransport._on_disconnected  s     &&'8)DDDr   c                 r   K   | j                   r%| j                   j                  ||       d{    yy7 w)z"Handle incoming WebSocket message.N)r   r.   )r@   r   rW   s      r)   r   z$WebsocketClientTransport._on_message"  s/     ;;++((G<<< <s   ,757)N)r   r    r!   r"   r&   r   r   rA   rl   r   r   r   r   r   r   r   r   s   @r)   r   r     sU    ( 3788 ./8>4 6 BE=r(   r   )0r"   r   r   r   r   typingr   r   r   r/   logurur   pydantic.mainr   websockets.asyncio.clientr   rM   pipecat.frames.framesr	   r
   r   r   r   r   r   r   r   "pipecat.processors.frame_processorr   #pipecat.serializers.base_serializerr   pipecat.serializers.protobufr   pipecat.transports.base_inputr   pipecat.transports.base_outputr   !pipecat.transports.base_transportr   r   "pipecat.utils.asyncio.task_managerr   r   r+   r3   rl   r   r   r'   r(   r)   <module>r      s     	   0 0   # B
 
 
 C ? @ < > L >
1O 
1ay aFA FAR`)$6 `)F^8%8 ^8BP=} P=r(   