
    qiO	                     f    d Z ddlZddlmZmZ ddlmZmZmZ ddl	m
Z
mZ ddlmZ  G d de      Zy)	z@Async generator processor for frame serialization and streaming.    N)AnyAsyncGenerator)CancelFrameEndFrameFrame)FrameDirectionFrameProcessor)FrameSerializerc                   R     e Zd ZdZdef fdZdedef fdZde	e
df   fd	Z xZS )
AsyncGeneratorProcessoraw  A frame processor that serializes frames and provides them via async generator.

    This processor passes frames through unchanged while simultaneously serializing
    them and making the serialized data available through an async generator interface.
    Useful for streaming frame data to external consumers while maintaining the
    normal frame processing pipeline.
    
serializerc                d    t        |   di | || _        t        j                         | _        y)zInitialize the async generator processor.

        Args:
            serializer: The frame serializer to use for converting frames to data.
            **kwargs: Additional arguments passed to the parent FrameProcessor.
        N )super__init___serializerasyncioQueue_data_queue)selfr   kwargs	__class__s      T/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/processors/async_generator.pyr   z AsyncGeneratorProcessor.__init__   s+     	"6"%"==?    frame	directionc                   K   t         |   ||       d{    | j                  ||       d{    t        |t        t
        f      r$| j                  j                  d       d{    y| j                  j                  |       d{   }|r$| j                  j                  |       d{    yy7 7 7 S7 17 w)zProcess frames by passing them through and queuing serialized data.

        Args:
            frame: The frame to process.
            direction: The direction of frame flow in the pipeline.
        N)
r   process_frame
push_frame
isinstancer   r   r   putr   	serialize)r   r   r   datar   s       r   r   z%AsyncGeneratorProcessor.process_frame)   s      g#E9555ooeY///ek845""&&t,,,))33E::D&&**4000  	6/ -:0sU   CB:CB<8C*B>+#CC $C3C4C<C>C CCreturnNc                z   K   d}|r1| j                   j                          d{   }|du}|r| |r0yy7 w)zGenerate serialized frame data asynchronously.

        Yields:
            Serialized frame data from the internal queue until a termination
            signal (None) is received.
        TN)r   get)r   runningr#   s      r   	generatorz!AsyncGeneratorProcessor.generator;   sA      ))--//D$&G
	 /s   ";9;;)__name__
__module____qualname____doc__r
   r   r   r   r   r   r   r(   __classcell__)r   s   @r   r   r      s<    	+o 	+1 1> 1$T	!: r   r   )r,   r   typingr   r   pipecat.frames.framesr   r   r   "pipecat.processors.frame_processorr   r	   #pipecat.serializers.base_serializerr
   r   r   r   r   <module>r2      s0    G  & 
 N ?2n 2r   