
    qi                     2   d Z ddlZddlZddlZddlmZ ddlmZmZm	Z	m
Z
mZmZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZmZmZmZmZmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z. ddl/m0Z0m1Z1 ddl2m3Z3 ddl4m5Z5 dZ6dZ7 G d de1      Z8y)zBase output transport implementation for Pipecat.

This module provides the BaseOutputTransport class which handles audio and video
output processing, including frame buffering, mixing, timing, and media streaming.
    N)ThreadPoolExecutor)AnyAsyncGeneratorDictListMappingOptional)logger)Image)load_dtmf_audio)BaseAudioMixer)create_stream_resampler
is_silence)AssistantImageRawFrameBotSpeakingFrameBotStartedSpeakingFrameBotStoppedSpeakingFrameCancelFrameEndFrameFrameInterruptionFrameMixerControlFrameOutputAudioRawFrameOutputDTMFFrameOutputDTMFUrgentFrameOutputImageRawFrameOutputTransportMessageFrame!OutputTransportMessageUrgentFrameOutputTransportReadyFrameSpeechOutputAudioRawFrameSpriteFrame
StartFrameSystemFrameTTSAudioRawFrameTTSStoppedFrame)FrameDirectionFrameProcessor)TransportParams)nanoseconds_to_secondsgffffff?   c                   n    e Zd ZdZdef fdZedefd       Zedefd       Z	de
fdZdefd	Zdefd
Zde
fdZdeez  fdZdefdZdefdZdedefdZdedefdZdeez  fdZdefdZ defdZ!deez  fdZ"deez  fdZ#defdZ$dee%z  fdZ&dede'f fdZ(defdZ) G d d      Z* xZ+S )BaseOutputTransporta%  Base class for output transport implementations.

    Handles audio and video output processing including frame buffering, audio mixing,
    timing coordination, and media streaming. Supports multiple output destinations
    and provides interruption handling for real-time communication.
    paramsc                 \    t        |   di | || _        d| _        d| _        i | _        y)zInitialize the base output transport.

        Args:
            params: Transport configuration parameters.
            **kwargs: Additional arguments passed to parent class.
        r   N )super__init___params_sample_rate_audio_chunk_size_media_senders)selfr-   kwargs	__class__s      P/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/transports/base_output.pyr1   zBaseOutputTransport.__init__B   s<     	"6"  "#
 MO    returnc                     | j                   S )z`Get the current audio sample rate.

        Returns:
            The sample rate in Hz.
        r3   r6   s    r9   sample_ratezBaseOutputTransport.sample_rate[   s        r:   c                     | j                   S )zyGet the audio chunk size for output processing.

        Returns:
            The size of audio chunks in bytes.
        r4   r>   s    r9   audio_chunk_sizez$BaseOutputTransport.audio_chunk_sized   s     %%%r:   framec                    K   | j                   j                  xs |j                  | _        t        | j                  dz        | j                   j                  z  dz  }|| j                   j
                  z  | _        yw)zStart the output transport and initialize components.

        Args:
            frame: The start frame containing initialization parameters.
        d      N)r2   audio_out_sample_rater3   intaudio_out_channelsaudio_out_10ms_chunksr4   )r6   rC   audio_bytes_10mss      r9   startzBaseOutputTransport.startm   si      !LL>>]%B]B]
 t00367$,,:Y:YY\]]!1DLL4V4V!Vs   A;A=c                    K   | j                   j                         D ]  \  }}|j                  |       d{      y7 w)zStop the output transport and cleanup resources.

        Args:
            frame: The end frame signaling transport shutdown.
        N)r5   itemsstopr6   rC   _senders       r9   rO   zBaseOutputTransport.stop{   s?      ,,224 	%IAv++e$$$	%$   5A?Ac                    K   | j                   j                         D ]  \  }}|j                  |       d{      y7 w)zCancel the output transport and stop all processing.

        Args:
            frame: The cancel frame signaling immediate cancellation.
        N)r5   rN   cancelrP   s       r9   rU   zBaseOutputTransport.cancel   s?      ,,224 	'IAv--&&&	'&rS   c                   K   | j                   j                  D ]  }| j                  |       d{     | j                   j                  D ]  }| j	                  |       d{     t
        j                  | d| j                  | j                  | j                         | j                  d<   | j                  d   j                  |       d{    t        t        | j                   j                  | j                   j                  z               }|D ]m  }t
        j                  | || j                  | j                  | j                         | j                  |<   | j                  |   j                  |       d{    o | j                  t               t        j                          d{    y7 7 W7 7 ?7 w)zCalled when the transport is ready to stream.

        Args:
            frame: The start frame containing initialization parameters.
        N)destinationr?   rB   r-   )r2   audio_out_destinationsregister_audio_destinationvideo_out_destinationsregister_video_destinationr,   MediaSenderr?   rB   r5   rL   listset
push_framer   r&   UPSTREAM)r6   rC   rW   destinationss       r9   set_transport_readyz'BaseOutputTransport.set_transport_ready   s      <<>> 	?K11+>>>	?  <<>> 	?K11+>>>	? %8$C$C((!22<< %D %
D! !!$'--e444 33dll6Y6YYZ

 ( 	@K/B/N/N' ,,!%!6!6|| 0O 0D, %%k288???	@ oo79>;R;RSSSC ? ? 	5" @ 	TsY   .GF83G$F;%A,GF>B.G G 1G2G3G;G>G GGc                    K   yw)ziSend a transport message.

        Args:
            frame: The transport message frame to send.
        Nr/   r6   rC   s     r9   send_messagez BaseOutputTransport.send_message   s      	   rW   c                    K   yw)z}Register a video output destination.

        Args:
            destination: The destination identifier to register.
        Nr/   r6   rW   s     r9   r[   z.BaseOutputTransport.register_video_destination         	rf   c                    K   yw)z~Register an audio output destination.

        Args:
            destination: The destination identifier to register.
        Nr/   rh   s     r9   rY   z.BaseOutputTransport.register_audio_destination   ri   rf   c                    K   yw)zWrite a video frame to the transport.

        Args:
            frame: The output video frame to write.

        Returns:
            True if the video frame was written successfully, False otherwise.
        Fr/   rd   s     r9   write_video_framez%BaseOutputTransport.write_video_frame         rf   c                    K   yw)zWrite an audio frame to the transport.

        Args:
            frame: The output audio frame to write.

        Returns:
            True if the audio frame was written successfully, False otherwise.
        Fr/   rd   s     r9   write_audio_framez%BaseOutputTransport.write_audio_frame   rm   rf   c                    K   | j                         r| j                  |       d{    y| j                  |       d{    y7 7 w)z}Write a DTMF tone using the transport's preferred method.

        Args:
            frame: The DTMF frame to write.
        N)_supports_native_dtmf_write_dtmf_native_write_dtmf_audiord   s     r9   
write_dtmfzBaseOutputTransport.write_dtmf   sE      %%'))%000((/// 1/s!   %AAAA	A	Ac                    K   yw)aM  Handle a queued frame after preceding audio has been sent.

        Override in transport subclasses to handle custom frame types that
        flow through the audio queue. Called by the media sender after the
        frame has waited for any preceding audio to finish.

        Args:
            frame: The frame to handle.
        Nr/   rd   s     r9   write_transport_framez)BaseOutputTransport.write_transport_frame   s      	rf   c                      y)zOverride in transport implementations that support native DTMF.

        Returns:
            True if the transport supports native DTMF, False otherwise.
        Fr/   r>   s    r9   rq   z)BaseOutputTransport._supports_native_dtmf   s     r:   c                     K   t        d      w)zzOverride in transport implementations for native DTMF.

        Args:
            frame: The DTMF frame to write.
        z=Transport claims native DTMF support but doesn't implement it)NotImplementedErrorrd   s     r9   rr   z&BaseOutputTransport._write_dtmf_native  s      ""abbs   c                    K   t        |j                  | j                         d{   }t        || j                  d      }| j	                  |       d{    y7 67 w)zkGenerate and send audio tones for DTMF.

        Args:
            frame: The DTMF frame to write.
        )r?   N   audior?   num_channels)r   buttonr3   r   ro   )r6   rC   
dtmf_audiodtmf_audio_frames       r9   rs   z%BaseOutputTransport._write_dtmf_audio  s\      +5<<TEVEVWW
.$*;*;!
 $$%5666	 X 	7s!   %A"A0A"A A" A"c                 `   K   | j                  |t        j                         d{    y7 w)zcSend an audio frame downstream.

        Args:
            frame: The audio frame to send.
        Nqueue_framer&   
DOWNSTREAMrd   s     r9   
send_audiozBaseOutputTransport.send_audio  $      un&?&?@@@   $.,.c                 `   K   | j                  |t        j                         d{    y7 w)zcSend an image frame downstream.

        Args:
            frame: The image frame to send.
        Nr   rd   s     r9   
send_imagezBaseOutputTransport.send_image#  r   r   	directionc                 0  K   t         |   ||       d{    t        |t              r4| j	                  ||       d{    | j                  |       d{    yt        |t              r4| j                  |       d{    | j	                  ||       d{    yt        |t              r4| j                  |       d{    | j	                  ||       d{    yt        |t              r4| j	                  ||       d{    | j                  |       d{    yt        |t              r| j                  |       d{    yt        |t              r| j                  |       d{    yt        |t               r| j	                  ||       d{    y|t"        j$                  k(  r| j	                  ||       d{    y| j                  |       d{    y7 7 7 7 x7 a7 :7 #7 7 7 7 7 j7 >7 &w)zProcess incoming frames and handle transport-specific logic.

        Args:
            frame: The frame to process.
            direction: The direction of frame flow in the pipeline.
        N)r0   process_frame
isinstancer"   r_   rL   r   rO   r   rU   r   _handle_framer   re   r   rt   r#   r&   r`   )r6   rC   r   r8   s      r9   r   z!BaseOutputTransport.process_frame/  s     g#E9555eZ( //%333**U###x())E"""//%333{+++e$$$//%33301//%333$$U+++@A##E***45//%((({+//%333.111//%333$$U+++5 	6
 4#"3$33+*(33+s   HG3)HG6HG9)HG<HG?)HH	H"H#*HHH&H
')HH)H:H;*H%H&-HHH-H.H6H9H<H?HHHH
HHHHHHc                 b  K   |j                   | j                  vr(t        j                  |  d|j                    d|        y| j                  |j                      }t	        |t
              r|j                  |       d{    yt	        |t              r|j                  |       d{    yt	        |t        t        f      rD|j                  |       d{    t	        |t              r|j                  |       d{    yyt	        |t              r|j                  |       d{    yt	        |t               r|j                  |       d{    y|j"                  r|j%                  |       d{    y|j                  |       d{    y7 7 7 7 7 x7 P7 ,7 w)z;Handle frames by routing them to appropriate media senders.z destination [z] not registered for frame N)transport_destinationr5   r
   warningr   r   handle_interruptionsr   handle_audio_framer   r!   handle_image_framer   handle_sync_framer   handle_mixer_control_framer%   ptshandle_timed_frame)r6   rC   rR   s      r9   r   z!BaseOutputTransport._handle_frameR  sx    &&d.A.AANN&u'B'B&CC^_d^ef $$U%@%@Ae./--e44423++E222 3[AB++E222%!78 ..u555 9 0133E:::/**5111YY++E222**5111! 522 6:121s   A>F/ F)F/*F!+/F/F#(F/F%*F/.F'/)F/F)%F/>F+?F/F-F/!F/#F/%F/'F/)F/+F/-F/c            
          e Zd ZdZdddee   dededef
dZe	d	efd
       Z
e	d	efd       ZdefdZdefdZdefdZdefdZdefdZdeez  fdZdefdZdefdZdefdZd Zd Zd Z d Z!d Z"de#fdZ$defdZ%defdZ&d	e'edf   fd Z(d!efd"Z)d# Z*d$ Z+d% Z,d&efd'Z-d(e.e   fd)Z/d* Z0d+ Z1defd,Z2d- Z3d. Z4d/ Z5y)0BaseOutputTransport.MediaSenderzHandles media streaming for a specific destination.

        Manages audio and video output processing including buffering, timing,
        mixing, and frame delivery for a single output destination.
        	transportr,   rW   r?   rB   r-   c                4   || _         || _        || _        || _        || _        t        d      | _        t               | _        t               | _
        d| _        d| _        d| _        d| _        d| _        d| _        d| _        d| _        d| _        d| _        y)az  Initialize the media sender.

            Args:
                transport: The parent transport instance.
                destination: The destination identifier for this sender.
                sample_rate: The audio sample rate in Hz.
                audio_chunk_size: The size of audio chunks in bytes.
                params: Transport configuration parameters.
            r{   )max_workersNFr   g?)
_transport_destinationr3   r4   r2   r   	_executor	bytearray_audio_bufferr   
_resampler_mixer_video_images_bot_speaking_tts_audio_received_bot_speaking_frame_time_bot_speaking_frame_period_bot_speech_last_time_audio_task_video_task_clock_task)r6   r   rW   r?   rB   r-   s         r9   r1   z(BaseOutputTransport.MediaSender.__init__z  s    $ (DO +D +D%5D"!DL 0A>DN "+D 67DO 59DK "&D "'D',D$,-D) /2D+)*D&7;D7;D7;Dr:   r;   c                     | j                   S )zdGet the audio sample rate.

            Returns:
                The sample rate in Hz.
            r=   r>   s    r9   r?   z+BaseOutputTransport.MediaSender.sample_rate  s     $$$r:   c                     | j                   S )zoGet the audio chunk size.

            Returns:
                The size of audio chunks in bytes.
            rA   r>   s    r9   rB   z0BaseOutputTransport.MediaSender.audio_chunk_size  s     )))r:   rC   c                 0  K   t               | _        | j                          | j                          | j	                          | j
                  j                  rt        | j
                  j                  t              r6| j
                  j                  j                  | j                  d      | _        n'| j                  s| j
                  j                  | _        | j                  r.| j                  j                  | j                         d{    yy7 w)zStart the media sender and initialize components.

            Args:
                frame: The start frame containing initialization parameters.
            N)r   r   _create_video_task_create_clock_task_create_audio_taskr2   audio_out_mixerr   r   getr   r   rL   r3   rd   s     r9   rL   z%BaseOutputTransport.MediaSender.start  s      "+D ##%##%##% ||++dll::GD"&,,">">"B"B4CTCTVZ"[DK**"&,,">">DK {{kk''(9(9::: :s   DDDDc                   K   | j                   j                  t        d      |j                  |f       d{    | j                  j                  |       d{    | j
                  r| j
                   d{    | j                  r| j                   d{    | j                  r"| j                  j                          d{    | j                          d{    y7 7 7 o7 Q7 %7 w)zStop the media sender and cleanup resources.

            Args:
                frame: The end frame signaling sender shutdown.
            infN)
_clock_queueputfloatid_audio_queuer   r   r   rO   _cancel_video_taskrd   s     r9   rO   z$BaseOutputTransport.MediaSender.stop  s      ##''uuxx(GHHH##''... &&&&&&&& {{kk&&((( ))+++# I. '& ) ,si   5C2C&"C2C(C2:C*;C2C,-C2C.	C2 C0!C2(C2*C2,C2.C20C2c                    K   | j                          d{    | j                          d{    | j                          d{    y7 57 7 	w)zCancel the media sender and stop all processing.

            Args:
                frame: The cancel frame signaling immediate cancellation.
            N)_cancel_audio_task_cancel_clock_taskr   rd   s     r9   rU   z&BaseOutputTransport.MediaSender.cancel  sK      ))+++))+++))+++ ,++s1   AAAAAAAAArQ   c                 j  K   | j                   j                  sy| j                          d{    | j                          d{    | j	                          d{    | j                          | j                          | j                          | j                          d{    y7 }7 g7 Q7 w)zHandle interruption events by restarting tasks and clearing buffers.

            Args:
                _: The start interruption frame (unused).
            N)	r   _allow_interruptionsr   r   r   r   r   r   _bot_stopped_speaking)r6   rQ   s     r9   r   z4BaseOutputTransport.MediaSender.handle_interruptions  s      ??77 ))+++))+++))+++ ##%##%##% ,,... ,++ /sF   +B3B+B3B-B3B/AB3%B1&B3-B3/B31B3c                   K   | j                   j                  sy| j                  j                  |j                  |j
                  | j                         d{   }t        |      }| j                  j                  |       t        | j                        | j                  k\  r |t        | j                  d| j                         | j                  |j                        }| j                  |_        | j                   j#                  |       d{    | j                  | j                  d | _        t        | j                        | j                  k\  ryy7 7 Jw)zHandle incoming audio frames by buffering and chunking.

            Args:
                frame: The output audio frame to handle.
            N)r?   r~   )r2   audio_out_enabledr   resampler}   r?   r3   typer   extendlenr4   bytesr~   r   r   r   r   )r6   rC   	resampledclschunks        r9   r   z2BaseOutputTransport.MediaSender.handle_audio_frame  s&     <<11 #oo66U..0A0A I u+C%%i0d(()T-C-CC$,,-Et/E/EFG $ 1 1!&!3!3
 /3.?.?+''++E222%)%7%78N8N8P%Q" d(()T-C-CC 3s,   AE EB:E EAE E E c                 t  K   | j                   j                  sy| j                   j                  r4t        |t              r$| j
                  j                  |       d{    yt        |t              r| j                  |       d{    y| j                  |j                         d{    y7 S7 +7 	w)zHandle incoming image frames for video output.

            Args:
                frame: The output image or sprite frame to handle.
            N)
r2   video_out_enabledvideo_out_is_liver   r   _video_queuer   _set_video_image_set_video_imagesimagesrd   s     r9   r   z2BaseOutputTransport.MediaSender.handle_image_frame3  s      <<11||--*UDW2X''++E222E#67++E222,,U\\:::	 32:s6   AB8B2)B8B4	#B8,B6-B84B86B8c                    K   | j                   j                  |j                  |j                  |f       d{    y7 w)zHandle frames with presentation timestamps.

            Args:
                frame: The frame with timing information to handle.
            N)r   r   r   r   rd   s     r9   r   z2BaseOutputTransport.MediaSender.handle_timed_frameC  s1      ##''EHHe(DEEEs   6A >A c                 V   K   | j                   j                  |       d{    y7 w)zHandle frames that need synchronized processing.

            Args:
                frame: The frame to handle synchronously.
            N)r   r   rd   s     r9   r   z1BaseOutputTransport.MediaSender.handle_sync_frameK  s"      ##''...s   )')c                 p   K   | j                   r$| j                   j                  |       d{    yy7 w)z|Handle audio mixer control frames.

            Args:
                frame: The mixer control frame to handle.
            N)r   r   rd   s     r9   r   z:BaseOutputTransport.MediaSender.handle_mixer_control_frameS  s/      {{kk//666 6s   +646c                     | j                   sHt        j                         | _        | j                  j                  | j                               | _         yy)z!Create the audio processing task.N)r   asyncioQueuer   r   create_task_audio_task_handlerr>   s    r9   r   z2BaseOutputTransport.MediaSender._create_audio_task`  s>    ##$+MMO!#'??#>#>t?W?W?Y#Z  $r:   c                    K   | j                   r5| j                  j                  | j                          d{    d| _         yy7 w)z-Cancel and cleanup the audio processing task.N)r   r   cancel_taskr>   s    r9   r   z2BaseOutputTransport.MediaSender._cancel_audio_taskf  =     oo11$2B2BCCC#'   C   5AAAc                    K   | j                   ryd| _         t        j                  d| j                  rd| j                   dnd d       t	               }| j                  |_        t	               }| j                  |_        |j                  |_        |j                  |_        | j                  j                  |       d{    | j                  j                  |t        j                         d{    y7 77 w)z"Handle bot started speaking event.NTBot [] z started speaking)r   r
   debugr   r   r   r   broadcast_sibling_idr   r_   r&   r`   r6   downstream_frameupstream_frames      r9   _bot_started_speakingz5BaseOutputTransport.MediaSender._bot_started_speakingl  s     !!!%DLL43D3D4,,-Q/"MM^_  78595F5F246N373D3DN0 3C2E2EN/4B4E4E1//,,-=>>>//,,^^=T=TUUU ?Us$   C C>C:1C>4C<5C><C>c                 ,  K   | j                   syd| _         d| _        t               | _        t	        j
                  d| j                  rd| j                   dnd d       t               }| j                  |_        t               }| j                  |_        |j                  |_
        |j                  |_
        | j                  j                  |       d{    | j                  j                  |t        j                         d{    y7 77 w)z"Handle bot stopped speaking event.NFr   r   r   r   z stopped speaking)r   r   r   r   r
   r   r   r   r   r   r   r   r_   r&   r`   r   s      r9   r   z5BaseOutputTransport.MediaSender._bot_stopped_speaking  s     %%!&D',D$ "+DLL43D3D4,,-Q/"MM^_  78595F5F246N373D3DN0 3C2E2EN/4B4E4E1//,,-=>>>//,,^^=T=TUUU ?Us$   CDD1D
DDDc                 V  K   | j                          d{    t        j                         | j                  z
  }|| j                  k\  r@| j                  j                  t               d{    t        j                         | _        t        j                         | _        y7 7 9w)zHandle bot speaking event.N)r   timer   r   r   broadcast_framer   r   )r6   	diff_times     r9   _bot_currently_speakingz7BaseOutputTransport.MediaSender._bot_currently_speaking  s}     ,,...		d&C&CCID;;;oo556FGGG04		-)-D& / Hs"   B)B%AB)-B'.8B)'B)c                    K   t        |j                        s| j                          d {    y t        j                         | j                  z
  }|t
        kD  r| j                          d {    y y 7 I7 wN)r   r}   r   r   r   BOT_VAD_STOP_SECSr   )r6   rC   silence_durations      r9   _maybe_bot_currently_speakingz=BaseOutputTransport.MediaSender._maybe_bot_currently_speaking  sf     ekk*22444#'99;1K1K#K #&7744666 8 5 7s"   )A9A5AA9.A7/A97A9c                    K   t        |t              r d| _        | j                          d {    y t        |t              r| j                  |       d {    y y 7 07 w)NT)r   r$   r   r   r    r   rd   s     r9   _handle_bot_speechz2BaseOutputTransport.MediaSender._handle_bot_speech  s]     %!12 ,0(22444E#<=88??? > 5 @s!   +A"A)A"A A" A"c                   K   t        |t              r| j                  |       d{    yt        |t              r| j	                  |       d{    yt        |t
              r$| j                  |j                         d{    yt        |t              r$| j                  j                  |       d{    yt        |t              r$| j                  j                  |       d{    yt        |t              r;| j                  r.t        j                   d       | j#                          d{    yy| j                  j%                  |       d{    y7 ;7 7 7 7 ~7 67 w)zHandle various frame types with appropriate processing.

            Args:
                frame: The frame to handle.
            Nz-Bot stopped speaking based on TTSStoppedFrame)r   r   r   r   r   r!   r   r   r   r   re   r   rt   r%   r   r
   r   r   rv   rd   s     r9   r   z-BaseOutputTransport.MediaSender._handle_frame  s     %!45--e444E#67++E222E;/,,U\\:::E#>?oo225999E?3oo00777E?3 ++LL!PQ44666 , oo;;EBBB! 52:97 7Bs|   %E2E")E2E%3E2E(3E29E*:3E2-E,.A	E27E.8$E2E0E2%E2(E2*E2,E2.E20E2Nc                      dt         dt        t        df   f fd}dt         dt        t        df   f fd} j                  r |t              S  |t              S )zGenerate the next frame for audio processing.

            Returns:
                An async generator yielding frames for processing.
            vad_stop_secsr;   Nc                &  K   	 	 t        j                  j                  j                         |        d {   }| j                  j	                          X7 $# t         j
                  $ r j                          d {  7   Y 1w xY ww)N)timeout)r   wait_forr   r   	task_doneTimeoutErrorr   )r   rC   r6   s     r9   without_mixerzBBaseOutputTransport.MediaSender._next_frame.<locals>.without_mixer  s     ;&-&6&6 --113]' ! $))335 !
 #// ;"88:::;sD   B2A A#A BA &BBBBBBc                  K   d}dj                   z  }	 	 j                  j                         }t        |t              rFj
                  j                  |j                         d {   |_        t        j                         }| j                  j                          7 =# t        j                  $ r t        j                         |z
  }|| kD  rj                          d {  7   t	        j
                  j                  |       d {  7  j                  j                  j                        }| t        j                   d       d {  7   Y w xY ww)Nr       r|   )r4   r   
get_nowaitr   r   r   mixr}   r   r   r   
QueueEmptyr   r3   r2   rI   sleep)r   last_frame_timesilencerC   r   r6   s        r9   
with_mixerz?BaseOutputTransport.MediaSender._next_frame.<locals>.with_mixer  s    "#!D$:$::/ $ 1 1 < < >%e-@A040L*LEK.2iikO#))335  +M #-- /$(IIK/$A	$}4"&"<"<">>> 3(,(@"@"@(,(9(9)-)H)H!
 $
 &mmA...!/sa   E(AB) )B'*<B) &E('B) )AE%+C.,(E%D
AE%EE%"E($E%%E()r   r   r   r   BOT_VAD_STOP_FALLBACK_SECS)r6   r  r  s   `  r9   _next_framez+BaseOutputTransport.MediaSender._next_frame  s^    
;5 
;^ESWK=X 
;/ /.PT:U /: {{!"<==$%?@@r:   secsc                    K   |dk  ry d}d| j                   z  |z  |z  }t        || j                   d      }| j                  j                  |       d {    y 7 w)Nr   rF   r  r{   r|   )r?   r   r   ro   )r6   r  sample_widthr
  silence_frames        r9   _send_silencez-BaseOutputTransport.MediaSender._send_silence  s`     qyL 0 00<?$FG/4+;+;!M //33MBBBs   AAAAc           	      (  K   | j                         2 3 d{   }t        |t              r/| j                  | j                  j
                         d{     y| j                  |       d{    d}	 t        |t              r#| j                  j                  |       d{   }|s| j                  j                  |       d{    7 7 ~7 e7 1# t        $ r)}t        j                  |  d| d|        d}Y d}~\d}~ww xY w7 B6 yw)z#Main audio processing task handler.NTz Error writing z to transport: F)r  r   r   r  r2   audio_out_end_silence_secsr   r   r   ro   	Exceptionr
   errorr_   )r6   rC   push_downstreames       r9   r   z3BaseOutputTransport.MediaSender._audio_task_handler  s    #//1 < <eeX.,,T\\-T-TUUU ((/// #',!%)<=040Q0QRW0X*X #//44U;;;3< V 0 +Y  ,LLD6qc!RS&+O, <3  2s   DDCD9DCD/C0D7.C%C&C*D-DDDDDDC	D"DDDDDc                     | j                   s_| j                  j                  rHt        j                         | _        | j                  j                  | j                               | _         yyy)z<Create the video processing task if video output is enabled.N)	r   r2   r   r   r   r   r   r   _video_task_handlerr>   s    r9   r   z2BaseOutputTransport.MediaSender._create_video_task7  sN    ##(F(F$+MMO!#'??#>#>t?W?W?Y#Z  )G#r:   c                    K   | j                   r5| j                  j                  | j                          d{    d| _         yy7 w)z-Cancel and cleanup the video processing task.N)r   r   r   r>   s    r9   r   z2BaseOutputTransport.MediaSender._cancel_video_task=  s?      oo11$2B2BCCC#'   Cr   imagec                 B   K   t        j                  |g      | _        yw)zSet a single video image for cycling output.

            Args:
                image: The image frame to cycle for video output.
            N	itertoolscycler   r6   r  s     r9   r   z0BaseOutputTransport.MediaSender._set_video_imageD  s      "+%!9Ds   r   c                 @   K   t        j                  |      | _        yw)zSet multiple video images for cycling output.

            Args:
                images: The list of image frames to cycle for video output.
            Nr  )r6   r   s     r9   r   z1BaseOutputTransport.MediaSender._set_video_imagesL  s      "+!8Ds   c                   K   d| _         d| _        d| j                  j                  z  | _        | j                  dz  | _        	 | j                  j                  r| j                          d{    n| j                  rVt        | j                        }| j                  |       d{    t        j                  | j                         d{    n't        j                  | j                         d{    7 7 V7 17 w)z#Main video processing task handler.Nr   r{      )_video_start_time_video_frame_indexr2   video_out_framerate_video_frame_duration_video_frame_resetr   _video_is_live_handlerr   next_draw_imager   r  r!  s     r9   r  z3BaseOutputTransport.MediaSender._video_task_handlerT  s     %)D"&'D#)*T\\-M-M)MD&&*&@&@1&DD#<<1155777'' !3!34E**5111!--(B(BCCC!--(B(BCCC 7 2CCsH   A+D-C=.:D(C?)&DD'D7D8D?DDDc                   K   | j                   j                          d{   }| j                  s t        j                         | _        d| _        t        j                         | j                  z
  }| j                  | j
                  z  }| j
                  |z   |z
  }t        |      | j                  kD  r!t        j                         | _        d| _        n7|dkD  r2t        j                  |       d{    | xj                  dz  c_        | j                  |       d{    | j                   j                          y7 !7 P7 $w)z.Handle live video streaming with frame timing.Nr   r{   )r   r   r%  r   r&  r(  absr)  r   r  r,  r   )r6   r  real_elapsed_timereal_render_time
delay_times        r9   r*  z6BaseOutputTransport.MediaSender._video_is_live_handlerd  s    ++//11E )))-&*+' !%		d.D.D D#669S9SS336FFIZZJ:!8!88)-&*+'ammJ///''1,' ""5)))'')- 2  0 *s4   EECE3E4-E!E" EEEc                     K   dt         dt         f fd} j                  j                         j                   j                  ||       d{   } j                  j                  |       d{    y7 (7 w)zDraw/render an image frame with resizing if needed.

            Args:
                frame: The image frame to draw.
            rC   r;   c                 h   j                   j                  j                   j                  f}| j                  |k7  rut	        j
                  | j                  | j                  | j                        }|j                  |      }t        |j                         |j                  |j                        } | S r   )r2   video_out_widthvideo_out_heightsizer   	frombytesformatr  resizer   tobytes)rC   desired_sizer  resized_imager6   s       r9   resize_framezABaseOutputTransport.MediaSender._draw_image.<locals>.resize_frame  s     $ < <dll>[>[\
 ::-!OOELL%**ekkRE$)LL$>M/%--/1C1C]EYEYE r:   N)r   r   get_event_looprun_in_executorr   rl   )r6   rC   r=  s   `  r9   r,  z+BaseOutputTransport.MediaSender._draw_image~  sp     $7 <O   //88:JJe E //33E::: ;s$   AA;A7"A;1A92A;9A;c                     | j                   sHt        j                         | _        | j                  j                  | j                               | _         yy)z(Create the clock/timing processing task.N)r   r   PriorityQueuer   r   r   _clock_task_handlerr>   s    r9   r   z2BaseOutputTransport.MediaSender._create_clock_task  sA    ##$+$9$9$;!#'??#>#>t?W?W?Y#Z  $r:   c                    K   | j                   r5| j                  j                  | j                          d{    d| _         yy7 w)z-Cancel and cleanup the clock processing task.N)r   r   r   r>   s    r9   r   z2BaseOutputTransport.MediaSender._cancel_clock_task  r   r   c                   K   d}|r| j                   j                          d{   \  }}}t        |t               }|r{| j                  j                         j                         }||kD  r+t        ||z
        }t        j                  |       d{    | j                  j                  |       d{    | j                   j                          |ryy7 7 H7 'w)z8Main clock/timing task handler for timed frame delivery.TN)r   r   r   r   r   	get_clockget_timer)   r   r  r_   r   )r6   running	timestamprQ   rC   current_time	wait_times          r9   rB  z3BaseOutputTransport.MediaSender._clock_task_handler  s     G,0,=,=,A,A,C&C#	1e )99
 #'??#<#<#>#G#G#IL </$:9|;S$T	%mmI666 //44U;;;!!++-% &C 7 <s:   "C CA.C C"C 6C7 C C C C )6__name__
__module____qualname____doc__r	   strrH   r(   r1   propertyr?   rB   r"   rL   r   rO   r   rU   r   r   r   r   r   r!   r   r   r   r   r   r   r   r   r   r   r   r    r   r   r   r   r  r  r   r   r   r   r   r   r  r*  r,  r   r   rB  r/   r:   r9   r\   r   s  s   	6	<,6	< "#	6	<
 6	< "6	< $6	<p 
	% 	% 
	% 
	*c 	* 
	*	;Z 	;2	,H 	,4		,k 		,	/0A 	/,	R2E 	R6	;2E2S 	; 	F% 	F	/ 	/	7:K 	7	[	(	V.	V8		5	7=V 	7		@% 		@	CU 	C23	At!< 3	Aj		CC 		C	<B	[	(	:0C 	:	9$7J2K 	9	D 	*4	;+> 	;@	[	(	.r:   r\   ),rK  rL  rM  rN  r(   r1   rP  rH   r?   rB   r"   rL   r   rO   r   rU   rb   r   r   re   rO  r[   rY   r   boolrl   r   ro   r   r   rt   r   rv   rq   rr   rs   r   r!   r   r&   r   r   r\   __classcell__)r8   s   @r9   r,   r,   :   st   O O2 !S ! ! &# & &W W% %'+ ')Tz )TV03TTC C 	-@ 	T 		-@ 	T 		0o8M&M 	0
 
t co@U.U c
7_?T-T 
7A&9 AA&9K&G A!, !,> !,F2 2BL	. L	.r:   r,   )9rN  r   r  r   concurrent.futuresr   typingr   r   r   r   r   r	   logurur
   PILr   pipecat.audio.dtmf.utilsr   %pipecat.audio.mixers.base_audio_mixerr   pipecat.audio.utilsr   r   pipecat.frames.framesr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   "pipecat.processors.frame_processorr&   r'   !pipecat.transports.base_transportr(   pipecat.utils.timer)   r   r  r,   r/   r:   r9   <module>r^     sq       1 E E   4 @ C     0 N = 5  E.. E.r:   