#                🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨
#           This file was automatically generated from src/transformers/models/higgs_audio_v2/modular_higgs_audio_v2.py.
#               Do NOT edit this file manually as any edits will be overwritten by the generation of
#             the file from the modular. If any change should be done, please apply the change to the
#                          modular_higgs_audio_v2.py file directly. One of our CI enforces this.
#                🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨
# Copyright 2025 the HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from collections.abc import Callable
from typing import Optional

import torch
import torch.nn as nn

from ... import initialization as init
from ...activations import ACT2FN
from ...cache_utils import Cache, DynamicCache
from ...integrations import use_kernel_forward_from_hub, use_kernel_func_from_hub, use_kernelized_func
from ...masking_utils import create_causal_mask
from ...modeling_layers import GradientCheckpointingLayer
from ...modeling_outputs import BaseModelOutputWithPast, CausalLMOutputWithPast
from ...modeling_rope_utils import ROPE_INIT_FUNCTIONS, dynamic_rope_update
from ...modeling_utils import ALL_ATTENTION_FUNCTIONS, PreTrainedModel
from ...processing_utils import Unpack
from ...utils import TransformersKwargs, auto_docstring, can_return_tuple, logging
from ...utils.generic import maybe_autocast
from ...utils.output_capturing import capture_outputs
from .configuration_higgs_audio_v2 import HiggsAudioV2Config
from .generation_higgs_audio_v2 import HiggsAudioV2GenerationMixin


logger = logging.get_logger(__name__)


class HiggsAudioV2MLP(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.hidden_size = config.hidden_size
        self.intermediate_size = config.intermediate_size
        self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=config.mlp_bias)
        self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=config.mlp_bias)
        self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=config.mlp_bias)
        self.act_fn = ACT2FN[config.hidden_act]

    def forward(self, x):
        down_proj = self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x))
        return down_proj


@use_kernel_forward_from_hub("RMSNorm")
class HiggsAudioV2RMSNorm(nn.Module):
    def __init__(self, hidden_size, eps: float = 1e-6) -> None:
        """
        HiggsAudioV2RMSNorm is equivalent to T5LayerNorm
        """
        super().__init__()
        self.weight = nn.Parameter(torch.ones(hidden_size))
        self.variance_epsilon = eps

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        input_dtype = hidden_states.dtype
        hidden_states = hidden_states.to(torch.float32)
        variance = hidden_states.pow(2).mean(-1, keepdim=True)
        hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon)
        return self.weight * hidden_states.to(input_dtype)

    def extra_repr(self):
        return f"{tuple(self.weight.shape)}, eps={self.variance_epsilon}"


def rotate_half(x):
    """Rotates half the hidden dims of the input."""
    x1 = x[..., : x.shape[-1] // 2]
    x2 = x[..., x.shape[-1] // 2 :]
    return torch.cat((-x2, x1), dim=-1)


@use_kernel_func_from_hub("rotary_pos_emb")
def apply_rotary_pos_emb(q, k, cos, sin, unsqueeze_dim=1):
    """Applies Rotary Position Embedding to the query and key tensors.

    Args:
        q (`torch.Tensor`): The query tensor.
        k (`torch.Tensor`): The key tensor.
        cos (`torch.Tensor`): The cosine part of the rotary embedding.
        sin (`torch.Tensor`): The sine part of the rotary embedding.
        unsqueeze_dim (`int`, *optional*, defaults to 1):
            The 'unsqueeze_dim' argument specifies the dimension along which to unsqueeze cos[position_ids] and
            sin[position_ids] so that they can be properly broadcasted to the dimensions of q and k. For example, note
            that cos[position_ids] and sin[position_ids] have the shape [batch_size, seq_len, head_dim]. Then, if q and
            k have the shape [batch_size, heads, seq_len, head_dim], then setting unsqueeze_dim=1 makes
            cos[position_ids] and sin[position_ids] broadcastable to the shapes of q and k. Similarly, if q and k have
            the shape [batch_size, seq_len, heads, head_dim], then set unsqueeze_dim=2.
    Returns:
        `tuple(torch.Tensor)` comprising of the query and key tensors rotated using the Rotary Position Embedding.
    """
    cos = cos.unsqueeze(unsqueeze_dim)
    sin = sin.unsqueeze(unsqueeze_dim)
    q_embed = (q * cos) + (rotate_half(q) * sin)
    k_embed = (k * cos) + (rotate_half(k) * sin)
    return q_embed, k_embed


def repeat_kv(hidden_states: torch.Tensor, n_rep: int) -> torch.Tensor:
    """
    This is the equivalent of torch.repeat_interleave(x, dim=1, repeats=n_rep). The hidden states go from (batch,
    num_key_value_heads, seqlen, head_dim) to (batch, num_attention_heads, seqlen, head_dim)
    """
    batch, num_key_value_heads, slen, head_dim = hidden_states.shape
    if n_rep == 1:
        return hidden_states
    hidden_states = hidden_states[:, :, None, :, :].expand(batch, num_key_value_heads, n_rep, slen, head_dim)
    return hidden_states.reshape(batch, num_key_value_heads * n_rep, slen, head_dim)


def eager_attention_forward(
    module: nn.Module,
    query: torch.Tensor,
    key: torch.Tensor,
    value: torch.Tensor,
    attention_mask: torch.Tensor | None,
    scaling: float,
    dropout: float = 0.0,
    **kwargs: Unpack[TransformersKwargs],
):
    key_states = repeat_kv(key, module.num_key_value_groups)
    value_states = repeat_kv(value, module.num_key_value_groups)

    attn_weights = torch.matmul(query, key_states.transpose(2, 3)) * scaling
    if attention_mask is not None:
        attn_weights = attn_weights + attention_mask

    attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query.dtype)
    attn_weights = nn.functional.dropout(attn_weights, p=dropout, training=module.training)
    attn_output = torch.matmul(attn_weights, value_states)
    attn_output = attn_output.transpose(1, 2).contiguous()

    return attn_output, attn_weights


@use_kernelized_func(apply_rotary_pos_emb)
class HiggsAudioV2Attention(nn.Module):
    """Multi-headed attention from 'Attention Is All You Need' paper"""

    def __init__(self, config: HiggsAudioV2Config, layer_idx: int):
        super().__init__()
        self.config = config
        self.layer_idx = layer_idx
        self.head_dim = getattr(config, "head_dim", config.hidden_size // config.num_attention_heads)
        self.num_key_value_groups = config.num_attention_heads // config.num_key_value_heads
        self.scaling = self.head_dim**-0.5
        self.attention_dropout = config.attention_dropout
        self.is_causal = True

        self.q_proj = nn.Linear(
            config.hidden_size, config.num_attention_heads * self.head_dim, bias=config.attention_bias
        )
        self.k_proj = nn.Linear(
            config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias
        )
        self.v_proj = nn.Linear(
            config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias
        )
        self.o_proj = nn.Linear(
            config.num_attention_heads * self.head_dim, config.hidden_size, bias=config.attention_bias
        )

    def forward(
        self,
        hidden_states: torch.Tensor,
        position_embeddings: tuple[torch.Tensor, torch.Tensor] | None = None,
        attention_mask: torch.Tensor | None = None,
        past_key_values: Cache | None = None,
        cache_position: torch.LongTensor | None = None,
        **kwargs: Unpack[TransformersKwargs],
    ) -> tuple[torch.Tensor, torch.Tensor]:
        input_shape = hidden_states.shape[:-1]
        hidden_shape = (*input_shape, -1, self.head_dim)

        query_states = self.q_proj(hidden_states).view(hidden_shape).transpose(1, 2)
        key_states = self.k_proj(hidden_states).view(hidden_shape).transpose(1, 2)
        value_states = self.v_proj(hidden_states).view(hidden_shape).transpose(1, 2)

        cos, sin = position_embeddings
        query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin)

        if past_key_values is not None:
            # sin and cos are specific to RoPE models; cache_position needed for the static cache
            cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position}
            key_states, value_states = past_key_values.update(key_states, value_states, self.layer_idx, cache_kwargs)

        attention_interface: Callable = ALL_ATTENTION_FUNCTIONS.get_interface(
            self.config._attn_implementation, eager_attention_forward
        )

        attn_output, attn_weights = attention_interface(
            self,
            query_states,
            key_states,
            value_states,
            attention_mask,
            dropout=0.0 if not self.training else self.attention_dropout,
            scaling=self.scaling,
            **kwargs,
        )

        attn_output = attn_output.reshape(*input_shape, -1).contiguous()
        attn_output = self.o_proj(attn_output)
        return attn_output, attn_weights


class HiggsAudioV2DecoderLayer(GradientCheckpointingLayer):
    def __init__(self, config: HiggsAudioV2Config, layer_idx: int):
        super().__init__()
        self.hidden_size = config.hidden_size

        self.self_attn = HiggsAudioV2Attention(config=config, layer_idx=layer_idx)

        self.mlp = HiggsAudioV2MLP(config)
        self.input_layernorm = HiggsAudioV2RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
        self.post_attention_layernorm = HiggsAudioV2RMSNorm(config.hidden_size, eps=config.rms_norm_eps)

        self.audio_mlp = HiggsAudioV2MLP(config)
        self.audio_input_layernorm = HiggsAudioV2RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
        self.audio_post_attention_layernorm = HiggsAudioV2RMSNorm(config.hidden_size, eps=config.rms_norm_eps)

    def forward(
        self,
        hidden_states: torch.Tensor,
        position_embeddings: tuple[torch.Tensor, torch.Tensor] | None,
        attention_mask: torch.Tensor | None = None,
        audio_token_mask: torch.BoolTensor | None = None,
        position_ids: torch.LongTensor | None = None,
        past_key_values: Cache | None = None,
        use_cache: bool | None = False,
        cache_position: torch.LongTensor | None = None,
        **kwargs: Unpack[TransformersKwargs],
    ) -> torch.Tensor:
        residual = hidden_states

        if audio_token_mask is None:
            hidden_states = self.audio_input_layernorm(hidden_states)
        else:
            audio_token_mask = audio_token_mask.to(hidden_states.device)
            hidden_states = hidden_states.masked_scatter(
                audio_token_mask.unsqueeze(-1),
                self.audio_input_layernorm(hidden_states[audio_token_mask]).to(hidden_states.device),
            )
            hidden_states = hidden_states.masked_scatter(
                ~audio_token_mask.unsqueeze(-1),
                self.input_layernorm(hidden_states[~audio_token_mask]).to(hidden_states.device),
            )

        # Self Attention
        hidden_states, _ = self.self_attn(
            hidden_states=hidden_states,
            attention_mask=attention_mask,
            position_ids=position_ids,
            past_key_values=past_key_values,
            use_cache=use_cache,
            cache_position=cache_position,
            position_embeddings=position_embeddings,
            **kwargs,
        )
        hidden_states = residual + hidden_states

        if audio_token_mask is None:
            audio_hidden_states = self.audio_post_attention_layernorm(hidden_states)
            audio_hidden_states = self.audio_mlp(audio_hidden_states)
            hidden_states = hidden_states + audio_hidden_states.to(hidden_states.device)
        else:
            text_hidden_states = self.post_attention_layernorm(hidden_states[~audio_token_mask])
            audio_hidden_states = self.audio_post_attention_layernorm(hidden_states[audio_token_mask])

            text_hidden_states = self.mlp(text_hidden_states)
            hidden_states[~audio_token_mask] += text_hidden_states.to(hidden_states.device)

            audio_hidden_states = self.audio_mlp(audio_hidden_states)
            hidden_states[audio_token_mask] += audio_hidden_states.to(hidden_states.device)

        return hidden_states


class HiggsAudioV2Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embed_audio_tokens = nn.Embedding((config.num_codebooks * config.codebook_size), config.hidden_size)
        self.register_buffer(
            "audio_tokens_offsets", torch.arange(config.num_codebooks) * config.codebook_size, persistent=False
        )

    def forward(self, input_ids):
        inputs_embeds = self.embed_audio_tokens(input_ids + self.audio_tokens_offsets)
        inputs_embeds = inputs_embeds.sum(dim=-2)
        return inputs_embeds


@auto_docstring
class HiggsAudioV2PreTrainedModel(PreTrainedModel):
    config: HiggsAudioV2Config
    base_model_prefix = "model"
    supports_gradient_checkpointing = True
    _no_split_modules = ["HiggsAudioV2DecoderLayer"]
    _skip_keys_device_placement = ["past_key_values"]
    _supports_flash_attn = True
    _supports_sdpa = True
    _supports_flex_attn = True

    _can_compile_fullgraph = True
    _supports_attention_backend = True
    _can_record_outputs = {
        "hidden_states": HiggsAudioV2DecoderLayer,
        "attentions": HiggsAudioV2Attention,
    }

    @torch.no_grad()
    def _init_weights(self, module):
        super()._init_weights(module)

        if isinstance(module, HiggsAudioV2Embeddings):
            init.copy_(
                module.audio_tokens_offsets, torch.arange(self.config.num_codebooks) * self.config.codebook_size
            )


class HiggsAudioV2RotaryEmbedding(nn.Module):
    inv_freq: torch.Tensor  # fix linting for `register_buffer`

    def __init__(self, config: HiggsAudioV2Config, device=None):
        super().__init__()
        self.max_seq_len_cached = config.max_position_embeddings
        self.original_max_seq_len = config.max_position_embeddings

        self.config = config

        self.rope_type = self.config.rope_parameters["rope_type"]
        rope_init_fn: Callable = self.compute_default_rope_parameters
        if self.rope_type != "default":
            rope_init_fn = ROPE_INIT_FUNCTIONS[self.rope_type]
        inv_freq, self.attention_scaling = rope_init_fn(self.config, device)

        self.register_buffer("inv_freq", inv_freq, persistent=False)
        self.register_buffer("original_inv_freq", inv_freq.clone(), persistent=False)

    @staticmethod
    def compute_default_rope_parameters(
        config: HiggsAudioV2Config | None = None,
        device: Optional["torch.device"] = None,
        seq_len: int | None = None,
    ) -> tuple["torch.Tensor", float]:
        """
        Computes the inverse frequencies according to the original RoPE implementation
        Args:
            config ([`~transformers.PreTrainedConfig`]):
                The model configuration.
            device (`torch.device`):
                The device to use for initialization of the inverse frequencies.
            seq_len (`int`, *optional*):
                The current sequence length. Unused for this type of RoPE.
        Returns:
            Tuple of (`torch.Tensor`, `float`), containing the inverse frequencies for the RoPE embeddings and the
            post-processing scaling factor applied to the computed cos/sin (unused in this type of RoPE).
        """
        base = config.rope_parameters["rope_theta"]
        dim = getattr(config, "head_dim", None) or config.hidden_size // config.num_attention_heads

        attention_factor = 1.0  # Unused in this type of RoPE

        # Compute the inverse frequencies
        inv_freq = 1.0 / (
            base ** (torch.arange(0, dim, 2, dtype=torch.int64).to(device=device, dtype=torch.float) / dim)
        )
        return inv_freq, attention_factor

    @torch.no_grad()
    @dynamic_rope_update  # power user: used with advanced RoPE types (e.g. dynamic rope)
    def forward(self, x, position_ids):
        inv_freq_expanded = self.inv_freq[None, :, None].float().expand(position_ids.shape[0], -1, 1).to(x.device)
        position_ids_expanded = position_ids[:, None, :].float()

        device_type = x.device.type if isinstance(x.device.type, str) and x.device.type != "mps" else "cpu"
        with maybe_autocast(device_type=device_type, enabled=False):  # Force float32
            freqs = (inv_freq_expanded.float() @ position_ids_expanded.float()).transpose(1, 2)
            emb = torch.cat((freqs, freqs), dim=-1)
            cos = emb.cos() * self.attention_scaling
            sin = emb.sin() * self.attention_scaling

        return cos.to(dtype=x.dtype), sin.to(dtype=x.dtype)


@auto_docstring
class HiggsAudioV2Model(HiggsAudioV2PreTrainedModel):
    def __init__(self, config: HiggsAudioV2Config):
        super().__init__(config)
        self.padding_idx = config.pad_token_id
        self.vocab_size = config.vocab_size

        self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, self.padding_idx)
        self.layers = nn.ModuleList(
            [HiggsAudioV2DecoderLayer(config, layer_idx) for layer_idx in range(config.num_hidden_layers)]
        )
        self.norm = HiggsAudioV2RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
        self.rotary_emb = HiggsAudioV2RotaryEmbedding(config=config)
        self.gradient_checkpointing = False
        self.embed_audio_tokens = HiggsAudioV2Embeddings(config)

        # Initialize weights and apply final processing
        self.post_init()

    @capture_outputs
    @auto_docstring
    def forward(
        self,
        input_ids: torch.LongTensor | None = None,
        audio_input_ids: torch.LongTensor | None = None,
        attention_mask: torch.LongTensor | None = None,
        audio_input_ids_mask: torch.BoolTensor | None = None,
        position_ids: torch.LongTensor | None = None,
        past_key_values: Cache | None = None,
        inputs_embeds: torch.FloatTensor | None = None,
        cache_position: torch.LongTensor | None = None,
        use_cache: bool | None = None,
        **kwargs: Unpack[TransformersKwargs],
    ) -> BaseModelOutputWithPast:
        r"""
        audio_input_ids (`torch.LongTensor` of shape `(batch_size, num_audio_frames, num_codebooks)`, *optional*):
            Indices of audio codebook tokens.

            Indices can be obtained using [`HiggsAudioV2TokenizerModel.encode`].
        audio_input_ids_mask (`torch.BoolTensor` of shape `(batch_size, num_audio_frames)`, *optional*):
            Indicates which audio frames in `audio_input_ids` are valid.

        Returns:
            [`~models.modeling_outputs.BaseModelOutputWithPast`]:
                Usual decoder outputs with the placeholder positions already substituted by their corresponding
                audio embeddings.

        Example:

        ```python
        >>> from transformers import AutoProcessor, HiggsAudioV2Model
        >>> import torch
        >>> device = "cuda" if torch.cuda.is_available() else "cpu"
        >>> processor = AutoProcessor.from_pretrained("eustlb/higgs-audio-v2-generation-3B-base", device_map=device)
        >>> model = HiggsAudioV2Model.from_pretrained("eustlb/higgs-audio-v2-generation-3B-base", device_map=device)
        >>> conversation = [
        ...     {
        ...         "role": "system",
        ...         "content": [
        ...             {
        ...                 "type": "text",
        ...                 "text": "Generate audio following instruction."
        ...             }
        ...         ]
        ...     },
        ...     {
        ...         "role": "scene",
        ...         "content": [
        ...             {
        ...                 "type": "text",
        ...                 "text": "Audio is recorded from a quiet room."
        ...             }
        ...         ]
        ...     },
        ...     {
        ...         "role": "user",
        ...         "content": [
        ...             {
        ...                 "type": "text",
        ...                 "text": "It was the night before my birthday. Hooray! It's almost here! It may not be a holiday, but it's the best day of the year."
        ...             }
        ...         ]
        ...     },
        ...     {
        ...         "role": "assistant",
        ...         "content": [
        ...             {
        ...                 "type": "audio",
        ...                 "url": "https://huggingface.co/datasets/eustlb/dummy-audio-samples-higgs/resolve/main/belinda.wav"
        ...             }
        ...         ]
        ...     },
        ...     {
        ...         "role": "user",
        ...         "content": [
        ...             {
        ...                 "type": "text",
        ...                 "text": "The sun rises in the east and sets in the west. This simple fact has been observed by humans for thousands of years."
        ...             }
        ...         ]
        ...     }
        ... ]
        >>> inputs = processor.apply_chat_template(conversation, return_dict=True, tokenize=True, sampling_rate=24000, return_tensors="pt")
        >>> inputs = inputs.to(model.device)
        >>> outputs = model(**inputs)
        ```
        """
        if (input_ids is None) and (inputs_embeds is None) and (audio_input_ids is None):
            raise ValueError("You must specify at least one of input_ids, inputs_embeds, or audio_input_ids")

        if (input_ids is not None) and (inputs_embeds is not None):
            raise ValueError("Only one of input_ids or inputs_embeds can be provided")

        audio_token_mask = self.get_placeholder_mask(input_ids, inputs_embeds, audio_input_ids_mask)

        if input_ids is not None:
            inputs_embeds = self.embed_tokens(input_ids)

        if audio_input_ids is not None:
            audio_embeds = self.embed_audio_tokens(audio_input_ids)

        if inputs_embeds is not None and audio_input_ids is not None:
            audio_embeds = (
                audio_embeds[audio_input_ids_mask.to(audio_embeds.device)]
                if audio_input_ids_mask is not None
                else audio_embeds
            )
            inputs_embeds = inputs_embeds.masked_scatter(
                audio_token_mask[..., None].expand_as(inputs_embeds), audio_embeds.to(inputs_embeds.device)
            )
        elif audio_input_ids is not None:
            inputs_embeds = audio_embeds

        if use_cache and past_key_values is None:
            past_key_values = DynamicCache(config=self.config)

        if cache_position is None:
            past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0
            cache_position: torch.Tensor = torch.arange(
                past_seen_tokens, past_seen_tokens + inputs_embeds.shape[1], device=inputs_embeds.device
            )

        if position_ids is None:
            position_ids = cache_position.unsqueeze(0)

        causal_mask = create_causal_mask(
            config=self.config,
            inputs_embeds=inputs_embeds,
            attention_mask=attention_mask,
            cache_position=cache_position,
            past_key_values=past_key_values,
            position_ids=position_ids,
        )

        hidden_states = inputs_embeds
        position_embeddings = self.rotary_emb(hidden_states, position_ids)

        for decoder_layer in self.layers[: self.config.num_hidden_layers]:
            hidden_states = decoder_layer(
                hidden_states,
                attention_mask=causal_mask,
                audio_token_mask=audio_token_mask,
                position_ids=position_ids,
                past_key_values=past_key_values,
                cache_position=cache_position,
                position_embeddings=position_embeddings,
                **kwargs,
            )

        hidden_states = self.norm(hidden_states)
        return BaseModelOutputWithPast(
            last_hidden_state=hidden_states,
            past_key_values=past_key_values,
        )

    def get_placeholder_mask(
        self, input_ids: torch.LongTensor, inputs_embeds: torch.FloatTensor, audio_input_ids_mask: torch.LongTensor
    ):
        """
        Obtains multimodal placeholder mask from `input_ids` or `inputs_embeds`, and checks that the placeholder token count is
        equal to the length of audio_input_ids. If the lengths are different, an error is raised.

        If input_ids and inputs_embeds are None, we return None.
        Indeed this means we cannot determine the placeholder mask, the model is to be used in a audio-only mode, hence we return None.
        """
        if input_ids is None and inputs_embeds is None:
            return None

        elif input_ids is None:
            special_audio_mask = inputs_embeds == self.embed_tokens(
                torch.tensor(self.config.audio_token_id, dtype=torch.long, device=inputs_embeds.device)
            )
            special_audio_mask = special_audio_mask.all(-1)

        else:
            special_audio_mask = (input_ids == self.config.audio_token_id) | (
                input_ids == self.config.audio_delay_token_id
            )

        return special_audio_mask


@auto_docstring(
    custom_intro="""
    The Higgs Audio model, a llama-like auto-regressive transformer model with dual-FFN.
    """
)
class HiggsAudioV2ForConditionalGeneration(HiggsAudioV2PreTrainedModel, HiggsAudioV2GenerationMixin):
    base_model_prefix = "model"
    _keys_to_ignore_on_load_unexpected = ["text_lm_head.weight"]

    def __init__(self, config: HiggsAudioV2Config, use_text_head: bool = False):
        r"""
        use_text_head (`bool`, *optional*, defaults to False):
            Whether to use a text language model head. Such head is not required for generation,
            but can be used to compute the text loss when training.
        """
        super().__init__(config)
        self.model = HiggsAudioV2Model(config)
        self.audio_lm_head = nn.Linear(config.hidden_size, config.num_codebooks * config.codebook_size, bias=False)
        self.text_lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) if use_text_head else None

        self.post_init()

    def prepare_inputs_for_generation(
        self,
        input_ids: torch.LongTensor,
        audio_input_ids: torch.LongTensor | None = None,
        audio_input_ids_mask: torch.LongTensor | None = None,
        **kwargs,
    ):
        model_inputs = super().prepare_inputs_for_generation(input_ids, **kwargs)

        if audio_input_ids is not None and model_inputs.get("past_key_values") is not None:
            current_cache_length = model_inputs["cache_position"][0]
            audio_token_mask = (input_ids == self.config.audio_token_id) | (
                input_ids == self.config.audio_delay_token_id
            )
            in_cache_num_audio_input_ids = audio_token_mask[:, :current_cache_length].sum(dim=-1)

            # already cached audio_input_ids should be masked
            # this surmise that audio_input_ids are right padded!
            valid_audio_input_ids = audio_input_ids_mask.cumsum(dim=-1) > in_cache_num_audio_input_ids[:, None]
            audio_input_ids_mask = audio_input_ids_mask & valid_audio_input_ids

        if audio_input_ids_mask is not None and (~audio_input_ids_mask[:, :-1]).all():
            # in decoding mode, we only pass audio_input_ids
            audio_input_ids = audio_input_ids[:, -1:, :].clone(memory_format=torch.contiguous_format)
            model_inputs.pop("input_ids", None)
            audio_input_ids_mask = None

        model_inputs["audio_input_ids"] = audio_input_ids
        model_inputs["audio_input_ids_mask"] = audio_input_ids_mask

        return model_inputs

    @auto_docstring
    @can_return_tuple
    def forward(
        self,
        input_ids: torch.LongTensor | None = None,
        attention_mask: torch.BoolTensor | None = None,
        audio_input_ids: torch.LongTensor | None = None,
        audio_input_ids_mask: torch.LongTensor | None = None,
        position_ids: torch.LongTensor | None = None,
        past_key_values: Cache | None = None,
        inputs_embeds: torch.FloatTensor | None = None,
        labels: torch.LongTensor | None = None,
        audio_labels: torch.LongTensor | None = None,
        use_cache: bool | None = None,
        cache_position: torch.LongTensor | None = None,
        logits_to_keep: int | torch.Tensor = 0,
        **kwargs: Unpack[TransformersKwargs],
    ):
        r"""
        audio_input_ids (`torch.LongTensor` of shape `(batch_size, num_audio_frames, num_codebooks)`, *optional*):
            Indices of audio codebook tokens.

            Indices can be obtained using [`HiggsAudioV2TokenizerModel.encode`].
        audio_input_ids_mask (`torch.BoolTensor` of shape `(batch_size, num_audio_frames)`, *optional*):
            Indicates which audio frames in `audio_input_ids` are valid.
        audio_labels (`torch.LongTensor` of shape `(batch_size, num_audio_frames, num_codebooks)`, *optional*):
            Labels for the audio codebook tokens for computing the masked language modeling loss. Indices should either be in `[0, ...,
            config.codebook_size]. Token with indices set to `-100` are ignored (masked), the loss is only computed for the tokens with labels in `[0, ..., config.codebook_size]`.
            Can be obtained using `output_labels=True` when calling [`HiggsAudioV2Processor`].

        Returns:
            [`~models.modeling_outputs.CausalLMOutputWithPast`]:
                A [`~models.modeling_outputs.CausalLMOutputWithPast`] containing the logits, loss (if labels are provided),
                and other outputs from the model.

        Example:

        ```python
        >>> from transformers import AutoProcessor, HiggsAudioV2ForConditionalGeneration
        >>> model_id = "eustlb/higgs-audio-v2-generation-3B-base"
        >>> processor = AutoProcessor.from_pretrained(model_id, device_map="auto")
        >>> model = HiggsAudioV2ForConditionalGeneration.from_pretrained(model_id, device_map="auto")
        >>> conversation = [
        ...     {
        ...         "role": "system",
        ...         "content": [
        ...             {
        ...                 "type": "text",
        ...                 "text": "Generate audio following instruction."
        ...             }
        ...         ]
        ...     },
        ...     {
        ...         "role": "scene",
        ...         "content": [
        ...             {
        ...                 "type": "text",
        ...                 "text": "Audio is recorded from a quiet room."
        ...             }
        ...         ]
        ...     },
        ...     {
        ...         "role": "user",
        ...         "content": [
        ...             {
        ...                 "type": "text",
        ...                 "text": "It was the night before my birthday. Hooray! It's almost here! It may not be a holiday, but it's the best day of the year."
        ...             }
        ...         ]
        ...     },
        ...     {
        ...         "role": "assistant",
        ...         "content": [
        ...             {
        ...                 "type": "audio",
        ...                 "url": "https://huggingface.co/datasets/eustlb/dummy-audio-samples-higgs/resolve/main/belinda.wav"
        ...             }
        ...         ]
        ...     },
        ...     {
        ...         "role": "user",
        ...         "content": [
        ...             {
        ...                 "type": "text",
        ...                 "text": "The sun rises in the east and sets in the west. This simple fact has been observed by humans for thousands of years."
        ...             }
        ...         ]
        ...     }
        ... ]
        >>> inputs = processor.apply_chat_template(conversation, return_dict=True, tokenize=True, sampling_rate=24000, return_tensors="pt")
        >>> inputs = inputs.to(model.device)
        >>> outputs = model(**inputs)
        ```
        """
        outputs = self.model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            audio_input_ids=audio_input_ids,
            audio_input_ids_mask=audio_input_ids_mask,
            position_ids=position_ids,
            past_key_values=past_key_values,
            inputs_embeds=inputs_embeds,
            use_cache=use_cache,
            cache_position=cache_position,
            **kwargs,
        )

        hidden_states = outputs.last_hidden_state
        # Only compute necessary logits, and do not upcast them to float if we are not computing the loss
        slice_indices = slice(-logits_to_keep, None) if isinstance(logits_to_keep, int) else logits_to_keep
        logits = self.audio_lm_head(hidden_states[:, slice_indices, :])

        loss = None
        if audio_labels is not None:
            audio_logits = logits.reshape(*logits.shape[:2], self.config.num_codebooks, self.config.codebook_size)
            audio_labels_expanded = input_ids.new_ones((*input_ids.shape[:2], 8)) * -100
            audio_token_mask = self.model.get_placeholder_mask(input_ids, inputs_embeds, audio_input_ids_mask)
            audio_labels_expanded[audio_token_mask] = audio_labels[audio_input_ids_mask]

            codebook_losses = []
            for codebook_idx in range(self.config.num_codebooks):
                codebook_logits = audio_logits[:, :, codebook_idx, :]
                codebook_labels = audio_labels_expanded[:, :, codebook_idx]
                codebook_losses.append(
                    self.loss_function(codebook_logits, codebook_labels, self.config.codebook_size, **kwargs)
                )

            loss = sum(codebook_losses)

        if labels is not None:
            if self.text_lm_head is not None:
                text_logits = self.text_lm_head(hidden_states[:, slice_indices, :])
                text_loss = self.loss_function(text_logits, labels, self.config.vocab_size, **kwargs)
                loss = text_loss if loss is None else loss + text_loss
            else:
                logger.warning_once(
                    f"`labels` provided to {self.__class__.__name__} but `text_lm_head` is disabled. "
                    f"Text labels ignored. Set `use_text_head=True` in model init to enable text loss."
                )

        return CausalLMOutputWithPast(
            loss=loss,
            logits=logits,
            past_key_values=outputs.past_key_values,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )


__all__ = ["HiggsAudioV2ForConditionalGeneration", "HiggsAudioV2PreTrainedModel", "HiggsAudioV2Model"]
