
    qi)                         d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZmZmZmZ ddlmZ dd	lmZ dd
lmZmZmZ e G d de             Z G d de      Z G d de      Z G d de      Zy)a  Synchronous parallel pipeline implementation for concurrent frame processing.

This module provides a pipeline that processes frames through multiple parallel
pipelines simultaneously, synchronizing their output to maintain frame ordering
and prevent duplicate processing.
    N)	dataclass)chain)List)logger)ControlFrameEndFrameFrameSystemFrame)BasePipeline)Pipeline)FrameDirectionFrameProcessorFrameProcessorSetupc                       e Zd ZdZy)	SyncFramezControl frame used to synchronize parallel pipeline processing.

    This frame is sent through parallel pipelines to determine when the
    internal pipelines have finished processing a batch of frames.
    N)__name__
__module____qualname____doc__     Y/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/pipeline/sync_parallel_pipeline.pyr   r      s     	r   r   c                   P     e Zd ZdZdej
                  f fdZdedef fdZ	 xZ
S )SyncParallelPipelineSourcezSource processor for synchronous parallel pipeline processing.

    Routes frames to parallel pipelines and collects upstream responses
    for synchronization purposes.
    upstream_queuec                 4    t         |   d       || _        y)zInitialize the sync parallel pipeline source.

        Args:
            upstream_queue: Queue for collecting upstream frames from the pipeline.
        Tenable_direct_modeN)super__init__	_up_queue)selfr   	__class__s     r   r    z#SyncParallelPipelineSource.__init__-   s     	D1'r   frame	directionc                   K   t         |   ||       d{    |xt        j                  k(  r% | j                  j                  |       d{    yt        j                  k(  r| j                  ||       d{    yy7 k7 57 
wzProcess frames and route them based on direction.

        Args:
            frame: The frame to process.
            direction: The direction of frame flow.
        N)r   process_framer   UPSTREAMr!   put
DOWNSTREAM
push_framer"   r$   r%   r#   s      r   r(   z(SyncParallelPipelineSource.process_frame6   su      g#E9555(((nn((///**ooeY777 + 	6 07s3   B	B7B	B,B	<B=B	B	B	r   r   r   r   asyncioQueuer    r	   r   r(   __classcell__r#   s   @r   r   r   &   s0    (w}} (8 8> 8 8r   r   c                   P     e Zd ZdZdej
                  f fdZdedef fdZ	 xZ
S )SyncParallelPipelineSinkzSink processor for synchronous parallel pipeline processing.

    Collects downstream frames from parallel pipelines and routes
    upstream frames back through the pipeline.
    downstream_queuec                 4    t         |   d       || _        y)zInitialize the sync parallel pipeline sink.

        Args:
            downstream_queue: Queue for collecting downstream frames from the pipeline.
        Tr   N)r   r    _down_queue)r"   r5   r#   s     r   r    z!SyncParallelPipelineSink.__init__M   s     	D1+r   r$   r%   c                   K   t         |   ||       d{    |xt        j                  k(  r | j	                  ||       d{    yt        j
                  k(  r$| j                  j                  |       d{    yy7 k7 >7 
wr'   )r   r(   r   r)   r,   r+   r7   r*   r-   s      r   r(   z&SyncParallelPipelineSink.process_frameV   sw      g#E9555(((ooeY777**&&**5111 + 	6 81s3   B	B.B	B5B	<B=B	B	B	r.   r2   s   @r   r4   r4   F   s0    , ,2 2> 2 2r   r4   c                        e Zd ZdZ fdZed        Zeded   fd       Zdee	   fdZ
def fd	Z fd
Zdedef fdZ xZS )SyncParallelPipelinea5  Pipeline that processes frames through multiple parallel pipelines synchronously.

    Creates multiple parallel processing paths that all receive the same input frames
    and produces synchronized output. Each parallel path is a separate pipeline that
    processes frames independently, with synchronization points to ensure consistent
    ordering and prevent duplicate frame processing.

    The pipeline uses SyncFrame control frames to coordinate between parallel paths
    and ensure all paths have completed processing before moving to the next frame.
    c                    t         |           t        |      dk(  rt        d      g | _        g | _        g | _        t        j                         | _	        t        j                         | _
        t        j                  d|  d       |D ]  }t        |t              st        d| d      t        j                         }t        j                         }t!        |      }t#        |      }| j
                  j%                  ||d       | j                  j%                  ||d       t'        |||      }| j                  j%                  |        t        j                  d	|  d       y
)a  Initialize the synchronous parallel pipeline.

        Args:
            *args: Variable number of processor lists, each representing a parallel pipeline path.
                   Each argument should be a list of FrameProcessor instances.

        Raises:
            Exception: If no arguments are provided.
            TypeError: If any argument is not a list of processors.
        r   z0SyncParallelPipeline needs at least one argumentz	Creating z
 pipelineszSyncParallelPipeline argument z is not a list)	processorqueue)sourcesinkzFinished creating N)r   r    len	Exception_sinks_sources
_pipelinesr/   r0   r!   r7   r   debug
isinstancelist	TypeErrorr   r4   appendr   )	r"   args
processorsup_queue
down_queuer>   r?   pipeliner#   s	           r   r    zSyncParallelPipeline.__init__r   s0    	t9>NPP "==?yj12 	-Jj$/"@N [\\ }}H J/9F+J7D MM  v
!KLKKTHEF  
6EHOO""8,#	-& 	)$z:;r   c                     | j                   S )a?  Return the list of sub-processors contained within this processor.

        Only compound processors (e.g. pipelines and parallel pipelines) have
        sub-processors. Non-compound processors will return an empty list.

        Returns:
            The list of sub-processors if this is a compound processor.
        )rD   r"   s    r   rK   zSyncParallelPipeline.processors   s     r   returnr   c                     | j                   S )a  Return the list of entry processors for this processor.

        Entry processors are the first processors in a compound processor
        (e.g. pipelines, parallel pipelines). Note that pipelines can also be an
        entry processor as pipelines are processors themselves. Non-compound
        processors will simply return an empty list.

        Returns:
            The list of entry processors.
        )rC   rP   s    r   entry_processorsz%SyncParallelPipeline.entry_processors   s     }}r   c                 `    t        t        j                  d | j                  D                    S )zCollect processors that can generate metrics from all parallel pipelines.

        Returns:
            List of frame processors that support metrics collection from all parallel paths.
        c              3   <   K   | ]  }|j                           y w)N)processors_with_metrics).0ps     r   	<genexpr>z?SyncParallelPipeline.processors_with_metrics.<locals>.<genexpr>   s     '](A(A(C']s   )rG   r   from_iterablerD   rP   s    r   rV   z,SyncParallelPipeline.processors_with_metrics   s$     E''']T__']]^^r   setupc                    K   t         |   |       d{    t        j                  | j                  D cg c]  }|j                  |       c}  d{    y7 Fc c}w 7 w)zSet up the parallel pipeline and all contained processors.

        Args:
            setup: Configuration for frame processor setup.
        N)r   r[   r/   gatherrD   )r"   r[   rX   r#   s      r   r[   zSyncParallelPipeline.setup   sO      gmE"""nntG!qwwu~GHHH 	#GHs,   A&A!A&AA&A$A&A&c                    K   t         |           d{    t        j                  | j                  D cg c]  }|j                          c}  d{    y7 Ec c}w 7 w)z<Clean up the parallel pipeline and all contained processors.N)r   cleanupr/   r]   rD   )r"   rX   r#   s     r   r_   zSyncParallelPipeline.cleanup   sI     gonnDOODqqyy{DEEE 	 DEs,   A$A!A$AA$A"A$A$r$   r%   c                 L  K   t         |   ||       d{    dt        j                  dt        dt
        fd}|t
        j                  k(  rFt        j                  | j                  D cg c]  } ||| j                  ||       c}  d{    nX|t
        j                  k(  rEt        j                  | j                  D cg c]  } ||| j                  ||       c}  d{    t               }| j                  j                         s| j                  j                          d{   }|j                   |vrC| j#                  |t
        j                         d{    |j%                  |j                          | j                  j'                          | j                  j                         st               }| j                  j                         s| j                  j                          d{   }|j                   |vrC| j#                  |t
        j                         d{    |j%                  |j                          | j                  j'                          | j                  j                         syy7 pc c}w 7  c c}w 7 7 l7 97 7 qw)a  Process frames through all parallel pipelines with synchronization.

        Distributes frames to all parallel pipelines and synchronizes their output
        to maintain proper ordering and prevent duplicate processing. Uses SyncFrame
        control frames to coordinate between parallel paths.

        Args:
            frame: The frame to process.
            direction: The direction of frame flow.
        N
main_queuer$   r%   c                 T  K   | d   }| d   }|j                  ||       d {    t        |t        t        f      r|j	                          d {   }t        |t        t        f      r|j                  |       d {    y t        |t        t        f      sY|j                  |       d {    |j                          |j	                          d {   }t        |t        t        f      sXy y |j                  t               |       d {    |j	                          d {   }t        |t              sS|j                  |       d {    |j                          |j	                          d {   }t        |t              sRy y 7 q7 F7 7 7 7 7 w7 P7 *w)Nr<   r=   )r(   rF   r
   r   getr*   	task_doner   )objra   r$   r%   r<   r=   	new_frames          r   wait_for_syncz9SyncParallelPipeline.process_frame.<locals>.wait_for_sync   sJ     K(ILE))%;;;%+x!89"'))+-	i+x)@A$..333([(4KL(nnY777)*/))+$5	 )[(4KL
  --ik9EEE"'))+-	$Y	:$..333OO%&+iik 1I %Y	: < .3 8$5E-3 1s    F(F-F(F.F(?F /F(/F0'F(FF(3F(F F(*F"+(F(F$'F(;F&<F(F(F(F(F(F( F("F($F(&F()r   r(   r/   r0   r	   r   r)   r]   rB   r!   r+   rC   r7   setemptyrc   idr,   addrd   )r"   r$   r%   rg   sseen_idsr#   s         r   r(   z"SyncParallelPipeline.process_frame   s     g#E9555	2$]]	238	2ES	22 ///..NRkkZ-4>>5)DZ   .333..PTP]P]^1-4#3#3UIF^   5..&&(..,,..Exxx'ooe^-D-DEEEUXX&NN$$& ..&&( 5""((***..00Exxx'ooe^-F-FGGGUXX&&&( ""((*k 	6H [ _ /E 1Gs   J$JAJ$,J
J$J5J$J!J$(J)AJ$.J/5J$$J%AJ$9AJ$:J ;5J$0J"1AJ$J$
J$J$J$J$ J$"J$)r   r   r   r   r    propertyrK   r   rS   r   rV   r   r[   r_   r	   r   r(   r1   r2   s   @r   r:   r:   f   s    	+<b 	 	 $'7"8  _n)= _I!4 IF
E) E)> E) E)r   r:   )r   r/   dataclassesr   	itertoolsr   typingr   logurur   pipecat.frames.framesr   r   r	   r
   pipecat.pipeline.base_pipeliner   pipecat.pipeline.pipeliner   "pipecat.processors.frame_processorr   r   r   r   r   r4   r:   r   r   r   <module>rw      sp     !    L L 7 . b b 	 	 	8 8@2~ 2@r)< r)r   