# Copyright 2024 Microsoft Research and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Processor class for Kosmos2_5.
"""

from ...image_processing_utils import BatchFeature
from ...image_utils import ImageInput
from ...processing_utils import ProcessingKwargs, ProcessorMixin, Unpack
from ...tokenization_utils_base import TextInput
from ...utils import auto_docstring, is_torch_available


if is_torch_available():
    import torch


class Kosmos2_5ProcessorKwargs(ProcessingKwargs, total=False):
    _defaults = {
        "text_kwargs": {
            "padding": True,
            "return_token_type_ids": False,
            "stride": 0,
            "truncation": True,
        },
        "images_kwargs": {
            "max_patches": 4096,
        },
        "common_kwargs": {"return_tensors": "pt"},
    }


@auto_docstring
class Kosmos2_5Processor(ProcessorMixin):
    def __init__(self, image_processor, tokenizer, num_image_tokens: int = 2048):
        r"""
        num_image_tokens (`int`, *optional*, defaults to 2048):
            Number of image tokens used as a placeholder.
        """
        self.image_start_token = tokenizer.boi_token  # "<image>" : fixed token for the start of image
        self.image_end_token = tokenizer.eoi_token  # "</image>" : fixed token for the end of image
        self.image_token = tokenizer.image_token  # "<s>" : within a <image> ... </image> pair, these <s> tokens indicate they are positions reserved for an image
        self.num_image_tokens = num_image_tokens
        super().__init__(image_processor, tokenizer)

    @auto_docstring
    def __call__(
        self,
        images: ImageInput | None = None,
        text: TextInput | list[TextInput] = None,
        **kwargs: Unpack[Kosmos2_5ProcessorKwargs],
    ) -> BatchFeature:
        if images is None and text is None:
            raise ValueError("You have to specify either images or text.")

        if images is None:
            raise ValueError("Kosmos2_5Processor requires images to be passed.")

        output_kwargs = self._merge_kwargs(
            Kosmos2_5ProcessorKwargs,
            tokenizer_init_kwargs=self.tokenizer.init_kwargs,
            **kwargs,
        )
        encoding = BatchFeature()

        if images is not None:
            image_encoding = self.image_processor(images, **output_kwargs["images_kwargs"])
            image_encoding.pop("rows")
            image_encoding.pop("cols")
            encoding.update(image_encoding)

        prompt = f"{self.tokenizer.bos_token}{self.image_start_token}{self.image_token * self.num_image_tokens}{self.image_end_token}"

        if text is not None:
            if isinstance(text, str):
                text = [prompt + text]
            else:
                text = [prompt + t for t in text]
            input = self.tokenizer(text, **output_kwargs["text_kwargs"])

            batch_size, seq_len = input.input_ids.shape
            image_embeds_position_mask = [0, -1] + [1] * self.num_image_tokens + [-1]
            image_embeds_position_mask += [0] * (seq_len - len(image_embeds_position_mask))
            image_embeds_position_mask = (
                torch.LongTensor(image_embeds_position_mask).unsqueeze(0).repeat(batch_size, 1)
            )

            encoding.update(
                {
                    "input_ids": input.input_ids,
                    "attention_mask": input.attention_mask,
                    "image_embeds_position_mask": image_embeds_position_mask,
                }
            )

        return encoding

    def batch_decode(self, *args, **kwargs):
        """
        This method forwards all its arguments to Kosmos2_5TokenizerFast's [`~PreTrainedTokenizer.batch_decode`].
        Please refer to the docstring of this method for more information.
        """
        return self.tokenizer.batch_decode(*args, **kwargs)

    def decode(self, *args, **kwargs):
        """
        This method forwards all its arguments to Kosmos2_5TokenizerFast's [`~PreTrainedTokenizer.decode`]. Please
        refer to the docstring of this method for more information.
        """
        return self.tokenizer.decode(*args, **kwargs)

    @property
    def model_input_names(self):
        tokenizer_input_names = self.tokenizer.model_input_names
        image_processor_input_names = self.image_processor.model_input_names
        return list(dict.fromkeys(tokenizer_input_names + image_processor_input_names))


__all__ = ["Kosmos2_5Processor"]
