
    qiW                        d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlZd dlmZ d dlmZ d d	lmZ d
dlmZ d
dlmZmZ d
dlmZ d
dlmZ d
dlmZmZmZ ddlmZ ddl m!Z!m"Z" ddl#m$Z$m%Z%m&Z&m'Z' ddl(m)Z)m*Z*m+Z+ ddl,m-Z-m.Z. 	 dZ/dZ0dZ1 G d dejd                        Z3 e        G d d             Z4 e        G d d             Z5 G d d       Z6y)!    N)abstractmethod)	Generator)contextmanager)ceil)perf_counter)nn)tqdm)logging_redirect_tqdm   )PretrainedConfig)CompileConfigGenerationConfig)LogitsProcessorList)logging)ContinuousBatchProcessorMetricsattach_tracertraced   )PagedAttentionCache)ContinuousBatchingAsyncIOsContinuousBatchingIOs)GenerationOutputRequestStateRequestStatuslogger)SCHEDULER_MAPPINGFIFOScheduler	Scheduler)attn_mask_is_neededpad_to_interval@   i @      c                       e Zd ZU eed<   ej                  ed<   ej                  ed<   ede	ddfd       Z
ededefd	       Zy)
ProtoPretrainedModelconfigdtypedeviceattn_implementationreturnNc                      y N )selfr(   s     l/opt/pipecat/venv/lib/python3.12/site-packages/transformers/generation/continuous_batching/continuous_api.pyset_attn_implementationz,ProtoPretrainedModel.set_attn_implementationF           generation_configc                      y r+   r,   )r-   r2   s     r.   _get_logits_processorz*ProtoPretrainedModel._get_logits_processorJ   r0   r1   )__name__
__module____qualname__r   __annotations__torchr&   r'   r   strr/   r   r   r4   r,   r1   r.   r$   r$   A   s`    ;;LL3 4   7G L_  r1   r$   c                       e Zd ZU eez  ed<   dededede	j                  de	j                  dej                  dej                  d	ej                  d
edededededededdf dZdefdZed/d       Zedededdfd       Zd/dZedefd       Zededdfd       Zed/d       Zedefd       Zed        Z ededdfd       Z!e ejD                         d e#jH                  d!e%d"eddfd#              Z&ed e#jH                  d$e'd!e%d"eddf
d%       Z( ed&'      d e#jH                  d$e'dejR                  fd(       Z* ed)'      d$e'd*ejR                  d!e%dejR                  fd+       Z+ ed,'      d-ejR                  d$e'd"eddfd.       Z,y)0ContinuousBatchProcessorinputs_and_outputscacher%   r2   input_queueoutput_queue
stop_eventmodel_devicemodel_dtype	schedulermanual_evictionuse_cuda_graphq_padding_interval_sizekv_padding_interval_sizemax_cached_graphsuse_async_batchingr)   Nc                    || _         || _        || _        || _        || _        || _        || _        || _        |	| _        |
| _	        t        |dd      dn|j                  | _        || _        || _        || _        || _        t        |dd      | _        d| _        |xs' | j                   duxr | j                   j$                   | _        |j(                  | _        t+        |j(                        | _        || _        | j.                  r#t1        |dz        }t3        |||||      | _        yt7        |||||      | _        y)a  Initialize the continuous batch processor.

        Args:
            cache: A [`PagedAttentionCache`] object
            config: The model configuration
            generation_config: The generation configuration
            input_queue: Queue for incoming requests
            output_queue: Queue for outgoing results
            stop_event: Event to signal processing should stop
            model_device: Device for model inputs/outputs
            model_dtype: Data type for model inputs/outputs
            scheduler: The [`Scheduler`] to use
            manual_eviction: Whether to manually evict blocks from the cache
            use_cuda_graph: Whether to use cuda graphs or not during CB. Check the docstring at the top of the file for
                more details.
            q_padding_interval_size: Padding granularity for queries in tokens.
            kv_padding_interval_size: Padding granularity for KV cache in tokens.
            max_cached_graphs: Maximum number of CUDA graphs to cache. Uses LRU eviction when full.
        sliding_windowNr   compile_configF   )r>   r%   r2   r?   r@   rA   rB   rC   rD   rE   getattrrL   rG   rH   rI   rF   rM   '_forward_process_and_sample_is_compileddynamic_pad_inputsmax_batch_tokensr   metricsrJ   r   r   r=   r   )r-   r>   r%   r2   r?   r@   rA   rB   rC   rD   rE   rF   rG   rH   rI   rJ   s                   r.   __init__z!ContinuousBatchProcessor.__init__T   sV   J 
!2&($(&". $+63CT#J#RaX^XmXm'>$(@%!2,4;<MO_ae4f7<4)rd.A.A.M.qVZViViVqVqRq !& 6 66u7M7MN #5"" $%6%: ;&@v|[:K'D# '<v|[:K'D#r1   c           	          d| j                    d| j                   d| j                  j                   d| j                  j                   d	| j
                  j                         j                         z   S )Nz%ContinuousBatchProcessor(input_queue=z, output_queue=z, active_requests=z, waiting_requests=))r?   r@   rD   active_requestswaiting_requestsr=   get_model_kwargs__repr__r-   s    r.   r[   z!ContinuousBatchProcessor.__repr__   sx    3D4D4D3E_UYUfUfTg h#~~==>>QRVR`R`RqRqQrrsu%%668AACD	
r1   c                    | j                   j                         sU	 | j                   j                         }|8| j                  j	                  |       | j                   j                         sTyy# t
        j                  $ r Y yt        $ rQ}t        j                  d| d       t               j                  d      }|| j                  ||       Y d}~d}~ww xY w)z?Pull new requests from the input queue and add to waiting list.NzError processing new request: Texc_infostate)r?   empty
get_nowaitrD   add_waiting_requestqueueEmpty	Exceptionr   errorlocalsget_handle_request_error)r-   r`   es      r.   _get_new_requestsz*ContinuousBatchProcessor._get_new_requests   s     ""((*9((335=2259 ""((* ;;  9=aSADQ&,hll7&;$..q%8	9s#   A1 A1 1CCACCrg   r`   c                    t         j                  |_        t        |      |_        t        |j                  t              r+| j                  j                  |j                        |_	        ng |_	        | j                  j                  |j                  |j                         | j                  j                  |j                                y)z(Handle general request processing error.N)r   FAILEDstatusr:   rg   
isinstance
request_idrD   !get_active_request_static_outputsgenerated_tokensrT   record_request_completioncreated_timer@   putto_generation_output)r-   rg   r`   s      r.   rj   z.ContinuousBatchProcessor._handle_request_error   s     %++%j e&&,%)^^%U%UV[VfVf%gE"%'E"..u/A/A5CSCSTe88:;r1   c           
      X   | j                   j                  r(| j                   j                  j                         \  }}n9t	        t        | j                   j                  j                                     \  }}t        j                  d| dt        |j                         dt        |j                         d       |j                         }t        j                  |_        | j                   j!                  |d       | j                   j#                  |       d| j                   _        y)a  Soft resets one active request by removing it from active requests and re-adding it to the waiting queue.

        The generated tokens are kept as part of the new request's initial prompt. When `block_new_requests` is False,
        the oldest request is offloaded; when True, the newest request is offloaded. This method also sets
        `block_new_requests` to True to prevent infinite loops of offloading and re-scheduling requests.
        zSoft resetting request z with z initial tokens and z generated tokensTevict_from_cacheN)rD   block_new_requestsrX   popitemnextiteritemsr   infoleninitial_tokensrs   !create_equivalent_initial_requestr   FINISHED_statusfinish_requestrc   )r-   rq   r`   	new_states       r.   soft_reset_one_requestz/ContinuousBatchProcessor.soft_reset_one_request   s     >>,, $ > > F F HJ $T$..*H*H*N*N*P%Q RJ%j\E<P<P8Q7RRf5))*++<>	

 ;;=	%..%%j4%H**95 -1)r1   c                 t   | j                          | j                  j                          | j                  j                         sy| j                  j                  t        | j                  j                        t        | j                  j                               | j                  j                  | j                  | j                  j                        }|>t        | j                  j                        dkD  r| j                          yt        d      |sy| j                  j                  |       | j                   j#                  |       | j                  j%                  | j                         t'        j(                  t*        j,                        r| j                   j/                         dd \  }}t'        j0                  dt        |       dt        | j                  j                         dt        | j                  j                         d	| d
| d| j                  j3                                 y)zPrepare tensors and metadata for the next model forward pass. Returns True if there are requests to process,
        False otherwise.FNr   z=No requests can be scheduled and no request can be offloaded.rN   zScheduled: z, Waiting: z
, Active: z	. cum Q: z
. cum KV: z, free blocks: T)rl   rD   clear_cancelled_requestshas_pending_requestsrT   record_queue_metricsr   rX   rY   schedule_batchrS   r>   	num_pagesr   RuntimeErrorrecord_batch_metricsr=   prepare_batch_tensorsrecord_kv_cache_memory_metricsr   isEnabledForr   DEBUGget_actual_lengthsdebugget_num_free_blocks)r-   requests_in_batchactual_query_lengthactual_key_lengths       r.   prepare_next_batchz+ContinuousBatchProcessor.prepare_next_batch   s    	 //1~~224))#dnn.L.L*MsSWSaSaSrSrOst !NN99$:O:OQUQ[Q[QeQef $4>>112Q6++-"#bcc  	))*;<556GH 	33DJJ?w}}-595L5L5_5_5abdcd5e2!2LLc"345[T^^EdEdAe@f gt~~==>?yI\H] ^,-_TZZ=[=[=]<^`
 r1   c                     |j                   s|j                  t        j                  k(  r*| j                  j                  |j                                yy)zCSend output to the queue based on streaming mode and request state.N)	streamingro   r   r   r@   rv   rw   )r-   r`   s     r.   _maybe_send_outputz+ContinuousBatchProcessor._maybe_send_output  s?     ??ellm.D.DD!!%"<"<">? Er1   c                 T   | j                   j                         \  }}d}|D ]  }|j                  }|j                  t        j
                  k(  r7| j                  r|j                  r|dz  }Kt        d|j                   d      |j                  r!|j                         dk(  rE| j                  j                  |j                  |j                         t        j                  |_        ||   }|dz  }|j                  |      }| j                   j#                  ||j$                         |rs| j                  j'                  |j                  |j                         | j(                  j+                  |j                  | j,                          d| j(                  _        | j1                  |       |j                  t        j2                  k(  s| j                   j#                  ||j$                          g g }	}| j(                  j4                  r| j(                  j4                  j7                         }
|
j8                  }d|
_        t;        |      D cg c]  }|
j                   d|  }}|D ]*  }|
j=                  |      | j(                  j>                  |<   , | j                   jA                  |
j                  |      \  }}|jC                  |       |	jC                  |       | j(                  j4                  r|r| j                   jE                  ||	       yyc c}w )	z0Update request states based on generated tokens.r   r   z!Tried to update FINISHED request z in sync mode.ry   Fz__child#N)#r=   prepare_batch_updater`   ro   r   r   rJ   has_new_tokenr   rq   generated_lenrT   record_ttft_metricru   DECODINGupdate_and_check_completionr>   !mark_shareable_blocks_as_completecomplete_blocksrt   rD   r   rE   r{   r   
PREFILLING_requests_to_forkpopnum_childrenrangeforkrX   fork_requestextend
copy_cache)r-   r   
new_tokenscurrent_logits_indexfuture_stater`   tokenis_finishedcopy_sourcecopy_destinationstate_to_forkr   inew_request_idsnew_request_idcopy_srccopy_dsts                    r.   update_batchz%ContinuousBatchProcessor.update_batch  s    )-(?(?(T(T(V%: - 	bL &&E||}555**#11,1,"%FuGWGWFXXf#ghh))&&(A-LL33E4F4FHXHXY#0#9#9EL"#78$)$ $??F

<<ULD`D`aLL::5;M;MuO_O_`NN11%2B2BZ^ZnZnVn1p8=DNN5''.!9!99

<<ULD`D`a?	bD )+B%nn.. NN<<@@BM(55L)*M&QVWcQdeA-":":!;8A3GeOe"1 dANASASTbAc..~>d "&!8!89Q9QSb!cHhx(##H- nn..  JJ!!+/?@  fs   L%c                 6    | j                   j                         S )z2Check if there are any active or waiting requests.)rD   r   r\   s    r.   r   z-ContinuousBatchProcessor.has_pending_requestsQ  s     ~~2244r1   c                     | j                   j                         d   }|D ]M  }| j                  ||j                         | j                  j                  |j                  j                         O y)z&Handle errors during batch processing.r   N)r=   r   rj   r`   rD   r   rq   )r-   rg   failed_future_statesr   s       r.   handle_batch_errorz+ContinuousBatchProcessor.handle_batch_errorV  sc      $66KKMaP0 	IL&&ul.@.@ANN)),*<*<*G*GH	Ir1   c                    t        | j                  j                  j                               }|D ]9  }| j	                  ||       | j                  j                  |j                         ; t        | j                  j                  j                               D ]9  }| j                  j                  j                  |      }| j	                  ||       ; | j                  j                  j                          y)zFail all active requests with the given error.

        Args:
            error: The error to report in the failure message
        N)listrD   rX   valuesrj   r   rq   rY   keysr   waiting_requests_orderclear)r-   rg   requestsr`   req_ids        r.   fail_all_requestsz*ContinuousBatchProcessor.fail_all_requests^  s     66==?@ 	<E&&ue4NN))%*:*:;	<
 4>>::??AB 	5FNN3377?E&&ue4	5
 	--335r1   modellogit_processor	do_samplec                 p   | j                   | j                  st        j                  | j                  | j                   j
                  | j                   j                  | j                   j                  | j                   j                  | j                   j                        | _        d| _        | j                  rx| j                  j                         \  }}}}}t        || j                  | j                        }t!        |      }t        || j"                  | j$                  j&                        }	nd\  }}	| j                  j)                  ||	      }
| j                  j*                  }| j,                  s>t        j.                  j1                  |      5  | j	                  ||
||       ddd       n:| j                  j2                  j5                  ||	      }|9t        j.                  j1                  |      5  |j7                          ddd       nt9        j:                  d||	f       t        j.                  j1                  |      5  | j	                  ||
||       ddd       t        j.                  j=                         }t        j.                  j?                  ||      5  | j	                  ||
||       ddd       | j                  j2                  jA                  ||	|       | j                  jC                          y# 1 sw Y   $xY w# 1 sw Y   0xY w# 1 sw Y   xY w# 1 sw Y   oxY w)z!Perform a single generation step.N)	fullgraphmoderQ   backendoptionsT)r   r   z8Creating graph for (padded_q, padded_read_index_size) = )stream)"rM   rP   r9   compile_forward_process_and_sampler   r   rQ   r   r   rR   r=   r   r    rG   rS   maxrH   r>   r   rZ   compute_streamrF   cudar   graphs	get_graphreplayr   r   	CUDAGraphgraph	set_graphretrieve_device_outputs)r-   r   r   r   r   _actual_read_sizespadded_qmax_read_index_sizepadded_read_index_size
batch_datar   r   s                r.   _generation_stepz)ContinuousBatchProcessor._generation_steps  s    *43_3_/4}}00--77((--++33++33++330D, <@D8 >B>U>U>h>h>j;A'8!&':D<X<XZ^ZoZopH"%&7"8%4#T%B%BDJJDXDX&" 04,H,,,==hH^_
00?? """">2 `00
OU^_` `
 ++22<<XG]^E ZZ&&~6 #LLN# # W8=S2T1XYZ ZZ&&~6 d44UJYbcd 

,,.ZZ%%eN%C d44UJYbcd ''..88CY[`a 	7797` `# #d d
d ds0   L:LL )L,LL L),L5r   c                     | j                   j                  |d          | j                  ||      }| j                  |||      }| j	                  |||       y)zThis function performs the forward pass, logits processing, and sampling; which are broken down into smaller
        function to be easier to trace with OpenTelemetry.	input_idsN)r=   carry_over_tokens_model_forward_process_logit_sample)r-   r   r   r   r   logitsprobss          r.   r   z4ContinuousBatchProcessor._forward_process_and_sample  sS     	11*[2IJ$$UJ7##JHUJ	2r1   model_forward	span_namec                 &     |di |j                   S )Nr,   )r   )r-   r   r   s      r.   r   z'ContinuousBatchProcessor._model_forward  s    "z")))r1   logit_processingr   c                     t        |d      r|j                  |d   |d          |j                  \  }}}|j                  ||z  |      }|d   j                  ||z        } |||      }	|	j                  |||      S )Nset_continuous_batching_contextlogits_indicescu_seq_lens_qr   )hasattrr   shapeview)
r-   r   r   r   
batch_sizeseq_len
vocab_size	logits_2dinput_ids_2dprocessed_logits_2ds
             r.   r   z'ContinuousBatchProcessor._process_logit  s    
 ?$EF;;JGW<XZdetZuv +1,,'
GZKK
W 4jA	!+.33J4HI-lIF"''
GZHHr1   samplingr   c                 z   |rKt         j                  j                  |d      }t        j                  |d   d      j                  d      }n(t        j                  |d      }|j                  d      }|j                  d      }|d   d | }||   }| j                  j                  d | j                  |       y )N)dimr   r   )num_samplesr   )r   
functionalsoftmaxr9   multinomialsqueezeargmaxsizer=   
output_idscopy_)r-   r   r   r   next_tokenstokensindicess          r.   r   z ContinuousBatchProcessor._sample  s    MM))%R)8E++E!H!DLLRPK,,u"5K%--a0K!!!$-.w7!'***7F399+Fr1   r)   N)-r5   r6   r7   r   r   r8   r   r   r   rd   Queue	threadingEventr9   r'   r&   r   boolintrU   r:   r[   r   rl   rf   r   rj   r   r   r   r   r   r   r   no_gradr   Moduler   r   dictr   Tensorr   r   r   r,   r1   r.   r<   r<   P   s   -0JJJL"L !L ,	L
 [[L kkL OOL llL [[L L L L "%L #&L L  !!L" 
#L\
# 
 9 9" <9 <\ <d < < 16 &D & &P @ @ @ @
 8A 8At 5d 5 5 I I 6y 6T 6 6( U]]_::bii ::BU ::bf ::ko ::  ::x 3yy3 3 -	3
 3 
3 3 o&*BII *4 *ELL * '* ()II(-IGZI	I *I" j!GU\\ Gt G GQU G "Gr1   r<   c                      e Zd ZdZ	 	 	 	 	 	 	 d.dedededededed	ed
ededz  ddfdZdedz  dede	dz  defdZ
ed/d       ZdefdZd0dededz  ddfdZd1dededz  ddfdZ	 	 	 	 d2dee   dedz  dedz  dededefdZ	 	 	 d3deee      dedz  dededdf
d Zdeddfd!Zd4dedz  dedz  dedz  fd"Zd# Zdedee   fd$Zed/d%       Zd/d&Z ed'(      d)eddfd*       Zed+ed)edz  ddfd,       Z ededdfd-       Z!y)5ContinuousBatchingManagerzManager for handling continuous batching of generation requests.

    This class provides the user interface for submitting generation requests,
    retrieving results, and managing the background generation thread.
    Nr   r2   rE   max_queue_sizerG   rH   rI   allow_block_sharingrJ   r)   c
           	         d|j                   j                  vr(|j                  d|j                   j                          |j                         | _        || _        || _        || _        t        j                  |      | _
        t        j                         | _        t        j                         | _        d| _        d| _        d| _        t        j$                         | _        ||j(                  n|}|| _        t+        |dd      | _        t+        |dd      | _        | j                  j1                  |      | _        t+        |d	d      }
|
|
nd
| _        | j7                  t+        |dd      t9        |xs |xs |      t+        |dd            | _        |	|	| _        nV| j:                  xr  t?        | j                  j                          | _        tA        jB                  d| j<                  d       |dkD  r|ntD        | _#        |dkD  r|ntH        | _%        |dkD  r|ntL        | _'        | j,                  rtQ        d      y)a&  Initialize the continuous batching manager.

        Args:
            model: The language model for generation
            generation_config: Configuration for generation parameters
            max_queue_size: Maximum size of the request queue (0 = unlimited)
            q_padding_interval_size: (optional) Padding granularity for queries in tokens. 0 uses default.
            kv_padding_interval_size: (optional) Padding granularity for KV cache in tokens. 0 uses default.
            max_cached_graphs: (optional) Maximum number of cached CUDA graphs. 0 uses default.
            allow_block_sharing: (optional) Whether to allow block sharing if the model has some full attention layers
            use_async_batching: Whether to use async API or not. If None, will be automatically detected.
        zpaged|)maxsizeNr   log_prob_generationFr   Tnum_return_sequencesr   rF   rM   )rF   user_specified_paramrM   zQNo behavior specified for use_async_batching, choosing self.use_async_batching = z because CUDA graphs are turned on and no attention mask is needed. If you want to save memory, you can disable asynchronous batching but it will degrade performance (by ~25% for some workloads).z(log_prob_generation is not supported yet))r%   _attn_implementationr/   evalr   rE   _allow_block_sharing_use_prefix_sharingrd   r  r?   r@   r  r  rA   batch_processor_generation_thread_request_counterLock_request_lockr2   rO   r"  r   r4   r   r#  _decide_use_cuda_graphsr  rF   rJ   r   r   r   Q_PADDING_INTERVAL_SIZErG   KV_PADDING_INTERVAL_SIZErH   MAX_CACHED_GRAPHSrI   NotImplementedError)r-   r   r2   rE   r  rG   rH   rI   r  rJ   r#  s              r.   rU   z"ContinuousBatchingManager.__init__  s@   2 5<<<<<))F5<<3T3T2U*VW ZZ\
.$7!#6  ;;~>!KKM#//+@D"& !&^^- 8I7PE33Vg!2#*+<>SUZ#[  !2KF48JJ4T4TUf4g&'8:PRVW<P<\$8bc!
 #::"#46FM!%&=&nAY&n]n!o"#46FM ; 
 )&8D# '+&9&9&hBUVZV`V`VgVgBh>hD#KKd$JaJaIe ff f (?'B#H_ 	$ )A1(D$Jb 	% 7H!6K!2Qb ##%&PQQ $r1   rF   r$  rM   c                 p   t         j                  j                         s8|r5t        j                  dt         j                  j                         d       y||S |ry||t         j
                  j                         j                  |j                  |j                        }|j                  dd      }|r#t        j                  d|j                  d       | S t        | j                  j                         }t        j                  d|d	| j                  j                  j                  d
       |S )a  Returns whether or not to use cuda graphs for continuous batching, depending on the following criteria:
        - (use_cuda_graph) which is the user choice
        - (user_specified_param): a boolean indicating if the user specified a parameter related to cuda graphs
        If none of the above criteria are met, we use a default heuristic based on the attention implementation: we turn
        on cuda graphs if and only if no attention mask is needed.
        z7use_cuda_graph is True but torch.cuda.is_available() = z: turning off cuda graphs.FTztriton.cudagraphsz%Compile config compile_config.mode = z uses cudagraphs, which usually does not work well with continuous batching. We recommend using mode 'default' or 'max-autotune-no-cudagraphs' instead.zINo behavior specified for use_cuda_graph, defaulting to use_cuda_graph = z2 because self.model.config._attn_implementation = zc. If you want to save memory, turn off cuda graphs, but they tend to improve performances by a lot.)r9   r   is_availabler   warning	_inductorlist_mode_optionsri   r   r   r   r   r%   r%  )r-   rF   r$  rM   r   compile_uses_cudagraphss         r.   r.  z1ContinuousBatchingManager._decide_use_cuda_graphsE  s$    zz&&(!YUZZ=T=T=V<ZZtuv%!!%oo779==n>Q>QSaSiSijG&-kk2Eu&M#&<n&9&9%= >v v /.. 11B1BCCX~FY Z9zz  559 :::	

 r1   c                     | j                   0| j                   j                         rt        j                  d       yt	        j
                  | j                        | _         | j                   j                          y)z'Start the background generation thread.Nz"Manager thread is already running.)target)r*  is_aliver   r5  r  Thread_run_generation_loopstartr\   s    r.   r>  zContinuousBatchingManager.startq  s\     "".43J3J3S3S3UNN?@"+"2"2$:S:S"T%%'r1   c                 V    | j                   duxr | j                   j                         S )z5Check if the background generation thread is running.N)r*  r;  r\   s    r.   
is_runningz$ContinuousBatchingManager.is_running{  s'    &&d2Yt7N7N7W7W7YYr1   blocktimeoutc                    | j                   t        j                  d       nV| j                   j                  j                  r6t        j
                  d| j                   j                  j                          | j                  t        j                  d       yt               }| j                  j                         s/| j                  j                          t        j
                  d       |r| j                  ||       d| _         y)zSignal the background thread to stop.

        Args:
            block: Whether to wait for the thread to stop
            timeout: Maximum time to wait for the thread to stop
        Nz%
Batch processor was not initialized.z-
Prefix sharing was on. Total prefix length: zManager not started.z'Stopping continuous batching manager...)r)  r   r5  r>   use_prefix_sharingr   _total_prefix_lengthr*  r   rA   is_setsetjoin)r-   rA  rB  stop_trigger_times       r.   stopzContinuousBatchingManager.stop  s     'NNCD##))<<DTEYEYE_E_EtEtDuv ""*NN12(N%%'OO!KKABII'1#r1   rI  c                    | j                   | j                   j                  |       | j                   j                         rt        j                  d| d       yt               }t        j                  d||z
  dd       d| _         yy)zWait for the background thread to finish.

        Args:
            timeout: Maximum time to wait for the thread to stop
        NrB  z3Generation thread did not exit after join timeout (z).z*Continuous Batching Manager stopped after z.2fzs.)r*  rH  r;  r   r5  r   r   )r-   rI  rB  ends       r.   rH  zContinuousBatchingManager.join  s     "".##(((9&&//1!TU\T]]_`a"nHO`I`adHeeghi*.' /r1   r   rq   max_new_tokensr   record_timestampsc           	      |   |9| j                   5  d| j                   }| xj                  dz  c_        ddd       || j                  j                  n|}t	        |t        |      | j                  dz
  ||| j                  j                  |      }| j                  j                  |dd       |S # 1 sw Y   xY w)a/  Add a new generation request to the queue.

        Args:
            input_ids: Input token IDs to use as prompt
            request_id: Optional custom request ID (auto-generated if None)
            **kwargs: Additional generation parameters

        Returns:
            str: The request ID
        Nreq_r   )rq   r   r   rO  rN  eos_token_idr   T
   rA  rB  )
r-  r+  r2   rN  r   r   r#  rR  r?   rv   )r-   r   rq   rN  r   rO  r`   s          r.   add_requestz%ContinuousBatchingManager.add_request  s    $ ## +#D$9$9#:;
%%*%+ CQBX//>>^l !	?22Q6/)//<<
 	U$;'+ +s   %B22B;inputsc                    | j                   5  t        | j                  | j                  t        |      z         D cg c]  }d| 	 }}| xj                  t        |      z  c_        d d d        t	        t        |            }| j                  rt        |d d      }|D ]  \  }}	| j                  |	||||        y c c}w # 1 sw Y   ^xY w)NrQ  c                     | d   S )Nr   r,   )xs    r.   <lambda>z8ContinuousBatchingManager.add_requests.<locals>.<lambda>  s
    !A$ r1   T)keyreverse)	r-  r   r+  r   r   zipr(  sortedrU  )
r-   rV  rN  r   rO  r   request_idsids_and_inputsrq   r   s
             r.   add_requestsz&ContinuousBatchingManager.add_requests  s      	1/4T5J5JDLaLadghndoLo/pq!T!:qKq!!S[0!	1 c+v67###NPTUN%3 	b!J	Y
NIO`a	b r	1 	1s   /CC  C CCc                 h    | j                   &| j                   j                  j                  |       yy)zkCancel a request by its ID.

        Args:
            request_id: The ID of the request to cancel
        N)r)  rD   set_request_cancellationr-   rq   s     r.   cancel_requestz(ContinuousBatchingManager.cancel_request  s/     +  **CCJO ,r1   c                    | j                   | j                  j                         ry	 | j                  j                  d|      }|+|j                  |k7  r| j                  j                  |       y|S # t        j                  $ r Y yw xY w)zRetrieve one result from the output queue.

        Args:
            timeout: Maximum time to wait for a result

        Returns:
            Optional[GenerationOutput]: The result data or None if timeout
        NTrT  )r*  r@   ra   ri   rq   rv   rd   re   )r-   rq   rB  results       r.   
get_resultz$ContinuousBatchingManager.get_result  s     ""*t/@/@/F/F/H	&&**w*GF%&*;*;z*I!!%%f-M{{ 		s   A	A5 3A5 5B
Bc              #      K   | j                   \| j                   j                         rA| j                  d      }|| | j                   | j                   j                         r?yyyyw)z.Iterate over results as they become available.N皙?rL  )r*  r;  rh  )r-   rg  s     r.   __iter__z"ContinuousBatchingManager.__iter__  sf     %%1d6M6M6V6V6X__S_1F! %%1d6M6M6V6V6X16X1s   A%A,(A,c              #   P  K   d}| j                   | j                   j                         ry|sv| j                  |d      }|| | j                  %| j                  j                  j                  |      }| j                   | j                   j                         r|ssyyyyyyw)zMIterate over results matching a specific request id as they become available.FNrj  )rq   rB  )r*  r;  rh  r)  rD   request_is_cancelled)r-   rq   request_cancelledrg  s       r.   request_id_iterz)ContinuousBatchingManager.request_id_iter  s     !%%1d6M6M6V6V6Xar__
C_HF!##/$($8$8$B$B$W$WXb$c! %%1d6M6M6V6V6Xar6X1ar6X1s   BB& B&c                     | j                   t        d      | j                   j                  | j                  | j                  | j
                         y)z=Perform a single generation step. This is mostly cuda graphedNzNTried to perform a generation step before the batch processor was initialized.)r)  r   r   r   r   r   r\   s    r.   r   z*ContinuousBatchingManager._generation_step  sE     'opp--djj$:N:NPTP^P^_r1   c                    d}	 t               }t        | j                  j                  | j                  | j                  j
                  | j                  j                  t        | j                  dd      | j                        }|j                  | _
        t        j                  dt               |z
   d       d}t        | j                  d      rLt        j                  | j                  j                   d      }|&t        j"                  d| d       t$        }nt$        }t               }t'        || j                  j                  | j                  | j(                  | j*                  | j,                  | j                  j
                  | j                  j                   ||| j.                        | j.                  | j0                  | j2                  | j4                  | j6                  | j8                  	      }|| _        d
| _        t        j                  dt               |z
   d       | j:                  j8                  r@|j?                         stA        d      | jC                          | xj<                  dz  c_        | j,                  jE                         r|jG                         rR| jI                  |       | xj<                  dz  c_        | j,                  jE                         sA|jG                         rRtK        |jL                  tN              r8d|jL                  jP                  z
  |jL                  _(        |jS                          t        jZ                  d       y# tT        $ r6}t        jV                  d| d       | jY                  ||       Y d}~Pd}~ww xY w# t        jZ                  d       w xY w)z6Main processing loop running in the background thread.N_tp_size)tp_sizer  zPagedAttentionCache created in z secondsrD   zScheduler 'z ' not found. Defaulting to FIFO.)r>   r%   r2   r?   r@   rA   rB   rC   rD   rE   rF   rG   rH   rI   rJ   r   zbatch_processor created in z$Failed to bootstrap the first batch.r   zError in generation loop: Tr^   zGeneration loop finished.).r   r   r   r%   r2   r'   r&   rO   r'  rD  r(  r   r   r   r   ri   rD   r5  r   r<   r?   r@   rA   rE   rF   rG   rH   rI   rJ   r)  current_batchr   r   r   rF  r   _inner_generation_looprp   r=   r   current_pairr   rf   rg   _handle_critical_errorr   )r-   r)  t0paged_attention_cacherD   t1rk   s          r.   r=  z.ContinuousBatchingManager._run_generation_loop  s   ;?B	5B$7

!!&&

!!

  

J=$($=$=%! (='O'OD$LL:<>B;N:OxXYIt--{;-11$2H2H2R2RTXY	$NN[;[#\] -I *	B6+zz(("&"8"8 ,,!..??!ZZ.. JJ,,#$94;O;OP $ 4 4#22(,(D(D)-)F)F"&"8"8#'#:#:O" $3D !"DLL6|~7J6K8TU ##66&99;&'MNN%%'""a'"--/O4X4X4Z++O<""a'" --/O4X4X4Z /<<>XYBCoFhFhFuFuBu22?,,. KK34	  	<LL5aS9DI''?;;	< KK34s7   K&M$ +M$ <AM$ $	N#-,NN& N##N& &N=generation_loopr   r)  c                 f    |j                         sy | j                          |j                          y r+   )r   r   r   )r-   r)  s     r.   ru  z0ContinuousBatchingManager._inner_generation_loopf  s+     113$$&r1   rg   c                     | j                   j                          	 	 | j                  j                         }||j	                  ||       /# t
        j                  $ r Y nw xY w||j                  |       yy)z:Handle critical errors that terminate the generation loop.N)rA   rG  r?   rb   rj   rd   re   r   )r-   rg   r)  req_datas       r.   rw  z0ContinuousBatchingManager._handle_critical_errorn  s~     		++668".#99%J  {{ 		 &--e4 's   0A A"!A"c                     | j                   st        d      | j                  &| j                  j                  j	                  |       yy)zSEvict a request from the cache. It is assumed that the request is already finished.z0Manual eviction is not enabled for this manager.N)rE   r   r)  rD   r   rd  s     r.   evict_request_from_cachez2ContinuousBatchingManager.evict_request_from_cache  sD     ##QRR+  **99*E ,r1   )Fr   r   r   r   TNr  )TNr+   )NNFF)NFF)NN)"r5   r6   r7   __doc__r$   r   r  r  rU   r   r.  r   r>  r@  floatrJ  rH  r   r:   rU  ra  re  r   rh  rk  r   ro  r   r=  r<   ru  rf   rw  r  r,   r1   r.   r  r    s    !&'(()!"$(*.SR#SR ,SR 	SR
 SR "%SR #&SR SR "SR !4KSR 
SRj*t* "* &,	*
 
*X ( (ZD Z
$$ $ $ $:/e /edl /d /$ "&%)"'&9& $J& d
	&
 &  & 
&V &*"'bT#Yb d
b 	b
  b 
b&P P PS4Z  YilpYp *d# d)<L2M d ` `E5N '('6N 'SW ' )' 5I 5H`cgHg 5lp 5 5$ F3 F4 F Fr1   r  c                   h   e Zd ZU dZeed<   e	 	 	 	 	 	 	 	 	 	 ddedz  dededededed	ed
e	dz  dedz  dede
e   fd       Z	 	 	 	 	 	 	 	 ddedz  dededededededz  dedefdZe ej                          	 	 	 	 	 	 	 	 ddeee      dedz  dededededededz  dedeeef   fd              Zy)ContinuousMixinz?Mixin class for models to add continuous batching capabilities.r2   NrE   r  rG   rH   r  rA  rB  rJ   rI   r)   c           
   #     K   | j                  ||||||
||	      }|j                          	 | t        j                  d       |j	                  ||       y # t        j                  d       |j	                  ||       w xY ww)N)r2   rE   r  rG   rH   rI   r  rJ   z!Continuous batching loop finishedrT  )init_continuous_batchingr>  r   r   rJ  )r-   r2   rE   r  rG   rH   r  rA  rB  rJ   rI   managers               r.   #continuous_batching_context_managerz3ContinuousMixin.continuous_batching_context_manager  s      ///+)$;%=/ 31 0 	
 		7MLL3 LLugL6 LL3 LLugL6s   *BA )B*BBc	                    t        | d      rt        | d      rt        | d      st        d      ||n| j                  }	|	t        d      |	j                  t        j                  d       d|	_        t        | |	|||||||	      S )	a  Initialize a manager for continuous batching inference.

        Args:
            generation_config: An optional generation configuration, which may contain a CompileConfig object
            manual_eviction: Whether to manually evict requests from the cache
            max_queue_size: Maximum size of the input request queue
            q_padding_interval_size: Padding granularity for queries in tokens. 0 uses default.
            kv_padding_interval_size: Padding granularity for KV cache in tokens. 0 uses default.
            allow_block_sharing: A flag to allow block sharing if the model has some full attention layers
            use_async_batching: Whether to use async API or not. If None, will be automatically detected.
            max_cached_graphs: Maximum number of cached CUDA graphs. 0 uses default.
        Returns:
            `ContinuousBatchingManager`: The manager instance to add requests and retrieve results.
        r%   r'   r&   z;Model must have 'config', 'device', and 'dtype' attributes.z8A GenerationConfig must be provided or set in the model.zE`eos_token_id` not set in GenerationConfig. Setting to -1 (disabled).r  )	r   r2   rE   r  rG   rH   r  rJ   rI   )r   AttributeErrorr2   
ValueErrorrR  r   r5  r  )
r-   r2   rE   r  rG   rH   r  rJ   rI   
gen_configs
             r.   r  z(ContinuousMixin.init_continuous_batching  s    2 tX&gdH.EWUY[bMc !^__*;*G&TMcMc
WXX""*NNbc&(J# )(+)$;%= 31/

 
	
r1   rV  rO  progress_barc
           
      \   |si S t        j                         t        j                  k  rt        j                  d       d}i }|| j
                  n|}t        |      |j                  |j                  ndz  }| j                  ||||	|dd|      }t        t         g      }t        || d| d	d
      }|5 }|5  |5 }	 |j                  ||
j                  d      |       d}||k  r|j                  d      }|r8|j                  }|j                         rM|||<   |dz  }|j!                  d       n1|j#                         s!t        j$                  d       t'        d       n||k  rddd       ddd       ddd       i }t+        t        |            D ]:  }|j                  d|       }|	||d| <   "t        j$                  d| d       < |S # t(        $ r$}t        j$                  d| d       Y d}~d}~ww xY w# 1 sw Y   xY w# 1 sw Y   xY w# 1 sw Y   xY w)a4  Generate sequences for a batch of prompts using continuous batching.

        Args:
            inputs: List of input token sequences (prompts)
            generation_config: Optional generation configuration
            q_padding_interval_size: Padding granularity for queries in tokens. 0 uses default.
            kv_padding_interval_size: Padding granularity for KV cache in tokens. 0 uses default.
            allow_block_sharing: A flag to allow block sharing if the model has some full attention layers
            record_timestamps: If set to true, the requests will have a timestamp for each token generated
            progress_bar: If set to true, a progress bar will be displayed
            use_async_batching: Whether to use async double buffering or not. If None, will be automatically detected.
            max_cached_graphs: Maximum number of cached CUDA graphs. 0 uses default.
            **kwargs: Additional generation parameters

        Returns:
            `dict[str, GenerationOutput]`: a dictionary of request ids to GenerationOutput objects
        z=Progress bar is disabled when logger level is less than DEBUGFNr   T   )r2   rG   rH   rI   r  rA  rB  rJ   zSolving z	 requestsrequest)totaldisabledescunitrN  )rV  rN  rO  r   rL  z*Generation thread terminated unexpectedly.zCReturning results of generate_batch despite unexpected termination.zError during batch generation: r^   rQ  zRequest req_z not found in results.)r   getEffectiveLevelr   r   r5  r2   r   r#  r  r
   r	   ra  ri   rh  rq   r   updater@  rg   printrf   r   )r-   rV  r2   rG   rH   r  rO  r  rJ   rI   kwargsresultsgen_cfgnum_requests
manager_cm
logging_cmpbar_cmr  pbarfinished_countrg  r   rk   reordered_resultsr   s                            r.   generate_batchzContinuousMixin.generate_batch  sq   @ I##%6NNZ[ L ,=,E$((K\6{gFbFbFng&B&Btuv==/$;%=/ 31 > 	

 +F84
%%L>3	
  	S7J 	S 	S4S$$!&**=M2Nbs %  "#$|3$///:F!'!2!2!--/.4GFO*a/N KKN&113"LL)UV!"gh! %|3	S 	S 	S0 s6{# 	GA[[4s,F!06!D*-|A3.DEF	G !   S>qcBTRRS+	S 	S 	S 	S 	S 	Ssa   8H";H>H
 B,G-H5H"	H	#H	=H
H	H

HHH	H""H+)
NFr   r   r   TTNNr   )NFr   r   r   TNr   )Nr   r   TFTNr   )r5   r6   r7   r  r   r8   r   r  r  r  r   r  r  r  r   r9   inference_moder   r  r:   r   r  r,   r1   r.   r  r    s	   I'' 6: %'(()$( $*.!"7+d27 7 	7
 "%7 #&7 "7 7 7 !4K7 7 
,	-7 7F 6: %'(()$(*.!"/
+d2/
 /
 	/

 "%/
 #&/
 "/
 !4K/
 /
 
#/
d U 6:'(()$("'!*.!"[!T#Y[! ,d2[! "%	[!
 #&[! "[!  [! [! !4K[! [! 
c##	$[!  [!r1   r  )7rd   r  abcr   collections.abcr   
contextlibr   mathr   timer   r9   r   r	   tqdm.contrib.loggingr
   configuration_utilsr   generation.configuration_utilsr   r   generation.logits_processr   utils.loggingr   utils.metricsr   r   r   r>   r   input_outputsr   r   r   r   r   r   r   rD   r   r   r   utilsr   r    r/  r0  r1  r  r$   r<   r  r  r,   r1   r.   <module>r     s       % %      6 3 M < $ S S & L K K B B 7&  #  299  SG SG SGn ^F ^F ^FBv! v!r1   