
    qi{.                        d Z ddlZddlmZ ddlmZmZmZmZm	Z	 ddl
mZmZ ddlmZmZmZ ddlmZmZmZ ddlmZ dd	lmZ dd
lmZ eefZe G d d             Z G d de      Z G d de      Z G d de      Z G d de      Z y)u  Observer for tracking pipeline startup timing.

This module provides an observer that measures how long each processor's
``start()`` method takes during pipeline startup. It works by tracking
when a ``StartFrame`` arrives at a processor (``on_process_frame``) versus
when it leaves (``on_push_frame``), giving the exact ``start()`` duration
for each processor in the pipeline.

It also measures transport timing — the time from ``StartFrame`` to the
first ``BotConnectedFrame`` (SFU transports only) and ``ClientConnectedFrame``
— via a separate ``on_transport_timing_report`` event.

Example::

    observer = StartupTimingObserver()

    @observer.event_handler("on_startup_timing_report")
    async def on_report(observer, report):
        for t in report.processor_timings:
            print(f"{t.processor_name}: {t.duration_secs:.3f}s")

    @observer.event_handler("on_transport_timing_report")
    async def on_transport(observer, report):
        if report.bot_connected_secs is not None:
            print(f"Bot connected in {report.bot_connected_secs:.3f}s")
        print(f"Client connected in {report.client_connected_secs:.3f}s")

    task = PipelineTask(pipeline, observers=[observer])
    N)	dataclass)DictListOptionalTupleType)	BaseModelField)BotConnectedFrameClientConnectedFrame
StartFrame)BaseObserverFrameProcessedFramePushed)BasePipeline)PipelineSource)FrameProcessorc                   &    e Zd ZU dZeed<   eed<   y)_ArrivalInfoz<Internal record of when a StartFrame arrived at a processor.	processorarrival_ts_nsN)__name__
__module____qualname____doc__r   __annotations__int     [/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/observers/startup_timing_observer.pyr   r   5   s    Fr   r   c                   0    e Zd ZU dZeed<   eed<   eed<   y)ProcessorStartupTiminga.  Startup timing for a single processor.

    Parameters:
        processor_name: The name of the processor.
        start_offset_secs: Offset in seconds from the StartFrame to when this
            processor's start() began.
        duration_secs: How long the processor's start() took, in seconds.
    processor_namestart_offset_secsduration_secsN)r   r   r   r   strr   floatr   r   r    r"   r"   =   s     r   r"   c                   H    e Zd ZU dZeed<   eed<    ee      Ze	e
   ed<   y)StartupTimingReporta8  Report of startup timings for all measured processors.

    Parameters:
        start_time: Unix timestamp when the first processor began starting.
        total_duration_secs: Total wall-clock time from first to last processor start.
        processor_timings: Per-processor timing data, in pipeline order.
    
start_timetotal_duration_secs)default_factoryprocessor_timingsN)r   r   r   r   r'   r   r
   listr-   r   r"   r   r   r    r)   r)   L   s+     6;D6Qt23Qr   r)   c                   D    e Zd ZU dZeed<   dZee   ed<   dZee   ed<   y)TransportTimingReportai  Time from pipeline start to transport connection milestones.

    Parameters:
        start_time: Unix timestamp of the StartFrame (pipeline start).
        bot_connected_secs: Seconds from StartFrame to first BotConnectedFrame
            (only set for SFU transports).
        client_connected_secs: Seconds from StartFrame to first ClientConnectedFrame.
    r*   Nbot_connected_secsclient_connected_secs)	r   r   r   r   r'   r   r1   r   r2   r   r   r    r0   r0   Z   s-     *..-18E?1r   r0   c                        e Zd ZdZdddeeee   df      f fdZdede	fd	Z
d
 ZdefdZdefdZdefdZdefdZd Z xZS )StartupTimingObservera:  Observer that measures processor startup times during pipeline initialization.

    Tracks how long each processor's ``start()`` method takes by measuring the
    time between when a ``StartFrame`` arrives at a processor and when it is
    pushed downstream. This captures WebSocket connections, API authentication,
    model loading, and other initialization work.

    Also measures transport timing, the time from ``StartFrame`` to connection
    milestones:

    - ``bot_connected_secs``: When the bot joins the transport room
      (SFU transports only, triggered by ``BotConnectedFrame``).
    - ``client_connected_secs``: When a remote participant connects
      (triggered by ``ClientConnectedFrame``).

    By default, internal pipeline processors (``PipelineSource``, ``Pipeline``)
    are excluded from the report. Pass ``processor_types`` to measure only
    specific types.

    Event handlers available:

    - on_startup_timing_report: Called once after startup completes with the full
      timing report.
    - on_transport_timing_report: Called once when the first client connects with a
      TransportTimingReport containing client_connected_secs and bot_connected_secs
      (if available).

    Example::

        observer = StartupTimingObserver(
            processor_types=(STTService, TTSService)
        )

        @observer.event_handler("on_startup_timing_report")
        async def on_report(observer, report):
            for t in report.processor_timings:
                logger.info(f"{t.processor_name}: {t.duration_secs:.3f}s")

        @observer.event_handler("on_transport_timing_report")
        async def on_transport(observer, report):
            if report.bot_connected_secs is not None:
                logger.info(f"Bot connected in {report.bot_connected_secs:.3f}s")
            logger.info(f"Client connected in {report.client_connected_secs:.3f}s")

        task = PipelineTask(pipeline, observers=[observer])

    Args:
        processor_types: Optional tuple of processor types to measure. If None,
            all non-internal processors are measured.
    N)processor_typesr5   .c                    t        |   di | || _        i | _        g | _        d| _        d| _        d| _        d| _        d| _	        d| _
        | j                  d       | j                  d       y)a  Initialize the startup timing observer.

        Args:
            processor_types: Optional tuple of processor types to measure.
                If None, all non-internal processors are measured.
            **kwargs: Additional arguments passed to parent class.
        NFon_startup_timing_reporton_transport_timing_reportr   )super__init___processor_types	_arrivals_timings_start_frame_id_startup_timing_reported_transport_timing_reported_start_frame_arrival_ns_bot_connected_secs_start_wall_clock_register_event_handler)selfr5   kwargs	__class__s      r    r:   zStartupTimingObserver.__init__   s     	"6" / 35 79 /3 ).% +0' 7;$ 59  37$$%?@$$%ABr   r   returnc                 h    | j                   t        || j                         S t        |t               S )zCheck if a processor should be tracked for timing.

        Args:
            processor: The processor to check.

        Returns:
            True if the processor matches the filter or no filter is set.
        )r;   
isinstance_INTERNAL_TYPES)rE   r   s     r    _should_trackz#StartupTimingObserver._should_track   s3       ,i)>)>??i999r   c                 Z   K   | j                   r| j                          d{    yy7 w)zEmit the startup timing report when the pipeline has fully started.

        Called by the ``PipelineTask`` after the ``StartFrame`` has been
        processed by all processors, including nested ``ParallelPipeline``
        branches.
        N)r=   _emit_report)rE   s    r    on_pipeline_startedz)StartupTimingObserver.on_pipeline_started   s)      ==##%%% %s    +)+datac                   K   | j                   ryt        |j                  t              sy| j                  F|j                  j
                  | _        |j                  | _        t        j                         | _	        n$|j                  j
                  | j                  k7  ry| j                  |j                        rCt        |j                  |j                        | j                  |j                  j
                  <   yyw)z{Record when a StartFrame arrives at a processor.

        Args:
            data: The frame processing event data.
        N)r   r   )r?   rJ   framer   r>   id	timestamprA   timerC   rL   r   r   r<   )rE   rP   s     r    on_process_framez&StartupTimingObserver.on_process_frame   s      (($**j1 '#'::==D +/>>D(%)YY[D"ZZ]]d222dnn-0<..1DNN4>>,,- .s   C>D c                   K   t        |j                  t              r| j                  |       yt        |j                  t              r| j                  |       d{    y| j                  ryt        |j                  t              sy| j                  $|j                  j                  | j                  k7  ry| j                  j                  |j                  j                  d      }|y|j                  |j                  z
  }|dz  }|j                  | j                  z
  dz  }| j                   j#                  t%        |j&                  j(                  ||             y7 w)zRecord when a StartFrame leaves a processor and compute the delta.

        Also handles ``BotConnectedFrame`` and ``ClientConnectedFrame`` to
        measure transport timing.

        Args:
            data: The frame push event data.
        N    eA)r#   r$   r%   )rJ   rR   r   _handle_bot_connectedr   _handle_client_connectedr?   r   r>   rS   r<   popsourcerT   r   rA   r=   appendr"   r   name)rE   rP   arrivalduration_nsr%   r$   s         r    on_push_framez#StartupTimingObserver.on_push_frame   s$     djj"34&&t,djj"67//555(($**j1+

AUAU0U..$$T[[^^T:?nnw'<'<<#c)$22T5Q5QQUXX"&0055"3+	
) 6s   AE'E$DE'c                 |    | j                   | j                  y|j                  | j                  z
  }|dz  | _         y)z7Record bot connected timing on first BotConnectedFrame.NrX   )rB   rA   rT   )rE   rP   delta_nss      r    rY   z+StartupTimingObserver._handle_bot_connected#  s<    ##/43O3O3W>>D$@$@@#+c> r   c                   K   | j                   s| j                  yd| _         |j                  | j                  z
  }|dz  }t        | j                  xs d| j
                  |      }| j                  d|       d{    y7 w)z;Emit transport timing report on first ClientConnectedFrame.NTrX           )r*   r1   r2   r8   )r@   rA   rT   r0   rC   rB   _call_event_handler)rE   rP   rc   r2   reports        r    rZ   z.StartupTimingObserver._handle_client_connected+  s     **d.J.J.R*.'>>D$@$@@ (3&--4#77"7

 &&'CVLLLs   A:B<B=Bc                    K   | j                   ryd| _         t        d | j                  D              }t        | j                  xs d|| j                        }| j                  d|       d{    y7 w)z)Build and emit the startup timing report.NTc              3   4   K   | ]  }|j                     y w)N)r%   ).0ts     r    	<genexpr>z5StartupTimingObserver._emit_report.<locals>.<genexpr>@  s     ;AOO;s   re   )r*   r+   r-   r7   )r?   sumr=   r)   rC   rf   )rE   totalrg   s      r    rN   z"StartupTimingObserver._emit_report:  si     (((,%;T]];;$--4 %"mm
 &&'A6JJJs   A,A6.A4/A6)r   r   r   r   r   r   r   r   r:   boolrL   rO   r   rV   r   ra   rY   rZ   rN   __classcell__)rG   s   @r    r4   r4   i   s    1l GK)C "%^(<c(A"BC)CV:~ :$ :&> 2(
 (
T2+ 2M; MKr   r4   )!r   rU   dataclassesr   typingr   r   r   r   r   pydanticr	   r
   pipecat.frames.framesr   r   r   pipecat.observers.base_observerr   r   r   pipecat.pipeline.base_pipeliner   pipecat.pipeline.pipeliner   "pipecat.processors.frame_processorr   rK   r   r"   r)   r0   r4   r   r   r    <module>ry      s   <  ! 4 4 % U U U U 7 4 = "<0   Y R) R2I 2_KL _Kr   