
    qi                         d Z ddlmZmZmZmZ ddlmZ ddlm	Z	 ddl
mZmZmZ  G d de      Z G d d	e      Z G d
 de	      Zy)a  Pipeline implementation for connecting and managing frame processors.

This module provides the main Pipeline class that connects frame processors
in sequence and manages frame flow between them, along with helper classes
for pipeline source and sink operations.
    )Callable	CoroutineListOptional)Frame)BasePipeline)FrameDirectionFrameProcessorFrameProcessorSetupc                   J     e Zd ZdZdeeegef   f fdZdedef fdZ	 xZ
S )PipelineSourcezSource processor that forwards frames to an upstream handler.

    This processor acts as the entry point for a pipeline, forwarding
    downstream frames to the next processor and upstream frames to a
    provided upstream handler function.
    upstream_push_framec                 6    t        |   dddi| || _        y)zInitialize the pipeline source.

        Args:
            upstream_push_frame: Coroutine function to handle upstream frames.
            **kwargs: Additional arguments passed to parent class.
        enable_direct_modeTN )super__init___upstream_push_frame)selfr   kwargs	__class__s      K/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/pipeline/pipeline.pyr   zPipelineSource.__init__   s"     	;D;F;$7!    frame	directionc                   K   t         |   ||       d{    |xt        j                  k(  r | j	                  ||       d{    yt        j
                  k(  r| j                  ||       d{    yy7 b7 57 
wzProcess frames and route them based on direction.

        Args:
            frame: The frame to process.
            direction: The direction of frame flow.
        N)r   process_framer	   UPSTREAMr   
DOWNSTREAM
push_framer   r   r   r   s      r   r   zPipelineSource.process_frame'   st      g#E9555(((//yAAA**ooeY777 + 	6 B73   B A:.B A<,B 3A>4B <B >B __name__
__module____qualname____doc__r   r   r	   r   r   r   __classcell__r   s   @r   r   r      s;    8He^5Li5W,X 88 8> 8 8r   r   c                   J     e Zd ZdZdeeegef   f fdZdedef fdZ	 xZ
S )PipelineSinkzSink processor that forwards frames to a downstream handler.

    This processor acts as the exit point for a pipeline, forwarding
    upstream frames to the previous processor and downstream frames to a
    provided downstream handler function.
    downstream_push_framec                 6    t        |   dddi| || _        y)zInitialize the pipeline sink.

        Args:
            downstream_push_frame: Coroutine function to handle downstream frames.
            **kwargs: Additional arguments passed to parent class.
        r   TNr   )r   r   _downstream_push_frame)r   r-   r   r   s      r   r   zPipelineSink.__init__?   s"     	;D;F;&;#r   r   r   c                   K   t         |   ||       d{    |xt        j                  k(  r | j	                  ||       d{    yt        j
                  k(  r| j                  ||       d{    yy7 b7 57 
wr   )r   r   r	   r   r!   r    r/   r"   s      r   r   zPipelineSink.process_frameK   ss      g#E9555(((ooeY777**11%CCC + 	6 8Cr#   r$   r*   s   @r   r,   r,   7   s@    
<%-un.Ey.P%Q
<D D> D Dr   r,   c                        e Zd ZdZddddee   dee   dee   f fdZed        Z	ed	ed
   fd       Z
d Zdef fdZ fdZdedef fdZdefdZd Zd Z xZS )Pipelinea#  Main pipeline implementation that connects frame processors in sequence.

    Creates a linear chain of frame processors with automatic source and sink
    processors for external frame handling. Manages processor lifecycle and
    provides metrics collection from contained processors.
    N)sourcesink
processorsr3   r4   c                   t         |   d       |xs t        | j                  |  d      | _        |xs t        | j                  |  d      | _        | j                  g|z   | j                  gz   | _        | j                          y)a  Initialize the pipeline with a list of processors.

        Args:
            processors: List of frame processors to connect in sequence.
            source: An optional pipeline source processor.
            sink: An optional pipeline sink processor.
        T)r   z::Source)namez::SinkN)	r   r   r   r!   _sourcer,   _sink_processors_link_processors)r   r5   r3   r4   r   s       r   r   zPipeline.__init__c   s     	D1 XhFW!XP\$//4&P
26,,*1LPTPZPZ|1[r   c                     | j                   S )a?  Return the list of sub-processors contained within this processor.

        Only compound processors (e.g. pipelines and parallel pipelines) have
        sub-processors. Non-compound processors will return an empty list.

        Returns:
            The list of sub-processors if this is a compound processor.
        )r:   r   s    r   r5   zPipeline.processors   s     r   returnr
   c                     | j                   gS )a  Return the list of entry processors for this processor.

        Entry processors are the first processors in a compound processor
        (e.g. pipelines, parallel pipelines). Note that pipelines can also be an
        entry processor as pipelines are processors themselves. Non-compound
        processors will simply return an empty list.

        Returns:
            The list of entry processors.
        )r8   r=   s    r   entry_processorszPipeline.entry_processors   s     ~r   c                     g }| j                   D ]B  }|j                         r|j                  |       |j                  |j	                                D |S )a  Return processors that can generate metrics.

        Recursively collects all processors that support metrics generation,
        including those from nested pipelines.

        Returns:
            List of frame processors that can generate metrics.
        )r5   can_generate_metricsappendextendprocessors_with_metrics)r   servicesps      r   rE   z Pipeline.processors_with_metrics   sP      	9A%%'"OOA5578	9 r   setupc                 v   K   t         |   |       d{    | j                  |       d{    y7 7 w)zSet up the pipeline and all contained processors.

        Args:
            setup: Configuration for frame processor setup.
        N)r   rH   _setup_processors)r   rH   r   s     r   rH   zPipeline.setup   s8      gmE"""$$U+++ 	#+s   959799c                 r   K   t         |           d{    | j                          d{    y7 7 w)z3Clean up the pipeline and all contained processors.N)r   cleanup_cleanup_processors)r   r   s    r   rL   zPipeline.cleanup   s2     go&&((( 	 (s   737577r   r   c                 b  K   t         |   ||       d{    |t        j                  k(  r3| j                  j                  |t        j                         d{    y|t        j                  k(  r3| j                  j                  |t        j                         d{    yy7 7 N7 
w)zProcess frames by routing them through the pipeline.

        Args:
            frame: The frame to process.
            direction: The direction of frame flow.
        N)r   r   r	   r    r8   queue_framer   r9   r"   s      r   r   zPipeline.process_frame   s      g#E9555111,,**5.2K2KLLL.111**((0G0GHHH 2	 	6 MHs5   B/B)AB/B+AB/"B-#B/+B/-B/c                 d   K   | j                   D ]  }|j                  |       d{     y7 w)z&Set up all processors in the pipeline.N)r:   rH   )r   rH   rG   s      r   rJ   zPipeline._setup_processors   s/     !! 	!A''%.  	! s   $0.0c                 b   K   | j                   D ]  }|j                          d{     y7 w)z(Clean up all processors in the pipeline.N)r:   rL   )r   rG   s     r   rM   zPipeline._cleanup_processors   s-     !! 	A))+	s   #/-/c                 p    | j                   d   }| j                   dd D ]  }|j                  |       |} y)z5Link all processors in sequence and set their parent.r      N)r:   link)r   prevcurrs      r   r;   zPipeline._link_processors   s>    "$$QR( 	DIIdOD	r   )r%   r&   r'   r(   r   r
   r   r   propertyr5   r@   rE   r   rH   rL   r   r	   r   rJ   rM   r;   r)   r*   s   @r   r2   r2   [   s     ,0)- (  (	 
 ~& 8 	  	  $'7"8   ,!4 ,)
I I> I!-@ !

r   r2   N)r(   typingr   r   r   r   pipecat.frames.framesr   pipecat.pipeline.base_pipeliner   "pipecat.processors.frame_processorr	   r
   r   r   r,   r2   r   r   r   <module>r\      sH    7 6 ' 7 b b8^ 8D!D> !DHy| yr   