
    qi                         d Z ddlmZ ddlmZmZ ddlmZ ddlm	Z	m
Z
mZmZ ddlmZ ddlmZmZmZ ddlmZmZmZ  G d	 d
e      Zy)a  Parallel pipeline implementation for concurrent frame processing.

This module provides a parallel pipeline that processes frames through multiple
sub-pipelines concurrently, with coordination for system frames and proper
handling of pipeline lifecycle events.
    )chain)DictList)logger)CancelFrameEndFrameFrame
StartFrame)BasePipeline)PipelinePipelineSinkPipelineSource)FrameDirectionFrameProcessorFrameProcessorSetupc                        e Zd ZdZ fdZed        Zeded   fd       Zdee	   fdZ
def fd	Z fd
Zdedef fdZdedefdZdedefdZd Z xZS )ParallelPipelinea~  Pipeline that processes frames through multiple sub-pipelines concurrently.

    Creates multiple parallel processing branches from the provided processor lists,
    coordinating frame flow and ensuring proper synchronization of lifecycle events
    like EndFrames. Each branch runs independently while system frames are handled
    specially to maintain pipeline coordination.
    c                 F   t         |           t        |      dk(  rt        d      g | _        t               | _        i | _        d| _        g | _	        t        j                  d|  d       |D ]  }t        |t              st        d| d      t        | j                        }t        | j                   |  d| 	      }t#        | j$                  |  d
| 	      }t'        |||      }| j                  j)                  |        t        j                  d|  d       y)a6  Initialize the parallel pipeline with processor lists.

        Args:
            *args: Variable number of processor lists, each becoming a parallel branch.

        Raises:
            Exception: If no processor lists are provided.
            TypeError: If any argument is not a list of processors.
        r   z,ParallelPipeline needs at least one argumentFz	Creating z
 pipelineszParallelPipeline argument z is not a listz::Source)namez::Sink)sourcesinkzFinished creating N)super__init__len	Exception
_pipelinesset	_seen_ids_frame_counter_synchronizing_buffered_framesr   debug
isinstancelist	TypeErrorr   _parallel_push_framer   _pipeline_sink_push_framer   append)selfargs
processorsnum_pipelinesr   r   pipeline	__class__s          T/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/pipeline/parallel_pipeline.pyr   zParallelPipeline.__init__"   s    	t9>JLL.0$)DFyj12 	-Jj$/"<ZL WXX0M $))4&0PF   > >vVTaSbEcdD  
6EHOO""8,	-" 	)$z:;    c                     | j                   S )a?  Return the list of sub-processors contained within this processor.

        Only compound processors (e.g. pipelines and parallel pipelines) have
        sub-processors. Non-compound processors will return an empty list.

        Returns:
            The list of sub-processors if this is a compound processor.
        r   r)   s    r/   r+   zParallelPipeline.processorsS   s     r0   returnr   c                     | j                   S )a  Return the list of entry processors for this processor.

        Entry processors are the first processors in a compound processor
        (e.g. pipelines, parallel pipelines). Note that pipelines can also be an
        entry processor as pipelines are processors themselves. Non-compound
        processors will simply return an empty list.

        Returns:
            The list of entry processors.
        r2   r3   s    r/   entry_processorsz!ParallelPipeline.entry_processors_   s     r0   c                 `    t        t        j                  d | j                  D                    S )zCollect processors that can generate metrics from all parallel branches.

        Returns:
            List of frame processors that support metrics collection from all branches.
        c              3   <   K   | ]  }|j                           y w)N)processors_with_metrics).0ps     r/   	<genexpr>z;ParallelPipeline.processors_with_metrics.<locals>.<genexpr>s   s     '](A(A(C']s   )r$   r   from_iterabler   r3   s    r/   r9   z(ParallelPipeline.processors_with_metricsm   s$     E''']T__']]^^r0   setupc                    K   t         |   |       d{    | j                  D ]  }|j                  |       d{     y7 /7 	w)zSet up the parallel pipeline and all its branches.

        Args:
            setup: Configuration for frame processor setup.

        Raises:
            TypeError: If any processor list argument is not actually a list.
        N)r   r>   r   )r)   r>   r;   r.   s      r/   r>   zParallelPipeline.setupu   sH      gmE""" 	!A''%.  	! 	# s   A
A'A
AA
A
c                    K   t         |           d{    | j                  D ]  }|j                          d{     y7 .7 	w)z4Clean up the parallel pipeline and all its branches.N)r   cleanupr   )r)   r;   r.   s     r/   rA   zParallelPipeline.cleanup   sB     go 	A))+	 	 s   AA&AAAAframe	directionc                   K   t         |   ||       d{    t        |t        t        t
        f      rct        | j                        | j                  |j                  <   d| _
        | j                          d{    | j                          d{    | j                  D ]  }|j                  ||       d{     y7 7 J7 47 w)zProcess frames through all parallel branches with lifecycle coordination.

        Args:
            frame: The frame to process.
            direction: The direction of frame flow.
        NT)r   process_framer#   r
   r   r   r   r   r   idr    pause_processing_system_framespause_processing_framesqueue_frame)r)   rB   rC   r;   r.   s       r/   rE   zParallelPipeline.process_frame   s      g#E9555 ej(K@A,/,@D)"&D55777..000  	2A--y111	2 	6 80 2sF   CCA%C=C>CC
(C>C?CC
CCc                   K   |j                   | j                  vrj| j                  j                  |j                          | j                  r| j                  j                  ||f       y| j                  ||       d{    yy7 w)a   Push frames while avoiding duplicates using frame ID tracking.

        During lifecycle frame synchronization, non-lifecycle frames are buffered
        to prevent them from escaping the parallel pipeline before all branches
        have finished processing the lifecycle frame.
        N)rF   r   addr    r!   r(   
push_frame)r)   rB   rC   s      r/   r&   z%ParallelPipeline._parallel_push_frame   sn      884>>)NNuxx(""%%,,eY-?@ooeY777 *
 8s   A=B?B Bc                 4  K   t        |t        t        t        f      r| j                  j                  |j                  d      }|dkD  r:| j                  |j                  xx   dz  cc<   | j                  |j                     }|dk(  rjd| _        | j                  ||       d {    | j                          d {    | j                          d {    | j                          d {    y y | j                  ||       d {    y 7 i7 S7 =7 '7 w)Nr      F)r#   r
   r   r   r   getrF   r    r&   _flush_buffered_framesresume_processing_system_framesresume_processing_frames)r)   rB   rC   frame_counters       r/   r'   z*ParallelPipeline._pipeline_sink_push_frame   s     ej(K@A //33EHHa@Mq ##EHH-2- $ 3 3EHH = !&+#//yAAA11333::<<<33555 " ++E9=== B3<5=sZ   B"D$D%D<D=DDD,D-DD	DDDDDc                 ~   K   | j                   }g | _         |D ]  \  }}| j                  ||       d{    ! y7 w)zGFlush frames that were buffered during lifecycle frame synchronization.N)r!   rL   )r)   framesrB   rC   s       r/   rP   z'ParallelPipeline._flush_buffered_frames   sD     && " & 	4E9//%333	43s   1=;=)__name__
__module____qualname____doc__r   propertyr+   r   r6   r   r9   r   r>   rA   r	   r   rE   r&   r'   rP   __classcell__)r.   s   @r/   r   r      s    +<b 	 	 $'7"8  _n)= _!!4 !2 2> 2(8 8. 8>U >~ >&4r0   r   N)rY   	itertoolsr   typingr   r   logurur   pipecat.frames.framesr   r   r	   r
   pipecat.pipeline.base_pipeliner   pipecat.pipeline.pipeliner   r   r   "pipecat.processors.frame_processorr   r   r   r    r0   r/   <module>rd      s8       J J 7 L L b bi4| i4r0   