
    qi/                     ,   d Z ddlZddlZddlZddlmZ ddlmZm	Z	m
Z
mZmZmZmZmZmZmZ ddlmZ ddlmZmZmZ ddlmZ ddlmZ dd	lmZ dd
lmZm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z- ddl.m/Z/m0Z0 ddl1m2Z2m3Z3 ddl4m5Z5 ddl6m7Z7 ddl8m9Z9 ddl:m;Z;m<Z< ddl=m>Z>m?Z?m@Z@ ddlAmBZB ddlCmDZD ddlEmFZFmGZGmHZH ddlImJZJmKZKmLZL ddlMmNZNmOZOmPZP ddlQmRZR ddlSmTZT ddlUmVZV dZWeWdz  ZXdZYdZZ ed      Z[ G d d e2      Z\ G d! d"e      Z] G d# d$e;      Z^y)%a  Pipeline task implementation for managing frame processing pipelines.

This module provides the main PipelineTask class that orchestrates pipeline
execution, frame routing, lifecycle management, and monitoring capabilities
including heartbeats, idle detection, and observer integration.
    N)Path)
AnyAsyncIterableDictIterableListOptionalSetTupleTypeTypeVar)logger)	BaseModel
ConfigDictField)BaseInterruptionStrategy)	BaseClock)SystemClock)BotSpeakingFrameCancelFrameCancelTaskFrameEndFrameEndTaskFrame
ErrorFrameFrameHeartbeatFrameInterruptionFrameInterruptionTaskFrameMetricsFrame
StartFrame	StopFrameStopTaskFrameUserSpeakingFrame)ProcessingMetricsDataTTFBMetricsData)BaseObserverFramePushed)TurnTrackingObserver)UserBotLatencyObserver)BasePipeline)BasePipelineTaskPipelineTaskParams)PipelinePipelineSinkPipelineSource)TaskObserver)LLMUserContextAggregator)FrameDirectionFrameProcessorFrameProcessorSetup)RTVIObserverRTVIObserverParamsRTVIProcessor)BaseTaskManagerTaskManagerTaskManagerParams)is_tracing_available)TracingContext)TurnTraceObserverg      ?
   i,  g      4@Tc                   \     e Zd ZdZdej
                  deee   df   f fdZ	de
fdZ xZS )IdleFrameObserverzIdle timeout observer.

    This observer waits for specific frames being generated in the pipeline. If
    the frames are generated the given asyncio event is set. If the event is not
    set it means the pipeline is probably idle.

    
idle_eventidle_timeout_frames.c                \    t         |           || _        || _        t	               | _        y)zInitialize the observer.

        Args:
            idle_event: The event to set if the idle timeout frames are being pushed.
            idle_timeout_frames: A tuple with the frames that should set the event when received
        N)super__init___idle_event_idle_timeout_framesset_processed_frames)selfrB   rC   	__class__s      G/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/pipeline/task.pyrF   zIdleFrameObserver.__init__O   s*     	%$7!!$    datac                 Z  K   |j                   j                  | j                  v ry| j                  j                  |j                   j                         t	        |j                   t
              s t	        |j                   | j                        r| j                  j                          yyw)z~Callback executed when a frame is pushed in the pipeline.

        Args:
            data: The frame push event data.
        N)	frameidrJ   add
isinstancer    rH   rG   rI   )rK   rO   s     rM   on_push_framezIdleFrameObserver.on_push_frame[   sv      ::==D222""4::==1djj*-DJJHaHa1b  " 2cs   B)B+)__name__
__module____qualname____doc__asyncioEventr   r   r   rF   r'   rU   __classcell__rL   s   @rM   rA   rA   F   s=    
'gmm 
'%PTUZP[]`P`Ja 
'# #rN   rA   c                      e Zd ZU dZ ed      ZdZeed<   dZ	e
ed<   dZe
ed<   d	Zeed
<   d	Zeed<   d	Zeed<   eZeed<    ee      Zee   ed<    ee      Zee   ed<   d	Zeed<   dZeed<    ee      Zeeef   ed<   y)PipelineParamsa  Configuration parameters for pipeline execution.

    These parameters are usually passed to all frame processors through
    StartFrame. For other generic pipeline task parameters use PipelineTask
    constructor arguments instead.

    Parameters:
        allow_interruptions: Whether to allow pipeline interruptions.

            .. deprecated:: 0.0.99
                Use  `LLMUserAggregator`'s new `user_turn_strategies` parameter instead.

        audio_in_sample_rate: Input audio sample rate in Hz.
        audio_out_sample_rate: Output audio sample rate in Hz.
        enable_heartbeats: Whether to enable heartbeat monitoring.
        enable_metrics: Whether to enable metrics collection.
        enable_usage_metrics: Whether to enable usage metrics.
        heartbeats_period_secs: Period between heartbeats in seconds.
        interruption_strategies: [deprecated] Strategies for bot interruption behavior.

            .. deprecated:: 0.0.99
                Use  `LLMUserAggregator`'s new `user_turn_strategies` parameter instead.

        observers: [deprecated] Use `observers` arg in `PipelineTask` class.

            .. deprecated:: 0.0.58
                Use the `observers` argument in the `PipelineTask` class instead.

        report_only_initial_ttfb: Whether to report only initial time to first byte.
        send_initial_empty_metrics: Whether to send initial empty metrics.
        start_metadata: Additional metadata for pipeline start.
    T)arbitrary_types_allowedallow_interruptionsi>  audio_in_sample_ratei]  audio_out_sample_rateFenable_heartbeatsenable_metricsenable_usage_metricsheartbeats_period_secs)default_factoryinterruption_strategies	observersreport_only_initial_ttfbsend_initial_empty_metricsstart_metadataN) rV   rW   rX   rY   r   model_configra   bool__annotations__rb   intrc   rd   re   rf   HEARTBEAT_SECSrg   floatr   listri   r   r   rj   r&   rk   rl   dictrm   r   strr    rN   rM   r_   r_   k   s    B d;L $$ %#%!&3&#t# ND !&$&$2E2>CTX>YT":;Y$)$$?ItL!?%*d*'++%*4%@NDcN@rN   r_   c            $           e Zd ZdZdddeddddddeefeddddddede	e
   de	e   d	ed
edede	e   de	e   dedededeee   df   de	e   de	ee      de	e   de	e   de	e   f" fdZede
fd       Zedefd       Zede	e   fd       Zede	e   fd       Zedefd       Z edeee   df   fd       Z!edeee   df   fd        Z"d!ef fd"Z#d#efd$Z$d#efd%Z%d&eee   df   fd'Z&d&eee   df   fd(Z'd&eee   df   fd)Z(d&eee   df   fd*Z)defd+Z*d, Z+dd-d.e	e   fd/Z,de-fd0Z.e/j`                  fd1ed2e/fd3Z1e/j`                  fd4e2e   e3e   z  d2e/fd5Z4dd-d.e	e   fd6Z5d7 Z6d8 Z7d9 Z8d: Z9d; Z:d< Z;de<fd=Z=d1efd>Z>d1efd?Z?d@ Z@de-fdAZAdBefdCZBdD ZCd1ed2e/fdEZDd1ed2e/fdFZEdG ZFdH ZGdI ZHdefdJZIdK ZJdL ZKdM ZLdeMeeNf   fdNZOdOePdPeeQ   de	eQ   fdQZR xZSS )RPipelineTaska/	  Manages the execution of a pipeline, handling frame processing and task lifecycle.

    This class orchestrates pipeline execution with comprehensive monitoring,
    event handling, and lifecycle management. It provides event handlers for
    various pipeline states and frame types, idle detection, heartbeat monitoring,
    and observer integration.

    Event handlers available:

    - on_frame_reached_upstream: Called when upstream frames reach the source
    - on_frame_reached_downstream: Called when downstream frames reach the sink
    - on_idle_timeout: Called when pipeline is idle beyond timeout threshold
    - on_pipeline_started: Called when pipeline starts with StartFrame
    - on_pipeline_stopped: [deprecated] Called when pipeline stops with StopFrame

            .. deprecated:: 0.0.86
                Use `on_pipeline_finished` instead.

    - on_pipeline_ended: [deprecated] Called when pipeline ends with EndFrame

            .. deprecated:: 0.0.86
                Use `on_pipeline_finished` instead.

    - on_pipeline_cancelled: [deprecated] Called when pipeline is cancelled with CancelFrame

            .. deprecated:: 0.0.86
                Use `on_pipeline_finished` instead.

    - on_pipeline_finished: Called after the pipeline has reached any terminal state.
          This includes:

              - StopFrame: pipeline was stopped (processors keep connections open)
              - EndFrame: pipeline ended normally
              - CancelFrame: pipeline was cancelled

          Use this event for cleanup, logging, or post-processing tasks. Users can inspect
          the frame if they need to handle specific cases.

    - on_pipeline_error: Called when an error occurs with ErrorFrame

    Example::

        @task.event_handler("on_frame_reached_upstream")
        async def on_frame_reached_upstream(task, frame):
            ...

        @task.event_handler("on_idle_timeout")
        async def on_pipeline_idle_timeout(task):
            ...

        @task.event_handler("on_pipeline_started")
        async def on_pipeline_started(task, frame):
            ...

        @task.event_handler("on_pipeline_finished")
        async def on_pipeline_finished(task, frame):
            ...

        @task.event_handler("on_pipeline_error")
        async def on_pipeline_error(task, frame):
            ...
    NTF)paramsadditional_span_attributescancel_on_idle_timeoutcancel_timeout_secscheck_dangling_tasksclockconversation_idenable_tracingenable_turn_trackingenable_rtvirC   idle_timeout_secsrj   rtvi_processorrtvi_observer_paramstask_managerpipelinerz   r{   r|   r}   r~   r   r   r   r   r   rC   .r   rj   r   r   r   c                
   t         |           |xs
 t               | _        |xs i | _        || _        || _        || _        |xs
 t               | _	        || _
        |	xr
 t               | _        |
| _        || _        | j                  j                  rZddl}|j#                         5  |j%                  d       |j'                  dt(               ddd       | j                  j                  }|xs g }d| _        d| _        d| _        d| _        | j                  r*t3               | _        |j5                  | j*                         | j                  r| j*                  rt7               | _        t9               | _        |j5                  | j,                         t;        | j*                  | j,                  | j                  | j                  | j0                        | _        |j5                  | j.                         d| _        d| _        |xs
 tA               | _!        tE        jF                         | _$        d| _%        tE        jF                         | _&        d| _'        d| _(        d| _)        d}| jU                  |tV              }tY        d |D              }|r|st[        j\                  |  d       n|s|rt[        j\                  |  d	       nf|r"|r t[        j^                  |  d
       || _)        nB|r@|xs
 tW               | _)        |j5                  | jR                  ja                  |             d}| jR                  r(| jb                  je                  d      dtV        fd       }tE        jf                         | _4        d| _5        | j                  r(tm        | jh                  |      }|j5                  |       tE        jf                         | _7        tE        jf                         | _8        tE        jf                         | _9        tu        | jv                  |  d      }ty        | jz                  |  d      | _>        |r| jR                  |gn|g}t        ||| j|                        | _@        t        || jB                        | _B        t               | _D        t               | _E        | j                  d       | j                  d       | j                  d       | j                  d       | j                  d       | j                  d       | j                  d       | j                  d       | j                  d       y# 1 sw Y   txY w)as  Initialize the PipelineTask.

        Args:
            pipeline: The pipeline to execute.
            params: Configuration parameters for the pipeline.
            additional_span_attributes: Optional dictionary of attributes to propagate as
                OpenTelemetry conversation span attributes.
            cancel_on_idle_timeout: Whether the pipeline task should be cancelled if
                the idle timeout is reached.
            cancel_timeout_secs: Timeout (in seconds) to wait for cancellation to happen
                cleanly.
            check_dangling_tasks: Whether to check for processors' tasks finishing properly.
            clock: Clock implementation for timing operations.
            conversation_id: Optional custom ID for the conversation.
            enable_rtvi: Whether to automatically add RTVI support to the pipeline.
            enable_tracing: Whether to enable tracing.
            enable_turn_tracking: Whether to enable turn tracking.
            idle_timeout_frames: A tuple with the frames that should trigger an idle
                timeout if not received within `idle_timeout_seconds`.
            idle_timeout_secs: Timeout (in seconds) to consider pipeline idle or
                None. If a pipeline is idle the pipeline task will be cancelled
                automatically.
            observers: List of observers for monitoring pipeline execution.
            rtvi_observer_params: The RTVI observer parameter to use if RTVI is enabled.
            rtvi_processor: The RTVI processor to add if RTVI is enabled.
            task_manager: Optional task manager for handling asyncio tasks.
        r   NalwayszGField 'observers' is deprecated, use the 'observers' parameter instead.)latency_trackerr   r{   tracing_contextFc              3   <   K   | ]  }t        |t                y wN)rT   r5   ).0os     rM   	<genexpr>z(PipelineTask.__init__.<locals>.<genexpr>O  s     %UajL&A%Us   zZ: RTVIProcessor found in pipeline but no RTVIObserver in observers. Make sure to add both.zZ: RTVIObserver found in observers but no RTVIProcessor in pipeline. Make sure to add both.z|: RTVIProcessor and RTVIObserver found, skipping default ones. They are both added by default, no need to add them yourself.)rz   Ton_client_readyrtvic                 @   K   | j                          d {    y 7 wr   )set_bot_ready)r   s    rM   r   z.PipelineTask.__init__.<locals>.on_client_readyh  s     ((***s   )rB   rC   z::Source)namez::Sink)sourcesink)rj   r   on_frame_reached_upstreamon_frame_reached_downstreamon_idle_timeouton_pipeline_startedon_pipeline_stoppedon_pipeline_endedon_pipeline_cancelledon_pipeline_finishedon_pipeline_error)GrE   rF   r_   _params_additional_span_attributes_cancel_on_idle_timeout_cancel_timeout_secs_check_dangling_tasksr   _clock_conversation_idr;   _enable_tracing_enable_turn_tracking_idle_timeout_secsrj   warningscatch_warningssimplefilterwarnDeprecationWarning_turn_tracking_observer_user_bot_latency_observer_turn_trace_observer_tracing_contextr(   appendr<   r)   r=   	_finished
_cancelledr9   _task_managerrZ   Queue_push_queue_process_push_task_heartbeat_queue_heartbeat_push_task_heartbeat_monitor_task_rtvi_find_processorr7   anyr   errorwarningcreate_rtvi_observerr   event_handlerr[   rG   _idle_monitor_taskrA   _pipeline_start_event_pipeline_end_event_pipeline_finished_eventr/   _source_push_framer.   _sink_push_frame_sinkr-   	_pipeliner0   	_observerrI   _reached_upstream_types_reached_downstream_types_register_event_handler)rK   r   rz   r{   r|   r}   r~   r   r   r   r   r   rC   r   rj   r   r   r   r   prepend_rtviexternal_rtviexternal_observer_foundr   idle_frame_observerr   
processorsrL   s                             rM   rF   zPipelineTask.__init__   s   ` 	1!1+E+K('=$$7!%9",{} /-H2F2H%9""3<<!!((* %%h/]& ..IO	GK$LP'AE!:>%%+?+AD(T99:D$@$@$2$4D!.D.FD+T<<=(9,, $ ? ? $ 5 5+/+K+K $ 5 5)D% T667 *:[] #==?:> !(<@!?C$ 
,,X}E"%%U9%U"U!8LL& ) ) #:LL& ) ) 6NN& P P 'DJ':=?DJTZZ<<DX<YZL::YY$$%67+M + 8+ #==?:>"""3++$7# 01 &-]]_" $+==?  )0%   7 7h>OP!$"7"7foN
 0<djj(+(
!*V$**M
 &	HZHZ[ :=$;>5&$$%@A$$%BC$$%67$$%:;$$%:;$$%89$$%<=$$%;<$$%89S s   ((UUreturnc                     | j                   S )zxGet the pipeline parameters for this task.

        Returns:
            The pipeline parameters configuration.
        )r   rK   s    rM   rz   zPipelineTask.params  s     ||rN   c                     | j                   S )zGet the full pipeline managed by this pipeline task.

        This will also include any internal processors added by the pipeline task.

        Returns:
            The pipeline managed by the pipeline task.
        )r   r   s    rM   r   zPipelineTask.pipeline       ~~rN   c                     | j                   S )zGet the turn tracking observer if enabled.

        Returns:
            The turn tracking observer instance or None if not enabled.
        )r   r   s    rM   turn_tracking_observerz#PipelineTask.turn_tracking_observer  s     +++rN   c                     | j                   S )zGet the turn trace observer if enabled.

        Returns:
            The turn trace observer instance or None if not enabled.
        )r   r   s    rM   turn_trace_observerz PipelineTask.turn_trace_observer  s     (((rN   c                 N    | j                   st        |  d      | j                   S )zGet the RTVI processor if RTVI is enabled.

        Returns:
            The RTVI processor added to the pipeline when RTVI is enabled.
        z RTVI is not enabled.)r   	Exceptionr   s    rM   r   zPipelineTask.rtvi  s(     zztf$9:;;zzrN   c                 ,    t        | j                        S )zGet the currently configured upstream frame type filters.

        Returns:
            Tuple of frame types that trigger the on_frame_reached_upstream event.
        )tupler   r   s    rM   reached_upstream_typesz#PipelineTask.reached_upstream_types  s     T1122rN   c                 ,    t        | j                        S )zGet the currently configured downstream frame type filters.

        Returns:
            Tuple of frame types that trigger the on_frame_reached_downstream event.
        )r   r   r   s    rM   reached_downstream_typesz%PipelineTask.reached_downstream_types  s     T3344rN   
event_namec                     |dv rHddl }|j                         5  |j                  d       |j                  d| dt               ddd       t
        |   |      S # 1 sw Y   xY w)zDecorator for registering event handlers.

        Args:
            event_name: The name of the event to handle.

        Returns:
            The decorator function that registers the handler.
        )r   r   r   r   Nr   zEvent 'z4' is deprecated, use 'on_pipeline_finished' instead.)r   r   r   r   r   rE   r   )rK   r   r   rL   s      rM   r   zPipelineTask.event_handler  sn     ^^((* %%h/j\)]^& w$Z00 s   ,AA&observerc                 :    | j                   j                  |       y)zAdd an observer to monitor pipeline execution.

        Args:
            observer: The observer to add to the pipeline monitoring.
        N)r   add_observerrK   r   s     rM   r   zPipelineTask.add_observer  s     	##H-rN   c                 V   K   | j                   j                  |       d{    y7 w)zRemove an observer from pipeline monitoring.

        Args:
            observer: The observer to remove from pipeline monitoring.
        N)r   remove_observerr   s     rM   r   zPipelineTask.remove_observer  s       nn,,X666s   )')typesc                 $    t        |      | _        y)zSet which frame types trigger the on_frame_reached_upstream event.

        Args:
            types: Tuple of frame types to monitor for upstream events.
        N)rI   r   rK   r   s     rM   set_reached_upstream_filterz(PipelineTask.set_reached_upstream_filter  s     (+5z$rN   c                 $    t        |      | _        y)zSet which frame types trigger the on_frame_reached_downstream event.

        Args:
            types: Tuple of frame types to monitor for downstream events.
        N)rI   r   r   s     rM   set_reached_downstream_filterz*PipelineTask.set_reached_downstream_filter  s     *-U&rN   c                 :    | j                   j                  |       y)zAdd frame types to trigger the on_frame_reached_upstream event.

        Args:
            types: Tuple of frame types to add to upstream monitoring.
        N)r   updater   s     rM   add_reached_upstream_filterz(PipelineTask.add_reached_upstream_filter  s     	$$++E2rN   c                 :    | j                   j                  |       y)zAdd frame types to trigger the on_frame_reached_downstream event.

        Args:
            types: Tuple of frame types to add to downstream monitoring.
        N)r   r   r   s     rM   add_reached_downstream_filterz*PipelineTask.add_reached_downstream_filter&  s     	&&--e4rN   c                     | j                   S )zCheck if the pipeline task has finished execution.

        This indicates whether the tasks has finished, meaninig all processors
        have stopped.

        Returns:
            True if all processors have stopped and the task is complete.
        )r   r   s    rM   has_finishedzPipelineTask.has_finished.  r   rN   c                    K   t        j                  d|  d       | j                  t                      d{    y7 w)zSchedule the pipeline to stop after processing all queued frames.

        Sends an EndFrame to gracefully terminate the pipeline once all
        current processing is complete.
        zTask z scheduled to stop when doneN)r   debugqueue_framer   r   s    rM   stop_when_donezPipelineTask.stop_when_done9  s4      	uTF">?@xz***s   6A >A reasonr  c                ^   K   | j                   s| j                  |       d{    yy7 w)zRequest the running pipeline to cancel.

        Args:
            reason: Optional reason to indicate why the pipeline is being cancelled.
        r   N)r   _cancelrK   r  s     rM   cancelzPipelineTask.cancelB  s+      ~~,,f,--- -s   "-+-c                 Z  K   | j                         ry| j                  |       d{    | j                          d{    	 | j                          d{    	 t        j                  d|  d       | j                          d{    | j                  r| j                          d| _        t        j                  d|  d       y7 7 7 w# t        j
                  $ rM t        j                  d|  d       | j                          d{  7   | j                          d{  7    w xY w7 # t        j                  d|  d       | j                          d{  7   | j                  r| j                          d| _        t        j                  d|  d       w xY ww)zStart and manage the pipeline execution until completion or cancellation.

        Args:
            params: Configuration parameters for pipeline execution.
        NzPipeline task z got cancelled from outside...z is finishing...Tz has finished)r   _setup_create_tasks_wait_for_pipeline_finishedrZ   CancelledErrorr   r   r  _cancel_tasksr   _print_dangling_tasksr   )rK   rz   s     rM   runzPipelineTask.runK  st      kk&!!!
   """	?22444 LL>$/?@A$$&&&))**,!DNLL>$}=>; 	"
 	# 5%% 	LL>$/MNO,,.   22444	 ' LL>$/?@A$$&&&))**,!DNLL>$}=>s   &F+CF+ CF+C CC ,F+D6AF+F+C ?D3DD3+D.,D33D8 6F+8-F(%E(&AF((F+rQ   	directionc                    K   |t         j                  k(  r$| j                  j                  |       d{    y| j                  j                  ||       d{    y7 *7 w)aX  Queue a single frame to be pushed through the pipeline.

        Downstream frames are pushed from the beginning of the pipeline.
        Upstream frames are pushed from the end of the pipeline.

        Args:
            frame: The frame to be processed.
            direction: The direction to push the frame. Defaults to downstream.
        N)r2   
DOWNSTREAMr   putr   r   rK   rQ   r  s      rM   r   zPipelineTask.queue_framet  sS      111""&&u---**((	::: .:s!   2A#A$A#A!A#!A#framesc                    K   t        |t              r&|2 3 d{   }| j                  ||       d{    $t        |t              r"|D ]  }| j                  ||       d{     yy7 R7 :6 y7 w)ay  Queue multiple frames to be pushed through the pipeline.

        Downstream frames are pushed from the beginning of the pipeline.
        Upstream frames are pushed from the end of the pipeline.

        Args:
            frames: An iterable or async iterable of frames to be processed.
            direction: The direction to push the frames. Defaults to downstream.
        N)rT   r   r   r   )rK   r  r  rQ   s       rM   queue_frameszPipelineTask.queue_frames  s      fm,% 9 9e&&ui888) 9&&ui8889 *98  & 9sA   A4A0A,A0A4A./A4#A2$A4,A0.A40A4c                   K   | j                   sCt        j                  d|         d| _         | j                  t	        |             d{    yy7 w)zInternal cancellation logic for the pipeline task.

        Args:
            reason: Optional reason to indicate why the pipeline is being cancelled.
        zCancelling pipeline task Tr   N)r   r   r   r   r   r  s     rM   r  zPipelineTask._cancel  sM      LL4TF;<"DO"";f#=>>>  ?s   A
AAAc                    K   | j                   j                  | j                         |  d      | _        | j                  j                          d{    | j                  S 7 w)z/Create and start all pipeline processing tasks.z::_process_push_queueN)r   create_task_process_push_queuer   r   startr   s    rM   r  zPipelineTask._create_tasks  s^     "&"4"4"@"@$$&4&0E(F#
 nn""$$$&&& 	%s   AA%A#A%c                    | j                   j                  rr| j                  e| j                  j	                  | j                         |  d      | _        | j                  j	                  | j                         |  d      | _        yyy)zHStart heartbeat tasks if heartbeats are enabled and not already running.Nz::_heartbeat_push_handlerz::_heartbeat_monitor_handler)r   rd   r   r   r  _heartbeat_push_handler_heartbeat_monitor_handlerr   r   s    rM   _maybe_start_heartbeat_tasksz)PipelineTask._maybe_start_heartbeat_tasks  s    <<))d.G.G.O(,(:(:(F(F,,.4&8Q0R)D% ,0+=+=+I+I//1dV;W3X,D(	 /P)rN   c                     | j                   r3| j                  j                  | j                         |  d      | _        yy)z9Start idle monitoring task if idle timeout is configured.z::_idle_monitor_handlerN)r   r   r  _idle_monitor_handlerr   r   s    rM   _maybe_start_idle_taskz#PipelineTask._maybe_start_idle_task  s?    ""&*&8&8&D&D**,6M.N'D# #rN   c                 @  K   | j                   j                          d{    | j                  r4| j                  j	                  | j                         d{    d| _        | j                          d{    | j                          d{    y7 u7 >7 !7 w)z"Cancel all running pipeline tasks.N)r   stopr   r   cancel_task_maybe_cancel_heartbeat_tasks_maybe_cancel_idle_taskr   s    rM   r  zPipelineTask._cancel_tasks  s     nn!!###""$$001H1HIII&*D#00222**,,, 	$ J 	3,sE   BB8BBB8B9BBBBBBc                 D  K   | j                   j                  sy| j                  r4| j                  j	                  | j                         d{    d| _        | j
                  r5| j                  j	                  | j
                         d{    d| _        yy7 M7 w)z+Cancel heartbeat tasks if they are running.N)r   rd   r   r   r$  r   r   s    rM   r%  z*PipelineTask._maybe_cancel_heartbeat_tasks  s     ||--$$$$001J1JKKK(,D%''$$001M1MNNN+/D( ( L Os$   AB B?B BB B c                    K   | j                   r5| j                  j                  | j                          d{    d| _         yy7 w)z-Cancel idle monitoring task if it is running.N)r   r   r$  r   s    rM   r&  z$PipelineTask._maybe_cancel_idle_task  s?     ""$$001H1HIII&*D# #Is   5AAAc                     | j                   j                         }g }|D ]N  }|j                  t        |j                  d             |j                  t        |j                  d             P t        |      S )zDCreate an initial metrics frame with zero values for all processors.g        )	processorvalue)rO   )r   processors_with_metricsr   r%   r   r$   r   )rK   r   rO   ps       rM   _initial_metrics_framez#PipelineTask._initial_metrics_frame  sf    ^^;;=
 	LAKK!&&DEKK-cJK	L &&rN   c                    K   t        j                  |  d| d       | j                  j                          d{    | j                  j	                          t        j                  |  d| d       y7 :w)zDWait for the specified start frame to reach the end of the pipeline.z: Starting. Waiting for $ to reach the end of the pipeline...N: z8 reached the end of the pipeline, pipeline is now ready.)r   r   r   waitclear)rK   rQ   s     rM   _wait_for_pipeline_startz%PipelineTask._wait_for_pipeline_start  sj     v5eW<`ab((--///""((*vRw&^_` 	0s   9A8A6;A8c                 |   K    fd}t        j                    d d       t        t              r |        d{    n= j                  j                          d{    t        j                    d d        j                  j                           j                  j                          y7 w7 Vw)z>Wait for the specified frame to reach the end of the pipeline.c                    K   	 t        j                  j                  j                         j                         d {    t        j                   d  d       j                  d        d {    j                  d        d {    y 7 T# t         j                  $ r t        j                   d  d       Y gw xY w7 U7 =# j                  d        d {  7   j                  d        d {  7   w xY ww)Ntimeoutr1  z! reached the end of the pipeline.z: timeout waiting for z= to reach the end of the pipeline (being blocked somewhere?).r   r   )
rZ   wait_forr   r2  r   r   r   TimeoutErrorr   _call_event_handler)rQ   rK   s   rM   wait_for_cancelz<PipelineTask._wait_for_pipeline_end.<locals>.wait_for_cancel  s     N&&,,113T=V=V   vRw.OPQ ../FNNN../EuMMM '' f25'9vw
 OM ../FNNN../EuMMMs   D<B BB !D6C7DCDB .C	C C		C DDD&C)'DDDDz: Closing. Waiting for r0  Nr1  z6 reached the end of the pipeline, pipeline is closing.)	r   r   rT   r   r   r2  r3  r   rI   )rK   rQ   r<  s   `` rM   _wait_for_pipeline_endz#PipelineTask._wait_for_pipeline_end  s     	N 	v4UG;_`ae[)!###**//111LLD6E7*`ab  &&( 	%%))+ $1s$   <B< B8"B<#B:$AB<:B<c                    K   | j                   j                          d {    | j                   j                          | j                  r| j                   d {    d | _        y y 7 G7 wr   )r   r2  r3  r   r   s    rM   r	  z(PipelineTask._wait_for_pipeline_finished  s`     ++00222%%++-""))))&*D# # 	3 *s!   A,A(9A,A*A,*A,c                 x  K   | j                          d{    | j                          d{    t        |j                        }| j                  j                  |       t        | j                  | j                  | j                        }| j                  j                  |       d{    y7 7 7 	w)z,Set up the pipeline task and all processors.N)loop)r   r   r   )
_load_setup_files_load_observer_filesr:   r@  r   setupr4   r   r   r   )rK   rz   
mgr_paramsrC  s       rM   r  zPipelineTask._setup  s      $$&&& '')))&FKK8
  ,#++++^^

 nn""5))) 	' 	* 	*s2   B:B4B:B6A?B:.B8/B:6B:8B:cleanup_pipelinec                 R  K   | j                          d{    | j                  r"| j                  j                          d{    | j                  r&t        | d      r| j                  j                          |r#| j                  j                          d{    yy7 7 ^7 
w)z*Clean up the pipeline task and processors.Nr   )cleanupr   r   hasattrr   end_conversation_tracingr   )rK   rE  s     rM   _cleanupzPipelineTask._cleanup&  s      lln >>..((*** GD2H$I%%>>@ ..((***  	 + +s4   B'B!-B'B#AB'B%B'#B'%B'c                   K   | j                   j                          | j                          t        | j                  j
                  | j                  j                  | j                  j                  | j                  j                  | j                  | j                  j                  | j                  j                  | j                  j                  | j                  	      }| j                         |_        | j                   j#                  |       d{    | j%                  |       d{    | j                  j                  rG| j                  j&                  r1| j                   j#                  | j)                                d{    d}d}|r| j*                  j-                          d{   }| j                   j#                  |       d{    t/        |t0        t2        t4        f      r| j7                  |       d{    t/        |t0        t2        t4        f       }t/        |t4               }| j*                  j9                          |r| j;                  |       d{    y7 ^7 H7 7 7 7 t7 w)a8  Process frames from the push queue and send them through the pipeline.

        This is the task that runs the pipeline for the first time by sending
        a StartFrame and by pushing any other frames queued by the user. It runs
        until the tasks is cancelled or stopped (e.g. with an EndFrame).
        )	ra   rb   rc   re   r   rf   rk   ri   r   NT)r   r  r!  r    r   ra   rb   rc   re   r   rf   rk   ri   r   _create_start_metadatametadatar   r   r4  rl   r.  r   getrT   r   r   r!   r=  	task_donerJ  )rK   start_framerunningrE  rQ   s        rM   r  z PipelineTask._process_push_queue7  s     	##%  $ @ @!%!B!B"&,,"D"D<<66//!%!B!B%)\\%J%J$(LL$H$H 11

  $::<nn((555 ++K888<<&&4<<+R+R..,,T-H-H-JKKK**..00E..,,U333%+x!CD11%888$U[(I,NOOG#-eY#??&&(  mm,---% 	6 	9 L
 138 	.s   DJI2J-I5.AJ
I8'J2I:3"JI<3J	I>
AJJ,J -J5J8J:J<J>J Jc                 Z  K   t        |t        | j                              r| j                  d|       d{    t        |t              rHt        j                  |  d|        | j                  t        |j                               d{    yt        |t              rHt        j                  |  d|        | j                  t        |j                               d{    yt        |t              r<t        j                  |  d|        | j                  t                      d{    yt        |t              rFt        j                  |  d|        | j                  j                  t!                      d{    yt        |t"              r{| j                  d|       d{    |j$                  r:t        j&                  d	|        | j                  t                      d{    yt        j(                  |  d
|        yy7 7 7 :7 7 7 s7 0w)aF  Process frames coming upstream from the pipeline.

        This is the task that processes frames coming upstream from the
        pipeline. These frames might indicate, for example, that we want the
        pipeline to be stopped (e.g. EndTaskFrame) in which case we would send
        an EndFrame down the pipeline.
        r   Nz: received end task frame r   z: received cancel task frame z: received stop task frame z#: received interruption task frame r   zA fatal error occurred: z: Something went wrong: )rT   r   r   r;  r   r   r   r   r   r  r   r   r"   r!   r   r   r   r   fatalr   r   r  s      rM   r   zPipelineTask._source_push_frameb  s     eU4#?#?@A**+FNNNe\*LLD6!;E7CD""85<<#@AAA/LLD6!>ugFG"";ell#CDDD}-LLD6!<UGDE""9;///45
 LLD6!DUGLM..,,->-@AAAz***+>FFF{{7w?@&&{}555$'?wGH ++ O
 B E 0 BF 6s   5H+HAH+HAH+&H 'AH+2H#3AH+H%	*H+3H'4AH+8H)9"H+H+ H+#H+%H+'H+)H+c                   K   t        |t        | j                              r| j                  d|       d{    t        |t              rg| j                  d|       d{    | j
                  j                          d{    | j                          | j                  j                          yt        |t              rO| j                  d|       d{    | j                  d|       d{    | j                  j                          yt        |t              rO| j                  d|       d{    | j                  d|       d{    | j                  j                          yt        |t              r| j                  j                          yt        |t              r$| j                  j!                  |       d{    yy7 7 s7 T7 7 7 7 7 w)a3  Process frames coming downstream from the pipeline.

        This tasks process frames coming downstream from the pipeline. For
        example, heartbeat frames or an EndFrame which would indicate all
        processors have handled the EndFrame and therefore we can exit the task
        cleanly.
        r   Nr   r   r   r   )rT   r   r   r;  r    r   r   r  r   rI   r   r   r!   r   r   r   r  r  s      rM   r   zPipelineTask._sink_push_frame  s     eU4#A#ABC**+H%PPPeZ(**+@%HHH..44666 --/&&**,x(**+>FFF**+A5III$$((*y)**+@%HHH**+A5III$$((*{+$$((*~.''++E222 /+ Q I6 GI II
 3s   5G%G)G%!G"!G%GAG%GG%2G3AG%7G8G%G!A8G%
G#G%G%G%G%G%G%!G%#G%c                   K   	 | j                   j                  t        | j                  j	                                      d{    t        j                  | j                  j                         d{    w7 67 w)z+Push heartbeat frames at regular intervals.)	timestampN)	r   r   r   r   get_timerZ   sleepr   rg   r   s    rM   r  z$PipelineTask._heartbeat_push_handler  s`      ..,,^dkkFZFZF\-]^^^-- C CDDD  _Ds$   AA?A;0A?5A=6A?=A?c                   K   t         }	 	 t        j                  | j                  j	                         |       d{   }| j
                  j                         |j                  z
  dz  }t        j                  |  d| d       | j                  j                          7 d# t        j                  $ r t        j                  |  d| d       Y 3w xY ww)az  Monitor heartbeat frames for processing time and timeout detection.

        This task monitors heartbeat frames. If a heartbeat frame has not
        been received for a long period a warning will be logged. It also logs
        the time that a heartbeat frame takes to processes, that is how long it
        takes for the heartbeat frame to traverse all the pipeline.
        r7  Ni ʚ;z: heartbeat frame processed in z secondsz-: heartbeat frame not received for more than )HEARTBEAT_MONITOR_SECSrZ   r9  r   rN  r   rW  rV  r   tracerO  r:  r   )rK   	wait_timerQ   process_times       rM   r  z'PipelineTask._heartbeat_monitor_handler  s      +	%..t/D/D/H/H/JT]^^ $ 4 4 6 HMYv%D\NRZ[\%%//1 ^ '' fI)T\]s9   C2B$ B"A#B$ !C"B$ $.CCCCc                 <  K   d}|r`	 t        j                  | j                  j                         | j                         d{    | j                  j                          |r_yy7 ## t         j                  $ r | j                          d{  7  }Y 5w xY ww)a  Monitor pipeline activity and detect idle conditions.

        Tracks frame activity and triggers idle timeout events when the
        pipeline hasn't received relevant frames within the timeout period.

        Note: Heartbeats are excluded from idle detection.
        Tr7  N)rZ   r9  rG   r2  r   r3  r:  _idle_timeout_detected)rK   rQ  s     rM   r   z"PipelineTask._idle_monitor_handler  s      >&&t'7'7'<'<'>H_H_```  &&( `'' > $ ; ; ===>sL   B<A* A(A* #B&B(A* *&BBBBBBc                    K   | j                   ryt        j                  d       | j                  d       d{    | j                  r.t        j                  d       | j                          d{    yy7 ?7 w)zHandle idle timeout detection and optional cancellation.

        Returns:
            Whether the pipeline task should continue running.
        FzIdle timeout detected.r   Nz3Idle pipeline detected, cancelling pipeline task...T)r   r   r   r;  r   r  r   s    rM   r_  z#PipelineTask._idle_timeout_detected  sj      ??/0&&'8999''NNPR++- 	:  s!   7A=A98A=2A;3A=;A=c           	        K   t         j                  j                  dd      j                  d      D cg c]  }|s|	 }}|D ]  }	 t	        |      j                         }|j                  }t        j                  j                  |t        |            }|r|j                  rt        j                  |  d|        t        j                  j                  |      }|j                  j                  |       t!        |d      r|j#                  |        d{    nt        j$                  |  d| d        yc c}w 7 (# t&        $ r(}t        j(                  |  d	| d
|        Y d}~+d}~ww xY ww)aP  Dynamically setup pipeline task from files listed in PIPECAT_SETUP_FILES.

        Each file should contain a `setup_pipeline_task(task)` async function
        that receives the `PipelineTask` instance and can perform any custom
        setup (e.g., adding event handlers, observers, or modifying task
        configuration).

        PIPECAT_SETUP_FILES :z running setup from setup_pipeline_taskNz setup file z$ has no setup_pipeline_task functionz# error running external setup from r1  )osenvironrN  splitr   resolvestem	importlibutilspec_from_file_locationrv   loaderr   r   module_from_specexec_modulerH  re  r   r   r   )rK   fsetup_filespathmodule_namespecmodulees           rM   rA  zPipelineTask._load_setup_files  sK     #%**..1F"K"Q"QRU"V\QZ[q\\ 	SASAw("ii ~~==k3t9UDKKLLD6)=dV!DE '^^<<TBFKK++F3 v'<=$88>>>#fL6Z[!	S ] ?
  Sv%H2aSQRRSsR   2E7D<D< E7CEE E9E7E	E4E/)E7/E44E7c           	      ^  K   t         j                  j                  dd      j                  d      D cg c]  }|s|	 }}|D ]  }ddl}|j                         5  |j                  d       |j                  dt               ddd       	 t        |      j                         }|j                  }t        j                  j                  |t        |            }|rt!        j"                  |  d|        t        j                  j%                  |      }|j&                  j)                  |       |j+                  |        d{   }|D ]  }	| j-                  |	          yc c}w # 1 sw Y   xY w7 1# t.        $ r(}
t!        j0                  |  d	| d
|
        Y d}
~
ad}
~
ww xY ww)zGDynamically load observers from files listed in PIPECAT_OBSERVER_FILES.PIPECAT_OBSERVER_FILESrc  rd  r   Nr   zObserver files (and environment variable `PIPECAT_OBSERVER_FILES`) is deprecated, use setup files instead (and `PIPECAT_SETUP_FILES`) instead.z loading observers from z' error loading external observers from r1  )rf  rg  rN  rh  r   r   r   r   r   r   ri  rj  rk  rl  rm  rv   r   r   ro  rn  rp  create_observersr   r   r   )rK   rq  observer_filesr   rs  rt  ru  rv  rj   r   rw  s              rM   rB  z!PipelineTask._load_observer_files	  s    %'ZZ^^4Lb%Q%W%WX[%\b`a!bb 	WA((* %%h/ e&WAw("ii ~~==k3t9ULLD6)A$!HI '^^<<TBFKK++F3 '-&=&=d&C CI$- 4))(34/	W c & !D  Wv%LQCrRSQTUVVWsj   2F-E&E& F-(E+F-B8E9E7E9"	F-+E4	0F-7E99	F*F%F-%F**F-c                     | j                   j                         D cg c]  }|j                          }}|rt        j                  d|        yyc c}w )z=Log any dangling tasks that haven't been properly cleaned up.zDangling tasks detected: N)r   current_tasksget_namer   r   )rK   ttaskss      rM   r  z"PipelineTask._print_dangling_tasks(  sL    '+'9'9'G'G'IJ!JJNN6ug>?  Ks   Ac                     i }| j                  | j                  t              rd|d<   |j                  | j                  j
                         |S )z?Build and return start metadata including user-provided values.Tdeprecated_openaillmcontext)r   r   r1   r   r   rm   )rK   rm   s     rM   rL  z#PipelineTask._create_start_metadata.  sI     0HI<@N89 	dll99:rN   r*  processor_typec                 r    t        ||      r|S |j                  D ]  }| j                  ||      }|s|c S  y)z?Recursively find a processor of the given type in the pipeline.N)rT   r   r   )rK   r*  r  r-  founds        rM   r   zPipelineTask._find_processor;  sF    i0%% 	A((N;E	 rN   )TrV   rW   rX   rY   CANCEL_TIMEOUT_SECSr   r#   IDLE_TIMEOUT_SECSr*   r	   r_   ru   ro   rs   r   rv   r   r   r   r   r&   r7   r6   r8   rF   propertyrz   r   r(   r   r=   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r,   r  r2   r  r   r   r   r  r  r  r  r!  r  r%  r&  r   r.  r4  r=  r	  r  rJ  r  r   r   r  r  r   r_  rA  rB  r  r   r   rL  r3   r?   r   r\   r]   s   @rM   ry   ry      sE   =F ,059'+%8%)%))-$%) 8HJ[7\->2626=A26'G:G: (	G:
 %-TNG: !%G: #G: #G: 	"G: "#G: G: #G: G: #4;#34G: $E?G:  D./!G:" !/#G:$ ''9:%G:& /'G:R    ,   ,1E(F , , )X.?%@ ) ) m   3d5k3.>(? 3 3 5%US0@*A 5 51 1*.\ .7l 72tE{C7G1H 245ec9I3J 43tE{C7G1H 355ec9I3J 5	d 	+ 7; .hsm .'? 2 '?T 9G8Q8Q;;'5;( %3$=$=9-"669 "9* 8< 	?x} 	?'	-0+' 'aE a,% ,<+*#5 *$+t +").V%Ie %I %IN3E 3n 3BE(> d $S@W>@S#X 	 	a 	U]^_U` 	rN   ry   )_rY   rZ   importlib.utilrk  rf  pathlibr   typingr   r   r   r   r   r	   r
   r   r   r   logurur   pydanticr   r   r   6pipecat.audio.interruptions.base_interruption_strategyr   pipecat.clocks.base_clockr   pipecat.clocks.system_clockr   pipecat.frames.framesr   r   r   r   r   r   r   r   r   r   r   r    r!   r"   r#   pipecat.metrics.metricsr$   r%   pipecat.observers.base_observerr&   r'   (pipecat.observers.turn_tracking_observerr(   +pipecat.observers.user_bot_latency_observerr)   pipecat.pipeline.base_pipeliner*   pipecat.pipeline.base_taskr+   r,   pipecat.pipeline.pipeliner-   r.   r/   pipecat.pipeline.task_observerr0   +pipecat.processors.aggregators.llm_responser1   "pipecat.processors.frame_processorr2   r3   r4   "pipecat.processors.frameworks.rtvir5   r6   r7   "pipecat.utils.asyncio.task_managerr8   r9   r:   pipecat.utils.tracing.setupr;   %pipecat.utils.tracing.tracing_contextr<   )pipecat.utils.tracing.turn_trace_observerr=   rr   rZ  r  r  r?   rA   r_   ry   rw   rN   rM   <module>r     s      	  ` ` `  1 1 [ / 3    " K E I N 7 K L L 7 P b b ^ ^ ^ ^ < @ G'",    CL"# "#J/AY /Adg# grN   