
    qi|                      t   d Z ddlZddlmZ ddlmZmZmZmZm	Z	m
Z
 ddlmZmZmZmZmZ ddlmZmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZmZ e G d de             Z G d de      Z G d de      Z dddddddddede!dee	e"      dee	e"      de	e   de!deee      dee   de!de
e	e   e	e   f   fdZ#y)z2Testing utilities for Pipecat pipeline components.    N)	dataclass)	AwaitableCallableListOptionalSequenceTuple)EndFrameFrameHeartbeatFrame
StartFrameSystemFrame)BaseObserverFramePushed)Pipeline)PipelineRunner)PipelineParamsPipelineTask)FrameDirectionFrameProcessorc                        e Zd ZU dZdZeed<   y)
SleepFrameaN  A system frame that introduces a sleep delay in the test pipeline.

    This frame is used by the test framework to control timing between
    frame processing, allowing tests to separate system frames from
    data or control frames.

    Parameters:
        sleep: Duration to sleep in seconds before processing the next frame.
    g?sleepN)__name__
__module____qualname____doc__r   float__annotations__     E/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/tests/utils.pyr   r      s     E5r!   r   c                   L     e Zd ZdZdedeeeged   f   f fdZde	fdZ
 xZS )HeartbeatsObserverzObserver that monitors heartbeat frames from a specific processor.

    This observer watches for HeartbeatFrames from a target processor and
    invokes a callback when they are detected, useful for testing timing
    and lifecycle events.
    targetheartbeat_callbackNc                @    t        |   di | || _        || _        y)a.  Initialize the heartbeats observer.

        Args:
            target: The frame processor to monitor for heartbeat frames.
            heartbeat_callback: Async callback function to invoke when heartbeats are detected.
            **kwargs: Additional arguments passed to the parent observer.
        Nr    )super__init___target	_callback)selfr%   r&   kwargs	__class__s       r"   r)   zHeartbeatsObserver.__init__2   s#     	"6"+r!   datac                    K   |j                   }|j                  }|| j                  k(  r6t        |t              r%| j                  | j                  |       d{    yyy7 w)zHandle frame push events and detect heartbeats from target processor.

        Args:
            data: The frame push event data containing source and frame information.
        N)sourceframer*   
isinstancer   r+   )r,   r/   srcr2   s       r"   on_push_framez HeartbeatsObserver.on_push_frameD   sS      kk

$,,:e^#D..u555 $E5s   AA#A!A#)r   r   r   r   r   r   r   r   r)   r   r5   __classcell__r.   s   @r"   r$   r$   *   sA    , , %nn%EyQU%VW	,$
6 
6r!   r$   c                   ^     e Zd ZdZdddej
                  dedef fdZde	d	ef fd
Z
 xZS )QueuedFrameProcessora  A processor that captures frames in a queue for testing purposes.

    This processor intercepts frames flowing in a specific direction and
    stores them in a queue for later inspection during testing, while
    still allowing the frames to continue through the pipeline.
    T)ignore_startqueuequeue_directionr:   c                P    t         |   d       || _        || _        || _        y)a  Initialize the queued frame processor.

        Args:
            queue: The asyncio queue to store captured frames.
            queue_direction: The direction of frames to capture (UPSTREAM or DOWNSTREAM).
            ignore_start: Whether to ignore StartFrames when capturing.
        T)enable_direct_modeN)r(   r)   _queue_queue_direction_ignore_start)r,   r;   r<   r:   r.   s       r"   r)   zQueuedFrameProcessor.__init__Y   s,     	D1 /)r!   r2   	directionc                   K   t         |   ||       d{    || j                  k(  r?t        |t              r| j
                  s#| j                  j                  |       d{    | j                  ||       d{    y7 m7 !7 	w)zProcess frames and capture them in the queue if they match the direction.

        Args:
            frame: The frame to process.
            direction: The direction the frame is flowing.
        N)	r(   process_framer@   r3   r   rA   r?   put
push_frame)r,   r2   rB   r.   s      r"   rD   z"QueuedFrameProcessor.process_framel   sw      g#E9555---eZ08J8Jkkooe,,,ooeY/// 	6 -/s4   BBAB%B&B?B	 BB	B)r   r   r   r   asyncioQueuer   boolr)   r   rD   r6   r7   s   @r"   r9   r9   Q   sL     "* }}* (	*
 *&0 0> 0 0r!   r9   FT)enable_rtviexpected_down_framesexpected_up_framesr:   	observerspipeline_paramssend_end_frame	processorrJ   rK   rL   frames_to_sendr:   rM   rN   rO   returnc                  K   |xs g }|xs
 t               }t        j                         }	t        j                         }
t        |	t        j
                  |      }t        |
t        j                  |      }t        || |g      }t        |d|||      fd}t               }t        j                  |j                         |              d{    g }|
j                         sL|
j                          d{   }t        |t              rs|j!                  |       |
j                         sL|d}|D ]  }||j"                  j$                   dz  }  |dz  }d}|D ]  }||j$                   dz  } |dz  }t'        d	|       t'        d
|       t)        |      t)        |      k(  sJ t+        ||      D ]  \  }}t        ||      rJ  g }|	j                         s:|	j                          d{   }|j!                  |       |	j                         s:|St'        d|       t'        d|       t)        |      t)        |      k(  sJ t+        ||      D ]  \  }}t        ||      rJ  ||fS 7 7 7 w)a`  Run a test pipeline with the specified processor and validate frame flow.

    This function creates a test pipeline with the given processor, sends the
    specified frames through it, and validates that the expected frames are
    received in both upstream and downstream directions.

    Args:
        processor: The frame processor to test.
        enable_rtvi: Whether RTVI should be enabled in this test.
        expected_down_frames: Expected frame types flowing downstream (optional).
        expected_up_frames: Expected frame types flowing upstream (optional).
        frames_to_send: Sequence of frames to send through the processor.
        ignore_start: Whether to ignore StartFrames in frame validation.
        observers: Optional list of observers to attach to the pipeline.
        pipeline_params: Optional pipeline parameters.
        send_end_frame: Whether to send an EndFrame at the end of the test.

    Returns:
        Tuple containing (downstream_frames, upstream_frames) that were received.

    Raises:
        AssertionError: If the received frames don't match the expected frame types.
    )r;   r<   r:   F)cancel_on_idle_timeoutrJ   rM   paramsc                  P  K   t        j                  d       d {    D ]S  } t        | t              r(t        j                  | j                         d {    ;j	                  |        d {    U r"j	                  t                      d {    y y 7 7 G7 /7 w)Ng{Gz?)rG   r   r3   r   queue_framer
   )r2   rQ   rO   tasks    r"   push_frameszrun_test.<locals>.push_frames   s     mmD!!!# 	.E%,mmEKK000&&u---		. ""8:...  	" 1- /sE   B&B;B&B B&2B"3$B&B$B& B&"B&$B&N[z, ]zreceived DOWN frames =zexpected DOWN frames =zreceived UP frames =zexpected UP frames =)r   rG   rH   r9   r   UPSTREAM
DOWNSTREAMr   r   r   gatherrunemptygetr3   r
   appendr.   r   printlenzip)rP   rJ   rK   rL   rQ   r:   rM   rN   rO   received_upreceived_downr1   sinkpipelinerY   runnerreceived_down_framesr2   down_frames_printedexpected_frames_printedrealexpectedreceived_up_framesrX   s       `   `              @r"   run_testrq   {   s    F RI%9)9O--/KMMOM!&//!F
  &11!D D12H$D
/ F
..D);=
999
 )+!!##''))%*. ''. !!#
 '!) 	CEeoo&>&>%?r#BB	Cs""%) 	=E#%..)9'<<#	=3&&(;<&(?@'(C0D,EEEE!"68LM 	.ND(dH---	. ')!!oo''!!%( ! %$&89$&89%&#.@*AAAA!"46HI 	.ND(dH---	. !"455[ : *4 (sP   CJI=)J J 7J9BJ(J=J>%J$AJ6J JJ)$r   rG   dataclassesr   typingr   r   r   r   r   r	   pipecat.frames.framesr
   r   r   r   r   pipecat.observers.base_observerr   r   pipecat.pipeline.pipeliner   pipecat.pipeline.runnerr   pipecat.pipeline.taskr   r   "pipecat.processors.frame_processorr   r   r   r$   r9   rI   typerq   r    r!   r"   <module>r{      s'   9  ! G G  F . 2 > M   $6 $6N'0> '0Z 5937.204w6w6 w6 #8D>2	w6
 !$0w6 UOw6 w6 \*+w6 n-w6 w6 8E?HUO+,w6r!   