
    qi                     b    d Z ddlZddlmZmZmZ ddlmZ ddlm	Z	m
Z
 defdZ G d d	e
      Zy)
z8Producer processor for frame filtering and distribution.    N)	AwaitableCallableList)Frame)FrameDirectionFrameProcessorframec                    K   | S w)zDefault transformer that returns the frame unchanged.

    Args:
        frame: The frame to transform.

    Returns:
        The same frame without modifications.
     )r	   s    W/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/processors/producer_processor.pyidentity_transformerr      s      Ls   c                        e Zd ZdZedddeegee   f   deegee   f   def fdZ	d Z
d	ed
ef fdZd	efdZ xZS )ProducerProcessora|  A processor that filters frames and distributes them to multiple consumers.

    This processor receives frames, applies a filter to determine which frames
    should be sent to consumers (ConsumerProcessor), optionally transforms those
    frames, and distributes them to registered consumer queues. It can also pass
    frames through to the next processor in the pipeline.
    T)transformerpassthroughfilterr   r   c                Z    t         |           || _        || _        || _        g | _        y)a2  Initialize the producer processor.

        Args:
            filter: Async function that determines if a frame should be produced.
                   Must return True for frames to be sent to consumers.
            transformer: Async function to transform frames before sending to consumers.
                        Defaults to identity_transformer which returns frames unchanged.
            passthrough: Whether to pass frames through to the next processor.
                        If True, all frames continue downstream regardless of filter result.
        N)super__init___filter_transformer_passthrough
_consumers)selfr   r   r   	__class__s       r   r   zProducerProcessor.__init__%   s.    " 	''/1    c                 d    t        j                         }| j                  j                  |       |S )zAdd a new consumer and return its associated queue.

        Returns:
            asyncio.Queue: The queue for the newly added consumer.
        )asyncioQueuer   append)r   queues     r   add_consumerzProducerProcessor.add_consumer<   s&     u%r   r	   	directionc                 :  K   t         |   ||       d{    | j                  |       d{   rA| j                  |       d{    | j                  r| j                  ||       d{    yy| j                  ||       d{    y7 y7 b7 K7 '7 w)a}  Process an incoming frame and determine whether to produce it.

        If the frame meets the filter criteria, it will be transformed and added
        to all consumer queues. If passthrough is enabled, the original frame
        will also be sent downstream.

        Args:
            frame: The frame to process.
            direction: The direction of the frame flow.
        N)r   process_framer   _producer   
push_frame)r   r	   r#   r   s      r   r%   zProducerProcessor.process_frameF   s      g#E9555e$$$--&&&  ooeY777 ! //%333 	6$&73sU   BBBBB	B
%B/B0BBBBBBBc                    K   | j                   D ]4  }| j                  |       d{   }|j                  |       d{    6 y7  7 	w)z!Produce a frame to all consumers.N)r   r   put)r   r	   consumer	new_frames       r   r&   zProducerProcessor._produceZ   sE      	*H"//66I,,y)))	*6)s    $AAAA	 A	A)__name__
__module____qualname____doc__r   r   r   r   boolr   r"   r   r%   r&   __classcell__)r   s   @r   r   r      sy     <P 2 %)D/122 ugy'778	2
 2.4 4> 4(*E *r   r   )r/   r   typingr   r   r   pipecat.frames.framesr   "pipecat.processors.frame_processorr   r   r   r   r   r   r   <module>r5      s3    ?  , , ' M	e 	B* B*r   