
    qiJ                        d Z ddlZddlZddlZddlZddlmZmZmZ ddl	m
Z
 ddlmZ ddlmZmZmZmZmZmZmZmZmZmZmZ ddlmZ ddlmZ dd	lmZ dd
lm Z  ddl!m"Z"m#Z# 	 ddl$Z$ddl%m&Z' ddl(m)Z)  G d de#      Z. G d de      Z/ G d de      Z0 G d de       Z1 G d de"      Z2y# e*$ r7Z+ e
jX                  de+         e
jX                  d        e-de+       dZ+[+ww xY w)zWebSocket server transport implementation for Pipecat.

This module provides WebSocket server transport functionality for real-time
audio and data streaming, including client connection management, session
handling, and frame serialization.
    N)	AwaitableCallableOptional)logger)	BaseModel)CancelFrameClientConnectedFrameEndFrameFrameInputAudioRawFrameInputTransportMessageFrameInterruptionFrameOutputAudioRawFrameOutputTransportMessageFrame!OutputTransportMessageUrgentFrame
StartFrame)FrameDirection)FrameSerializer)BaseInputTransport)BaseOutputTransport)BaseTransportTransportParams)serve)StatezException: zLIn order to use websockets, you need to `pip install pipecat-ai[websocket]`.zMissing module: c                   H    e Zd ZU dZdZeed<   dZee	   ed<   dZ
ee   ed<   y)WebsocketServerParamsa  Configuration parameters for WebSocket server transport.

    Parameters:
        add_wav_header: Whether to add WAV headers to audio frames.
        serializer: Frame serializer for message encoding/decoding.
        session_timeout: Timeout in seconds for client sessions.
    Fadd_wav_headerN
serializersession_timeout)__name__
__module____qualname____doc__r   bool__annotations__r   r   r   r   int     U/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/transports/websocket/server.pyr   r   4   s0     !ND ,0J)0%)OXc])r(   r   c                       e Zd ZU dZeej                  ged   f   ed<   eej                  ged   f   ed<   eej                  ged   f   ed<   eg ed   f   ed<   y)WebsocketServerCallbacksa  Callback functions for WebSocket server events.

    Parameters:
        on_client_connected: Called when a client connects to the server.
        on_client_disconnected: Called when a client disconnects from the server.
        on_session_timeout: Called when a client session times out.
        on_websocket_ready: Called when the WebSocket server is ready to accept connections.
    Non_client_connectedon_client_disconnectedon_session_timeouton_websocket_ready)	r    r!   r"   r#   r   
websocketsWebSocketServerProtocolr   r%   r'   r(   r)   r+   r+   B   sw     ":#E#E"F	RV"WXX$j&H&H%I9UY?%Z[[ *"D"D!EyQU!VWW Yt_!455r(   r+   c            
            e Zd ZdZdededededef
 fdZ	de
f fd	Zdef fd
Zdef fdZ fdZd Zdej&                  fdZdej&                  defdZ xZS )WebsocketServerInputTransportzWebSocket server input transport for receiving client data.

    Handles incoming WebSocket connections, message processing, and client
    session management including timeout monitoring and connection lifecycle.
    	transporthostportparams	callbacksc                     t        |   |fi | || _        || _        || _        || _        || _        d| _        d| _        d| _	        t        j                         | _        d| _        y)a  Initialize the WebSocket server input transport.

        Args:
            transport: The parent transport instance.
            host: Host address to bind the WebSocket server to.
            port: Port number to bind the WebSocket server to.
            params: WebSocket server configuration parameters.
            callbacks: Callback functions for WebSocket events.
            **kwargs: Additional arguments passed to parent class.
        NF)super__init__
_transport_host_port_params
_callbacks
_websocket_server_task_monitor_taskasyncioEvent_stop_server_event_initialized)selfr4   r5   r6   r7   r8   kwargs	__class__s          r)   r;   z&WebsocketServerInputTransport.__init__Y   sk    & 	*6*#

#HL  "")--/ "r(   framec                   K   t         |   |       d{    | j                  ryd| _        | j                  j                  r-| j                  j                  j                  |       d{    | j                  s$| j                  | j                               | _        | j                  |       d{    y7 7 P7 	w)zStart the WebSocket server and initialize components.

        Args:
            frame: The start frame containing initialization parameters.
        NT)
r:   startrG   r?   r   setuprB   create_task_server_task_handlerset_transport_readyrH   rK   rJ   s     r)   rM   z#WebsocketServerInputTransport.start   s      gmE""" <<"",,))//666   $ 0 01J1J1L MD&&u--- 	# 7 	.s5   CB<AC-B>.AC6C 7C>C Cc                 8  K   t         |   |       d{    | j                  j                          | j                  r*| j                  | j                         d{    d| _        | j                  r| j                   d{    d| _        yy7 }7 67 w)zStop the WebSocket server and cleanup resources.

        Args:
            frame: The end frame signaling transport shutdown.
        N)r:   stoprF   setrC   cancel_taskrB   rR   s     r)   rT   z"WebsocketServerInputTransport.stop   s      gl5!!!##%""4#5#5666!%D#### $D  	" 7 $s4   BBABB &BBBBBc                 "  K   t         |   |       d{    | j                  r*| j                  | j                         d{    d| _        | j                  r+| j                  | j                         d{    d| _        yy7 r7 E7 w)zCancel the WebSocket server and stop all processing.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        N)r:   cancelrC   rV   rB   rR   s     r)   rX   z$WebsocketServerInputTransport.cancel   s      gnU###""4#5#5666!%D""4#4#4555 $D 	 	$6 6s3   BB	.BB5B;B<BBBc                    K   t         |           d{    | j                  j                          d{    y7 '7 wz'Cleanup resources and parent transport.Nr:   cleanupr<   rH   rJ   s    r)   r\   z%WebsocketServerInputTransport.cleanup   6     gooo%%''' 	 '   A=!A?AAc                   K   t        j                  d| j                   d| j                          t	        | j
                  | j                  | j                        4 d{   }| j                  j                          d{    | j                  j                          d{    ddd      d{    y7 Y7 97 7 # 1 d{  7  sw Y   yxY ww)z7Handle WebSocket server startup and client connections.zStarting websocket server on :N)
r   infor=   r>   websocket_serve_client_handlerr@   r/   rF   wait)rH   servers     r)   rP   z2WebsocketServerInputTransport._server_task_handler   s     3DJJ<qMN"4#7#7TZZP 	1 	1TZ//44666))..000	1 	1 	160	1 	1 	1 	1sl   AC!B;"C%CB=!C%B?&C*C5C6C=C?CCC	C
CC	websocketc           	        K   t        j                  d|j                          | j                  r7| j                  j	                          d{    t        j
                  d       || _        | j                  j                  |       d{    | j                  sP| j                  j                  r:| j                  | j                  || j                  j                              | _        	 |2 3 d{   }| j                  j                  s | j                  j                  j                  |       d{   }|sPt        |t               r| j#                  |       d{    zt        |t$              r*| j'                  t$        |j(                         d{    | j+                  |       d{    7 q7 57 7 7 c7 +7 6 nH# t,        $ r<}t        j.                  |  d|j0                  j2                   d| d       Y d}~nd}~ww xY w| j                  j5                  |       d{  7   | j                  j	                          d{  7   d| _        t        j                  d|j                   d	       yw)
z<Handle individual client connections and message processing.zNew client connection from Nz/Only one client connected, using new connection)messagez exception receiving data:  ()zClient z disconnected)r   rb   remote_addressrA   closewarningr@   r,   rC   r?   r   rO   _monitor_websocketr   deserialize
isinstancer   push_audio_framer   broadcast_frameri   
push_frame	ExceptionerrorrJ   r    r-   )rH   rg   ri   rK   es        r)   rd   z-WebsocketServerInputTransport._client_handler   s    1)2J2J1KLM??//'')))NNLM# oo11)<<< !!dll&B&B!%!1!1''	4<<3O3OP"D
	[!* 1 1g||.."ll55AA'JJe%78//666'AB../ISXS`S`.aaa//%000= * 	=1 K 7a0 "+  	[LLD6!<Q[[=Q=Q<RRTUVTWWXYZZ	[ oo44Y???oo##%%%gi667}EFs   AJF?>JGA J/G 1G5G6G9A G 9G:+G %G	&9G G G 9G:G ?JJGG 	G G G G J	H2HJH!J7H:8"JI0Jr   c                 D  K   	 t        j                  |       d{    |j                  t        j                  ur$| j
                  j                  |       d{    yy7 E7 # t         j                  $ r$ t        j                  d|j                           w xY ww)z1Monitor WebSocket connection for session timeout.NzMonitoring task cancelled for: )rD   sleepstater   CLOSEDr@   r.   CancelledErrorr   rb   rl   )rH   rg   r   s      r)   ro   z0WebsocketServerInputTransport._monitor_websocket   s     	--000ell2oo88CCC 3 1C%% 	KK9):R:R9STU	s>   B A& A">A& A$A&  B "A& $A& &7BB )r    r!   r"   r#   r   strr&   r   r+   r;   r   rM   r
   rT   r   rX   r\   rP   r0   r1   rd   ro   __classcell__rJ   s   @r)   r3   r3   R   s    %" %" %" 	%"
 &%" ,%"N. .&% %%+ %(
1,Gz/Q/Q ,G\
#;;
NQ
r(   r3   c                        e Zd ZdZdedef fdZdeej                     fdZ
def fdZdef fd	Zdef fd
Z fdZdedef fdZdeez  fdZdedefdZdefdZd Z xZS )WebsocketServerOutputTransportzWebSocket server output transport for sending data to clients.

    Handles outgoing frame serialization, audio streaming with timing control,
    and client connection management for WebSocket communication.
    r4   r7   c                 z    t        |   |fi | || _        || _        d| _        d| _        d| _        d| _        y)a  Initialize the WebSocket server output transport.

        Args:
            transport: The parent transport instance.
            params: WebSocket server configuration parameters.
            **kwargs: Additional arguments passed to parent class.
        Nr   F)r:   r;   r<   r?   rA   _send_interval_next_send_timerG   )rH   r4   r7   rI   rJ   s       r)   r;   z'WebsocketServerOutputTransport.__init__   sH     	*6*#HL    "r(   rg   c                    K   | j                   r7| j                   j                          d{    t        j                  d       || _         y7 !w)zSet the active client WebSocket connection.

        Args:
            websocket: The WebSocket connection to set as active, or None to clear.
        Nz-Only one client allowed, using new connection)rA   rm   r   rn   rH   rg   s     r)   set_client_connectionz4WebsocketServerOutputTransport.set_client_connection  s>      ??//'')))NNJK# *s   *AA"ArK   c                 j  K   t         |   |       d{    | j                  ryd| _        | j                  j                  r-| j                  j                  j                  |       d{    | j                  | j                  z  dz  | _        | j                  |       d{    y7 7 A7 	w)zStart the output transport and initialize components.

        Args:
            frame: The start frame containing initialization parameters.
        NT   )
r:   rM   rG   r?   r   rN   audio_chunk_sizesample_rater   rQ   rR   s     r)   rM   z$WebsocketServerOutputTransport.start#  s      gmE""" <<"",,))//666#44t7G7GG1L&&u--- 	# 7-s4   B3B-AB3-B/.9B3'B1(B3/B31B3c                 v   K   t         |   |       d{    | j                  |       d{    y7 7 w)zStop the output transport and send final frame.

        Args:
            frame: The end frame signaling transport shutdown.
        N)r:   rT   _write_framerR   s     r)   rT   z#WebsocketServerOutputTransport.stop5  s8      gl5!!!&&& 	"&   959799c                 v   K   t         |   |       d{    | j                  |       d{    y7 7 w)zCancel the output transport and send cancellation frame.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        N)r:   rX   r   rR   s     r)   rX   z%WebsocketServerOutputTransport.cancel>  s8      gnU###&&& 	$&r   c                    K   t         |           d{    | j                  j                          d{    y7 '7 wrZ   r[   r]   s    r)   r\   z&WebsocketServerOutputTransport.cleanupG  r^   r_   	directionc                    K   t         |   ||       d{    t        |t              r!| j	                  |       d{    d| _        yy7 67 w)zProcess frames and handle interruption timing.

        Args:
            frame: The frame to process.
            direction: The direction of frame flow in the pipeline.
        Nr   )r:   process_framerq   r   r   r   )rH   rK   r   rJ   s      r)   r   z,WebsocketServerOutputTransport.process_frameL  sU      g#E9555e./##E***#$D  0 	6 +s!   AA(A AAAc                 B   K   | j                  |       d{    y7 w)z}Send a transport message frame to the client.

        Args:
            frame: The transport message frame to send.
        N)r   )rH   rK   s     r)   send_messagez+WebsocketServerOutputTransport.send_messageY  s      &&&   returnc                   K   | j                   syt        |j                  | j                  | j                  j
                        }| j                  j                  rt        j                         5 }t        j                  |d      5 }|j                  d       |j                  |j                         |j                  |j                         |j                  |j                         ddd       t        |j!                         |j                  |j                        }|}ddd       | j#                  |       d{    | j%                          d{    y# 1 sw Y   uxY w# 1 sw Y   GxY w7 57 w)zWrite an audio frame to the WebSocket client with timing control.

        Args:
            frame: The output audio frame to write.

        Returns:
            True if the audio frame was written successfully, False otherwise.
        F)audior   num_channelswbr   N)r   r   T)rA   r   r   r   r?   audio_out_channelsr   ioBytesIOwaveopensetsampwidthsetnchannelsr   setframeratewriteframesgetvaluer   _write_audio_sleep)rH   rK   bufferwf	wav_frames        r)   write_audio_framez0WebsocketServerOutputTransport.write_audio_framec  s4     #++((88
 <<&& "YYvt, 0OOA&OOE$6$67OOE$5$56NN5;;/	0
 0OO% % 1 1!&!3!3	
 "" &&& %%'''#0 0" " 	' 	(sU   A.E:0E*A#E*:E*$E: E6E:E8E:E'	#E**E3/E:8E:c           	        K   | j                   j                  sy	 | j                   j                  j                  |       d{   }|r1| j                  r$| j                  j	                  |       d{    yyy7 87 	# t
        $ r<}t        j                  |  d|j                  j                   d| d       Y d}~yd}~ww xY ww)z3Serialize and send a frame to the WebSocket client.Nz exception sending data: rj   rk   )
r?   r   	serializerA   sendru   r   rv   rJ   r    )rH   rK   payloadrw   s       r)   r   z+WebsocketServerOutputTransport._write_frame  s     ||&&	Y LL33==eDDG4??oo**7333 +w E3 	YLLD6!:1;;;O;O:PPRSTRUUVWXX	YsR   C(B  A<0B  4A>5B  9C<B  >B   	C	2C ;C CCc                 :  K   t        j                         }t        d| j                  |z
        }t	        j
                  |       d{    |dk(  r't        j                         | j                  z   | _        y| xj                  | j                  z  c_        y7 Pw)z>Simulate audio device timing by sleeping between audio chunks.r   N)time	monotonicmaxr   rD   ry   r   )rH   current_timesleep_durations      r)   r   z1WebsocketServerOutputTransport._write_audio_sleep  s{      ~~'Q 4 4| CDmmN+++Q#'>>#3d6I6I#ID   D$7$77 	 	,s   ABB	AB)r    r!   r"   r#   r   r   r;   r   r0   r1   r   r   rM   r
   rT   r   rX   r\   r   r   r   r   r   r   r   r$   r   r   r   r~   r   s   @r)   r   r      s    "- "9N "4	$Xj>`>`5a 	$. .$' ''+ '(
% %> %'03TT'%-@ %T %N
Y 
Y	8r(   r   c                   ~     e Zd ZdZ	 	 	 	 ddedededee   dee   f
 fdZde	fd	Z
defd
Zd Zd Zd Zd Z xZS )WebsocketServerTransporta  WebSocket server transport for bidirectional real-time communication.

    Provides a complete WebSocket server implementation with separate input and
    output transports, client connection management, and event handling for
    real-time audio and data streaming applications.

    Event handlers available:

    - on_client_connected(transport, websocket): Client WebSocket connected
    - on_client_disconnected(transport, websocket): Client WebSocket disconnected
    - on_session_timeout(transport, websocket): Session timed out
    - on_websocket_ready(transport): WebSocket server is ready to accept connections

    Example::

        @transport.event_handler("on_client_connected")
        async def on_client_connected(transport, websocket):
            ...
    r7   r5   r6   
input_nameoutput_namec                 |   t         |   ||       || _        || _        || _        t        | j                  | j                  | j                  | j                        | _
        d| _        d| _        d| _        | j                  d       | j                  d       | j                  d       | j                  d       y)a  Initialize the WebSocket server transport.

        Args:
            params: WebSocket server configuration parameters.
            host: Host address to bind the server to. Defaults to "localhost".
            port: Port number to bind the server to. Defaults to 8765.
            input_name: Optional name for the input processor.
            output_name: Optional name for the output processor.
        )r   r   )r,   r-   r.   r/   Nr,   r-   r.   r/   )r:   r;   r=   r>   r?   r+   _on_client_connected_on_client_disconnected_on_session_timeout_on_websocket_readyr@   _input_outputrA   _register_event_handler)rH   r7   r5   r6   r   r   rJ   s         r)   r;   z!WebsocketServerTransport.__init__  s    " 	JKH

2 $ 9 9#'#?#?#77#77	
 @DAEHL 	$$%:;$$%=>$$%9:$$%9:r(   r   c                     | j                   sHt        | | j                  | j                  | j                  | j
                  | j                        | _         | j                   S )zGet the input transport for receiving client data.

        Returns:
            The WebSocket server input transport instance.
        name)r   r3   r=   r>   r?   r@   _input_namerH   s    r)   inputzWebsocketServerTransport.input  sG     {{7djj$**dllDOORVRbRbDK {{r(   c                     | j                   s't        | | j                  | j                        | _         | j                   S )zGet the output transport for sending data to clients.

        Returns:
            The WebSocket server output transport instance.
        r   )r   r   r?   _output_namer   s    r)   outputzWebsocketServerTransport.output  s4     ||9dll):):DL ||r(   c                 F  K   | j                   rv| j                   j                  |       d{    | j                  d|       d{    | j                  r,| j                  j	                  t                      d{    yyt        j                  d       y7 m7 U7  w)z Handle client connection events.Nr,   <A WebsocketServerTransport output is missing in the pipeline)r   r   _call_event_handlerr   rt   r	   r   rv   r   s     r)   r   z-WebsocketServerTransport._on_client_connected  s     <<,,44Y???**+@)LLL{{kk,,-A-CDDD  LLWX @LDs3   +B!BB!B6B!>B?B!B!B!c                    K   | j                   r>| j                   j                  d       d{    | j                  d|       d{    yt        j                  d       y7 57 w)z#Handle client disconnection events.Nr-   r   )r   r   r   r   rv   r   s     r)   r   z0WebsocketServerTransport._on_client_disconnected   sR     <<,,44T:::**+CYOOOLLWX ;Os!   +A'A#A'A%A'%A'c                 D   K   | j                  d|       d{    y7 w)z%Handle client session timeout events.r.   Nr   r   s     r)   r   z,WebsocketServerTransport._on_session_timeout  s     &&';YGGGs     c                 B   K   | j                  d       d{    y7 w)z%Handle WebSocket server ready events.r/   Nr   r   s    r)   r   z,WebsocketServerTransport._on_websocket_ready  s     &&';<<<r   )	localhosti="  NN)r    r!   r"   r#   r   r}   r&   r   r;   r3   r   r   r   r   r   r   r   r~   r   s   @r)   r   r     s    .  $(%)%;%%; %; 	%;
 SM%; c]%;N
4 

6 
YYH=r(   r   )3r#   rD   r   r   r   typingr   r   r   logurur   pydanticr   pipecat.frames.framesr   r	   r
   r   r   r   r   r   r   r   r   "pipecat.processors.frame_processorr   #pipecat.serializers.base_serializerr   pipecat.transports.base_inputr   pipecat.transports.base_outputr   !pipecat.transports.base_transportr   r   r0   websockets.asyncio.serverr   rc   websockets.protocolr   ModuleNotFoundErrorrw   rv   ru   r   r+   r3   r   r   r'   r(   r)   <module>r      s     	   0 0      > ? < > L,B)*O *6y 6 b$6 bJh8%8 h8Vl=} l=i  ,FLL;qc"#FLL_`
&qc*
++,s   $B, ,C(12C##C(