
    qiV                         d Z ddlZddlZddlmZmZmZmZ ddlm	Z	 ddl
mZmZmZ ddlmZ e	 G d d             Z G d	 d
      Z G d de      Zy)zTask observer for managing pipeline frame observers.

This module provides a proxy observer system that manages multiple observers
for pipeline frame events, ensuring that observer processing doesn't block
the main pipeline execution.
    N)AnyDictListOptional)	dataclass)BaseObserverFrameProcessedFramePushed)BaseTaskManagerc                   X    e Zd ZU dZej
                  ed<   ej                  ed<   eed<   y)Proxya{  Proxy data for managing observer tasks and queues.

    This represents is the data received from the main observer that
    is queued for later processing.

    Parameters:
        queue: Queue for frame data awaiting observer processing.
        task: Asyncio task running the observer's frame processing loop.
        observer: The actual observer instance being proxied.
    queuetaskobserverN)	__name__
__module____qualname____doc__asyncioQueue__annotations__Taskr        P/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/pipeline/task_observer.pyr   r      s#    	 ==
,,r   r   c                       e Zd ZdZy)_PipelineStartedSignalzDInternal sentinel queued to observers when the pipeline has started.N)r   r   r   r   r   r   r   r   r   *   s    Nr   r   c                        e Zd ZdZdddeee      def fdZdefdZ	defd	Z
d
 Zd Z fdZd ZdefdZdefdZdedefdZdee   deeef   fdZdefdZdej4                  defdZ xZS )TaskObservera  Proxy observer that manages multiple observers without blocking the pipeline.

    This is a pipeline frame observer that is meant to be used as a proxy to
    the user provided observers. That is, this is the observer that should be
    passed to the frame processors. Then, every time a frame is pushed this
    observer will call all the observers registered to the pipeline task.

    This observer makes sure that passing frames to observers doesn't block the
    pipeline by creating a queue and a task for each user observer. When a frame
    is received, it will be put in a queue for efficiency and later processed by
    each task.
    N)	observersr    task_managerc                V    t        |   di | |xs g | _        || _        d| _        y)a  Initialize the TaskObserver.

        Args:
            observers: List of observers to manage. Defaults to empty list.
            task_manager: Task manager for creating and managing observer tasks.
            **kwargs: Additional arguments passed to the base observer.
        Nr   )super__init__
_observers_task_manager_proxies)selfr    r!   kwargs	__class__s       r   r$   zTaskObserver.__init__>   s2     	"6"#/r) 	r   r   c                     | j                   j                  |       | j                  r!| j                  |      }|| j                  |<   yy)zjAdd a new observer to the managed list.

        Args:
            observer: The observer to add.
        N)r%   appendr'   _create_proxyr(   r   proxys      r   add_observerzTaskObserver.add_observerS   sB     	x( ==&&x0E&+DMM(# r   c                 *  K   | j                   rW|| j                   v rI| j                   |   }| j                   |= | j                  j                  |j                         d{    || j                  v r| j                  j                  |       yy7 /w)ztRemove an observer and clean up its resources.

        Args:
            observer: The observer to remove.
        N)r'   r&   cancel_taskr   r%   remover.   s      r   remove_observerzTaskObserver.remove_observerb   s|      ==X6MM(+Eh'$$00<<< t&OO""8, ' =s   AB!B"0Bc                 L   K   | j                  | j                        | _        yw)zStart all proxy observer tasks.N)_create_proxiesr%   r'   r(   s    r   startzTaskObserver.startt   s     ,,T__=s   "$c                    K   | j                   sy| j                   j                         D ]/  }| j                  j                  |j                         d{    1 y7 w)zStop all proxy observer tasks.N)r'   valuesr&   r2   r   )r(   r/   s     r   stopzTaskObserver.stopx   sM     }}]]))+ 	=E$$00<<<	=<s   AAAAc                    K   t         |           d{    | j                  sy| j                  D ]  }|j                          d{     y7 ;7 	w)zCleanup all proxy observers.N)r#   cleanupr'   )r(   r/   r*   s     r   r=   zTaskObserver.cleanup   sN     go}}]] 	"E--/!!	" 	  "s!   AA3A	A
AAc                 R   K   | j                  t                      d{    y7 w)z9Forward pipeline started signal to all managed observers.N)_send_to_proxyr   r7   s    r   on_pipeline_startedz TaskObserver.on_pipeline_started   s     !!"8":;;;s   '%'datac                 B   K   | j                  |       d{    y7 wzQueue frame data for all managed observers.

        Args:
            data: The frame push event data to distribute to observers.
        Nr?   r(   rA   s     r   on_process_framezTaskObserver.on_process_frame         !!$'''   c                 B   K   | j                  |       d{    y7 wrC   rD   rE   s     r   on_push_framezTaskObserver.on_push_frame   rG   rH   returnc                     t        j                         }| j                  j                  | j	                  ||      d| d      }t        |||      }|S )z%Create a proxy for a single observer.zTaskObserver::z::_proxy_task_handler)r   r   r   )r   r   r&   create_task_proxy_task_handlerr   )r(   r   r   r   r/   s        r   r-   zTaskObserver._create_proxy   sU    !!--$$UH5XJ&;<
 Ex@r   c                 D    i }|D ]  }| j                  |      }|||<    |S )z!Create proxies for all observers.)r-   )r(   r    proxiesr   r/   s        r   r6   zTaskObserver._create_proxies   s7    ! 	&H&&x0E %GH	& r   c                    K   | j                   j                         D ]%  }|j                  j                  |       d {    ' y 7 w)N)r'   r:   r   put)r(   rA   r/   s      r   r?   zTaskObserver._send_to_proxy   s;     ]]))+ 	(E++//$'''	('s   <AAAr   c                 &  K   d}t        j                  |j                        }t        |j                        dkD  rFddl}|j                         5  |j                  d       |j                  dt               ddd       d}	 |j                          d{   }t        |t              r|j                          d{    nt        |t              rl|rP|j                  |j                  |j                   |j"                  |j$                  |j&                         d{    nC|j                  |       d{    n)t        |t(              r|j+                  |       d{    |j-                          # 1 sw Y   xY w7 7 7 j7 R7 *w)z.Handle frame processing for a single observer.F   r   NalwayszObserver `on_push_frame(source, destination, frame, direction, timestamp)` is deprecated, us `on_push_frame(data: FramePushed)` instead.T)inspect	signaturerJ   len
parameterswarningscatch_warningssimplefilterwarnDeprecationWarningget
isinstancer   r@   r
   sourcedestinationframe	direction	timestampr	   rF   	task_done)r(   r   r   on_push_frame_deprecatedrW   rZ   rA   s          r   rN   z TaskObserver._proxy_task_handler   sY    #( %%h&<&<=	y##$q(((* %%h/ _& (,$$D$ 6722444D+.+"00T%5%5tzz4>>SWSaSa   #00666D.1//555OO   % 5 75ss   AF(E:8FF'F>F	?A!F F!F:F;)F$F%F:F?	F	FFFF)r   r   r   r   r   r   r   r   r$   r0   r4   r8   r;   r=   r@   r	   rF   r
   rJ   r   r-   r   r6   r   r?   r   r   rN   __classcell__)r*   s   @r   r   r   0   s      37
 D./
 &	
*,\ ,-l -$>="<(> (( (l u l); \SXEX@Y ( (w}}  r   r   )r   r   rV   typingr   r   r   r   attrr   pipecat.observers.base_observerr   r	   r
   "pipecat.utils.asyncio.task_managerr   r   r   r   r   r   r   <module>rm      sU      , ,  U U >   "	 	c< cr   