# Copyright 2025 the HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math

from ...feature_extraction_utils import BatchFeature
from ...image_utils import ImageInput, make_nested_list_of_images
from ...processing_utils import (
    ProcessingKwargs,
    ProcessorMixin,
    TextKwargs,
    Unpack,
)
from ...tokenization_utils_base import BatchEncoding, TextInput
from ...utils import auto_docstring, logging


logger = logging.get_logger(__name__)


class Lfm2VlTextKwargs(TextKwargs, total=False):
    """
    use_image_special_tokens (`bool`, *optional*, defaults to `True`):
        Whether to use special image tokens (`<|image_start|>` and `<|image_end|>`) to delimit image sequences
        in the text. When enabled, images are wrapped with these tokens to clearly mark image boundaries.
        When disabled, only the image token itself is used without delimiters.
    """

    use_image_special_tokens: bool | None


class Lfm2VlProcessorKwargs(ProcessingKwargs, total=False):
    text_kwargs: Lfm2VlTextKwargs
    _defaults = {
        "images_kwargs": {
            "return_row_col_info": True,
        },
        "text_kwargs": {
            "use_image_special_tokens": True,
            "add_special_tokens": False,
            "padding": False,
            "is_split_into_words": False,
        },
    }


@auto_docstring
class Lfm2VlProcessor(ProcessorMixin):
    def __init__(
        self,
        image_processor,
        tokenizer,
        chat_template: str | None = None,
        **kwargs,
    ):
        self.image_token = getattr(tokenizer, "image_token", "<image>")
        self.image_token_id = (
            tokenizer.image_token_id
            if hasattr(tokenizer, "image_token_id")
            else tokenizer.convert_tokens_to_ids(self.image_token)
        )
        self.image_start_token = getattr(tokenizer, "image_start_token", "<|image_start|>")
        self.image_end_token = getattr(tokenizer, "image_end_token", "<|image_end|>")
        self.image_thumbnail_token = getattr(tokenizer, "image_thumbnail_token", "<|img_thumbnail|>")
        super().__init__(image_processor, tokenizer, chat_template=chat_template, **kwargs)

    @auto_docstring
    def __call__(
        self,
        images: ImageInput | list[ImageInput] | list[list[ImageInput]] | None = None,
        text: TextInput | list[TextInput] | None = None,
        **kwargs: Unpack[Lfm2VlProcessorKwargs],
    ) -> BatchEncoding:
        if text is None and images is None:
            raise ValueError("You must provide one of `text` or `images`.")

        if images is not None and text is None:
            raise ValueError(
                "You must provide `text` when `images` is provided. Minimal text consists of a single image token."
            )

        output_kwargs = self._merge_kwargs(
            Lfm2VlProcessorKwargs,
            tokenizer_init_kwargs=self.tokenizer.init_kwargs,
            **kwargs,
        )

        if isinstance(text, str):
            text = [text]
        elif not isinstance(text, list) and not isinstance(text[0], str):
            raise TypeError("Invalid input text. Please provide a string, or a list of strings")

        n_images_in_text = [sample.count(self.image_token) for sample in text]
        if sum(n_images_in_text) > 0 and images is None:
            raise ValueError(f"We detected {sum(n_images_in_text)} tokens in the text but no images were passed")

        inputs = {}
        use_image_special_tokens = output_kwargs["text_kwargs"].pop("use_image_special_tokens")

        if images is not None:
            images = self.image_processor.fetch_images(images)
            batched_images = make_nested_list_of_images(images)
            vision_inputs = self.image_processor(batched_images, **output_kwargs["images_kwargs"])

            n_images_in_images = [len(sublist) for sublist in batched_images]
            if n_images_in_images != n_images_in_text:
                raise ValueError(
                    f"The number of images in the text {n_images_in_text} and images {n_images_in_images} should be the same."
                )

            text = self.expand_text_with_placeholders(
                text,
                batched_images,
                image_rows=vision_inputs.pop("image_rows"),
                image_cols=vision_inputs.pop("image_cols"),
                image_sizes=vision_inputs.pop("image_sizes"),
                use_image_special_tokens=use_image_special_tokens,
                **output_kwargs["images_kwargs"],
            )
            inputs.update(vision_inputs)

        return_tensors = output_kwargs["text_kwargs"].pop("return_tensors", None)

        text_inputs = self.tokenizer(text, **output_kwargs["text_kwargs"])
        inputs.update(text_inputs)

        return BatchFeature(inputs, tensor_type=return_tensors)

    def expand_text_with_placeholders(
        self,
        text: list[str],
        images: list[list[ImageInput]],
        image_rows: list[list[int]],
        image_cols: list[list[int]],
        image_sizes: list[list[int]],
        use_image_special_tokens: bool,
        **images_kwargs,
    ) -> list[str]:
        use_thumbnail = images_kwargs.get("use_thumbnail", self.image_processor.use_thumbnail)
        image_data = iter(zip(image_rows, image_cols, image_sizes))

        prompt_strings = []
        for sample_text, sample_images in zip(text, images):
            text_parts = sample_text.split(self.image_token)
            result_parts = []

            for i, _ in enumerate(sample_images):
                result_parts.append(text_parts[i])

                rows, cols, image_size = next(image_data)
                tokens_per_tile, tokens_for_image = self._get_image_num_tokens(image_size, **images_kwargs)
                image_tokens = self._build_image_tokens(
                    rows,
                    cols,
                    tokens_per_tile,
                    tokens_for_image,
                    use_thumbnail,
                    use_image_special_tokens,
                )
                result_parts.append(image_tokens)

            # Add remaining text after the last image
            if len(sample_images) < len(text_parts):
                result_parts.append(text_parts[-1])

            prompt_strings.append("".join(result_parts))

        return prompt_strings

    def _build_image_tokens(
        self,
        rows: int,
        cols: int,
        tokens_per_tile: int,
        tokens_for_image: int,
        use_thumbnail: bool,
        use_image_special_tokens: bool,
    ) -> str:
        """Build the expanded token string for a single image."""
        parts = []

        if use_image_special_tokens:
            parts.append(self.image_start_token)

        is_multi_tile = rows > 1 or cols > 1
        if is_multi_tile:
            for row in range(rows):
                for col in range(cols):
                    if use_image_special_tokens:
                        parts.append(f"<|img_row_{row + 1}_col_{col + 1}|>")
                    parts.append(self.image_token * tokens_per_tile)

            if use_thumbnail:
                if use_image_special_tokens:
                    parts.append(self.image_thumbnail_token)
                parts.append(self.image_token * tokens_for_image)
        else:
            parts.append(self.image_token * tokens_for_image)

        if use_image_special_tokens:
            parts.append(self.image_end_token)

        return "".join(parts)

    def _compute_tokens_per_tile(self, tile_size: int, encoder_patch_size: int, downsample_factor: int) -> int:
        """Compute the number of tokens for a single tile."""
        num_patches = tile_size // encoder_patch_size
        downsampled_patches = math.ceil(num_patches / downsample_factor)
        return downsampled_patches * downsampled_patches

    def _compute_tokens_for_image(self, image_size: list[int], encoder_patch_size: int, downsample_factor: int) -> int:
        """Compute the number of tokens for a resized image (used for single-tile or thumbnail)."""
        image_height, image_width = image_size
        patches_h = math.ceil((image_height // encoder_patch_size) / downsample_factor)
        patches_w = math.ceil((image_width // encoder_patch_size) / downsample_factor)
        return patches_h * patches_w

    def _get_image_num_tokens(self, image_size: list[int], **images_kwargs) -> tuple[int, int]:
        """
        Compute token counts for image processing.

        Returns:
            tuple[int, int]: (tokens_per_tile, tokens_for_image)
                - tokens_per_tile: tokens for each tile in multi-tile mode
                - tokens_for_image: tokens for the resized image (single-tile) or thumbnail (multi-tile)
        """
        tile_size = images_kwargs.get("tile_size", self.image_processor.tile_size)
        downsample_factor = images_kwargs.get("downsample_factor", self.image_processor.downsample_factor)
        encoder_patch_size = images_kwargs.get("encoder_patch_size", self.image_processor.encoder_patch_size)

        tokens_per_tile = self._compute_tokens_per_tile(tile_size, encoder_patch_size, downsample_factor)
        tokens_for_image = self._compute_tokens_for_image(image_size, encoder_patch_size, downsample_factor)

        return tokens_per_tile, tokens_for_image

    def batch_decode(self, *args, **kwargs):
        """
        This method forwards all its arguments to LFM2Tokeniser's [`~PreTrainedTokenizer.batch_decode`]. Please
        refer to the docstring of this method for more information.
        """
        batched_decode_output = self.tokenizer.batch_decode(*args, **kwargs)
        return batched_decode_output

    def decode(self, *args, **kwargs):
        """
        This method forwards all its arguments to LFM2Tokeniser's [`~PreTrainedTokenizer.decode`]. Please refer to
        the docstring of this method for more information.
        """
        decode_output = self.tokenizer.decode(*args, **kwargs)
        return decode_output

    @property
    def model_input_names(self):
        tokenizer_input_names = self.tokenizer.model_input_names
        image_processor_input_names = self.image_processor.model_input_names

        # LFM2-VL has no dedicated tokenizer class and uses the Base class with default model input names
        tokenizer_input_names = [name for name in tokenizer_input_names if name != "token_type_ids"]
        return list(tokenizer_input_names + image_processor_input_names)


__all__ = ["Lfm2VlProcessor"]
