
    qi)                         d Z ddlmZmZmZ ddlmZ ddlmZ ddl	m
Z
mZ ddlmZ ddlmZ ddlmZ dd	lmZ erdd
lmZmZ  e       rddlmZ dd
lmZmZ  G d de
      Zy)zTurn trace observer for OpenTelemetry tracing in Pipecat.

This module provides an observer that creates trace spans for each conversation
turn, integrating with the turn tracking system to provide hierarchical tracing
of conversation flows.
    )TYPE_CHECKINGDictOptional)logger)
StartFrame)BaseObserverFramePushed)TurnTrackingObserver)UserBotLatencyObserver)is_tracing_available)TracingContext)SpanSpanContext)tracec                        e Zd ZdZ	 	 	 ddededee   dee   dee	   f
 fdZ
defd	Zd
efdZddee   fdZd ZdefdZdededefdZded   fdZdeded   fdZ xZS )TurnTraceObserverav  Observer that creates trace spans for each conversation turn.

    This observer uses TurnTrackingObserver to track turns and creates
    OpenTelemetry spans for each turn. Service spans (STT, LLM, TTS)
    become children of the turn spans.

    If conversation tracing is enabled, turns become children of a
    conversation span that encapsulates the entire session.
    turn_trackerlatency_trackerconversation_idadditional_span_attributestracing_contextc                     t        
   d
i | | _        | _        |xs
 t	                _        d _        d _        i  _        t               rt        j                  d      nd _        d _        | _        |xs i  _        |j!                  d       fd       }|j!                  d       fd       }|j!                  d       fd	       }	y)a  Initialize the turn trace observer.

        Args:
            turn_tracker: The turn tracking observer to monitor.
            latency_tracker: The latency tracking observer for user-bot latency.
            conversation_id: Optional conversation ID for grouping turns.
            additional_span_attributes: Additional attributes to add to spans.
            tracing_context: Pipeline-scoped tracing context for span hierarchy.
            **kwargs: Additional arguments passed to parent class.
        Nr   zpipecat.turnon_turn_startedc                 D   K   j                  |       d {    y 7 wN)_handle_turn_started)trackerturn_numberselfs     [/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/utils/tracing/turn_trace_observer.pyr   z3TurnTraceObserver.__init__.<locals>.on_turn_startedN   s     ++K888     on_turn_endedc                 H   K   j                  |||       d {    y 7 wr   )_handle_turn_ended)r   r   durationwas_interruptedr   s       r    r"   z1TurnTraceObserver.__init__.<locals>.on_turn_endedR   s     ))+xQQQs   " "on_latency_measuredc                 D   K   j                  |       d {    y 7 wr   )_handle_latency_measured)r   latency_secondsr   s     r    r'   z7TurnTraceObserver.__init__.<locals>.on_latency_measuredV   s     //@@@r!    )super__init___turn_tracker_latency_trackerr   _tracing_context_current_span_current_turn_number_trace_context_mapr   r   
get_tracer_tracer_conversation_span_conversation_id_additional_span_attributesevent_handler)r   r   r   r   r   r   kwargsr   r"   r'   	__class__s   `         r    r-   zTurnTraceObserver.__init__-   s    & 	"6") / / C>3C/3)*!<>;O;Qu''7W[ 59 /+E+K(		#	#$5	6	9 
7	9 
	#	#O	4	R 
5	R 
	&	&'<	=	A 
>	A    r*   c                    K   | j                   rOt               rD| j                   j                  d|       t        j                  d| j
                   d|dd       yyyw)a  Handle latency measurement events.

        Called when the latency tracker measures user-to-bot latency.
        Adds the latency as an attribute to the current turn span.

        Args:
            latency_seconds: The measured latency in seconds.
        zturn.user_bot_latency_secondszTurn z user-bot latency: z.3fsN)r1   r   set_attributer   debugr2   )r   r*   s     r    r)   z*TurnTraceObserver._handle_latency_measuredZ   sa      "6"8,,-Lo^LL1122EoVYEZZ[\ #9s   AAdatac                    K   t        |j                  t              r)| j                  s| j	                  | j
                         yyyw)a;  Process a frame without modifying it.

        Handles StartFrame to begin conversation tracing early, ensuring
        that any spans created before Turn 1 (e.g., from flow initialization)
        are properly attached to the conversation trace.

        Args:
            data: The frame push event data.
        N)
isinstanceframer   r6   start_conversation_tracingr7   )r   rA   s     r    on_push_framezTurnTraceObserver.on_push_framei   s;      djj*-d6M6M++D,A,AB 7N-s   AAc                 h   t               r| j                  sy|,t        j                         }t	        j
                  d|        || _        | j                  j                  d      | _        | j                  j                  d|       | j                  j                  dd       | j                  xs i j                         D ]!  \  }}| j                  j                  ||       # | j                  j                  | j                  j                         |       t	        j
                  d|        y)zStart a new conversation span.

        Args:
            conversation_id: Optional custom ID for the conversation. If None, a UUID will be generated.
        NzGenerated new conversation ID: conversationconversation.idzconversation.typevoicez!Started tracing for Conversation )r   r5   r   generate_conversation_idr   r@   r7   
start_spanr6   r?   r8   itemsr0   set_conversation_contextget_span_context)r   r   kvs       r    rE   z,TurnTraceObserver.start_conversation_tracingv   s    $%T\\ ",EEGOLL:?:KLM / #',,"9"9."I 	--.?Q--.A7K55;BBD 	8DAq##11!Q7	8 	66##446	
 	88IJKr<   c                 D   t               sy| j                  rt        j                  d| j                   d       | j                  j                  dd       | j                  j                  dd       | j                  j                          d| _        | j                  j                  d       | j                  rf| j                  j                          d| _	        | j                  j                  d       t        j                  d| j                          d| _        yy)zEEnd the current conversation span and ensure the last turn is closed.NzEnding Turn z due to conversation endturn.was_interruptedTzturn.ended_by_conversation_endzEnded tracing for Conversation )r   r1   r   r@   r2   r?   endr0   set_turn_contextr6   rN   r7   r   s    r    end_conversation_tracingz*TurnTraceObserver.end_conversation_tracing   s    #% LL<(A(A'BBZ[\,,-CTJ,,-MtT""$!%D !!2248 ""##'')&*D# !!::4@LL:4;P;P:QRS$(D! #r<   r   c                   K   t               r| j                  sy|dk(  r'| j                  s| j                  | j                         d}| j                  r| j
                  j                         }| j                  j                  d|      | _        || _	        | j                  j                  d|       | j                  j                  dd       | j                  r&| j                  j                  d| j                         | j                  j                         | j                  |<   | j
                  j                  | j                  j                                t        j                  d	|        yw)
z1Handle a turn start event by creating a new span.N   turn)contextzturn.numberz	turn.typerH   rI   zStarted tracing for Turn )r   r5   r6   rE   r7   r0   get_conversation_contextrL   r1   r2   r?   rO   r3   rU   r   r@   )r   r   parent_contexts      r    r   z&TurnTraceObserver._handle_turn_started   s3    #%T\\!D$;$;++D,A,AB ""!22KKMN "\\44V^4T$/! 	((D((nE   ,,->@U@UV 04/A/A/R/R/T, 	..t/A/A/R/R/TU0>?s   E2E4r%   r&   c                 r  K   t               r| j                  sy|| j                  k(  r| j                  j                  d|       | j                  j                  d|       | j                  j	                          d| _        | j
                  j                  d       t        j                  d|        yyw)z3Handle a turn end event by ending the current span.Nzturn.duration_secondsrS   zEnded tracing for Turn )	r   r1   r2   r?   rT   r0   rU   r   r@   )r   r   r%   r&   s       r    r$   z$TurnTraceObserver._handle_turn_ended   s     #%T-?-? $333,,-DhO,,-C_U ""$!%D !!2248LL2;-@A 4s   B5B7returnr   c                 d    t               r| j                  sy| j                  j                         S )zGet the span context for the current turn.

        This can be used by services to create child spans.

        Returns:
            The current turn's span context or None if not available.
        N)r   r1   rO   rV   s    r    get_current_turn_contextz*TurnTraceObserver.get_current_turn_context   s*     $%T-?-?!!2244r<   c                 N    t               sy| j                  j                  |      S )a  Get the span context for a specific turn.

        This can be used by services to create child spans.

        Args:
            turn_number: The turn number to get context for.

        Returns:
            The specified turn's span context or None if not available.
        N)r   r3   get)r   r   s     r    get_turn_contextz"TurnTraceObserver.get_turn_context   s$     $%&&**;77r<   )NNNr   )__name__
__module____qualname____doc__r
   r   r   strdictr   r-   floatr)   r	   rF   rE   rW   intr   boolr$   ra   rd   __classcell__)r;   s   @r    r   r   "   s     *.5948+A*+A 0+A "#	+A
 %-TN+A ".1+AZe C CL(3- LB):@c @BBC B5 B[_ B(5(=*A 58C 8H]4K 8r<   r   N)rh   typingr   r   r   logurur   pipecat.frames.framesr   pipecat.observers.base_observerr   r	   (pipecat.observers.turn_tracking_observerr
   +pipecat.observers.user_bot_latency_observerr   pipecat.utils.tracing.setupr   %pipecat.utils.tracing.tracing_contextr   opentelemetry.tracer   r   opentelemetryr   r   r+   r<   r    <module>ry      sI    1 0  , E I N < @ 5#5b8 b8r<   