
    qi`V                        d Z ddlZddlZddlZddlZddlZddlmZmZmZ ddl	m
Z
 ddlmZ ddlmZmZmZmZmZmZmZmZmZmZmZ ddlmZ ddlmZ dd	lmZ dd
lm Z  ddl!m"Z"m#Z# 	 ddl$m%Z% ddl&m'Z'  G d de#      Z, G d de      Z- G d d      Z. G d d      Z/ G d de      Z0 G d de       Z1 G d de"      Z2y# e($ r7Z) e
jT                  de)         e
jT                  d        e+de)       dZ)[)ww xY w)a  FastAPI WebSocket transport implementation for Pipecat.

This module provides WebSocket-based transport for real-time audio/video streaming
using FastAPI and WebSocket connections. Supports binary and text serialization
with configurable session timeouts and WAV header generation.
    N)	AwaitableCallableOptional)logger)	BaseModel)CancelFrameClientConnectedFrameEndFrameFrameInputAudioRawFrameInputTransportMessageFrameInterruptionFrameOutputAudioRawFrameOutputTransportMessageFrame!OutputTransportMessageUrgentFrame
StartFrame)FrameDirection)FrameSerializer)BaseInputTransport)BaseOutputTransport)BaseTransportTransportParams)	WebSocket)WebSocketStatezException: zTIn order to use FastAPI websockets, you need to `pip install pipecat-ai[websocket]`.zMissing module: c                   \    e Zd ZU dZdZeed<   dZee	   ed<   dZ
ee   ed<   dZee   ed<   y)FastAPIWebsocketParamsa  Configuration parameters for FastAPI WebSocket transport.

    Parameters:
        add_wav_header: Whether to add WAV headers to audio frames.
        serializer: Frame serializer for encoding/decoding messages.
        session_timeout: Session timeout in seconds, None for no timeout.
        fixed_audio_packet_size: Optional fixed-size packetization for raw PCM audio payloads.
            Useful when the remote WebSocket media endpoint requires strict audio framing.
    Fadd_wav_headerN
serializersession_timeoutfixed_audio_packet_size)__name__
__module____qualname____doc__r   bool__annotations__r   r   r   r   intr         V/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/transports/websocket/fastapi.pyr   r   6   s?     !ND ,0J)0%)OXc])-1Xc]1r)   r   c                   f    e Zd ZU dZeeged   f   ed<   eeged   f   ed<   eeged   f   ed<   y)FastAPIWebsocketCallbacksa"  Callback functions for WebSocket events.

    Parameters:
        on_client_connected: Called when a client connects to the WebSocket.
        on_client_disconnected: Called when a client disconnects from the WebSocket.
        on_session_timeout: Called when a session timeout occurs.
    Non_client_connectedon_client_disconnectedon_session_timeout)r!   r"   r#   r$   r   r   r   r&   r(   r)   r*   r,   r,   G   sK     "9+y">??$i[)D/%ABB )io!=>>r)   r,   c                   4    e Zd ZdZdefdZd Zdeez  fdZ	y)_WebSocketMessageIteratorzGAsync iterator for WebSocket messages that yields both binary and text.	websocketc                     || _         y N)
_websocketselfr2   s     r*   __init__z"_WebSocketMessageIterator.__init__X   s	    #r)   c                     | S r4   r(   r7   s    r*   	__aiter__z#_WebSocketMessageIterator.__aiter__[   s    r)   returnc                    K   | j                   j                          d {   }|d   dk(  rt        d|v r
|d   |d   S d|v r
|d   |d   S t        7 4w)Ntypezwebsocket.disconnectbytestext)r5   receiveStopAsyncIteration)r7   messages     r*   	__anext__z#_WebSocketMessageIterator.__anext__^   sq     //116?44$$g''"2">7##W!<6?"   2s   AA5AN)
r!   r"   r#   r$   r   r8   r;   r?   strrD   r(   r)   r*   r1   r1   U   s&    Q$) $! !r)   r1   c                       e Zd ZdZdedefdZdefdZde	j                  eez     fdZd	eez  fd
Zd Zd Zd Zd Zd Zedefd       Zedefd       Zy)FastAPIWebsocketClientzWebSocket client wrapper for handling connections and message passing.

    Manages WebSocket state, message sending/receiving, and connection lifecycle
    with support for both binary and text message types.
    r2   	callbacksc                 <    || _         d| _        || _        d| _        y)zInitialize the WebSocket client.

        Args:
            websocket: The FastAPI WebSocket connection.
            callbacks: Event callback functions.
        Fr   N)r5   _closing
_callbacks_leave_counter)r7   r2   rH   s      r*   r8   zFastAPIWebsocketClient.__init__p   s!     $#r)   _c                 6   K   | xj                   dz  c_         yw)z]Set up the WebSocket client.

        Args:
            _: The start frame (unused).
           N)rL   )r7   rM   s     r*   setupzFastAPIWebsocketClient.setup|   s      	q s   r<   c                 ,    t        | j                        S )zGet an async iterator for receiving WebSocket messages.

        Returns:
            An async iterator yielding bytes or strings.
        )r1   r5   r:   s    r*   rA   zFastAPIWebsocketClient.receive   s     )99r)   datac           
      T  K   	 | j                         rXt        |t              r$| j                  j	                  |       d{    y| j                  j                  |       d{    yy7 *7 # t        $ r}t        j                  |  d|j                  j                   d| d| j                  j                          | j                  j                  t        j                  k(  r2| j                  s!t        j                  d       d| _        Y d}~yY d}~yY d}~yd}~ww xY ww)zySend data through the WebSocket connection.

        Args:
            data: The data to send (string or bytes).
        N exception sending data:  (z), application_state: z'Closing already disconnected websocket!T)	_can_send
isinstancer?   r5   
send_bytes	send_text	Exceptionr   error	__class__r!   application_stater   DISCONNECTED
is_closingwarningrJ   )r7   rR   es      r*   sendzFastAPIWebsocketClient.send   s     	%~~dE*//44T::://33D999	  :9 	%LL&1!++2F2F1Gr!Lbcgcrcr  dE  dE  cF  G 11^5P5PPHI $ ( Q	%s_   D(>A1 A-A1 D(A1 &A/'A1 +D(-A1 /A1 1	D%:BD D( D%%D(c                 B  K   | xj                   dz  c_         | j                   dkD  ry| j                  r8| j                  s+d| _        	 | j                  j                          d{    yyy7 # t        $ r$}t        j                  |  d|        Y d}~yd}~ww xY ww)z Disconnect the WebSocket client.rO   r   NTz( exception while closing the websocket: )	rL   is_connectedr_   rJ   r5   closerZ   r   r[   )r7   ra   s     r*   
disconnectz!FastAPIWebsocketClient.disconnect   s     q "T__ DMSoo++--- &5 . Sv%MaSQRRSsB   ABA/ %A-&A/ *B-A/ /	B8BBBBc                 j   K   | j                   j                  | j                         d{    y7 w)z)Trigger the client disconnected callback.N)rK   r.   r5   r:   s    r*   trigger_client_disconnectedz2FastAPIWebsocketClient.trigger_client_disconnected   s"     oo44T__EEE   )313c                 j   K   | j                   j                  | j                         d{    y7 w)z&Trigger the client connected callback.N)rK   r-   r5   r:   s    r*   trigger_client_connectedz/FastAPIWebsocketClient.trigger_client_connected   s"     oo11$//BBBri   c                 j   K   | j                   j                  | j                         d{    y7 w)z$Trigger the client timeout callback.N)rK   r/   r5   r:   s    r*   trigger_client_timeoutz-FastAPIWebsocketClient.trigger_client_timeout   s"     oo00AAAri   c                 8    | j                   xr | j                   S )z0Check if data can be sent through the WebSocket.)rd   r_   r:   s    r*   rV   z FastAPIWebsocketClient._can_send   s      8%88r)   c                 P    | j                   j                  t        j                  k(  S )zCheck if the WebSocket is currently connected.

        Returns:
            True if the WebSocket is in connected state.
        )r5   client_stater   	CONNECTEDr:   s    r*   rd   z#FastAPIWebsocketClient.is_connected   s     ++~/G/GGGr)   c                     | j                   S )zCheck if the WebSocket is currently closing.

        Returns:
            True if the WebSocket is in the process of closing.
        )rJ   r:   s    r*   r_   z!FastAPIWebsocketClient.is_closing   s     }}r)   N)r!   r"   r#   r$   r   r,   r8   r   rP   typingAsyncIteratorr?   rE   rA   rb   rf   rh   rk   rm   rV   propertyr%   rd   r_   r(   r)   r*   rG   rG   i   s    
 ) 
 8Q 
 !Z !:--eck: :%sU{ %2SFCB9 Hd H H D  r)   rG   c                   |     e Zd ZdZdededef fdZdef fdZ	d Z
def fd	Zdef fd
Z fdZd Zd Z xZS )FastAPIWebsocketInputTransportzInput transport for FastAPI WebSocket connections.

    Handles incoming WebSocket messages, deserializes frames, and manages
    connection monitoring with optional session timeouts.
    	transportclientparamsc                 z    t        |   |fi | || _        || _        || _        d| _        d| _        d| _        y)a$  Initialize the WebSocket input transport.

        Args:
            transport: The parent transport instance.
            client: The WebSocket client wrapper.
            params: Transport configuration parameters.
            **kwargs: Additional arguments passed to parent class.
        NF)superr8   
_transport_client_params_receive_task_monitor_websocket_task_initializedr7   rx   ry   rz   kwargsr\   s        r*   r8   z'FastAPIWebsocketInputTransport.__init__   sF     	*6*#!'+$ "r)   framec                   K   t         |   |       d{    | j                  ryd| _        | j                  j	                  |       d{    | j
                  j                  r-| j
                  j                  j	                  |       d{    | j                  s:| j
                  j                  r$| j                  | j                               | _        | j                  j                          d{    | j                  t                      d{    | j                  s$| j                  | j                               | _        | j!                  |       d{    y7 R7 7 7 w7 X7 w)zStart the input transport and begin message processing.

        Args:
            frame: The start frame containing initialization parameters.
        NT)r|   startr   r~   rP   r   r   r   r   create_task_monitor_websocketrk   
push_framer	   r   _receive_messagesset_transport_readyr7   r   r\   s     r*   r   z$FastAPIWebsocketInputTransport.start   s'     gmE""" ll  '''<<"",,))//666++0L0L+/+;+;D<S<S<U+VD(ll33555oo24555!!!%!1!1$2H2H2J!KD&&u---! 	# 	(6 	65 	.sl   E6E(6E6E+AE6E.A'E68E09 E6E2AE6"E4#E6+E6.E60E62E64E6c                    K   | j                   r*| j                  | j                          d{    d| _         | j                  r+| j                  | j                         d{    d| _        yy7 C7 w)zStop all running tasks.N)r   cancel_taskr   r:   s    r*   _stop_tasksz*FastAPIWebsocketInputTransport._stop_tasks  sm     ''""4#?#?@@@+/D(""4#5#5666!%D  A 7s!   +A5A15A5#A3$A53A5c                    K   t         |   |       d{    | j                          d{    | j                  j	                          d{    y7 ?7 )7 	w)zStop the input transport and cleanup resources.

        Args:
            frame: The end frame signaling transport shutdown.
        N)r|   stopr   r~   rf   r   s     r*   r   z#FastAPIWebsocketInputTransport.stop  sP      gl5!!!   ll%%''' 	" '1   AAAA!AAAAAc                    K   t         |   |       d{    | j                          d{    | j                  j	                          d{    y7 ?7 )7 	w)zCancel the input transport and stop all processing.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        N)r|   cancelr   r~   rf   r   s     r*   r   z%FastAPIWebsocketInputTransport.cancel   sP      gnU###   ll%%''' 	$ 'r   c                    K   t         |           d{    | j                  j                          d{    y7 '7 wzClean up transport resources.Nr|   cleanupr}   r7   r\   s    r*   r   z&FastAPIWebsocketInputTransport.cleanup*  6     gooo%%''' 	 '   A=!A?AAc           	        K   	 | j                   j                         2 3 d{   }| j                  j                  s | j                  j                  j	                  |       d{   }|sPt        |t              r| j                  |       d{    zt        |t              r*| j                  t        |j                         d{    | j                  |       d{    7 7 7 ]7 %7 6 nH# t        $ r<}t        j                  |  d|j                  j                    d| d       Y d}~nd}~ww xY w| j                   j"                  s$| j                   j%                          d{  7   yyw)z3Main message receiving loop for WebSocket messages.N)rC   z exception receiving data: rU   ))r~   rA   r   r   deserializerW   r   push_audio_framer   broadcast_framerC   r   rZ   r   r[   r\   r!   r_   rh   )r7   rC   r   ra   s       r*   r   z0FastAPIWebsocketInputTransport._receive_messages/  sH    	[!%!5!5!7 1 1g||.."ll55AA'JJe%78//666'AB../ISXS`S`.aaa//%0001 K 7a0 "8  	[LLD6!<Q[[=Q=Q<RRTUVTWWXYZZ	[
 ||&&,,::<<< 's   E;C8 C6C,C6A C8 &C.'+C8 C09C8 C2C8 &C4'C8 ,C6.C8 0C8 2C8 4C8 6C8 7E;8	D=2D83E;8D==6E;3E64E;c                    K   t        j                  | j                  j                         d{    | j                  j                          d{    y7 '7 w)zeWait for self._params.session_timeout seconds, if the websocket is still open, trigger timeout event.N)asynciosleepr   r   r~   rm   r:   s    r*   r   z1FastAPIWebsocketInputTransport._monitor_websocketI  sA     mmDLL88999ll11333 	:3s!   -AA!AAAA)r!   r"   r#   r$   r   rG   r   r8   r   r   r   r
   r   r   r   r   r   r   __classcell__r\   s   @r*   rw   rw      sa    " " '" '	"2. .0&( ((+ ((
=44r)   rw   c                        e Zd ZdZdededef fdZdef fdZ	de
f fdZdef fd	Z fd
Zdedef fdZdeez  fdZdedefdZdefdZd Z xZS )FastAPIWebsocketOutputTransportzOutput transport for FastAPI WebSocket connections.

    Handles outgoing frame serialization, audio streaming with timing simulation,
    and WebSocket message transmission with optional WAV header generation.
    rx   ry   rz   c                     t        |   |fi | || _        || _        || _        d| _        d| _        t               | _        d| _	        y)a%  Initialize the WebSocket output transport.

        Args:
            transport: The parent transport instance.
            client: The WebSocket client wrapper.
            params: Transport configuration parameters.
            **kwargs: Additional arguments passed to parent class.
        r   FN)
r|   r8   r}   r~   r   _send_interval_next_send_time	bytearray_audio_send_bufferr   r   s        r*   r8   z(FastAPIWebsocketOutputTransport.__init__V  sT     	*6*#    #,+ "r)   r   c                   K   t         |   |       d{    | j                  ryd| _        | j                  j	                  |       d{    | j
                  j                  r-| j
                  j                  j	                  |       d{    | j                  | j                  z  dz  | _	        | j                  |       d{    y7 7 7 C7 w)zStart the output transport and initialize timing.

        Args:
            frame: The start frame containing initialization parameters.
        NT   )r|   r   r   r~   rP   r   r   audio_chunk_sizesample_rater   r   r   s     r*   r   z%FastAPIWebsocketOutputTransport.start~  s      gmE""" ll  '''<<"",,))//666#44t7G7GG1L&&u--- 	# 	(6-sF   CC6CCACC9C
CCCCCc                    K   t         |   |       d{    | j                  |       d{    | j                  j	                          d{    y7 @7 )7 	w)zStop the output transport and cleanup resources.

        Args:
            frame: The end frame signaling transport shutdown.
        N)r|   r   _write_framer~   rf   r   s     r*   r   z$FastAPIWebsocketOutputTransport.stop  sR      gl5!!!&&&ll%%''' 	"&'1   AAAA!AAAAAc                    K   t         |   |       d{    | j                  |       d{    | j                  j	                          d{    y7 @7 )7 	w)zCancel the output transport and stop all processing.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        N)r|   r   r   r~   rf   r   s     r*   r   z&FastAPIWebsocketOutputTransport.cancel  sR      gnU###&&&ll%%''' 	$&'r   c                    K   t         |           d{    | j                  j                          d{    y7 '7 wr   r   r   s    r*   r   z'FastAPIWebsocketOutputTransport.cleanup  r   r   	directionc                   K   t         |   ||       d{    t        |t              rQ| j                  j
                  r| j                  j                          | j                  |       d{    d| _	        yy7 f7 w)zProcess outgoing frames with special handling for interruptions.

        Args:
            frame: The frame to process.
            direction: The direction of frame flow in the pipeline.
        Nr   )
r|   process_framerW   r   r   r    r   clearr   r   )r7   r   r   r\   s      r*   r   z-FastAPIWebsocketOutputTransport.process_frame  st      g#E9555e./||33''--/##E***#$D  0 	6 +s"   BA>AB0B 1B Bc                 B   K   | j                  |       d{    y7 w)zoSend a transport message frame.

        Args:
            frame: The transport message frame to send.
        N)r   )r7   r   s     r*   send_messagez,FastAPIWebsocketOutputTransport.send_message  s      &&&s   r<   c                 8  K   | j                   j                  s| j                   j                  syt        |j                  | j
                  | j                  j                        }| j                  j                  rt        j                         5 }t        j                  |d      5 }|j                  d       |j                  |j                         |j!                  |j
                         |j#                  |j                         ddd       t        |j%                         |j
                  |j                        }|}ddd       | j'                  |       d{    | j)                          d{    y# 1 sw Y   uxY w# 1 sw Y   GxY w7 57 w)zWrite an audio frame to the WebSocket with timing simulation.

        Args:
            frame: The output audio frame to write.

        Returns:
            True if the audio frame was written successfully, False otherwise.
        F)audior   num_channelswbr   N)r   r   T)r~   r_   rd   r   r   r   r   audio_out_channelsr   ioBytesIOwaveopensetsampwidthsetnchannelsr   setframeratewriteframesgetvaluer   _write_audio_sleep)r7   r   bufferwf	wav_frames        r*   write_audio_framez1FastAPIWebsocketOutputTransport.write_audio_frame  sF     <<""$,,*C*C#++((88
 <<&& "YYvt, 0OOA&OOE$6$67OOE$5$56NN5;;/	0
 0OO% % 1 1!&!3!3	
 "" &&& %%'''#0 0" " 	' 	(sU   BFF
'A#E>
:F
F F!F8F9F>F	F

FFFc           	      t  K   | j                   j                  s| j                   j                  sy| j                  j                  sy	 | j                  j                  j                  |       d{   }|r| j                  j                  }|rt        |t        t        f      r| j                  j                  t        |             t        | j                        |k\  rct        | j                  d|       }| j                  d|= | j                   j                  |       d{    t        | j                        |k\  rcy| j                   j                  |       d{    yy7 7 E7 
# t        $ r<}t        j                   |  d|j"                  j$                   d| d       Y d}~yd}~ww xY ww)z1Serialize and send a frame through the WebSocket.NrT   rU   r   )r~   r_   rd   r   r   	serializer    rW   r?   r   r   extendlenrb   rZ   r   r[   r\   r!   )r7   r   payloadpacket_byteschunkra   s         r*   r   z,FastAPIWebsocketOutputTransport._write_frame  st    <<""$,,*C*C||&&	Y LL33==eDDG  $||CCJw	8J$K++225>B d556,F %d&=&=m|&L M 33M\MB"ll//666 d556,F ll''000!  E 7 1 	YLLD6!:1;;;O;O:PPRSTRUUVWXX	Yss   AF8(E0 0E*1B5E0 &E,'E0 F8E0 #E.$E0 (F8*E0 ,E0 .E0 0	F592F0+F80F55F8c                 :  K   t        j                         }t        d| j                  |z
        }t	        j
                  |       d{    |dk(  r't        j                         | j                  z   | _        y| xj                  | j                  z  c_        y7 Pw)z7Simulate audio playback timing with appropriate delays.r   N)time	monotonicmaxr   r   r   r   )r7   current_timesleep_durations      r*   r   z2FastAPIWebsocketOutputTransport._write_audio_sleep
  s{      ~~'Q 4 4| CDmmN+++Q#'>>#3d6I6I#ID   D$7$77 	 	,s   ABB	AB)r!   r"   r#   r$   r   rG   r   r8   r   r   r
   r   r   r   r   r   r   r   r   r   r   r   r%   r   r   r   r   r   s   @r*   r   r   O  s    &" &" '&" '	&"P. .&( ((+ ((
% %> %"'03TT'%-@ %T %NY Y<	8r)   r   c            
       p     e Zd ZdZ	 	 ddededee   dee   f fdZde	fdZ
defd	Zd
 Zd Zd Z xZS )FastAPIWebsocketTransportax  FastAPI WebSocket transport for real-time audio/video streaming.

    Provides bidirectional WebSocket communication with frame serialization,
    session management, and event handling for client connections and timeouts.

    Event handlers available:

    - on_client_connected(transport, websocket): Client WebSocket connected
    - on_client_disconnected(transport, websocket): Client WebSocket disconnected
    - on_session_timeout(transport, websocket): Session timed out

    Example::

        @transport.event_handler("on_client_connected")
        async def on_client_connected(transport, websocket):
            ...
    r2   rz   
input_nameoutput_namec                    t         |   ||       || _        t        | j                  | j
                  | j                        | _        t        || j                        | _	        t        | | j                  | j                  | j                        | _        t        | | j                  | j                  | j                        | _        | j!                  d       | j!                  d       | j!                  d       y)a4  Initialize the FastAPI WebSocket transport.

        Args:
            websocket: The FastAPI WebSocket connection.
            params: Transport configuration parameters.
            input_name: Optional name for the input processor.
            output_name: Optional name for the output processor.
        )r   r   )r-   r.   r/   )namer-   r.   r/   N)r|   r8   r   r,   _on_client_connected_on_client_disconnected_on_session_timeoutrK   rG   r~   rw   _input_name_inputr   _output_name_output_register_event_handler)r7   r2   rz   r   r   r\   s        r*   r8   z"FastAPIWebsocketTransport.__init__)  s     	JKH3 $ 9 9#'#?#?#77
 .iI4$,,43C3C
 7$,,43D3D
 	$$%:;$$%=>$$%9:r)   r<   c                     | j                   S )zqGet the input transport processor.

        Returns:
            The WebSocket input transport instance.
        )r   r:   s    r*   inputzFastAPIWebsocketTransport.inputQ  s     {{r)   c                     | j                   S )zsGet the output transport processor.

        Returns:
            The WebSocket output transport instance.
        )r   r:   s    r*   outputz FastAPIWebsocketTransport.outputY  s     ||r)   c                 D   K   | j                  d|       d{    y7 w)zHandle client connected event.r-   N_call_event_handlerr6   s     r*   r   z.FastAPIWebsocketTransport._on_client_connecteda  s     &&'<iHHH     c                 D   K   | j                  d|       d{    y7 w)z!Handle client disconnected event.r.   Nr   r6   s     r*   r   z1FastAPIWebsocketTransport._on_client_disconnectede  s     &&'?KKKr   c                 D   K   | j                  d|       d{    y7 w)zHandle session timeout event.r/   Nr   r6   s     r*   r   z-FastAPIWebsocketTransport._on_session_timeouti  s     &&';YGGGr   )NN)r!   r"   r#   r$   r   r   r   rE   r8   rw   r   r   r   r   r   r   r   r   s   @r*   r   r     sp    , %)%)&;&; '&; SM	&;
 c]&;P5 7 ILHr)   r   )3r$   r   r   r   rs   r   r   r   r   logurur   pydanticr   pipecat.frames.framesr   r	   r
   r   r   r   r   r   r   r   r   "pipecat.processors.frame_processorr   #pipecat.serializers.base_serializerr   pipecat.transports.base_inputr   pipecat.transports.base_outputr   !pipecat.transports.base_transportr   r   fastapir   starlette.websocketsr   ModuleNotFoundErrorra   r[   rZ   r   r,   r1   rG   rw   r   r   r(   r)   r*   <module>r      s    	    0 0      > ? < > L,!32_ 2"?	 ?! !(i iXw4%7 w4tD8&9 D8NUH UHQ  ,FLL;qc"#FLL^ &qc*
++,s   (C   C<2C77C<