| | from transformers import PreTrainedModel |
| | import torch |
| | import joblib, os |
| | import numpy as np |
| | from sentence_transformers import SentenceTransformer |
| | from transformers import AutoTokenizer |
| |
|
| |
|
| | from .nugget_model_utils import CustomRobertaWithPOS as NuggetModel |
| | from .args_model_utils import CustomRobertaWithPOS as ArgumentModel |
| | from .realis_model_utils import CustomRobertaWithPOS as RealisModel |
| |
|
| | from .configuration import CybersecurityKnowledgeGraphConfig |
| |
|
| | from .event_nugget_predict import create_dataloader as event_nugget_dataloader |
| | from .event_realis_predict import create_dataloader as event_realis_dataloader |
| | from .event_arg_predict import create_dataloader as event_argument_dataloader |
| |
|
| | class CybersecurityKnowledgeGraphModel(PreTrainedModel): |
| | config_class = CybersecurityKnowledgeGraphConfig |
| |
|
| | def __init__(self, config): |
| | super().__init__(config) |
| | self.tokenizer = AutoTokenizer.from_pretrained("ehsanaghaei/SecureBERT") |
| | |
| | self.event_nugget_model_path = config.event_nugget_model_path |
| | self.event_argument_model_path = config.event_argument_model_path |
| | self.event_realis_model_path = config.event_realis_model_path |
| |
|
| | self.event_nugget_dataloader = event_nugget_dataloader |
| | self.event_argument_dataloader = event_argument_dataloader |
| | self.event_realis_dataloader = event_realis_dataloader |
| |
|
| | self.event_nugget_model = NuggetModel(num_classes = 11) |
| | self.event_argument_model = ArgumentModel(num_classes = 43) |
| | self.event_realis_model = RealisModel(num_classes_realis = 4) |
| |
|
| | self.role_classifiers = {} |
| | self.embed_model = SentenceTransformer('all-MiniLM-L6-v2') |
| |
|
| |
|
| | self.event_nugget_list = config.event_nugget_list |
| | self.event_args_list = config.event_args_list |
| | self.realis_list = config.realis_list |
| | self.arg_2_role = config.arg_2_role |
| |
|
| |
|
| | def forward(self, text): |
| | nugget_dataloader, _ = self.event_nugget_dataloader(text) |
| | argument_dataloader, _ = self.event_argument_dataloader(self.event_nugget_model, text) |
| | realis_dataloader, _ = self.event_realis_dataloader(self.event_nugget_model, text) |
| |
|
| | nugget_pred = self.forward_model(self.event_nugget_model, nugget_dataloader) |
| | no_nuggets = torch.all(nugget_pred == 0, dim=1) |
| |
|
| | argument_preds = torch.empty(nugget_pred.size()) |
| | realis_preds = torch.empty(nugget_pred.size()) |
| | for idx, (batch, no_nugget) in enumerate(zip(nugget_pred, no_nuggets)): |
| | if no_nugget: |
| | argument_pred, realis_pred = torch.zeros(batch.size()), torch.zeros(batch.size()) |
| | else: |
| | argument_pred = self.forward_model(self.event_argument_model, argument_dataloader) |
| | realis_pred = self.forward_model(self.event_realis_model, realis_dataloader) |
| | argument_preds[idx] = argument_pred |
| | realis_preds[idx] = realis_pred |
| | |
| | attention_mask = [batch["attention_mask"] for batch in nugget_dataloader] |
| | attention_mask = torch.cat(attention_mask, dim=-1) |
| |
|
| | input_ids = [batch["input_ids"] for batch in nugget_dataloader] |
| | input_ids = torch.cat(input_ids, dim=-1) |
| | |
| | output = {"nugget" : nugget_pred, "argument" : argument_preds, "realis" : realis_preds, "input_ids" : input_ids, "attention_mask" : attention_mask} |
| | no_of_batch = output['input_ids'].shape[0] |
| |
|
| | structured_output = [] |
| | for b in range(no_of_batch): |
| | token_mask = [True if self.tokenizer.decode(token) not in self.tokenizer.all_special_tokens else False for token in output['input_ids'][b]] |
| | filtered_ids = output['input_ids'][b][token_mask] |
| | filtered_tokens = [self.tokenizer.decode(token) for token in filtered_ids] |
| |
|
| | filtered_nuggets = output['nugget'][b][token_mask] |
| | filtered_args = output['argument'][b][token_mask] |
| | filtered_realis = output['realis'][b][token_mask] |
| |
|
| | batch_output = [{"id" : id.item(), "token" : token, "nugget" : self.event_nugget_list[int(nugget.item())], "argument" : self.event_args_list[int(arg.item())], "realis" : self.realis_list[int(realis.item())]} |
| | for id, token, nugget, arg, realis in zip(filtered_ids, filtered_tokens, filtered_nuggets, filtered_args, filtered_realis)] |
| | structured_output.extend(batch_output) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | return structured_output |
| |
|
| | def forward_model(self, model, dataloader): |
| | predicted_label = [] |
| | for batch in dataloader: |
| | with torch.no_grad(): |
| | logits = model(**batch) |
| | batch_predicted_label = logits.argmax(-1) |
| | predicted_label.append(batch_predicted_label) |
| | return torch.cat(predicted_label, dim=-1) |