
    qi                        d Z ddlZddlZddlZddlmZ ddlmZ ddlmZm	Z	m
Z
mZmZmZmZmZmZ ddlmZ ddlmZ ddlmZ dd	lmZmZmZmZmZmZmZmZm Z m!Z!m"Z" dd
l#m$Z$m%Z% ddl&m'Z'm(Z(m)Z) ddl*m+Z+ ddl,m-Z- ddl.m/Z/  G d de      Z0e
dee0ge	d   f   Z1e G d d             Z2 G d dejf                        Z4dZ5 G d de/      Z6y)zFrame processing pipeline infrastructure for Pipecat.

This module provides the core frame processing system that enables building
audio/video processing pipelines. It includes frame processors, pipeline
management, and frame flow control mechanisms.
    N)	dataclass)Enum)	Any	AwaitableCallable	CoroutineListOptionalSequenceTupleType)logger)BaseInterruptionStrategy)	BaseClock)CancelFrame
ErrorFrameFrameFrameProcessorPauseFrameFrameProcessorPauseUrgentFrameFrameProcessorResumeFrameFrameProcessorResumeUrgentFrameInterruptionFrame
StartFrameSystemFrameUninterruptibleFrame)LLMTokenUsageMetricsData)BaseObserverFrameProcessedFramePushed)FrameProcessorMetrics)BaseTaskManager)
BaseObjectc                       e Zd ZdZdZdZy)FrameDirectionzDirection of frame flow in the processing pipeline.

    Parameters:
        DOWNSTREAM: Frames flowing from input to output.
        UPSTREAM: Frames flowing back from output to input.
          N)__name__
__module____qualname____doc__
DOWNSTREAMUPSTREAM     T/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/processors/frame_processor.pyr%   r%   7   s     JHr/   r%   FrameProcessorc                   :    e Zd ZU dZeed<   eed<   dZee	   ed<   y)FrameProcessorSetupa  Configuration parameters for frame processor initialization.

    Parameters:
        clock: The clock instance for timing operations.
        task_manager: The task manager for handling async operations.
        observer: Optional observer for monitoring frame processing events.
    clocktask_managerNobserver)
r(   r)   r*   r+   r   __annotations__r"   r6   r
   r   r.   r/   r0   r3   r3   F   s$     !!'+Hh|$+r/   r3   c                   V     e Zd ZdZdZdZ fdZdeee	e
f   f fdZdef fdZ xZS )	FrameProcessorQueuea   A priority queue for systems frames and other frames.

    This is a specialized queue for frame processors that separates and
    prioritizes system frames over other frames. It ensures that `SystemFrame`
    objects are processed before any other frames by using a priority queue.

    r&   r'   c                 >    t         |           d| _        d| _        y)z#Initialize the FrameProcessorQueue.r   N)super__init__"_FrameProcessorQueue__high_counter!_FrameProcessorQueue__low_counterself	__class__s    r0   r<   zFrameProcessorQueue.__init__a   s    r/   itemc                 P  K   |\  }}}t        |t              rD| xj                  dz  c_        t        |   | j
                  | j                  |f       d{    y| xj                  dz  c_        t        |   | j                  | j                  |f       d{    y7 I7 w)a8  Put an item into the priority queue.

        System frames (`SystemFrame`) have higher priority than any other
        frames. If a non-frame item (e.g. a watchdog cancellation sentinel) is
        provided it will have the highest priority.

        Args:
            item (Any): The item to enqueue.

        r&   N)
isinstancer   r=   r;   putHIGH_PRIORITYr>   LOW_PRIORITY)r@   rB   frame_rA   s       r0   rE   zFrameProcessorQueue.putg   s      q!e[)1$'+t1143F3FMNNN!#'+t00$2D2DdKLLL O Ms%   AB&B"AB&B$B&$B&returnc                 H   K   t         |           d{   \  }}}|S 7 
w)zRetrieve the next item from the queue.

        System frames are prioritized. If both queues are empty, this method
        waits until an item is available.

        Returns:
            Any: The next item from the system or main queue.

        N)r;   get)r@   rI   rB   rA   s      r0   rL   zFrameProcessorQueue.getz   s'      !7;=(
1d )s   " ")r(   r)   r*   r+   rF   rG   r<   r   r   r%   FrameCallbackrE   r   rL   __classcell__rA   s   @r0   r9   r9   U   sD     MLMeE>=$HI M&3  r/   r9      c                   \    e Zd ZdZdddddee   dedee   f fdZe	d	e
fd
       Ze	d	efd       Ze	d	ed    fd       Ze	d	ed    fd       Ze	d	ed    fd       Ze	d	ed    fd       Ze	d        Ze	d        Ze	d        Ze	d        Ze	d	ee   fd       Ze	d	efd       Zd Zd	efdZdefdZdddee   fdZ dddee   fdZ!dddee   fd Z"dddee   fd!Z#d"e$fd#Z%d$efd%Z&d& Z'd' Z(d( Z)d\d)e*dee   d	e+jX                  fd*Z-d]d+e+jX                  d,ee   fd-Z.d\d+e+jX                  d,ee   fd.Z/d/e0fd0Z1 fd1Z2d^d2Z3d	e4fd3Z5d	e+jl                  fd4Z7e8jr                  dfd5e:d6e8d7ee;   fd8Z<d9 Z=d: Z>d; Z?d< Z@d5e:d6e8fd=ZA	 	 d_d>ed?eeB   d@efdAZCdBeDfdCZEe8jr                  fd5e:d6e8fdDZFdE ZGdFdGd,efdHZHdIeIe:   fdJZJd5e:fdKZKd5eLfdLZMd5eNfdMZOd5ePeQz  fdNZRd5eSeTz  fdOZUdP ZVd5e:d6e8fdQZWd5e:fdRZXdS ZYdT ZZdU Z[dV Z\dW Z]dX Z^d5e:d6e8d7ee;   fdYZ_dZ Z`d[ Za xZbS )`r1   a6  Base class for all frame processors in the pipeline.

    Frame processors are the building blocks of Pipecat pipelines, they can be
    linked to form complex processing pipelines. They receive frames, process
    them, and pass them to the next or previous processor in the chain.  Each
    frame processor guarantees frame ordering and processes frames in its own
    task. System frames are also processed in a separate task which guarantees
    frame priority.

    Event handlers available:

    - on_before_process_frame: Called before a frame is processed
    - on_after_process_frame: Called after a frame is processed
    - on_before_push_frame: Called before a frame is pushed
    - on_after_push_frame: Called after a frame is pushed
    - on_error: Called when an error is raised in the frame processing.
    NF)nameenable_direct_modemetricsrR   rS   rT   c                   t        |   dd|i| d| _        d| _        || _        d| _        d| _        d| _        d| _        d| _	        d| _
        d| _        g | _        d| _        d| _        d| _        |xs
 t!               | _        | j"                  j%                  | j&                         d| _        t+               | _        d| _        d| _        d| _        t5        j6                         | _        d| _        d| _        d| _        | jA                  dd       | jA                  dd       | jA                  dd       | jA                  d	d       | jA                  d
d       y)aY  Initialize the frame processor.

        Args:
            name: Optional name for this processor instance.
            enable_direct_mode: Whether to process frames immediately or use internal queues.
            metrics: Optional metrics collector for this processor.
            **kwargs: Additional arguments passed to parent class.
        rR   NFon_before_process_frameT)syncon_after_process_frameon_before_push_frameon_after_push_frameon_errorr.   )!r;   r<   _prev_next_enable_direct_mode_clock_task_manager	_observer_enable_metrics_enable_usage_metrics_report_only_initial_ttfb_allow_interruptions_interruption_strategies_deprecated_openaillmcontext_FrameProcessor__started_cancellingr!   _metricsset_processor_namerR   +_FrameProcessor__should_block_system_framesr9   _FrameProcessor__input_queue_FrameProcessor__input_event!_FrameProcessor__input_frame_task$_FrameProcessor__should_block_framesasyncioQueue_FrameProcessor__process_queue_FrameProcessor__process_event#_FrameProcessor__process_frame_task&_FrameProcessor__process_current_frame_register_event_handler)r@   rR   rS   rT   kwargsrA   s        r0   r<   zFrameProcessor.__init__   su     	-d-f-15
15
 $6  ,0 9= 26  %%*").&$)!HJ%,1)  !  :#8#:((3 -2)026::> &+"&}}8<<@!8<$ 	$$%>T$J$$%=D$I$$%;$$G$$%:$F$$Zd$;r/   rJ   c                     | j                   S )z}Get the unique identifier for this processor.

        Returns:
            The unique integer ID of this processor.
        )_idr@   s    r0   idzFrameProcessor.id   s     xxr/   c                     | j                   S )zkGet the name of this processor.

        Returns:
            The name of this processor instance.
        )_namer{   s    r0   rR   zFrameProcessor.name       zzr/   c                     g S )a?  Return the list of sub-processors contained within this processor.

        Only compound processors (e.g. pipelines and parallel pipelines) have
        sub-processors. Non-compound processors will return an empty list.

        Returns:
            The list of sub-processors if this is a compound processor.
        r.   r{   s    r0   
processorszFrameProcessor.processors  s	     	r/   c                     g S )a  Return the list of entry processors for this processor.

        Entry processors are the first processors in a compound processor
        (e.g. pipelines, parallel pipelines). Note that pipelines can also be an
        entry processor as pipelines are processors themselves. Non-compound
        processors will simply return an empty list.

        Returns:
            The list of entry processors.
        r.   r{   s    r0   entry_processorszFrameProcessor.entry_processors  s	     	r/   c                     | j                   S )zxGet the next processor.

        Returns:
            The next processor, or None if there's no next processor.
        )r]   r{   s    r0   nextzFrameProcessor.next%  r   r/   c                     | j                   S )zGet the previous processor.

        Returns:
            The previous processor, or None if there's no previous processor.
        )r\   r{   s    r0   previouszFrameProcessor.previous.  r   r/   c                     ddl }|j                         5  |j                  d       |j                  dt        d       ddd       | j
                  S # 1 sw Y   | j
                  S xY w)zCheck if interruptions are allowed for this processor.

        .. deprecated:: 0.0.99
            Use  `LLMUserAggregator`'s new `user_mute_strategies` parameter instead.

        Returns:
            True if interruptions are allowed.
        r   Nalwaysz}`FrameProcessor.interruptions_allowed` is deprecated. Use `LLMUserAggregator`'s new `user_mute_strategies` parameter instead.r'   
stacklevel)warningscatch_warningssimplefilterwarnDeprecationWarningre   )r@   r   s     r0   interruptions_allowedz$FrameProcessor.interruptions_allowed7  sj     	$$& 	!!(+MMZ"	  	 (((	 (((s   *AA'c                     | j                   S )zuCheck if metrics collection is enabled.

        Returns:
            True if metrics collection is enabled.
        )rb   r{   s    r0   metrics_enabledzFrameProcessor.metrics_enabledN  s     ###r/   c                     | j                   S )zCheck if usage metrics collection is enabled.

        Returns:
            True if usage metrics collection is enabled.
        )rc   r{   s    r0   usage_metrics_enabledz$FrameProcessor.usage_metrics_enabledW  s     )))r/   c                     | j                   S )zCheck if only initial TTFB should be reported.

        Returns:
            True if only initial time-to-first-byte should be reported.
        )rd   r{   s    r0   report_only_initial_ttfbz'FrameProcessor.report_only_initial_ttfb`  s     ---r/   c                     | j                   S )a  Get the interruption strategies for this processor.

        .. deprecated:: 0.0.99
            This function is deprecated, use the new user and bot turn start
            strategies insted.

        Returns:
            Sequence of interruption strategies.
        )rf   r{   s    r0   interruption_strategiesz&FrameProcessor.interruption_strategiesi  s     ,,,r/   c                 N    | j                   st        |  d      | j                   S )zGet the task manager for this processor.

        Returns:
            The task manager instance.

        Raises:
            Exception: If the task manager is not initialized.
        z& TaskManager is still not initialized.)r`   	Exceptionr{   s    r0   r5   zFrameProcessor.task_managerv  s,     !!tf$JKLL!!!r/   c                     g S )a  Return processors that can generate metrics.

        Recursively collects all processors that support metrics generation,
        including those from nested processors.

        Returns:
            List of frame processors that can generate metrics.
        r.   r{   s    r0   processors_with_metricsz&FrameProcessor.processors_with_metrics  s	     	r/   c                      y)zCheck if this processor can generate metrics.

        Returns:
            True if this processor can generate metrics.
        Fr.   r{   s    r0   can_generate_metricsz#FrameProcessor.can_generate_metrics  s     r/   datac                 :    | j                   j                  |       y)zlSet core metrics data for this processor.

        Args:
            data: The metrics data to set.
        N)rj   set_core_metrics_data)r@   r   s     r0   r   z$FrameProcessor.set_core_metrics_data  s     	++D1r/   
start_timer   c                   K   | j                         r=| j                  r0| j                  j                  || j                         d{    yyy7 w)zStart time-to-first-byte metrics collection.

        Args:
            start_time: Optional timestamp to use as the start time. If None,
                uses the current time.
        )r   r   N)r   r   rj   start_ttfb_metricsrd   r@   r   s     r0   r   z!FrameProcessor.start_ttfb_metrics  sV      $$&4+?+?--22%@^@^ 3    ,@&s   AA	A
Aend_timer   c                   K   | j                         rN| j                  rA| j                  j                  |       d{   }|r| j	                  |       d{    yyyy7 #7 
w)zStop time-to-first-byte metrics collection and push results.

        Args:
            end_time: Optional timestamp to use as the end time. If None, uses
                the current time.
        r   N)r   r   rj   stop_ttfb_metrics
push_framer@   r   rH   s      r0   r   z FrameProcessor.stop_ttfb_metrics  s`      $$&4+?+?--9989LLEooe,,,  ,@&L,!   <A&A"A&A$	A&$A&c                   K   | j                         r2| j                  r%| j                  j                  |       d{    yyy7 w)zStart processing metrics collection.

        Args:
            start_time: Optional timestamp to use as the start time. If None,
                uses the current time.
        r   N)r   r   rj   start_processing_metricsr   s     r0   r   z'FrameProcessor.start_processing_metrics  sC      $$&4+?+?--88J8OOO ,@&Os   <AAAc                   K   | j                         rN| j                  rA| j                  j                  |       d{   }|r| j	                  |       d{    yyyy7 #7 
w)zStop processing metrics collection and push results.

        Args:
            end_time: Optional timestamp to use as the end time. If None, uses
                the current time.
        r   N)r   r   rj   stop_processing_metricsr   r   s      r0   r   z&FrameProcessor.stop_processing_metrics  s`      $$&4+?+?--???RREooe,,,  ,@&R,r   tokensc                    K   | j                         rM| j                  r@| j                  j                  |       d{   }|r| j	                  |       d{    yyyy7 #7 
w)ztStart LLM usage metrics collection.

        Args:
            tokens: Token usage information for the LLM.
        N)r   r   rj   start_llm_usage_metricsr   )r@   r   rH   s      r0   r   z&FrameProcessor.start_llm_usage_metrics  s]      $$&4+E+E--??GGEooe,,,  ,F&G,!   ;A%A!A%A#	A%#A%textc                    K   | j                         rM| j                  r@| j                  j                  |       d{   }|r| j	                  |       d{    yyyy7 #7 
w)znStart TTS usage metrics collection.

        Args:
            text: The text being processed by TTS.
        N)r   r   rj   start_tts_usage_metricsr   )r@   r   rH   s      r0   r   z&FrameProcessor.start_tts_usage_metrics  s]      $$&4+E+E--??EEEooe,,,  ,F&E,r   c                    K   | j                         r0| j                  r#| j                  j                          d{    yyy7 w)z/Start text aggregation time metrics collection.N)r   r   rj   start_text_aggregation_metricsr{   s    r0   r   z-FrameProcessor.start_text_aggregation_metrics  s<     $$&4+?+?-->>@@@ ,@&@s   :AAAc                    K   | j                         rL| j                  r?| j                  j                          d{   }|r| j	                  |       d{    yyyy7 #7 
w)z?Stop text aggregation time metrics collection and push results.N)r   r   rj   stop_text_aggregation_metricsr   r@   rH   s     r0   r   z,FrameProcessor.stop_text_aggregation_metrics  sY     $$&4+?+?--EEGGEooe,,,  ,@&G,s!   :A$A A$A"	A$"A$c                    K   | j                          d{    | j                          d{    | j                          d{    y7 57 7 	w)z#Stop all active metrics collection.N)r   r   r   r{   s    r0   stop_all_metricszFrameProcessor.stop_all_metrics  sI     $$&&&**,,,00222 	',2s1   AAAAAAAAA	coroutinec                     |r|  d| }n|  d|j                   j                   }| j                  j                  ||      S )zCreate a new task managed by this processor.

        Args:
            coroutine: The coroutine to run in the task.
            name: Optional name for the task.

        Returns:
            The created asyncio task.
        z::)cr_codeco_namer5   create_task)r@   r   rR   s      r0   r   zFrameProcessor.create_task  sM     V2dV$DV2i//7789D  ,,Y==r/   tasktimeoutc                 X   K   | j                   j                  ||       d{    y7 w)aI  Cancel a task managed by this processor.

        A default timeout if 1 second is used in order to avoid potential
        freezes caused by certain libraries that swallow
        `asyncio.CancelledError`.

        Args:
            task: The task to cancel.
            timeout: Optional timeout for task cancellation.
        N)r5   cancel_task)r@   r   r   s      r0   r   zFrameProcessor.cancel_task  s$      ++D':::s    *(*c                   K   ddl }|j                         5  |j                  d       |j                  dt        d       ddd       |rt        j                  ||       d{    y| d{    y# 1 sw Y   5xY w7 7 w)a)  Wait for a task to complete.

        .. deprecated:: 0.0.81
            This function is deprecated, use `await task` or
            `await asyncio.wait_for(task, timeout)` instead.

        Args:
            task: The task to wait for.
            timeout: Optional timeout for waiting.
        r   Nr   zt`FrameProcessor.wait_for_task()` is deprecated. Use `await task` or `await asyncio.wait_for(task, timeout)` instead.r'   r   )r   r   r   r   r   rq   wait_for)r@   r   r   r   s       r0   wait_for_taskzFrameProcessor.wait_for_task  s      	$$& 	!!(+MMW"	  	 ""4111JJ	 	 2s:   B*A5#B$B%
B/B0B5A>:BBsetupc                 
  K   |j                   | _        |j                  | _        |j                  | _        | j                          | j                  .| j                  j                  | j                         d{    yy7 w)zSet up the processor with required components.

        Args:
            setup: Configuration object containing setup parameters.
        N)	r4   r_   r5   r`   r6   ra   "_FrameProcessor__create_input_taskrj   r   )r@   r   s     r0   r   zFrameProcessor.setup-  sj      kk"// 	  "==$--%%d&8&8999 %9s   A8B:B;Bc                   K   t         |           d{    | j                          d{    | j                          d{    | j                  #| j                  j                          d{    yy7 d7 N7 87 w)zClean up processor resources.N)r;   cleanup"_FrameProcessor__cancel_input_task$_FrameProcessor__cancel_process_taskrj   r?   s    r0   r   zFrameProcessor.cleanup=  sq     go&&(((((***==$--''))) % 	 (*)sC   BA:BA<BA>-B3B 4B<B>B Bc                 j    || _         | |_        t        j                  d|  d| j                           y)zLink this processor to the next processor in the pipeline.

        Args:
            processor: The processor to link to.
        zLinking z -> N)r]   r\   r   debug)r@   	processors     r0   linkzFrameProcessor.linkE  s/     
	xvT$**67r/   c                 N    | j                   st        |  d      | j                   S )zGet the clock used by this processor.

        Returns:
            The clock instance.

        Raises:
            Exception: If the clock is not initialized.
        z  Clock is still not initialized.)r_   r   r{   s    r0   	get_clockzFrameProcessor.get_clockO  s(     {{tf$DEFF{{r/   c                 6    | j                   j                         S )ziGet the event loop used by this processor.

        Returns:
            The asyncio event loop.
        )r5   get_event_loopr{   s    r0   r   zFrameProcessor.get_event_loop\  s       //11r/   rH   	directioncallbackc                    K   | j                   ry| j                  r| j                  |||       d{    y| j                  j	                  |||f       d{    y7 ,7 w)zQueue a frame for processing.

        Args:
            frame: The frame to queue.
            direction: The direction of frame flow.
            callback: Optional callback to call after processing.
        N)ri   r^   _FrameProcessor__process_framerm   rE   r@   rH   r   r   s       r0   queue_framezFrameProcessor.queue_framed  s`      ##&&uiBBB$$((%H)EFFF CFs!   0A#A&A#A!A#!A#c                    K   t        j                  |  d       d| _        | j                  r| j                  j	                          yyw)z"Pause processing of queued frames.z: pausing frame processingTN)r   tracerp   rt   clearr{   s    r0   pause_processing_framesz&FrameProcessor.pause_processing_framesz  sB     v789%)"  &&(     AA
c                    K   t        j                  |  d       d| _        | j                  r| j                  j	                          yyw)z)Pause processing of queued system frames.z!: pausing system frame processingTN)r   r   rl   rn   r   r{   s    r0   pause_processing_system_framesz-FrameProcessor.pause_processing_system_frames  sB     v>?@,0)$$& r   c                    K   t        j                  |  d       | j                  r| j                  j                          yyw)z#Resume processing of queued frames.z: resuming frame processingN)r   r   rt   setr{   s    r0   resume_processing_framesz'FrameProcessor.resume_processing_frames  s:     v89:  $$&     AAc                    K   t        j                  |  d       | j                  r| j                  j                          yyw)z*Resume processing of queued system frames.z": resuming system frame processingN)r   r   rn   r   r{   s    r0   resume_processing_system_framesz.FrameProcessor.resume_processing_system_frames  s:     v?@A""$ r   c                   K   | j                   rZ| j                  r| j                  j                         nd}t        | |||      }| j                   j	                  |       d{    t        |t              r| j                  |       d{    yt        |t              r1| j                          d{    | j                          d{    yt        |t              r| j                  |       d{    yt        |t        t        f      r| j                  |       d{    yt        |t         t"        f      r| j%                  |       d{    yy7 7 7 7 7 n7 @7 w)zProcess a frame.

        Args:
            frame: The frame to process.
            direction: The direction of frame flow.
        r   )r   rH   r   	timestampN)ra   r_   get_timer   on_process_framerD   r   _FrameProcessor__startr   _start_interruptionr   r   _FrameProcessor__cancelr   r   _FrameProcessor__pauser   r   _FrameProcessor__resume)r@   rH   r   r   r   s        r0   process_framezFrameProcessor.process_frame  s+     >>26++,,.1I!##	D ..11$777eZ(,,u%%%01**,,,''))){+--&&& 8:XYZ,,u%%% 9;Z[\--&&& ] 8 &,)&%&s~   A"E-$E%(E-E!(E-6E#7E-E%)E-8E'9/E-(E))/E-E+E-!E-#E-%E-'E-)E-+E-	error_msg	exceptionfatalc                 b   K   t        ||||       }| j                  |       d{    y7 w)a  Creates and pushes an ErrorFrame upstream.

        Creates and pushes an ErrorFrame upstream to notify other processors in the
        pipeline about an error condition. The error frame will include context about
        which processor generated the error.

        Args:
            error_msg: Descriptive message explaining the error condition.
            exception: Optional exception object that caused the error, if available.
                This provides additional context for debugging and error handling.
            fatal: Whether this error should be considered fatal to the pipeline.
                Fatal errors typically cause the entire pipeline to stop processing.
                Defaults to False for non-fatal errors.

        Example::

            ```python
            # Non-fatal error
            await self.push_error("Failed to process audio chunk, skipping")

            # Fatal error with exception context
            try:
                result = some_critical_operation()
            except Exception as e:
                await self.push_error("Critical operation failed", exception=e, fatal=True)
            ```
        )errorr   r   r   )r   N)r   push_error_frame)r@   r   r   r   error_frames        r0   
push_errorzFrameProcessor.push_error  s1     B !y^bc##+#666s   %/-/r   c                   K   |j                   s| |_         | j                  d|       d{    |j                  rdt        j                  |j                  j
                        }|d   }|j                    d|j                   d|j                   d|j                   }n|j                    d|j                   }t        j                  |       | j                  |t        j                         d{    y7 7 w)zaPush an error frame upstream.

        Args:
            error: The error frame to push.
        r[   Nz exception (:z): z error: )r   _call_event_handlerr   	traceback
extract_tb__traceback__filenamelinenor   r   r   r%   r-   )r@   r   tblasterror_messages        r0   r   zFrameProcessor.push_error_frame  s      "EO&&z5999??%%eoo&C&CDBb6D??#<a}CPUP[P[}]   %/x}EM]#ooe^%<%<=== 	: 	>s"   )C=C9CC=3C;4C=;C=c                    K   | j                  |      sy| j                  d|       d{    | j                  ||       d{    | j                  d|       d{    y7 97 !7 	w)zPush a frame to the next processor in the pipeline.

        Args:
            frame: The frame to push.
            direction: The direction to push the frame.
        NrY   rZ   )_check_startedr  $_FrameProcessor__internal_push_frame)r@   rH   r   s      r0   r   zFrameProcessor.push_frame  sk      ""5)&&'=uEEE((	:::&&'<eDDD	 	F:Ds3   (A*A$A*A&A*A(A*&A*(A*c                    K   t        j                  |  d       | j                          | j                          d{    | j	                  t
               d{    y7 "7 w)z>Broadcast an `InterruptionFrame` both upstream and downstream.z: broadcasting interruptionN)r   r   #_FrameProcessor__reset_process_taskr   broadcast_framer   r{   s    r0   broadcast_interruptionz%FrameProcessor.broadcast_interruption  sV     v89:!!###%%%""#4555 	&5s!   <A%A!A%A#A%#A%g      @)r   c                   K   ddl }|j                         5  |j                  d       |j                  dt        d       ddd       | j                          d{    y# 1 sw Y   "xY w7 w)a  Push an interruption task frame upstream and wait for the interruption.

        .. deprecated:: 0.0.104
            Use :meth:`broadcast_interruption` instead. This method now
            delegates to ``broadcast_interruption()`` and ignores *timeout*.
        r   Nr   z~`FrameProcessor.push_interruption_task_frame_and_wait()` is deprecated. Use `FrameProcessor.broadcast_interruption()` instead.r'   r   )r   r   r   r   r   r  )r@   r   r   s      r0   %push_interruption_task_frame_and_waitz4FrameProcessor.push_interruption_task_frame_and_wait  sp      	$$& 	!!(+MMI"	  	 ))+++	 	 	,s(   A0*A"A0A.A0"A+'A0	frame_clsc                    K    |di |} |di |}|j                   |_        |j                   |_        | j                  |       d{    | j                  |t        j                         d{    y7 -7 w)a  Broadcasts a frame of the specified class upstream and downstream.

        This method creates two instances of the given frame class using the
        provided keyword arguments (without deep-copying them) and pushes them
        upstream and downstream.

        Args:
            frame_cls: The class of the frame to be broadcasted.
            **kwargs: Keyword arguments to be passed to the frame's constructor.
        Nr.   )r|   broadcast_sibling_idr   r%   r-   )r@   r  rx   downstream_frameupstream_frames        r0   r  zFrameProcessor.broadcast_frame  sv      %.v.",V,0>0A0A-.>.A.A+oo.///oonn.E.EFFF 	0Fs$   AA;	A7
'A;1A92A;9A;c                 
  K   t        |      }t        j                  |      D ci c]0  }|j                  s|j                  t        ||j                        2 }}t        j                  |      D ci c]=  }|j                  s/|j                  dvr!|j                  t        ||j                        ? }} |di |}|j                         D ]  \  }}t        |||         |di |}	|j                         D ]  \  }}t        |	||        |	j                  |_	        |j                  |	_	        | j                  |       d{    | j                  |	t        j                         d{    yc c}w c c}w 7 77 w)a  Broadcasts a frame instance upstream and downstream.

        This method creates two new frame instances shallow-copying all fields
        from the original frame except `id` and `name`, which get fresh values.

        Args:
            frame: The frame instance to broadcast.

        Note:
            Prefer using `broadcast_frame()` when possible, as it is more
            efficient. This method should only be used when you are not the
            creator of the frame and need to broadcast an existing instance.
        )r|   rR   Nr.   )typedataclassesfieldsinitrR   getattritemssetattrr|   r  r   r%   r-   )
r@   rH   r  finit_fieldsextra_fieldsr  kvr  s
             r0   broadcast_frame_instancez'FrameProcessor.broadcast_frame_instance(  sm     K	?J?Q?QRW?Xc!\]\b\bqvvwuaff55cc !''.
66affN: FFGE166**
 
 %3{3 &&( 	,DAq$a+	, #1[1 &&( 	*DAqNAq)	* 1?0A0A-.>.A.A+oo.///oonn.E.EFFF% d
  	0FsA   #FE5#E5F3AE:5BFE?'F/F0FFc                 
  K   d| _         |j                  | _        |j                  | _        |j
                  | _        |j                  | _        |j                  | _
        d|j                  v | _        | j                          yw)zHandle the start frame to initialize processor state.

        Args:
            frame: The start frame containing initialization parameters.
        Tdeprecated_openaillmcontextN)rh   allow_interruptionsre   enable_metricsrb   enable_usage_metricsrc   r   rf   r   rd   metadatarg   $_FrameProcessor__create_process_taskr   s     r0   __startzFrameProcessor.__startK  su      $)$=$=!$33%*%?%?"(-(E(E%).)G)G& -JU^^,[)""$s   BBc                 N   K   d| _         | j                          d{    y7 w)zqHandle the cancel frame to stop processor operation.

        Args:
            frame: The cancel frame.
        TN)ri   r   r   s     r0   __cancelzFrameProcessor.__cancel]  s"       ((***s   %#%c                    K   |j                   j                  | j                  k(  r| j                          d{    yy7 w)zlHandle pause frame to pause processor operation.

        Args:
            frame: The pause frame.
        N)r   rR   r   r   s     r0   __pausezFrameProcessor.__pausef  s8      ??499,..000 -0   7AA Ac                    K   |j                   j                  | j                  k(  r| j                          d{    yy7 w)zoHandle resume frame to resume processor operation.

        Args:
            frame: The resume frame.
        N)r   rR   r   r   s     r0   __resumezFrameProcessor.__resumeo  s8      ??499,//111 -1r5  c                 "  K   	 t        | j                  t              r| j                          y| j	                          d{    | j                          y7 # t        $ r)}| j                  d| |       d{  7   Y d}~yd}~ww xY ww)z;Start handling an interruption by cancelling current tasks.Nz1Uncaught exception handling _start_interruption: r   r   )rD   rv   r   $_FrameProcessor__reset_process_queuer   r/  r   r   )r@   es     r0   r   z"FrameProcessor._start_interruption|  s     	$668LM **, 00222**, 3 	//MaSQ "   	sV   B*A BA AA BA 	B#B<A?=BBBBc                   K   	 | j                   r| j                   j                         nd}|t        j                  k(  r| j                  rt        j                  d| d|  d| j                          | j                  r=t        | | j                  |||      }| j                  j                  |       d{    | j                  j                  ||       d{    y|t        j                  k(  r| j                  rt        j                  d| d|  d| j                          | j                  r=t        | | j                  |||      }| j                  j                  |       d{    | j                  j                  ||       d{    yyy7 7 7 /7 # t        $ r)}| j                  d| |	       d{  7   Y d}~yd}~ww xY ww)
zInternal method to push frames to adjacent processors.

        Args:
            frame: The frame to push.
            direction: The direction to push the frame.
        r   zPushing z downstream from z to )sourcedestinationrH   r   r   Nz upstream from zUncaught exception: r9  )r_   r   r%   r,   r]   r   r   ra   r    on_push_framer   r-   r\   r   r   )r@   rH   r   r   r   r;  s         r0   __internal_push_framez$FrameProcessor.__internal_push_frame  s    	U26++,,.1IN555$**xw.?vT$**VW>>&#$(JJ#"+"+D ..66t<<<jj,,UI>>>n555$**xwodV4

|TU>>&#$(JJ#"+"+D ..66t<<<jj,,UI>>> ;E5 => => 	U//.B1#,FRS/TTT	Us   GB3F  7F8#F  FF   G!BF  ,F-#F  FF  GF  F  F  F   	G)GGGGGGc                 h    | j                   st        j                  |  d| d       | j                   S )zCheck if the processor has been started.

        Args:
            frame: The frame being processed.

        Returns:
            True if the processor has been started.
        z Trying to process z  but StartFrame not received yet)rh   r   r   r   s     r0   r  zFrameProcessor._check_started  s0     ~~LLD6!4UG;[\]~~r/   c                     | j                   ry| j                  s>t        j                         | _        | j                  | j                               | _        yy)z'Create the frame input processing task.N)r^   ro   rq   Eventrn   r   )_FrameProcessor__input_frame_task_handlerr{   s    r0   __create_input_taskz"FrameProcessor.__create_input_task  sF    ##&&!(D&*&6&6t7V7V7X&YD# 'r/   c                    K   | j                   r0| j                  | j                   t               d{    d| _         yy7 w)z'Cancel the frame input processing task.N)ro   r   INPUT_TASK_CANCEL_TIMEOUT_SECSr{   s    r0   __cancel_input_taskz"FrameProcessor.__cancel_input_task  sA     "" ""4#:#:<Z[[[&*D# # \s   0AA Ac                     | j                   ry| j                  s5| j                          | j                  | j	                               | _        yy)z,Create the non-system frame processing task.N)r^   ru   r  r   +_FrameProcessor__process_frame_task_handlerr{   s    r0   __create_process_taskz$FrameProcessor.__create_process_task  sD    ##((%%'(,(8(89Z9Z9\(]D% )r/   c                 ~    | j                   ryd| _        t        j                         | _        | j                          y)z'Reset non-system frame processing task.NF)r^   rp   rq   rC  rt   r:  r{   s    r0   __reset_process_taskz#FrameProcessor.__reset_process_task  s0    ##%*"&}}""$r/   c                    t        j                         }| j                  j                         su| j                  j	                         }|d   }t        |t              r|j                  |       | j                  j                          | j                  j                         su|j                         sM|j	                         }| j                  j                  |       |j                          |j                         sLyy)z(Reset non-system frame processing queue.r   N)	rq   rr   rs   empty
get_nowaitrD   r   
put_nowait	task_done)r@   	new_queuerB   rH   s       r0   __reset_process_queuez$FrameProcessor.__reset_process_queue  s     MMO	 &&,,.''224DGE%!56$$T*  **, &&,,. //#'')D  ++D1! //#r/   c                 ~   K   | j                   r+| j                  | j                          d{    d| _         yy7 w)z,Cancel the non-system frame processing task.N)ru   r   r{   s    r0   __cancel_process_taskz$FrameProcessor.__cancel_process_task  s9     $$""4#<#<===(,D% %=s   +=;=c                 L  K   	 | j                  d|       d {    | j                  ||       d {    |r || ||       d {    | j                  d|       d {    y 7 M7 57 #7 # t        $ r)}| j                  d| |       d {  7   Y d }~y d }~ww xY ww)NrV   rX   zError processing frame: r9  )r  r   r   r   )r@   rH   r   r   r;  s        r0   __process_framezFrameProcessor.__process_frame  s     	Y**+DeLLL $$UI666tUI666**+CUKKK M 7 7K 	Y//.Fqc,JVW/XXX	Ys~   B$A/ A'A/ A)A/ A+A/ !A-"A/ &B$'A/ )A/ +A/ -A/ /	B!8BBBB$B!!B$c                   K   	 | j                   j                          d{   \  }}}| j                  r| j                  rst	        j
                  |  d       | j                  j                          d{    | j                  j                          d| _        t	        j
                  |  d       t        |t              r| j                  |||       d{    nM| j                  r'| j                  j                  |||f       d{    nt        |  d|j                         | j                   j                          F7 )7 7 s7 Bw)zHandle frames from the input queue.

        It only processes system frames. Other frames are queue for another task
        to execute.

        Nz : system frame processing pausedFz!: system frame processing resumedz0: __process_queue is None when processing frame )rm   rL   rl   rn   r   r   waitr   rD   r   r   rs   rE   RuntimeErrorrR   rR  r   s       r0   __input_frame_task_handlerz)FrameProcessor.__input_frame_task_handler  s-     151C1C1G1G1I+I(UIx00T5G5Gv%EFG((--///""((*491v%FGH%-**5)XFFF%%**..y(/KLLL"fLUZZLY  ((*' +I 0 GLsG   EE
AE7E8A#EE2EE<EEEEc                   K   	 d| _         | j                  j                          d{   \  }}}|| _         | j                  r| j                  rst        j                  |  d       | j                  j                          d{    | j                  j                          d| _        t        j                  |  d       | j                  |||       d{    | j                  j                          7 7 u7 #w)z0Handle non-system frames from the process queue.Nz: frame processing pausedFz: frame processing resumed)rv   rs   rL   rp   rt   r   r   rZ  r   r   rR  r   s       r0   __process_frame_task_handlerz+FrameProcessor.__process_frame_task_handler(  s     +/D(151E1E1I1I1K+K(UIx+0D())d.B.Bv%>?@**//111$$**,-2*v%?@A&&uiBBB  **,!  ,L 2
 Cs5   &C?C9AC?C;AC?C= C?;C?=C?)N)g      ?)r   r1   )NF)cr(   r)   r*   r+   r
   strboolr!   r<   propertyintr|   rR   r	   r   r   r   r   r   r   r   r   r   r   r   r"   r5   r   r   r   r   floatr   r   r   r   r   r   r   r   r   r   r   rq   Taskr   r   r   r3   r   r   r   r   r   AbstractEventLoopr   r%   r,   r   rM   r   r   r   r   r   r   r   r   r   r   r   r  r  r   r  r(  r   r   r   r   r   r   r   r   r   r   r   r  r  r   r   r/  r  r:  r   r   rD  rJ  rN   rO   s   @r0   r1   r1      sh   * ##(37W< smW< !	W<
 /0W<r C   c   	D!12 	 	 $'7"8   h/0   (#34   ) ), $ $ * * . . 
-2J)K 
- 
- "o " "	d 2+ 2 IM 
huo 
 FJ 
-8E? 
- OS PHUO P LP 
-% 
-	-M 	-	-# 	-A
-3>Y >hsm >w|| > ;gll ;Xe_ ; x 6:!4 : *89 2 9 9 2 %3$=$=,0	GG "G =)	G,)''%' '> '@ *.	"7"7 I&"7 	"7H>J >, JXIbIb Ee E E 6 OR ,e ,(GtE{ G$!GE !GF%: %$+K +1#;>\#\ 12$=@_$_ 2"#U #U> #UJE Z+^%"&-YY'5YAI-AXY +8-r/   )7r+   rq   r  r  r   enumr   typingr   r   r   r   r	   r
   r   r   r   logurur   6pipecat.audio.interruptions.base_interruption_strategyr   pipecat.clocks.base_clockr   pipecat.frames.framesr   r   r   r   r   r   r   r   r   r   r   pipecat.metrics.metricsr   r   pipecat.observers.base_observerr   r   r    2pipecat.processors.metrics.frame_processor_metricsr!   "pipecat.utils.asyncio.task_managerr"   pipecat.utils.base_objectr#   r%   rM   r3   PriorityQueuer9   rG  r1   r.   r/   r0   <module>rr     s       ! 
 
 
  [ /    ? U U T > 0	T 	 *E>BIdOST , , ,0'// 0j "# m-Z m-r/   