diff --git a/multimolecule/models/__init__.py b/multimolecule/models/__init__.py index 1076773d..044ef004 100644 --- a/multimolecule/models/__init__.py +++ b/multimolecule/models/__init__.py @@ -6,6 +6,13 @@ RnaBertForTokenClassification, RnaBertModel, ) +from .rnafm import ( + RnaFmConfig, + RnaFmForMaskedLM, + RnaFmForSequenceClassification, + RnaFmForTokenClassification, + RnaFmModel, +) from .rnamsm import ( RnaMsmConfig, RnaMsmForMaskedLM, @@ -27,6 +34,11 @@ "RnaBertForMaskedLM", "RnaBertForSequenceClassification", "RnaBertForTokenClassification", + "RnaFmConfig", + "RnaFmForMaskedLM", + "RnaFmForSequenceClassification", + "RnaFmForTokenClassification", + "RnaFmModel", "RnaMsmConfig", "RnaMsmModel", "RnaMsmForMaskedLM", diff --git a/multimolecule/models/rnafm/__init__.py b/multimolecule/models/rnafm/__init__.py new file mode 100644 index 00000000..bee6fb5c --- /dev/null +++ b/multimolecule/models/rnafm/__init__.py @@ -0,0 +1,31 @@ +from transformers import ( + AutoConfig, + AutoModel, + AutoModelForMaskedLM, + AutoModelForSequenceClassification, + AutoModelForTokenClassification, + AutoModelWithLMHead, + AutoTokenizer, +) + +from multimolecule.tokenizers.rna import RnaTokenizer + +from .configuration_rnafm import RnaFmConfig +from .modeling_rnafm import RnaFmForMaskedLM, RnaFmForSequenceClassification, RnaFmForTokenClassification, RnaFmModel + +__all__ = [ + "RnaFmConfig", + "RnaFmModel", + "RnaTokenizer", + "RnaFmForMaskedLM", + "RnaFmForSequenceClassification", + "RnaFmForTokenClassification", +] + +AutoConfig.register("rnafm", RnaFmConfig) +AutoModel.register(RnaFmConfig, RnaFmModel) +AutoModelForMaskedLM.register(RnaFmConfig, RnaFmForMaskedLM) +AutoModelForSequenceClassification.register(RnaFmConfig, RnaFmForSequenceClassification) +AutoModelForTokenClassification.register(RnaFmConfig, RnaFmForTokenClassification) +AutoModelWithLMHead.register(RnaFmConfig, RnaFmForTokenClassification) +AutoTokenizer.register(RnaFmConfig, RnaTokenizer) diff --git a/multimolecule/models/rnafm/configuration_rnafm.py b/multimolecule/models/rnafm/configuration_rnafm.py new file mode 100644 index 00000000..713e0129 --- /dev/null +++ b/multimolecule/models/rnafm/configuration_rnafm.py @@ -0,0 +1,130 @@ +from transformers.utils import logging + +from ..configuration_utils import HeadConfig, MaskedLMHeadConfig, PretrainedConfig + +logger = logging.get_logger(__name__) + + +class RnaFmConfig(PretrainedConfig): + r""" + This is the configuration class to store the configuration of a [`RnaFmModel`]. It is used to instantiate a RNA-FM + model according to the specified arguments, defining the model architecture. Instantiating a configuration with the + defaults will yield a similar configuration to that of the RNA-FM + [ml4bio/RNA-FM](https://github.com/ml4bio/RNA-FM) architecture. + + Configuration objects inherit from [`PretrainedConfig`] and can be used to control the model outputs. Read the + documentation from [`PretrainedConfig`] for more information. + + + Args: + vocab_size (`int`, *optional*): + Vocabulary size of the RNA-FM model. Defines the number of different tokens that can be represented by the + `inputs_ids` passed when calling [`RnaFmModel`]. + hidden_size (`int`, *optional*, defaults to 768): + Dimensionality of the encoder layers and the pooler layer. + num_hidden_layers (`int`, *optional*, defaults to 12): + Number of hidden layers in the Transformer encoder. + num_attention_heads (`int`, *optional*, defaults to 12): + Number of attention heads for each attention layer in the Transformer encoder. + intermediate_size (`int`, *optional*, defaults to 3072): + Dimensionality of the "intermediate" (often named feed-forward) layer in the Transformer encoder. + hidden_dropout (`float`, *optional*, defaults to 0.1): + The dropout probability for all fully connected layers in the embeddings, encoder, and pooler. + attention_dropout (`float`, *optional*, defaults to 0.1): + The dropout ratio for the attention probabilities. + max_position_embeddings (`int`, *optional*, defaults to 1026): + The maximum sequence length that this model might ever be used with. Typically set this to something large + just in case (e.g., 512 or 1024 or 2048). + initializer_range (`float`, *optional*, defaults to 0.02): + The standard deviation of the truncated_normal_initializer for initializing all weight matrices. + layer_norm_eps (`float`, *optional*, defaults to 1e-12): + The epsilon used by the layer normalization layers. + pad_token_id (`int`, *optional*, defaults to 0): + The index of the padding token in the vocabulary. This must be included in the config because certain parts + of the RnaBert code use this instead of the attention mask. + bos_token_id (`int`, *optional*, defaults to 1): + The index of the bos token in the vocabulary. This must be included in the config because of the + contact and other prediction heads removes the bos and padding token when predicting outputs. + mask_token_id (`int`, *optional*, defaults to 4): + The index of the mask token in the vocabulary. This must be included in the config because of the + "mask-dropout" scaling trick, which will scale the inputs depending on the number of masked tokens. + position_embedding_type (`str`, *optional*, defaults to `"absolute"`): + Type of position embedding. Choose one of `"absolute"`, `"relative_key"`, `"relative_key_query", "rotary"`. + For positional embeddings use `"absolute"`. For more information on `"relative_key"`, please refer to + [Self-Attention with Relative Position Representations (Shaw et al.)](https://arxiv.org/abs/1803.02155). + For more information on `"relative_key_query"`, please refer to *Method 4* in [Improve Transformer Models + with Better Relative Position Embeddings (Huang et al.)](https://arxiv.org/abs/2009.13658). + is_decoder (`bool`, *optional*, defaults to `False`): + Whether the model is used as a decoder or not. If `False`, the model is used as an encoder. + use_cache (`bool`, *optional*, defaults to `True`): + Whether or not the model should return the last key/values attentions (not used by all models). Only + relevant if `config.is_decoder=True`. + emb_layer_norm_before (`bool`, *optional*): + Whether to apply layer normalization after embeddings but before the main stem of the network. + token_dropout (`bool`, defaults to `False`): + When this is enabled, masked tokens are treated as if they had been dropped out by input dropout. + + Examples: + + ```python + >>> from multimolecule import RnaFmModel, RnaFmConfig + + >>> # Initializing a RNA-FM style configuration >>> configuration = RnaFmConfig() + + >>> # Initializing a model from the configuration >>> model = RnaFmModel(configuration) + + >>> # Accessing the model configuration >>> configuration = model.config + ``` + """ + + model_type = "rnafm" + + def __init__( + self, + vocab_size=25, + hidden_size=640, + num_hidden_layers=12, + num_attention_heads=20, + intermediate_size=5120, + hidden_act="gelu", + hidden_dropout=0.1, + attention_dropout=0.1, + max_position_embeddings=1026, + initializer_range=0.02, + layer_norm_eps=1e-12, + pad_token_id=0, + bos_token_id=1, + mask_token_id=4, + position_embedding_type="absolute", + use_cache=True, + emb_layer_norm_before=True, + token_dropout=True, + head=None, + lm_head=None, + **kwargs, + ): + if head is None: + head = {} + if lm_head is None: + lm_head = {} + head.setdefault("hidden_size", hidden_size) + lm_head.setdefault("hidden_size", hidden_size) + super().__init__(pad_token_id=pad_token_id, bos_token_id=bos_token_id, mask_token_id=mask_token_id, **kwargs) + + self.vocab_size = vocab_size + self.hidden_size = hidden_size + self.num_hidden_layers = num_hidden_layers + self.num_attention_heads = num_attention_heads + self.intermediate_size = intermediate_size + self.hidden_act = hidden_act + self.hidden_dropout = hidden_dropout + self.attention_dropout = attention_dropout + self.max_position_embeddings = max_position_embeddings + self.initializer_range = initializer_range + self.layer_norm_eps = layer_norm_eps + self.position_embedding_type = position_embedding_type + self.use_cache = use_cache + self.emb_layer_norm_before = emb_layer_norm_before + self.token_dropout = token_dropout + self.head = HeadConfig(**head) + self.lm_head = MaskedLMHeadConfig(**lm_head) diff --git a/multimolecule/models/rnafm/convert_checkpoint.py b/multimolecule/models/rnafm/convert_checkpoint.py new file mode 100644 index 00000000..a93ec0b9 --- /dev/null +++ b/multimolecule/models/rnafm/convert_checkpoint.py @@ -0,0 +1,158 @@ +import os +from typing import Optional + +import chanfig +import torch +from torch import nn + +from multimolecule.models import RnaFmConfig as Config +from multimolecule.models import RnaFmForMaskedLM as Model +from multimolecule.tokenizers.rna.utils import get_special_tokens_map, get_tokenizer_config, get_vocab_list + +try: + from huggingface_hub import HfApi +except ImportError: + HfApi = None + + +torch.manual_seed(1013) + +CONFIG = { + "architectures": ["RnaFmModel"], + "attention_dropout": 0.1, + "hidden_act": "gelu", + "hidden_dropout": 0.1, + "hidden_size": 640, + "intermediate_size": 5120, + "max_position_embeddings": 1026, + "num_attention_heads": 20, + "num_hidden_layers": 12, + "max_tokens_per_msa": 2**14, + "num_labels": 1, +} + +original_vocab_list = [ + "", + "", + "", + "", + "A", + "C", + "G", + "U", + "R", + "Y", + "K", + "M", + "S", + "W", + "B", + "D", + "H", + "V", + "N", + "-", + "", + "", + "", + "", + "", +] +vocab_list = get_vocab_list() + + +def _convert_checkpoint(config, original_state_dict): + state_dict = {} + for key, value in original_state_dict.items(): + key = "rnafm" + key[7:] + key = key.replace("LayerNorm", "layer_norm") + key = key.replace("gamma", "weight") + key = key.replace("beta", "bias") + key = key.replace("rnafm.encoder.emb_layer_norm_before", "rnafm.embeddings.layer_norm") + key = key.replace("rnafm.encoder.embed_tokens", "rnafm.embeddings.word_embeddings") + key = key.replace("rnafm.encoder.embed_positions", "rnafm.embeddings.position_embeddings") + key = key.replace("layers", "layer") + key = key.replace("self_attn", "attention.self") + key = key.replace("q_proj", "query") + key = key.replace("k_proj", "key") + key = key.replace("v_proj", "value") + key = key.replace("self.out_proj", "output.dense") + key = key.replace("self_layer_norm", "layer_norm") + key = key.replace("final_layer_norm", "layer_norm") + key = key.replace("fc1", "intermediate.dense") + key = key.replace("fc2", "output.dense") + key = key.replace("regression", "decoder") + key = key.replace("rnafm.encoder.lm_head", "lm_head") + key = key.replace("lm_head.dense", "lm_head.transform.dense") + key = key.replace("lm_head.layer_norm", "lm_head.transform.layer_norm") + key = key.replace("lm_head.weight", "lm_head.decoder.weight") + key = key.replace("rnafm.encoder.contact_head", "contact_head") + state_dict[key] = value + + state_vocab_size = state_dict["rnafm.embeddings.word_embeddings.weight"].size(0) + original_vocab_size = len(original_vocab_list) + if state_vocab_size != original_vocab_size: + raise ValueError( + f"Vocabulary size do not match. Expected to have {original_vocab_size}, but got {state_vocab_size}." + ) + word_embed = nn.Embedding(config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id) + word_embed_weight = word_embed.weight.data + predictions_decoder_weight = torch.zeros((config.vocab_size, config.hidden_size)) + predictions_bias = torch.zeros(config.vocab_size) + # nn.init.normal_(pos_embed.weight, std=0.02) + for original_index, original_token in enumerate(original_vocab_list): + new_index = vocab_list.index(original_token) + word_embed_weight[new_index] = state_dict["rnafm.embeddings.word_embeddings.weight"][original_index] + predictions_decoder_weight[new_index] = state_dict["lm_head.decoder.weight"][original_index] + predictions_bias[new_index] = state_dict["lm_head.bias"][original_index] + state_dict["rnafm.embeddings.word_embeddings.weight"] = word_embed_weight + state_dict["lm_head.decoder.weight"] = predictions_decoder_weight + state_dict["lm_head.bias"] = predictions_bias + state_dict["lm_head.decoder.bias"] = state_dict["lm_head.bias"] + return state_dict + + +def convert_checkpoint(convert_config): + config = Config.from_dict(chanfig.FlatDict(CONFIG)) + config.vocab_size = len(vocab_list) + + model = Model(config) + + ckpt = torch.load(convert_config.checkpoint_path, map_location=torch.device("cpu")) + state_dict = _convert_checkpoint(config, ckpt) + + model.load_state_dict(state_dict) + model.save_pretrained(convert_config.output_path, safe_serialization=True) + model.save_pretrained(convert_config.output_path, safe_serialization=False) + chanfig.NestedDict(get_special_tokens_map()).json( + os.path.join(convert_config.output_path, "special_tokens_map.json") + ) + chanfig.NestedDict(get_tokenizer_config()).json(os.path.join(convert_config.output_path, "tokenizer_config.json")) + + if convert_config.push_to_hub: + if HfApi is None: + raise ImportError("Please install huggingface_hub to push to the hub.") + api = HfApi() + api.create_repo( + convert_config.repo_id, + token=convert_config.token, + exist_ok=True, + ) + api.upload_folder( + repo_id=convert_config.repo_id, folder_path=convert_config.output_path, token=convert_config.token + ) + + +@chanfig.configclass +class ConvertConfig: + checkpoint_path: str + output_path: str = Config.model_type + push_to_hub: bool = False + repo_id: str = f"multimolecule/{output_path}" + token: Optional[str] = None + + +if __name__ == "__main__": + config = ConvertConfig() + config.parse() # type: ignore[attr-defined] + convert_checkpoint(config) diff --git a/multimolecule/models/rnafm/modeling_rnafm.py b/multimolecule/models/rnafm/modeling_rnafm.py new file mode 100755 index 00000000..7953c05f --- /dev/null +++ b/multimolecule/models/rnafm/modeling_rnafm.py @@ -0,0 +1,1023 @@ +from typing import List, Optional, Tuple, Union + +import torch +import torch.utils.checkpoint +from torch import Tensor, nn +from torch.nn import functional as F +from transformers.activations import ACT2FN +from transformers.modeling_outputs import ( + BaseModelOutputWithPastAndCrossAttentions, + BaseModelOutputWithPoolingAndCrossAttentions, + MaskedLMOutput, + SequenceClassifierOutput, + TokenClassifierOutput, +) +from transformers.modeling_utils import PreTrainedModel, find_pruneable_heads_and_indices, prune_linear_layer +from transformers.utils import logging + +from ..modeling_utils import ( + ContactPredictionHead, + MaskedLMHead, + SequenceClassificationHead, + TokenClassificationHead, + apply_rotary_pos_emb, +) +from .configuration_rnafm import RnaFmConfig + +logger = logging.get_logger(__name__) + + +class RnaFmPreTrainedModel(PreTrainedModel): + """ + An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained + models. + """ + + config_class = RnaFmConfig + base_model_prefix = "rnafm" + supports_gradient_checkpointing = True + _no_split_modules = ["RnaFmLayer", "RnaFmEmbeddings"] + + # Copied from transformers.models.bert.modeling_bert.BertPreTrainedModel._init_weights + def _init_weights(self, module: nn.Module): + """Initialize the weights""" + if isinstance(module, nn.Linear): + # Slightly different from the TF version which uses truncated_normal for initialization + # cf https://github.com/pytorch/pytorch/pull/5617 + module.weight.data.normal_(mean=0.0, std=self.config.initializer_range) + if module.bias is not None: + module.bias.data.zero_() + elif isinstance(module, nn.Embedding): + module.weight.data.normal_(mean=0.0, std=self.config.initializer_range) + if module.padding_idx is not None: + module.weight.data[module.padding_idx].zero_() + elif isinstance(module, nn.LayerNorm): + module.bias.data.zero_() + module.weight.data.fill_(1.0) + + +class RnaFmModel(RnaFmPreTrainedModel): + """ + + The model can behave as an encoder (with only self-attention) as well as a decoder, in which case a layer of + cross-attention is added between the self-attention layers, following the architecture described in [Attention is + all you need](https://arxiv.org/abs/1706.03762) by Ashish Vaswani, Noam Shazeer, Niki Parmar, Jakob Uszkoreit, + Llion Jones, Aidan N. Gomez, Lukasz Kaiser and Illia Polosukhin. + + To behave as an decoder the model needs to be initialized with the `is_decoder` argument of the configuration set + to `True`. To be used in a Seq2Seq model, the model needs to initialized with both `is_decoder` argument and + `add_cross_attention` set to `True`; an `encoder_hidden_states` is then expected as an input to the forward pass. + """ + + def __init__(self, config, add_pooling_layer=True): + super().__init__(config) + self.config = config + self.pad_token_id = config.pad_token_id + self.embeddings = RnaFmEmbeddings(config) + self.encoder = RnaFmEncoder(config) + self.pooler = RnaFmPooler(config) if add_pooling_layer else None + + # Initialize weights and apply final processing + self.post_init() + + def get_input_embeddings(self): + return self.embeddings.word_embeddings + + def set_input_embeddings(self, value): + self.embeddings.word_embeddings = value + + def _prune_heads(self, heads_to_prune): + """ + Prunes heads of the model. heads_to_prune: dict of {layer_num: list of heads to prune in this layer} See base + class PreTrainedModel + """ + for layer, heads in heads_to_prune.items(): + self.encoder.layer[layer].attention.prune_heads(heads) + + def forward( + self, + input_ids: Optional[Tensor] = None, + attention_mask: Optional[Tensor] = None, + position_ids: Optional[Tensor] = None, + head_mask: Optional[Tensor] = None, + inputs_embeds: Optional[Tensor] = None, + encoder_hidden_states: Optional[Tensor] = None, + encoder_attention_mask: Optional[Tensor] = None, + past_key_values: Optional[List[torch.FloatTensor]] = None, + use_cache: Optional[bool] = None, + output_attentions: Optional[bool] = None, + output_hidden_states: Optional[bool] = None, + return_dict: Optional[bool] = None, + ) -> Union[Tuple[Tensor, ...], BaseModelOutputWithPoolingAndCrossAttentions]: + r""" + encoder_hidden_states (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): + Sequence of hidden-states at the output of the last layer of the encoder. Used in the cross-attention if + the model is configured as a decoder. + encoder_attention_mask (`torch.FloatTensor` of shape `(batch_size, sequence_length)`, *optional*): + Mask to avoid performing attention on the padding token indices of the encoder input. This mask is used in + the cross-attention if the model is configured as a decoder. Mask values selected in `[0, 1]`: + + - 1 for tokens that are **not masked**, + - 0 for tokens that are **masked**. + past_key_values (`tuple(tuple(torch.FloatTensor))` of length `config.n_layers` with each tuple having 4 tensors + of shape `(batch_size, num_heads, sequence_length - 1, embed_size_per_head)`): + Contains precomputed key and value hidden states of the attention blocks. Can be used to speed up decoding. + + If `past_key_values` are used, the user can optionally input only the last `decoder_input_ids` (those that + don't have their past key value states given to this model) of shape `(batch_size, 1)` instead of all + `decoder_input_ids` of shape `(batch_size, sequence_length)`. + use_cache (`bool`, *optional*): + If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding (see + `past_key_values`). + """ + output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions + output_hidden_states = ( + output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states + ) + return_dict = return_dict if return_dict is not None else self.config.use_return_dict + + if self.config.is_decoder: + use_cache = use_cache if use_cache is not None else self.config.use_cache + else: + use_cache = False + + if input_ids is not None and inputs_embeds is not None: + raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time") + elif input_ids is not None: + self.warn_if_padding_and_no_attention_mask(input_ids, attention_mask) + input_shape = input_ids.size() + elif inputs_embeds is not None: + input_shape = inputs_embeds.size()[:-1] + else: + raise ValueError("You have to specify either input_ids or inputs_embeds") + + batch_size, seq_length = input_shape + device = input_ids.device if input_ids is not None else inputs_embeds.device # type: ignore[union-attr] + + # past_key_values_length + past_key_values_length = past_key_values[0][0].shape[2] if past_key_values is not None else 0 + + if attention_mask is None: + attention_mask = ( + input_ids.ne(self.pad_token_id) + if self.pad_token_id is not None + else torch.ones(((batch_size, seq_length + past_key_values_length)), device=device) + ) + + # We can provide a self-attention mask of dimensions [batch_size, from_seq_length, to_seq_length] + # ourselves in which case we just need to make it broadcastable to all heads. + extended_attention_mask: Tensor = self.get_extended_attention_mask(attention_mask, input_shape) + + # If a 2D or 3D attention mask is provided for the cross-attention + # we need to make broadcastable to [batch_size, num_heads, seq_length, seq_length] + if self.config.is_decoder and encoder_hidden_states is not None: + encoder_batch_size, encoder_sequence_length, _ = encoder_hidden_states.size() + encoder_hidden_shape = (encoder_batch_size, encoder_sequence_length) + if encoder_attention_mask is None: + encoder_attention_mask = torch.ones(encoder_hidden_shape, device=device) + encoder_extended_attention_mask = self.invert_attention_mask(encoder_attention_mask) + else: + encoder_extended_attention_mask = None + + # Prepare head mask if needed + # 1.0 in head_mask indicate we keep the head + # attention_probs has shape bsz x n_heads x N x N + # input head_mask has shape [num_heads] or [num_hidden_layers x num_heads] + # and head_mask is converted to shape [num_hidden_layers x batch x num_heads x seq_length x seq_length] + head_mask = self.get_head_mask(head_mask, self.config.num_hidden_layers) + + embedding_output = self.embeddings( + input_ids=input_ids, + position_ids=position_ids, + attention_mask=attention_mask, + inputs_embeds=inputs_embeds, + past_key_values_length=past_key_values_length, + ) + encoder_outputs = self.encoder( + embedding_output, + attention_mask=extended_attention_mask, + head_mask=head_mask, + encoder_hidden_states=encoder_hidden_states, + encoder_attention_mask=encoder_extended_attention_mask, + past_key_values=past_key_values, + use_cache=use_cache, + output_attentions=output_attentions, + output_hidden_states=output_hidden_states, + return_dict=return_dict, + ) + sequence_output = encoder_outputs[0] + pooled_output = self.pooler(sequence_output) if self.pooler is not None else None + + if not return_dict: + return (sequence_output, pooled_output) + encoder_outputs[1:] + + return BaseModelOutputWithPoolingAndCrossAttentions( + last_hidden_state=sequence_output, + pooler_output=pooled_output, + past_key_values=encoder_outputs.past_key_values, + hidden_states=encoder_outputs.hidden_states, + attentions=encoder_outputs.attentions, + cross_attentions=encoder_outputs.cross_attentions, + ) + + +class RnaFmForMaskedLM(RnaFmPreTrainedModel): + _tied_weights_keys = ["lm_head.decoder.weight"] + + def __init__(self, config: RnaFmConfig): + super().__init__(config) + + if config.is_decoder: + logger.warning( + "If you want to use `RnaFmForMaskedLM` make sure `config.is_decoder=False` for " + "bi-directional self-attention." + ) + + self.rnafm = RnaFmModel(config, add_pooling_layer=False) + self.contact_head = ContactPredictionHead( + config, in_features=config.num_hidden_layers * config.num_attention_heads + ) + self.lm_head = MaskedLMHead(config) + + # Initialize weights and apply final processing + self.post_init() + + def get_output_embeddings(self): + return self.lm_head.decoder + + def set_output_embeddings(self, new_embeddings): + self.lm_head.decoder = new_embeddings + + def forward( + self, + input_ids: Optional[Tensor] = None, + attention_mask: Optional[Tensor] = None, + position_ids: Optional[Tensor] = None, + head_mask: Optional[Tensor] = None, + inputs_embeds: Optional[Tensor] = None, + encoder_hidden_states: Optional[Tensor] = None, + encoder_attention_mask: Optional[Tensor] = None, + labels: Optional[Tensor] = None, + output_attentions: Optional[bool] = None, + output_hidden_states: Optional[bool] = None, + return_dict: Optional[bool] = None, + ) -> Union[Tuple[Tensor, ...], MaskedLMOutput]: + r""" + labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): + Labels for computing the masked language modeling loss. Indices should be in `[-100, 0, ..., + config.vocab_size]` (see `input_ids` docstring) Tokens with indices set to `-100` are ignored (masked), the + loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]` + """ + + return_dict = return_dict if return_dict is not None else self.config.use_return_dict + + outputs = self.rnafm( + input_ids, + attention_mask=attention_mask, + position_ids=position_ids, + head_mask=head_mask, + inputs_embeds=inputs_embeds, + encoder_hidden_states=encoder_hidden_states, + encoder_attention_mask=encoder_attention_mask, + output_attentions=output_attentions, + output_hidden_states=output_hidden_states, + return_dict=return_dict, + ) + logits = self.lm_head(outputs) + + masked_lm_loss = None + if labels is not None: + masked_lm_loss = F.cross_entropy(logits.view(-1, self.config.vocab_size), labels.view(-1)) + + if not return_dict: + output = (logits,) + outputs[2:] + return ((masked_lm_loss,) + output) if masked_lm_loss is not None else output + + return MaskedLMOutput( + loss=masked_lm_loss, + logits=logits, + hidden_states=outputs.hidden_states, + attentions=outputs.attentions, + ) + + def predict_contacts(self, tokens, attention_mask): + attns = self(tokens, attention_mask=attention_mask, return_dict=True, output_attentions=True).attentions + attns = torch.stack(attns, dim=1) # Matches the original model layout + # In the original model, attentions for padding tokens are completely zeroed out. + # This makes no difference most of the time because the other tokens won't attend to them, + # but it does for the contact prediction task, which takes attentions as input, + # so we have to mimic that here. + attns *= attention_mask.unsqueeze(1).unsqueeze(2).unsqueeze(3) + attns *= attention_mask.unsqueeze(1).unsqueeze(2).unsqueeze(4) + return self.contact_head(tokens, attns) + + +class RnaFmForSequenceClassification(RnaFmPreTrainedModel): + def __init__(self, config: RnaFmConfig): + super().__init__(config) + self.num_labels = config.num_labels + self.config = config + + self.rnafm = RnaFmModel(config, add_pooling_layer=False) + self.classifier = SequenceClassificationHead(config) + + # Initialize weights and apply final processing + self.post_init() + + def forward( + self, + input_ids: Optional[Tensor] = None, + attention_mask: Optional[Tensor] = None, + position_ids: Optional[Tensor] = None, + head_mask: Optional[Tensor] = None, + inputs_embeds: Optional[Tensor] = None, + labels: Optional[Tensor] = None, + output_attentions: Optional[bool] = None, + output_hidden_states: Optional[bool] = None, + return_dict: Optional[bool] = None, + ) -> Union[Tuple[Tensor, ...], SequenceClassifierOutput]: + r""" + labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*): + Labels for computing the sequence classification/regression loss. Indices should be in `[0, ..., + config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If + `config.num_labels > 1` a classification loss is computed (Cross-Entropy). + """ + return_dict = return_dict if return_dict is not None else self.config.use_return_dict + + outputs = self.rnafm( + input_ids, + attention_mask=attention_mask, + position_ids=position_ids, + head_mask=head_mask, + inputs_embeds=inputs_embeds, + output_attentions=output_attentions, + output_hidden_states=output_hidden_states, + return_dict=return_dict, + ) + logits = self.classifier(outputs) + + loss = None + if labels is not None: + if self.config.problem_type is None: + if self.num_labels == 1: + self.config.problem_type = "regression" + elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int): + self.config.problem_type = "single_label_classification" + else: + self.config.problem_type = "multi_label_classification" + if self.config.problem_type == "regression": + loss = ( + F.mse_loss(logits.squeeze(), labels.squeeze()) + if self.num_labels == 1 + else F.mse_loss(logits, labels) + ) + elif self.config.problem_type == "single_label_classification": + loss = F.cross_entropy(logits.view(-1, self.num_labels), labels.view(-1)) + elif self.config.problem_type == "multi_label_classification": + loss = F.binary_cross_entropy_with_logits(logits, labels) + if not return_dict: + output = (logits,) + outputs[2:] + return ((loss,) + output) if loss is not None else output + + return SequenceClassifierOutput( + loss=loss, + logits=logits, + hidden_states=outputs.hidden_states, + attentions=outputs.attentions, + ) + + +class RnaFmForTokenClassification(RnaFmPreTrainedModel): + def __init__(self, config: RnaFmConfig): + super().__init__(config) + self.num_labels = config.num_labels + + self.rnafm = RnaFmModel(config, add_pooling_layer=False) + self.classifier = TokenClassificationHead(config) + + # Initialize weights and apply final processing + self.post_init() + + def forward( + self, + input_ids: Optional[Tensor] = None, + attention_mask: Optional[Tensor] = None, + position_ids: Optional[Tensor] = None, + head_mask: Optional[Tensor] = None, + inputs_embeds: Optional[Tensor] = None, + labels: Optional[Tensor] = None, + output_attentions: Optional[bool] = None, + output_hidden_states: Optional[bool] = None, + return_dict: Optional[bool] = None, + ) -> Union[Tuple[Tensor, ...], TokenClassifierOutput]: + r""" + labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): + Labels for computing the token classification loss. Indices should be in `[0, ..., config.num_labels - 1]`. + """ + return_dict = return_dict if return_dict is not None else self.config.use_return_dict + + outputs = self.rnafm( + input_ids, + attention_mask=attention_mask, + position_ids=position_ids, + head_mask=head_mask, + inputs_embeds=inputs_embeds, + output_attentions=output_attentions, + output_hidden_states=output_hidden_states, + return_dict=return_dict, + ) + logits = self.classifier(outputs) + + loss = None + if labels is not None: + if self.config.problem_type is None: + if self.num_labels == 1: + self.config.problem_type = "regression" + elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int): + self.config.problem_type = "single_label_classification" + else: + self.config.problem_type = "multi_label_classification" + if self.config.problem_type == "regression": + loss = ( + F.mse_loss(logits.squeeze(), labels.squeeze()) + if self.num_labels == 1 + else F.mse_loss(logits, labels) + ) + elif self.config.problem_type == "single_label_classification": + loss = F.cross_entropy(logits.view(-1, self.num_labels), labels.view(-1)) + elif self.config.problem_type == "multi_label_classification": + loss = F.binary_cross_entropy_with_logits(logits, labels) + if not return_dict: + output = (logits,) + outputs[2:] + return ((loss,) + output) if loss is not None else output + + return TokenClassifierOutput( + loss=loss, + logits=logits, + hidden_states=outputs.hidden_states, + attentions=outputs.attentions, + ) + + +class RnaFmEmbeddings(nn.Module): + """ + Same as BertEmbeddings with a tiny tweak for positional embeddings indexing. + """ + + def __init__(self, config: RnaFmConfig): + super().__init__() + self.word_embeddings = nn.Embedding(config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id) + + if config.emb_layer_norm_before: + self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) + else: + self.layer_norm = None + # position_ids (1, len position emb) is contiguous in memory and exported when serialized + self.position_embedding_type = getattr(config, "position_embedding_type", "absolute") + self.register_buffer( + "position_ids", torch.arange(config.max_position_embeddings).expand((1, -1)), persistent=False + ) + + self.padding_idx = config.pad_token_id + if self.position_embedding_type == "absolute": + self.position_embeddings = nn.Embedding( + config.max_position_embeddings, config.hidden_size, padding_idx=self.padding_idx + ) + else: + self.position_embeddings = None + self.token_dropout = config.token_dropout + self.mask_token_id = config.mask_token_id + + def forward( + self, input_ids=None, attention_mask=None, position_ids=None, inputs_embeds=None, past_key_values_length=0 + ): + if position_ids is None: + if input_ids is not None: + # Create the position ids from the input token ids. Any padded tokens remain padded. + position_ids = create_position_ids_from_input_ids(input_ids, self.padding_idx, past_key_values_length) + else: + position_ids = self.create_position_ids_from_inputs_embeds(inputs_embeds) + # This is a bug in the original implementation + position_ids += 1 + + if inputs_embeds is None: + inputs_embeds = self.word_embeddings(input_ids) + + embeddings = inputs_embeds + + if self.token_dropout: + embeddings = embeddings.masked_fill((input_ids == self.mask_token_id).unsqueeze(-1), 0.0) + mask_ratio_train = 0.15 * 0.8 # Hardcoded as the ratio used in all RNAFM model training runs + src_lengths = attention_mask.sum(-1) + mask_ratio_observed = (input_ids == self.mask_token_id).sum(-1).float() / src_lengths + embeddings = (embeddings * (1 - mask_ratio_train) / (1 - mask_ratio_observed)[:, None, None]).to( + embeddings.dtype + ) + + if self.position_embeddings is not None: + position_embeddings = self.position_embeddings(position_ids) + embeddings = embeddings + position_embeddings + + if self.layer_norm is not None: + embeddings = self.layer_norm(embeddings) + if attention_mask is not None: + embeddings = (embeddings * attention_mask.unsqueeze(-1)).to(embeddings.dtype) + return embeddings + + def create_position_ids_from_inputs_embeds(self, inputs_embeds): + """ + We are provided embeddings directly. We cannot infer which are padded so just generate sequential position ids. + + Args: + inputs_embeds: Tensor + + Returns: Tensor + """ + input_shape = inputs_embeds.size()[:-1] + sequence_length = input_shape[1] + + position_ids = torch.arange( + self.padding_idx + 1, sequence_length + self.padding_idx + 1, dtype=torch.long, device=inputs_embeds.device + ) + return position_ids.unsqueeze(0).expand(input_shape) + + +class RotaryEmbedding(torch.nn.Module): + """ + Rotary position embeddings based on those in + [RoFormer](https://huggingface.co/docs/transformers/model_doc/roformer). Query and keys are transformed by rotation + matrices which depend on their relative positions. + """ + + def __init__(self, dim: int): + super().__init__() + # Generate and save the inverse frequency buffer (non trainable) + inv_freq = 1.0 / (10000 ** (torch.arange(0, dim, 2, dtype=torch.int64).float() / dim)) + self.register_buffer("inv_freq", inv_freq) + + self._seq_len_cached = None + self._cos_cached = None + self._sin_cached = None + + def _update_cos_sin_tables(self, x, seq_dimension=2): + seq_len = x.shape[seq_dimension] + + # Reset the tables if the sequence length has changed, + # or if we're on a new device (possibly due to tracing for instance) + if seq_len != self._seq_len_cached or self._cos_cached.device != x.device: + self._seq_len_cached = seq_len + t = torch.arange(x.shape[seq_dimension], device=x.device).type_as(self.inv_freq) + freqs = torch.outer(t, self.inv_freq) + emb = torch.cat((freqs, freqs), dim=-1).to(x.device) + + self._cos_cached = emb.cos()[None, None, :, :] + self._sin_cached = emb.sin()[None, None, :, :] + + return self._cos_cached, self._sin_cached + + def forward(self, q: Tensor, k: Tensor) -> Tuple[Tensor, Tensor]: + self._cos_cached, self._sin_cached = self._update_cos_sin_tables(k, seq_dimension=-2) + + return ( + apply_rotary_pos_emb(q, self._cos_cached, self._sin_cached), + apply_rotary_pos_emb(k, self._cos_cached, self._sin_cached), + ) + + +class RnaFmEncoder(nn.Module): + def __init__(self, config: RnaFmConfig): + super().__init__() + self.config = config + self.layer = nn.ModuleList([RnaFmLayer(config) for _ in range(config.num_hidden_layers)]) + self.emb_layer_norm_after = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) + self.gradient_checkpointing = False + + def forward( + self, + hidden_states: Tensor, + attention_mask: Optional[torch.FloatTensor] = None, + head_mask: Optional[torch.FloatTensor] = None, + encoder_hidden_states: Optional[torch.FloatTensor] = None, + encoder_attention_mask: Optional[torch.FloatTensor] = None, + past_key_values: Optional[Tuple[Tuple[torch.FloatTensor, ...], ...]] = None, + use_cache: Optional[bool] = None, + output_attentions: bool = False, + output_hidden_states: bool = False, + return_dict: bool = True, + ) -> Union[Tuple[Tensor, ...], BaseModelOutputWithPastAndCrossAttentions]: + all_hidden_states = () if output_hidden_states else None + all_self_attentions = () if output_attentions else None + all_cross_attentions = () if output_attentions and self.config.add_cross_attention else None + + if self.gradient_checkpointing and self.training and use_cache: + logger.warning_once( + "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..." + ) + use_cache = False + + next_decoder_cache = () if use_cache else None + for i, layer_module in enumerate(self.layer): + if output_hidden_states: + all_hidden_states = all_hidden_states + (hidden_states,) # type: ignore[operator] + + layer_head_mask = head_mask[i] if head_mask is not None else None + past_key_value = past_key_values[i] if past_key_values is not None else None + + if self.gradient_checkpointing and self.training: + layer_outputs = self._gradient_checkpointing_func( + layer_module.__call__, + hidden_states, + attention_mask, + layer_head_mask, + encoder_hidden_states, + encoder_attention_mask, + past_key_value, + output_attentions, + ) + else: + layer_outputs = layer_module( + hidden_states, + attention_mask, + layer_head_mask, + encoder_hidden_states, + encoder_attention_mask, + past_key_value, + output_attentions, + ) + + hidden_states = layer_outputs[0] + if use_cache: + next_decoder_cache = next_decoder_cache + (layer_outputs[-1],) # type: ignore[operator] + if output_attentions: + all_self_attentions = all_self_attentions + (layer_outputs[1],) # type: ignore[operator] + if self.config.add_cross_attention: + all_cross_attentions = all_cross_attentions + (layer_outputs[2],) # type: ignore[operator] + + if self.emb_layer_norm_after: + hidden_states = self.emb_layer_norm_after(hidden_states) + + if output_hidden_states: + all_hidden_states = all_hidden_states + (hidden_states,) # type: ignore[operator] + + if not return_dict: + return tuple( + v + for v in [ + hidden_states, + next_decoder_cache, + all_hidden_states, + all_self_attentions, + all_cross_attentions, + ] + if v is not None + ) + return BaseModelOutputWithPastAndCrossAttentions( + last_hidden_state=hidden_states, + past_key_values=next_decoder_cache, + hidden_states=all_hidden_states, + attentions=all_self_attentions, + cross_attentions=all_cross_attentions, + ) + + +class RnaFmLayer(nn.Module): + def __init__(self, config: RnaFmConfig): + super().__init__() + self.chunk_size_feed_forward = config.chunk_size_feed_forward + self.seq_len_dim = 1 + self.attention = RnaFmAttention(config) + self.is_decoder = config.is_decoder + self.add_cross_attention = config.add_cross_attention + if self.add_cross_attention: + if not self.is_decoder: + raise ValueError(f"{self} should be used as a decoder model if cross attention is added") + self.crossattention = RnaFmAttention(config) + self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) + self.intermediate = RnaFmIntermediate(config) + self.output = RnaFmOutput(config) + + def forward( + self, + hidden_states: Tensor, + attention_mask: Optional[torch.FloatTensor] = None, + head_mask: Optional[torch.FloatTensor] = None, + encoder_hidden_states: Optional[torch.FloatTensor] = None, + encoder_attention_mask: Optional[torch.FloatTensor] = None, + past_key_value: Optional[Tuple[Tuple[torch.FloatTensor, ...], ...]] = None, + output_attentions: bool = False, + ) -> Tuple[Tensor, ...]: + # decoder uni-directional self-attention cached key/values tuple is at positions 1,2 + self_attn_past_key_value = past_key_value[:2] if past_key_value is not None else None + self_attention_outputs = self.attention( + hidden_states, + attention_mask, + head_mask, + output_attentions=output_attentions, + past_key_value=self_attn_past_key_value, + ) + attention_output = self_attention_outputs[0] + + # if decoder, the last output is tuple of self-attn cache + if self.is_decoder: + outputs = self_attention_outputs[1:-1] + present_key_value = self_attention_outputs[-1] + else: + outputs = self_attention_outputs[1:] # add self attentions if we output attention weights + + cross_attn_present_key_value = None + if self.is_decoder and encoder_hidden_states is not None: + if not hasattr(self, "crossattention"): + raise AttributeError( + f"If `encoder_hidden_states` are passed, {self} has to be instantiated" + " with cross-attention layers by setting `config.add_cross_attention=True`" + ) + + # cross_attn cached key/values tuple is at positions 3,4 of past_key_value tuple + cross_attn_past_key_value = past_key_value[-2:] if past_key_value is not None else None + cross_attention_outputs = self.crossattention( + attention_output, + attention_mask, + head_mask, + encoder_hidden_states, + encoder_attention_mask, + cross_attn_past_key_value, + output_attentions, + ) + attention_output = cross_attention_outputs[0] + outputs = outputs + cross_attention_outputs[1:-1] # add cross attentions if we output attention weights + + # add cross-attn cache to positions 3,4 of present_key_value tuple + cross_attn_present_key_value = cross_attention_outputs[-1] + present_key_value = present_key_value + cross_attn_present_key_value + + layer_output = self.feed_forward_chunk(attention_output) + + outputs = (layer_output,) + outputs + + # if decoder, return the attn key/values as the last output + if self.is_decoder: + outputs = outputs + (present_key_value,) + + return outputs + + def feed_forward_chunk(self, attention_output): + attention_output_ln = self.layer_norm(attention_output) + intermediate_output = self.intermediate(attention_output_ln) + layer_output = self.output(intermediate_output, attention_output) + return layer_output + + +class RnaFmAttention(nn.Module): + def __init__(self, config: RnaFmConfig): + super().__init__() + self.self = RnaFmSelfAttention(config) + self.output = RnaFmSelfOutput(config) + self.pruned_heads = set() # type: ignore[var-annotated] + self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) + + def prune_heads(self, heads): + if len(heads) == 0: + return + heads, index = find_pruneable_heads_and_indices( + heads, self.self.num_attention_heads, self.self.attention_head_size, self.pruned_heads + ) + + # Prune linear layers + self.self.query = prune_linear_layer(self.self.query, index) + self.self.key = prune_linear_layer(self.self.key, index) + self.self.value = prune_linear_layer(self.self.value, index) + self.output.dense = prune_linear_layer(self.output.dense, index, dim=1) + + # Update hyper params and store pruned heads + self.self.num_attention_heads = self.self.num_attention_heads - len(heads) + self.self.all_head_size = self.self.attention_head_size * self.self.num_attention_heads + self.pruned_heads = self.pruned_heads.union(heads) + + def forward( + self, + hidden_states: Tensor, + attention_mask: Optional[torch.FloatTensor] = None, + head_mask: Optional[torch.FloatTensor] = None, + encoder_hidden_states: Optional[torch.FloatTensor] = None, + encoder_attention_mask: Optional[torch.FloatTensor] = None, + past_key_value: Optional[Tuple[Tuple[torch.FloatTensor, ...], ...]] = None, + output_attentions: bool = False, + ) -> Tuple[Tensor, ...]: + hidden_states_ln = self.layer_norm(hidden_states) + self_outputs = self.self( + hidden_states_ln, + attention_mask, + head_mask, + encoder_hidden_states, + encoder_attention_mask, + past_key_value, + output_attentions, + ) + attention_output = self.output(self_outputs[0], hidden_states) + outputs = (attention_output,) + self_outputs[1:] # add attentions if we output them + return outputs + + +class RnaFmSelfAttention(nn.Module): + def __init__(self, config, position_embedding_type=None): + super().__init__() + if config.hidden_size % config.num_attention_heads != 0 and not hasattr(config, "embedding_size"): + raise ValueError( + f"The hidden size ({config.hidden_size}) is not a multiple of the number of attention " + f"heads ({config.num_attention_heads})" + ) + + self.num_attention_heads = config.num_attention_heads + self.attention_head_size = int(config.hidden_size / config.num_attention_heads) + self.all_head_size = self.num_attention_heads * self.attention_head_size + + self.query = nn.Linear(config.hidden_size, self.all_head_size) + self.key = nn.Linear(config.hidden_size, self.all_head_size) + self.value = nn.Linear(config.hidden_size, self.all_head_size) + + self.dropout = nn.Dropout(config.attention_dropout) + self.position_embedding_type = position_embedding_type or getattr(config, "position_embedding_type", "absolute") + self.rotary_embeddings = None + if self.position_embedding_type == "relative_key" or self.position_embedding_type == "relative_key_query": + self.max_position_embeddings = config.max_position_embeddings + self.distance_embedding = nn.Embedding(2 * config.max_position_embeddings - 1, self.attention_head_size) + elif self.position_embedding_type == "rotary": + self.rotary_embeddings = RotaryEmbedding(dim=self.attention_head_size) + + self.is_decoder = config.is_decoder + + def transpose_for_scores(self, x: Tensor) -> Tensor: + new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size) + x = x.view(new_x_shape) + return x.permute(0, 2, 1, 3) + + def forward( + self, + hidden_states: Tensor, + attention_mask: Optional[torch.FloatTensor] = None, + head_mask: Optional[torch.FloatTensor] = None, + encoder_hidden_states: Optional[torch.FloatTensor] = None, + encoder_attention_mask: Optional[torch.FloatTensor] = None, + past_key_value: Optional[Tuple[Tuple[torch.FloatTensor, ...], ...]] = None, + output_attentions: bool = False, + ) -> Tuple[Tensor, ...]: + mixed_query_layer = self.query(hidden_states) + + # If this is instantiated as a cross-attention module, the keys + # and values come from an encoder; the attention mask needs to be + # such that the encoder's padding tokens are not attended to. + is_cross_attention = encoder_hidden_states is not None + + if is_cross_attention and past_key_value is not None: + # reuse k,v, cross_attentions + key_layer = past_key_value[0] + value_layer = past_key_value[1] + attention_mask = encoder_attention_mask + elif is_cross_attention: + key_layer = self.transpose_for_scores(self.key(encoder_hidden_states)) + value_layer = self.transpose_for_scores(self.value(encoder_hidden_states)) + attention_mask = encoder_attention_mask + elif past_key_value is not None: + key_layer = self.transpose_for_scores(self.key(hidden_states)) + value_layer = self.transpose_for_scores(self.value(hidden_states)) + key_layer = torch.cat([past_key_value[0], key_layer], dim=2) + value_layer = torch.cat([past_key_value[1], value_layer], dim=2) + else: + key_layer = self.transpose_for_scores(self.key(hidden_states)) + value_layer = self.transpose_for_scores(self.value(hidden_states)) + + query_layer = self.transpose_for_scores(mixed_query_layer) + + query_layer = query_layer * self.attention_head_size**-0.5 + + if self.is_decoder: + # if cross_attention save Tuple(Tensor, Tensor) of all cross attention key/value_states. + # Further calls to cross_attention layer can then reuse all cross-attention + # key/value_states (first "if" case) + # if uni-directional self-attention (decoder) save Tuple(Tensor, Tensor) of + # all previous decoder key/value_states. Further calls to uni-directional self-attention + # can concat previous decoder key/value_states to current projected key/value_states (third "elif" case) + # if encoder bi-directional self-attention `past_key_value` is always `None` + past_key_value = (key_layer, value_layer) + + if self.position_embedding_type == "rotary": + query_layer, key_layer = self.rotary_embeddings(query_layer, key_layer) + + # Take the dot product between "query" and "key" to get the raw attention scores. + attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2)) # type: ignore[attr-defined] + + if self.position_embedding_type == "relative_key" or self.position_embedding_type == "relative_key_query": + seq_length = hidden_states.size()[1] + position_ids_l = torch.arange(seq_length, dtype=torch.long, device=hidden_states.device).view(-1, 1) + position_ids_r = torch.arange(seq_length, dtype=torch.long, device=hidden_states.device).view(1, -1) + distance = position_ids_l - position_ids_r + positional_embedding = self.distance_embedding(distance + self.max_position_embeddings - 1) + positional_embedding = positional_embedding.to(dtype=query_layer.dtype) # fp16 compatibility + + if self.position_embedding_type == "relative_key": + relative_position_scores = torch.einsum("bhld,lrd->bhlr", query_layer, positional_embedding) + attention_scores = attention_scores + relative_position_scores + elif self.position_embedding_type == "relative_key_query": + relative_position_scores_query = torch.einsum("bhld,lrd->bhlr", query_layer, positional_embedding) + relative_position_scores_key = torch.einsum("bhrd,lrd->bhlr", key_layer, positional_embedding) + attention_scores = attention_scores + relative_position_scores_query + relative_position_scores_key + + if attention_mask is not None: + # Apply the attention mask is (precomputed for all layers in RnaFmModel forward() function) + attention_scores = attention_scores + attention_mask + + # Normalize the attention scores to probabilities. + attention_probs = nn.functional.softmax(attention_scores, dim=-1) + + # This is actually dropping out entire tokens to attend to, which might + # seem a bit unusual, but is taken from the original Transformer paper. + attention_probs = self.dropout(attention_probs) + + # Mask heads if we want to + if head_mask is not None: + attention_probs = attention_probs * head_mask + + context_layer = torch.matmul(attention_probs.to(value_layer.dtype), value_layer) # type: ignore[attr-defined] + + context_layer = context_layer.permute(0, 2, 1, 3).contiguous() + new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,) + context_layer = context_layer.view(new_context_layer_shape) + + outputs = (context_layer, attention_probs) if output_attentions else (context_layer,) + + if self.is_decoder: + outputs = outputs + (past_key_value,) + return outputs + + +class RnaFmSelfOutput(nn.Module): + def __init__(self, config: RnaFmConfig): + super().__init__() + self.dense = nn.Linear(config.hidden_size, config.hidden_size) + self.dropout = nn.Dropout(config.hidden_dropout) + + def forward(self, hidden_states, input_tensor): + hidden_states = self.dense(hidden_states) + hidden_states = self.dropout(hidden_states) + hidden_states = hidden_states + input_tensor + return hidden_states + + +class RnaFmIntermediate(nn.Module): + def __init__(self, config: RnaFmConfig): + super().__init__() + self.dense = nn.Linear(config.hidden_size, config.intermediate_size) + if isinstance(config.hidden_act, str): + self.intermediate_act_fn = ACT2FN[config.hidden_act] + else: + self.intermediate_act_fn = config.hidden_act + + def forward(self, hidden_states: Tensor) -> Tensor: + hidden_states = self.dense(hidden_states) + hidden_states = self.intermediate_act_fn(hidden_states) + return hidden_states + + +class RnaFmOutput(nn.Module): + def __init__(self, config: RnaFmConfig): + super().__init__() + self.dense = nn.Linear(config.intermediate_size, config.hidden_size) + self.dropout = nn.Dropout(config.hidden_dropout) + + def forward(self, hidden_states: Tensor, input_tensor: Tensor) -> Tensor: + hidden_states = self.dense(hidden_states) + hidden_states = self.dropout(hidden_states) + hidden_states = hidden_states + input_tensor + return hidden_states + + +# Copied from transformers.models.bert.modeling_bert.BertPooler +class RnaFmPooler(nn.Module): + def __init__(self, config: RnaFmConfig): + super().__init__() + self.dense = nn.Linear(config.hidden_size, config.hidden_size) + self.activation = nn.Tanh() + + def forward(self, hidden_states: Tensor) -> Tensor: + # We "pool" the model by simply taking the hidden state corresponding + # to the first token. + first_token_tensor = hidden_states[:, 0] + pooled_output = self.dense(first_token_tensor) + pooled_output = self.activation(pooled_output) + return pooled_output + + +def create_position_ids_from_input_ids(input_ids, padding_idx, past_key_values_length=0): + """ + Replace non-padding symbols with their position numbers. Position numbers begin at padding_idx+1. Padding symbols + are ignored. This is modified from fairseq's `utils.make_positions`. + + Args: + x: Tensor x: + + Returns: Tensor + """ + # The series of casts and type-conversions here are carefully balanced to both work with ONNX export and XLA. + mask = input_ids.ne(padding_idx).int() + incremental_indices = (torch.cumsum(mask, dim=1).type_as(mask) + past_key_values_length) * mask + return incremental_indices.long() + padding_idx