
    qi#&                     b   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	m
Z
mZmZmZmZmZ ddlmZmZ 	 ddlZ ej(                  dd	        ej(                  d
d	       ddlmZmZ  G d de      Zy# e$ r7Z ej4                  de         ej4                  d        ede       dZ[ww xY w)z2GStreamer pipeline source integration for Pipecat.    N)Optional)logger)	BaseModel)CancelFrameEndFrameFrameOutputAudioRawFrameOutputImageRawFrame
StartFrameSystemFrame)FrameDirectionFrameProcessorGstz1.0GstApp)r   r   zException: zIn order to use GStreamer, you need to `pip install pipecat-ai[gstreamer]`. Also, you need to install GStreamer in your system.zMissing module: c                   v    e Zd ZdZ G d de      Zdddedee   f fdZd	e	d
e
f fdZd	efdZd	efdZd	efdZdej&                  dej(                  fdZdej,                  dej.                  fdZdej.                  fdZdej.                  fdZdej8                  fdZdej8                  fdZ xZS )GStreamerPipelineSourcea+  A frame processor that uses GStreamer pipelines as media sources.

    This processor creates and manages GStreamer pipelines to generate audio and video
    output frames. It handles pipeline lifecycle, decoding, format conversion, and
    frame generation with configurable output parameters.
    c                   ^    e Zd ZU dZdZeed<   dZeed<   dZe	e   ed<   dZ
eed	<   d
Zeed<   y)$GStreamerPipelineSource.OutputParamsa  Output configuration parameters for GStreamer pipeline.

        Parameters:
            video_width: Width of output video frames in pixels.
            video_height: Height of output video frames in pixels.
            audio_sample_rate: Sample rate for audio output. If None, uses frame sample rate.
            audio_channels: Number of audio channels for output.
            clock_sync: Whether to synchronize output with pipeline clock.
        i   video_widthi  video_heightNaudio_sample_rate   audio_channelsT
clock_sync)__name__
__module____qualname____doc__r   int__annotations__r   r   r   r   r   bool     ^/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/processors/gstreamer/pipeline_source.pyOutputParamsr   0   s@    	  Sc+/8C=/
Dr#   r%   N)
out_paramspipeliner&   c                   t        |   di | |xs t        j                         | _        d| _        t        j                          t        j                  j                  d      | _
        t        j                  |d      }t        j                  j                  dd      }|j                  d| j                         | j                  j!                  |       | j                  j!                  |       |j#                  |       | j                  j%                         }|j'                          |j                  d| j(                         y)	a*  Initialize the GStreamer pipeline source.

        Args:
            pipeline: GStreamer pipeline description string for the source.
            out_params: Output configuration parameters. If None, uses defaults.
            **kwargs: Additional arguments passed to parent FrameProcessor.
        r   playerT	decodebinNz	pad-addedmessager"   )super__init__r   r%   _out_params_sample_rater   initPipelinenew_playerparse_bin_from_descriptionElementFactorymakeconnect_decodebin_callbackaddlinkget_busadd_signal_watch_on_gstreamer_message)selfr'   r&   kwargssourcer*   bus	__class__s          r$   r-   z GStreamerPipelineSource.__init__A   s     	"6"%O)@)M)M)O
||''1//$?&&++K>	+t'?'?@ #Ill""$It99:r#   frame	directionc                   K   t         |   ||       d{    t        |t              r4| j	                  ||       d{    | j                  |       d{    yt        |t              r4| j                  |       d{    | j	                  ||       d{    yt        |t              r| j	                  ||       d{    yt        |t              r4| j	                  ||       d{    | j                  |       d{    y| j	                  ||       d{    y7 7 7 7 7 7 o7 F7 /7 w)zProcess incoming frames and manage GStreamer pipeline lifecycle.

        Args:
            frame: The frame to process.
            direction: The direction of frame processing.
        N)r,   process_frame
isinstancer   
push_frame_startr   _cancelr   r   _stop)r>   rC   rD   rB   s      r$   rF   z%GStreamerPipelineSource.process_frame_   s     g#E9555 eZ( //%333++e$$${+,,u%%%//%333{+//%333x( //%333**U### //%333- 	6 4$%3 4
 4# 4s   ED.)ED1ED3)ED5ED7*E	D9
*E4D;5ED=E(D?)E1E3E5E7E9E;E=E?Ec                    K   | j                   j                  xs |j                  | _        | j                  j                  t        j                  j                         yw)zStart the GStreamer pipeline.N)	r.   r   audio_out_sample_rater/   r3   	set_stater   StatePLAYINGr>   rC   s     r$   rI   zGStreamerPipelineSource._start~   sA      ,,>>]%B]B]syy001s   AA c                 r   K   | j                   j                  t        j                  j                         yw)zStop the GStreamer pipeline.Nr3   rN   r   rO   NULLrQ   s     r$   rK   zGStreamerPipelineSource._stop         syy~~.   57c                 r   K   | j                   j                  t        j                  j                         yw)zCancel the GStreamer pipeline.NrS   rQ   s     r$   rJ   zGStreamerPipelineSource._cancel   rU   rV   rA   r+   c                     |j                   }|t        j                  j                  k(  r0|j	                         \  }}t        j                  |  d| d|        y)zHandle GStreamer bus messages.z error: z : T)typer   MessageTypeERRORparse_errorr   error)r>   rA   r+   terrdebugs         r$   r=   z-GStreamerPipelineSource._on_gstreamer_message   sO    LL%%% ,,.JCLLD6#c%9:r#   r*   padc                     |j                         j                         }|j                  d      r| j                  |       y|j                  d      r| j	                  |       yy)z'Handle new pads from decodebin element.audiovideoN)get_current_caps	to_string
startswith_decodebin_audio_decodebin_video)r>   r*   ra   caps_strings       r$   r8   z+GStreamerPipelineSource._decodebin_callback   sV    **,668!!'*!!#&##G,!!#& -r#   c                    t         j                  j                  dd      }t         j                  j                  dd      }t         j                  j                  dd      }t         j                  j                  dd      }t         j                  j	                  d| j
                   d| j                  j                   d      }|j                  d	|       t         j                  j                  d
d      }|j                  dd       |j                  d| j                  j                         |j                  d| j                         | j                  j                  |       | j                  j                  |       | j                  j                  |       | j                  j                  |       | j                  j                  |       |j                          |j                          |j                          |j                          |j                          |j                  |       |j                  |       |j                  |       |j                  |       |j!                  d      }|j                  |       y)z8Set up audio processing pipeline from decoded audio pad.queueNaudioconvertaudioresample
capsfilterzaudio/x-raw,format=S16LE,rate=z
,channels=z,layout=interleavedcapsappsinkemit-signalsTsync
new-samplesink)r   r5   r6   Capsfrom_stringr/   r.   r   set_propertyr   r7   _appsink_audio_new_sampler3   r9   sync_state_with_parentr:   get_static_pad)	r>   ra   queue_audiorm   rn   audiocapsfilter	audiocapsappsink_audio	queue_pads	            r$   rh   z(GStreamerPipelineSource._decodebin_audio   s   ((--gt<))..~tD**//F,,11,EHH((,T->->,?z$JZJZJiJiIjj}~
	 	$$VY7**//	4@"">48""64+;+;+F+FGlD,J,JK%&')'**,++-,,...0,,.&-(?+]+..v6	r#   c                 $   t         j                  j                  dd      }t         j                  j                  dd      }t         j                  j                  dd      }t         j                  j                  dd      }t         j                  j	                  d| j
                  j                   d| j
                  j                         }|j                  d|       t         j                  j                  d	d      }|j                  d
d       |j                  d| j
                  j                         |j                  d| j                         | j                  j                  |       | j                  j                  |       | j                  j                  |       | j                  j                  |       | j                  j                  |       |j                          |j                          |j                          |j                          |j                          |j                  |       |j                  |       |j                  |       |j                  |       |j!                  d      }|j                  |       y)z8Set up video processing pipeline from decoded video pad.rl   Nvideoconvert
videoscalero   zvideo/x-raw,format=RGB,width=z,height=rp   rq   rr   Trs   rt   ru   )r   r5   r6   rv   rw   r.   r   r   rx   r   r7   _appsink_video_new_sampler3   r9   rz   r:   r{   )	r>   ra   queue_videor   r   videocapsfilter	videocapsappsink_videor   s	            r$   ri   z(GStreamerPipelineSource._decodebin_video   s   ((--gt<))..~tD'',,\4@
,,11,EHH((+D,<,<,H,H+IRVRbRbRoRoQpq
	 	$$VY7**//	4@"">48""64+;+;+F+FGlD,J,JK%&$)'**,++-))+..0,,.&*%(]+..v6	r#   rq   c                    |j                         j                         }|j                  t        j                  j
                        \  }}t        |j                  | j                  | j                  j                        }t        j                  | j                  |      | j                                |j                  |       t        j                   j"                  S )z0Handle new audio samples from GStreamer appsink.)rc   sample_ratenum_channels)pull_sample
get_buffermapr   MapFlagsREADr	   datar/   r.   r   asynciorun_coroutine_threadsaferH   get_event_loopunmap
FlowReturnOKr>   rq   buffer_inforC   s         r$   ry   z1GStreamerPipelineSource._appsink_audio_new_sample   s    $$&113JJs||001	D#))))))88

 	(()?ATATAVWT~~   r#   c                    |j                         j                         }|j                  t        j                  j
                        \  }}t        |j                  | j                  j                  | j                  j                  fd      }t        j                  | j                  |      | j                                |j                  |       t        j                   j"                  S )z0Handle new video samples from GStreamer appsink.RGB)imagesizeformat)r   r   r   r   r   r   r
   r   r.   r   r   r   r   rH   r   r   r   r   r   s         r$   r   z1GStreamerPipelineSource._appsink_video_new_sample   s    $$&113JJs||001	D#))""..0@0@0M0MN

 	(()?ATATAVWT~~   r#   ) r   r   r   r   r   r%   strr   r-   r   r   rF   r   rI   r   rK   r   rJ   r   BusMessager=   ElementPadr8   rh   ri   r   AppSinkry   r   __classcell__)rB   s   @r$   r   r   (   s     y  " OS ;C ;Xl5K ;<4 4> 4>2* 2
/ //; / 3;; 'S[[ 'sww ' CGG  D!CGG !F! !! !r#   r   )r   r   typingr   logurur   pydanticr   pipecat.frames.framesr   r   r   r	   r
   r   r   "pipecat.processors.frame_processorr   r   girequire_versiongi.repositoryr   r   ModuleNotFoundErrorer]   	Exceptionr   r"   r#   r$   <module>r      s    9       N,Bue$Bx')T!n T!  ,FLL;qc"#FLL 	J &qc*
++,s   2A2 2B.72B))B.