
    qi9                        d Z ddlZddlmZ ddlmZmZmZ ddlm	Z	m
Z
 ddlmZmZmZmZmZmZmZmZmZ ddlmZmZ ddlmZmZ dd	lmZ  G d
 de	      Z G d de	      Z G d de	      Z G d de	      Z  G d de      Z!y)aM  Observer for tracking user-to-bot response latency.

This module provides an observer that monitors the time between when a user
stops speaking and when the bot starts speaking, emitting events when latency
is measured. Optionally collects per-service latency breakdown metrics
(TTFB, text aggregation) when ``enable_metrics=True``.
    N)deque)DictListOptional)	BaseModelField)	BotStartedSpeakingFrameClientConnectedFrameFunctionCallInProgressFrameFunctionCallResultFrameInterruptionFrameMetricsFrameUserStoppedSpeakingFrameVADUserStartedSpeakingFrameVADUserStoppedSpeakingFrame)TextAggregationMetricsDataTTFBMetricsData)BaseObserverFramePushed)FrameDirectionc                   D    e Zd ZU dZeed<   dZee   ed<   eed<   eed<   y)TTFBBreakdownMetricsaD  TTFB measurement with timestamp for timeline placement.

    Parameters:
        processor: Name of the processor that reported the TTFB.
        model: Optional model name associated with the metric.
        start_time: Unix timestamp when the TTFB measurement started.
        duration_secs: TTFB duration in seconds.
    	processorNmodel
start_timeduration_secs)	__name__
__module____qualname____doc__str__annotations__r   r   float     ]/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/observers/user_bot_latency_observer.pyr   r   (   s(     NE8C=r%   r   c                   0    e Zd ZU dZeed<   eed<   eed<   y)TextAggregationBreakdownMetricsa  Text aggregation measurement with timestamp for timeline placement.

    Parameters:
        processor: Name of the processor that reported the metric.
        start_time: Unix timestamp when text aggregation started.
        duration_secs: Aggregation duration in seconds.
    r   r   r   Nr   r   r   r    r!   r"   r#   r$   r%   r&   r(   r(   8   s     Nr%   r(   c                   0    e Zd ZU dZeed<   eed<   eed<   y)FunctionCallMetricsa  Latency for a single function call execution.

    Parameters:
        function_name: Name of the function that was called.
        start_time: Unix timestamp when execution started.
        duration_secs: Time in seconds from execution start to result.
    function_namer   r   Nr)   r$   r%   r&   r+   r+   F   s     r%   r+   c                       e Zd ZU dZ ee      Zee   e	d<   dZ
ee   e	d<   dZee   e	d<   dZee   e	d<    ee      Zee   e	d<   d	ee   fd
Zy)LatencyBreakdowna  Per-service latency breakdown for a single user-to-bot cycle.

    Collected between ``VADUserStoppedSpeakingFrame`` and
    ``BotStartedSpeakingFrame`` when ``enable_metrics=True`` in
    :class:`~pipecat.pipeline.task.PipelineParams`.

    Parameters:
        ttfb: Time-to-first-byte metrics from each service in the pipeline.
        text_aggregation: First text aggregation measurement, representing
            the latency cost of sentence aggregation in the TTS pipeline.
        user_turn_start_time: Unix timestamp when the user turn started
            (actual user silence, adjusted for VAD stop_secs). ``None`` if
            no ``VADUserStoppedSpeakingFrame`` was observed.
        user_turn_secs: Duration in seconds of the user's turn, measured
            from when the user actually stopped speaking to when the turn
            was released (``UserStoppedSpeakingFrame``). This includes
            VAD silence detection, STT finalization, and any turn analyzer
            wait. ``None`` if no ``UserStoppedSpeakingFrame`` was observed
            (e.g. no turn analyzer configured).
        function_calls: Latency for each function call executed during
            this cycle. Empty if no function calls occurred.
    )default_factoryttfbNtext_aggregationuser_turn_start_timeuser_turn_secsfunction_callsreturnc                    g }| j                   8| j                  ,|j                  | j                   d| j                  ddf       | j                  D ]:  }|j                  |j                  |j
                   d|j                  ddf       < | j                  D ]:  }|j                  |j                  |j                   d|j                  ddf       < | j                  rD| j                  }|j                  |j                  |j
                   d|j                  ddf       |j                  d        |D cg c]  \  }}|	 c}}S c c}}w )	a*  Return human-readable event labels sorted by start time.

        Collects all sub-metrics into a flat list, sorts by ``start_time``,
        and returns formatted strings suitable for logging.

        Returns:
            List of formatted strings, one per event, in chronological order.
        zUser turn: z.3fsz: TTFB z: z: text aggregation c                     | d   S )Nr   r$   )es    r&   <lambda>z7LatencyBreakdown.chronological_events.<locals>.<lambda>   s
    !A$ r%   )key)r2   r3   appendr0   r   r   r   r4   r,   r1   sort)selfeventstfcta_labels          r&   chronological_eventsz%LatencyBreakdown.chronological_eventsr   sO    !$$0T5H5H5TMM444DDWDWX[C\\]6^_` 	YAMM1<<AKK=PS?TTU)VWX	Y %% 	[BMM2==R-=-=,>bAQAQRU@VVW*XYZ	[   &&BMM2<<.0CBDTDTUXCYYZ [\ 	'&,-(!U---s   E)r   r   r   r    r   listr0   r   r   r"   r1   r   r(   r2   r#   r3   r4   r+   r!   rE   r$   r%   r&   r.   r.   T   sq    . (-T'BD$#
$BBFh>?F,0(5/0&*NHUO*05d0KND,-K.d3i .r%   r.   c                   L     e Zd ZdZdd fd
ZdefdZd Zdefd	Z	d
 Z
 xZS )UserBotLatencyObserveru}  Observer that tracks user-to-bot response latency.

    Measures the time between when a user stops speaking (VADUserStoppedSpeakingFrame)
    and when the bot starts speaking (BotStartedSpeakingFrame). Emits events when
    latency is measured, allowing consumers to log, trace, or otherwise process
    the latency data.

    When ``enable_metrics=True`` in pipeline params, also collects per-service
    latency breakdown (TTFB, text aggregation) and emits an
    ``on_latency_breakdown`` event alongside the existing latency measurement.

    This observer follows the composition pattern used by TurnTrackingObserver,
    acting as a reusable component for latency measurement.

    Events:
        on_latency_measured(observer, latency_seconds): Emitted when
            time-to-first-bot-speech is calculated. Measures the time from
            when the user stopped speaking to when the bot starts speaking.
        on_latency_breakdown(observer, breakdown): Emitted at each
            ``BotStartedSpeakingFrame`` with a :class:`LatencyBreakdown`
            containing per-service metrics collected during the user→bot cycle.
        on_first_bot_speech_latency(observer, latency_seconds): Emitted once,
            the first time ``BotStartedSpeakingFrame`` arrives after
            ``ClientConnectedFrame``. Measures the time from client connection
            to the first bot speech.
    d   )
max_framesc                H   t        |   di | d| _        d| _        d| _        d| _        d| _        t               | _        t        |      | _
        g | _        d| _        i | _        g | _        | j                  d       | j                  d       | j                  d       y)at  Initialize the user-bot latency observer.

        Sets up tracking for processed frames and user speech timing
        to calculate response latencies.

        Args:
            max_frames: Maximum number of frame IDs to keep in history for
                duplicate detection. Defaults to 100.
            **kwargs: Additional arguments passed to parent class.
        NF)maxlenon_latency_measuredon_latency_breakdownon_first_bot_speech_latencyr$   )super__init___user_stopped_time_user_turn_start_time
_user_turn_client_connected_time_first_bot_speech_measuredset_processed_framesr   _frame_history_ttfb_text_aggregation_function_call_starts_function_call_metrics_register_event_handler)r>   rJ   kwargs	__class__s      r&   rQ   zUserBotLatencyObserver.__init__   s     	"6"376:"+/ 8<#05' '*e%**%= 24
LPCE"AC#$$%:;$$%;<$$%BCr%   datac                 P  K   |j                   t        j                  k7  ry|j                  j                  | j
                  v ry| j
                  j                  |j                  j                         | j                  j                  |j                  j                         t        | j
                        t        | j                        kD  rt        | j                        | _        t        |j                  t              r&| j                  t        j                         | _        yt        |j                  t              r-d| _        d| _        d| _        | j'                          d| _        yt        |j                  t*              rD|j                  j,                  |j                  j.                  z
  | _        | j                   | _        yt        |j                  t0              r4| j                   't        j                         | j                   z
  | _        yyt        |j                  t2              r| j'                          yt        |j                  t4              rL|j                  j6                  t        j                         f| j8                  |j                  j:                  <   yt        |j                  t<              ru| j8                  j?                  |j                  j:                  d      }|B|\  }}| j@                  j                  tC        ||t        j                         |z
               yyt        |j                  tD              r| jG                  |j                         yt        |j                  tH              r| jK                          d{    yy7 w)az  Process frames to track speech timing and calculate latency.

        Tracks VAD events and bot speaking events to measure the time between
        user stopping speech and bot starting speech. Also accumulates metrics
        from MetricsFrame for the latency breakdown.

        Args:
            data: Frame push event containing the frame and direction information.
        NT)r,   r   r   )&	directionr   
DOWNSTREAMframeidrX   addrY   r<   lenrW   
isinstancer
   rU   timer   rR   rS   rT   _reset_accumulatorsrV   r   	timestamp	stop_secsr   r   r   r,   r\   tool_call_idr   popr]   r+   r   _handle_metrics_framer	   _handle_bot_started_speaking)r>   ra   startr,   r   s        r&   on_push_framez$UserBotLatencyObserver.on_push_frame   s     >>^666 ::==D222""4::==1""4::==1t%%&T-@-@)AA%()<)<%=D" djj"67**2.2iik+ djj"=>&*D#)-D&"DO$$& /3D+

$?@ '+jj&:&:TZZ=Q=Q&QD#)-)@)@D&

$<= &&2"&))+0G0G"G 3

$56$$&

$?@

((		CD&&tzz'>'>? 

$;<..224::3J3JDQE ,1)z++22'&3#-&*iikJ&> ! 

L1&&tzz2

$;<33555 =5s   NN&N$N&c           	      v  K   d}| j                   P| j                  sDd| _        t        j                         | j                   z
  }| j                  d|       d{    d}| j                  Dt        j                         | j                  z
  }d| _        | j                  d|       d{    d}|rt        t        | j                        | j                  | j                  | j                  t        | j                              }| j                  d|       d{    | j                          yy7 7 7 w)z=Handle BotStartedSpeakingFrame to emit latency and breakdown.FNTrO   rM   )r0   r1   r2   r3   r4   rN   )rU   rV   rj   _call_event_handlerrR   r.   rF   rZ   r[   rS   rT   r]   rk   )r>   emit_breakdownlatency	breakdowns       r&   rq   z3UserBotLatencyObserver._handle_bot_started_speaking  s     &&24;Z;Z.2D+iikD$?$??G**+H'RRR!N"".iikD$;$;;G&*D#**+@'JJJ!N($**%!%!7!7%)%?%?##D$?$?@I **+A9MMM$$&  S K Ns8   AD9D3AD9*D5+A1D9D7D95D97D9re   c           	      0   | j                   duxr | j                   }| j                  |syt        j                         }|j                  D ]  }t        |t              rc|j                  dkD  rT| j                  j                  t        |j                  |j                  ||j                  z
  |j                               vt        |t              s| j                  t        |j                  ||j                  z
  |j                        | _         y)u  Extract latency metrics from a MetricsFrame.

        Accumulates metrics when a measurement is in progress: either a
        user→bot cycle (after ``VADUserStoppedSpeakingFrame``) or the
        first-bot-speech window (after ``ClientConnectedFrame``).
        Nr   )r   r   r   r   )r   r   r   )rU   rV   rR   rj   ra   ri   r   valuerZ   r<   r   r   r   r   r[   r(   )r>   re   waiting_for_first_speechnowmetrics_datas        r&   rp   z,UserBotLatencyObserver._handle_metrics_frame6  s     ''t3[D<[<[8[ 	! ""*3Kiik!JJ 	L,8\=O=ORS=S

!!("."8"8*00#&););#;&2&8&8	 L*DE ))1-L"."8"8#&););#;&2&8&8.D*	r%   c                 X    g | _         d| _        d| _        d| _        i | _        g | _        y)z$Clear per-cycle metric accumulators.N)rZ   r[   rS   rT   r\   r]   )r>   s    r&   rk   z*UserBotLatencyObserver._reset_accumulatorsX  s0    
!%%)"%'"&(#r%   )r   r   r   r    rQ   r   rs   rq   r   rp   rk   __classcell__)r`   s   @r&   rH   rH      s:    6 &)  DDJ6 J6X'8 <  D)r%   rH   )"r    rj   collectionsr   typingr   r   r   pydanticr   r   pipecat.frames.framesr	   r
   r   r   r   r   r   r   r   pipecat.metrics.metricsr   r   pipecat.observers.base_observerr   r   "pipecat.processors.frame_processorr   r   r(   r+   r.   rH   r$   r%   r&   <module>r      sw      ' ' %
 
 
 F =9  i ) 9.y 9.xO)\ O)r%   