
    qi                     n    d Z ddlZddlmZ ddlmZ ddlmZmZm	Z	m
Z
mZmZ ddlmZmZ  G d de      Zy)	zTurn tracking observer for conversation flow monitoring.

This module provides an observer that monitors conversation turns in a pipeline,
tracking when turns start and end based on user and bot speech patterns.
    N)deque)logger)BotStartedSpeakingFrameBotStoppedSpeakingFrameCancelFrameEndFrame
StartFrameUserStartedSpeakingFrame)BaseObserverFramePushedc                        e Zd ZdZd fd	ZdefdZdefdZd ZdefdZ	defdZ
defd	Zdefd
ZdefdZdefdZdedefdZ xZS )TurnTrackingObservera[  Observer that tracks conversation turns in a pipeline.

    This observer monitors the flow of conversation by tracking when turns
    start and end based on user and bot speaking patterns. It handles
    interruptions, timeouts, and maintains turn state throughout the pipeline.

    Turn tracking logic:

    - The first turn starts immediately when the pipeline starts (StartFrame)
    - Subsequent turns start when the user starts speaking
    - A turn ends when the bot stops speaking and either:

      - The user starts speaking again
      - A timeout period elapses with no more bot speech
    c                 
   t        |   di | d| _        d| _        d| _        d| _        d| _        || _        d| _        t               | _
        t        |      | _        | j                  d       | j                  d       y)a  Initialize the turn tracking observer.

        Args:
            max_frames: Maximum number of frame IDs to keep in history for
                duplicate detection. Defaults to 100.
            turn_end_timeout_secs: Timeout in seconds after bot stops speaking
                before automatically ending the turn. Defaults to 2.5.
            **kwargs: Additional arguments passed to the parent observer.
        r   FN)maxlenon_turn_startedon_turn_ended )super__init___turn_count_is_turn_active_is_bot_speaking_has_bot_spoken_turn_start_time_turn_end_timeout_secs_end_turn_timerset_processed_framesr   _frame_history_register_event_handler)self
max_framesturn_end_timeout_secskwargs	__class__s       Z/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/observers/turn_tracking_observer.pyr   zTurnTrackingObserver.__init__.   s     	"6"$ %$ !&;## "%#:6$$%67$$_5    datac                   K   |j                   j                  | j                  v ry| j                  j                  |j                   j                         | j                  j                  |j                   j                         t        | j                        t        | j                        kD  rt        | j                        | _        t        |j                   t              r*| j                  dk(  r| j                  |       d{    yyt        |j                   t              r| j                  |       d{    yt        |j                   t              r| j                  |       d{    yt        |j                   t               r&| j"                  r| j%                  |       d{    yt        |j                   t&        t(        f      r| j+                  |       d{    yy7 7 7 7 F7 w)zProcess frame events for turn tracking.

        Args:
            data: Frame push event data containing the frame and metadata.
        Nr   )frameidr   addr   appendlenr   
isinstancer	   r   _start_turnr
   _handle_user_started_speakingr   _handle_bot_started_speakingr   r   _handle_bot_stopped_speakingr   r   _handle_pipeline_endr!   r(   s     r&   on_push_framez"TurnTrackingObserver.on_push_frameH   sw     ::==D222""4::==1""4::==1 t%%&T-@-@)AA%()<)<%=D"djj*-1$&&t,,, %

$<=44T:::

$;<33D999 

$;<AVAV33D999

X{$;<++D111 = -:9 :1sZ   DG:G04G:;G2<3G:/G40?G:/G609G:)G8*G:2G:4G:6G:8G:c                       j                          t        j                         }|j                   j                   fd       _        y)z!Schedule turn end with a timeout.c                  L    t        j                  j                               S )N)asynciocreate_task_end_turn_after_timeout)r(   r!   s   r&   <lambda>z9TurnTrackingObserver._schedule_turn_end.<locals>.<lambda>s   s    G''(D(DT(JK r'   N)_cancel_turn_end_timerr9   get_event_loop
call_laterr   r   )r!   r(   loops   `` r&   _schedule_turn_endz'TurnTrackingObserver._schedule_turn_endj   s>     	##% %%'#''K 
r'   c                 `    | j                   r"| j                   j                          d| _         yy)z'Cancel the turn end timer if it exists.N)r   cancel)r!   s    r&   r=   z+TurnTrackingObserver._cancel_turn_end_timerv   s*      '')#'D   r'   c                    K   | j                   rS| j                  sFt        j                  d| j                   d       | j                  |d       d{    d| _        yyy7 w)z#End turn after timeout has expired.Turn z ending due to timeoutFwas_interruptedN)r   r   r   tracer   	_end_turnr   r5   s     r&   r;   z,TurnTrackingObserver._end_turn_after_timeout|   s]     (=(=LL5!1!1 22HIJ..u.===#'D  )>=s   AA%A#A%c                   K   | j                   rL| j                          | j                  |d       d{    d| _         | j                  |       d{    y| j                  rQ| j
                  rE| j                          | j                  |d       d{    | j                  |       d{    y| j                  s| j                  |       d{    yt        j                  d| j                          y7 7 7 k7 T7 0w)z5Handle user speaking events, including interruptions.TrF   NFz!User is already speaking in Turn )	r   r=   rI   r0   r   r   r   rH   r   r5   s     r&   r1   z2TurnTrackingObserver._handle_user_started_speaking   s       '')..t.<<<$)D!""4(((!!d&:&:'')..u.===""4(((%%""4((( LL<T=M=M<NOP =( >( )sX   3DDDDADDD2D3%DD	)DDDD	Dc                 H   K   d| _         d| _        | j                          yw)zHandle bot speaking events.TN)r   r   r=   r5   s     r&   r2   z1TurnTrackingObserver._handle_bot_started_speaking   s"      $###%s    "c                 <   K   d| _         | j                  |       yw)z#Handle bot stopped speaking events.FN)r   rA   r5   s     r&   r3   z1TurnTrackingObserver._handle_bot_stopped_speaking   s      % 	%s   c                    K   | j                   r,| j                          | j                  |d       d{    yy7 w)z@Handle pipeline end or cancellation by flushing any active turn.TrF   N)r   r=   rI   r5   s     r&   r4   z)TurnTrackingObserver._handle_pipeline_end   s;     '')..t.<<<	   =s   3><>c                   K   d| _         d| _        | xj                  dz  c_        |j                  | _        t        j                  d| j                   d       | j                  d| j                         d{    y7 w)zStart a new turn.TF   rE   z startedr   N)r   r   r   	timestampr   r   rH   _call_event_handlerr5   s     r&   r0   z TurnTrackingObserver._start_turn   sl     #$A $uT--.h78&&'8$:J:JKKKs   A7B9A?:BrG   c           	        K   | j                   sy|j                  | j                  z
  dz  }d| _         |rdnd}t        j                  d| j
                   d| d|d	d
       | j                  d| j
                  ||       d{    y7 w)zEnd the current turn.Ni ʚ;Finterrupted	completedrE    z after z.2fsr   )r   rP   r   r   rH   r   rQ   )r!   r(   rG   durationstatuss        r&   rI   zTurnTrackingObserver._end_turn   s     ##NNT%:%::mK$"1{uT--.axwxnANO&&8H8H(Tcddds   BBB
B)d   g      @)__name__
__module____qualname____doc__r   r   r6   rA   r=   r;   r1   r2   r3   r4   r0   boolrI   __classcell__)r%   s   @r&   r   r      s     64 2  2D

{ 

((+ (Q Q(&{ &&{ &={ =Lk L
eK 
e$ 
er'   r   )r]   r9   collectionsr   logurur   pipecat.frames.framesr   r   r   r   r	   r
   pipecat.observers.base_observerr   r   r   r   r'   r&   <module>rd      s5        Fde< der'   