| |
|
| |
|
| | """# shared_subspace_encoder.py"""
|
| |
|
| | from typing import Optional
|
| |
|
| | import torch
|
| | from torch import nn
|
| |
|
| | from transformers.configuration_utils import PretrainedConfig
|
| | from transformers.modeling_utils import PreTrainedModel
|
| | from transformers.modeling_attn_mask_utils import _prepare_4d_attention_mask_for_sdpa
|
| |
|
| | from ..layers.mla import MultiheadLatentAttention, RotaryEmbedding
|
| | from ..layers.feedforward import SubspaceFeedForward
|
| | from ..models.shared_space_config import SharedSpaceDecoderConfig
|
| |
|
| | """
|
| | RMSNorm
|
| | From: https://huggingface.co/deepseek-ai/DeepSeek-R1/blob/main/modeling_deepseek.py
|
| | """
|
| |
|
| | class DeepseekV3RMSNorm(nn.Module):
|
| | def __init__(self, hidden_size, eps=1e-6):
|
| | """
|
| | DeepseekV3RMSNorm is equivalent to T5LayerNorm
|
| | """
|
| | super().__init__()
|
| | self.weight = nn.Parameter(torch.ones(hidden_size))
|
| | self.variance_epsilon = eps
|
| |
|
| | def forward(self, hidden_states):
|
| | input_dtype = hidden_states.dtype
|
| | hidden_states = hidden_states.to(torch.float32)
|
| | variance = hidden_states.pow(2).mean(-1, keepdim=True)
|
| | hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon)
|
| | return self.weight * hidden_states.to(input_dtype)
|
| |
|
| | def create_norm_layer(hidden_size: int, config: SharedSpaceDecoderConfig) -> nn.Module:
|
| | """
|
| | Create a normalization layer based on the config norm_type.
|
| |
|
| | Args:
|
| | hidden_size: The dimension to normalize over
|
| | config: Configuration containing norm_type and epsilon values
|
| |
|
| | Returns:
|
| | Either a LayerNorm or RMSNorm layer
|
| | """
|
| | if config.norm_type == "layernorm":
|
| | return nn.LayerNorm(hidden_size, eps=config.layer_norm_eps)
|
| | elif config.norm_type == "rmsnorm":
|
| | return DeepseekV3RMSNorm(hidden_size, eps=config.rms_norm_eps)
|
| | else:
|
| |
|
| | raise ValueError(f"Unknown norm_type: {config.norm_type}")
|
| |
|
| | """#### *PreTrainedModel"""
|
| |
|
| | class SharedSpaceDecoderPreTrainedModel(PreTrainedModel):
|
| | """
|
| | The **PreTrainedModel object:
|
| | - Is instantiated when TODO
|
| | - Initializes:
|
| | - TODO
|
| | - Provides access to TODO
|
| | - Executes TODO
|
| | """
|
| |
|
| | config_class = SharedSpaceDecoderConfig
|
| | base_model_prefix = "model"
|
| |
|
| | def _init_weights(self, module: nn.Module) -> None:
|
| | """Weight initialization hook used by :class:`PreTrainedModel`.
|
| |
|
| | ``PreTrainedModel.post_init`` will recursively apply this function to
|
| | every submodule right after construction. HuggingFace models override
|
| | it so that creating a model from scratch yields the same initialization
|
| | as ``from_pretrained`` when no checkpoint is supplied.
|
| |
|
| | This decoder-specific initialization strategy includes:
|
| | - Proper handling of configurable normalization layers (LayerNorm or RMSNorm)
|
| | - Special initialization for language modeling heads
|
| | - Considerations for causal attention and autoregressive modeling
|
| | - Support for both dense and decomposed vocabulary embeddings
|
| | """
|
| |
|
| | if isinstance(module, nn.Linear):
|
| |
|
| | module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
|
| | if module.bias is not None:
|
| | module.bias.data.zero_()
|
| |
|
| | elif isinstance(module, nn.Embedding):
|
| |
|
| | module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
|
| | if module.padding_idx is not None:
|
| | module.weight.data[module.padding_idx].zero_()
|
| |
|
| | elif isinstance(module, DeepseekV3RMSNorm):
|
| |
|
| | module.weight.data.fill_(1.0)
|
| |
|
| | elif isinstance(module, nn.LayerNorm):
|
| |
|
| | module.bias.data.zero_()
|
| | module.weight.data.fill_(1.0)
|
| |
|
| |
|
| | class SharedSpaceDecoderLayer(nn.Module):
|
| | """
|
| | The **Layer object:
|
| | - Is instantiated by :class:`SharedSpaceDecoderModel` for each
|
| | Transformer block in the decoder.
|
| | - Initializes:
|
| | - ``self_attn`` – multi-head latent attention implementing either
|
| | dense or latent projections depending on the configuration.
|
| | - ``ffn`` – a :class:`SubspaceFeedForward` block.
|
| | - RMSNorm layers for pre-attention and pre-FFN normalization.
|
| | - Provides access to the attention and feed-forward submodules via the
|
| | attributes ``self_attn`` and ``ffn``.
|
| | - Executes a single decoder block in :meth:`forward`.
|
| | """
|
| |
|
| | def __init__(self, config: SharedSpaceDecoderConfig, layer_idx: int) -> None:
|
| |
|
| | super().__init__()
|
| |
|
| |
|
| | self.attn_input_norm = create_norm_layer(config.hidden_size, config)
|
| |
|
| |
|
| | self.self_attn = MultiheadLatentAttention(config, layer_idx)
|
| |
|
| |
|
| | self.ffn_input_norm = create_norm_layer(config.hidden_size, config)
|
| |
|
| |
|
| | self.ffn = SubspaceFeedForward(config, layer_idx)
|
| |
|
| | def forward(
|
| | self,
|
| | hidden_states: torch.Tensor,
|
| | position_embeddings: tuple[torch.Tensor, torch.Tensor],
|
| | attention_mask: Optional[torch.Tensor],
|
| | ) -> torch.Tensor:
|
| |
|
| |
|
| |
|
| |
|
| | residual_strm = hidden_states
|
| |
|
| |
|
| | attn_input = self.attn_input_norm(hidden_states)
|
| |
|
| |
|
| | attn_output = self.self_attn(
|
| | attn_input,
|
| | position_embeddings,
|
| | attention_mask,
|
| | )
|
| |
|
| |
|
| |
|
| | hidden_states = residual_strm + attn_output
|
| |
|
| |
|
| |
|
| |
|
| | residual_strm = hidden_states
|
| |
|
| |
|
| | ffn_input = self.ffn_input_norm(hidden_states)
|
| |
|
| |
|
| | ffn_output = self.ffn(ffn_input)
|
| |
|
| |
|
| | hidden_states = residual_strm + ffn_output
|
| |
|
| | return hidden_states
|
| |
|
| |
|
| | class SharedSpaceDecoderModel(SharedSpaceDecoderPreTrainedModel):
|
| | """
|
| | The **Model object:
|
| | - Initializes:
|
| | - The vocabulary embeddings (and optional decomposition)
|
| | - Position embeddings (calculated in RotaryEmbedding)
|
| | - All of the **Layer objects.
|
| | - Provides interface to vocab embeddings.
|
| | - Executes the whole decoder model in `forward` with causal attention.
|
| |
|
| | This is the base decoder without the language modeling head.
|
| | Use SubspaceDecoderForCausalLM for language modeling tasks.
|
| | """
|
| |
|
| | def __init__(self, config: SharedSpaceDecoderConfig) -> None:
|
| | super().__init__(config)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | if config.vocab_subspace:
|
| |
|
| |
|
| |
|
| | self.vocab_embed = nn.Embedding(
|
| | config.vocab_size,
|
| | config.vocab_rank
|
| | )
|
| |
|
| |
|
| |
|
| |
|
| | self.vocab_proj = nn.Linear(
|
| | config.vocab_rank,
|
| | config.hidden_size,
|
| | bias=False
|
| | )
|
| |
|
| |
|
| | else:
|
| |
|
| | self.vocab_embed = nn.Embedding(
|
| | config.vocab_size,
|
| | config.hidden_size
|
| | )
|
| |
|
| | self.vocab_proj = None
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | self.rope = RotaryEmbedding(config)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | layers = []
|
| |
|
| |
|
| | for i in range(config.num_hidden_layers):
|
| |
|
| | layers.append(
|
| | SharedSpaceDecoderLayer(
|
| | config,
|
| | layer_idx = i
|
| | )
|
| | )
|
| |
|
| |
|
| | self.layers = nn.ModuleList(layers)
|
| |
|
| |
|
| | self.post_init()
|
| |
|
| |
|
| |
|
| |
|
| | def embed(self, input_ids: torch.LongTensor) -> torch.Tensor:
|
| | """
|
| | Return token embeddings for input ids.
|
| | This will perform the up projection to model space if the vocabulary is
|
| | decomposed.
|
| |
|
| | input_ids have shape [batch_size, seq_len]
|
| | """
|
| |
|
| |
|
| | if self.vocab_proj is not None:
|
| |
|
| |
|
| |
|
| |
|
| | x = self.vocab_embed(input_ids)
|
| |
|
| |
|
| | return(self.vocab_proj(x))
|
| |
|
| |
|
| | else:
|
| |
|
| | return self.vocab_embed(input_ids)
|
| |
|
| | def forward(
|
| | self,
|
| | input_ids: torch.LongTensor,
|
| | attention_mask: Optional[torch.Tensor] = None,
|
| | **kwargs,
|
| | ) -> torch.Tensor:
|
| | """
|
| | Run the full decoder stack with causal attention.
|
| |
|
| | Inputs:
|
| | input_ids [batch_size, seq_len]
|
| | attention_mask [batch_size, seq_len] - 1 for real tokens, 0 for padding
|
| |
|
| | Returns:
|
| | Final decoder layer output [batch_size, seq_len, model_size]
|
| | """
|
| |
|
| |
|
| |
|
| | hidden_states = self.embed(input_ids)
|
| |
|
| |
|
| |
|
| |
|
| | seq_len = hidden_states.size(1)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | R_cos = self.rope.cos[:seq_len]
|
| | R_sin = self.rope.sin[:seq_len]
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | """
|
| | use_sdpa_attention_masks = (
|
| | self.attn_implementation == "sdpa"
|
| | and self.position_embedding_type == "absolute"
|
| | and head_mask is None
|
| | and not output_attentions
|
| | )
|
| | """
|
| |
|
| |
|
| |
|
| | if True:
|
| |
|
| |
|
| | extended_attention_mask = _prepare_4d_attention_mask_for_sdpa(
|
| | attention_mask,
|
| | hidden_states.dtype,
|
| | tgt_len = seq_len
|
| | )
|
| | attention_mask = extended_attention_mask
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | for layer_i, layer in enumerate(self.layers):
|
| |
|
| |
|
| | hidden_states = layer(
|
| | hidden_states,
|
| | (R_cos, R_sin),
|
| | attention_mask,
|
| | )
|
| |
|
| |
|
| | return hidden_states
|
| |
|
| |
|