| | import os |
| | import sys |
| | import csv |
| | import json |
| | from collections import defaultdict |
| | import networkx as nx |
| | from datetime import datetime |
| | import logging |
| |
|
| |
|
| | def col2idx(cols): |
| | result = dict() |
| | for i, col in enumerate(cols): |
| | result[col] = i |
| | return result |
| |
|
| |
|
| | def load_alert_param(_alert_param_csv): |
| | """Load an alert parameter file |
| | :param _alert_param_csv: Alert parameter CSV file |
| | :return: dict of line number of the parameter file and parameter set as dict |
| | """ |
| | with open(_alert_param_csv, "r") as _rf: |
| | reader = csv.reader(_rf) |
| | header = next(reader) |
| | name2idx = col2idx(header) |
| | count_idx = name2idx["count"] |
| | type_idx = name2idx["type"] |
| | schedule_idx = name2idx["schedule_id"] |
| | min_acct_idx = name2idx["min_accounts"] |
| | max_acct_idx = name2idx["max_accounts"] |
| | min_amt_idx = name2idx["min_amount"] |
| | max_amt_idx = name2idx["max_amount"] |
| | min_period_idx = name2idx["min_period"] |
| | max_period_idx = name2idx["max_period"] |
| | bank_idx = name2idx["bank_id"] |
| | sar_idx = name2idx["is_sar"] |
| |
|
| | param_data = dict() |
| | line_num = 2 |
| | for row in reader: |
| | count = int(row[count_idx]) |
| | alert_type = row[type_idx] |
| | is_ordered = int(row[schedule_idx]) > 0 |
| | accounts = (int(row[min_acct_idx]), int(row[max_acct_idx])) |
| | amount = (float(row[min_amt_idx]), float(row[max_amt_idx])) |
| | period = (int(row[min_period_idx]), int(row[max_period_idx])) |
| | is_multiple_banks = row[bank_idx] == "" |
| | is_sar = row[sar_idx].lower() == "true" |
| | params = {"count": count, "type": alert_type, "ordered": is_ordered, |
| | "accounts": accounts, "amount": amount, "period": period, |
| | "multiple_banks": is_multiple_banks, "sar": is_sar} |
| | param_data[line_num] = params |
| | line_num += 1 |
| |
|
| | return param_data |
| |
|
| |
|
| | def load_alert_tx(_alert_tx_schema, _alert_tx_csv): |
| | """Load an alert-related transaction CSV file and construct subgraphs |
| | :param _alert_tx_schema: |
| | :param _alert_tx_csv: |
| | :return: dict of alert ID and alert transaction subgraph |
| | """ |
| | alert_idx = None |
| | type_idx = None |
| | orig_idx = None |
| | bene_idx = None |
| | amt_idx = None |
| | date_idx = None |
| | for i, col in enumerate(_alert_tx_schema): |
| | data_type = col.get("dataType") |
| | if data_type == "alert_id": |
| | alert_idx = i |
| | elif data_type == "alert_type": |
| | type_idx = i |
| | elif data_type == "orig_id": |
| | orig_idx = i |
| | elif data_type == "dest_id": |
| | bene_idx = i |
| | elif data_type == "amount": |
| | amt_idx = i |
| | elif data_type == "timestamp": |
| | date_idx = i |
| |
|
| | alert_graphs = defaultdict(nx.DiGraph) |
| | with open(_alert_tx_csv, "r") as _rf: |
| | reader = csv.reader(_rf) |
| | next(reader) |
| | for row in reader: |
| | alert_id = row[alert_idx] |
| | alert_type = row[type_idx] |
| | orig_id = row[orig_idx] |
| | bene_id = row[bene_idx] |
| | amount = float(row[amt_idx]) |
| | date_str = row[date_idx].split("T")[0] |
| | date = datetime.strptime(date_str, "%Y-%m-%d") |
| | alert_graphs[alert_id].add_edge(orig_id, bene_id, amount=amount, date=date) |
| | alert_graphs[alert_id].graph["alert_id"] = alert_id |
| | alert_graphs[alert_id].graph["alert_type"] = alert_type |
| |
|
| | return alert_graphs |
| |
|
| |
|
| | def satisfies_params(alert_sub_g, param): |
| | """Check whether the given alert subgraph satisfies the given parameter |
| | :param alert_sub_g: Alert subgraph |
| | :param param: Alert parameters as dict from a parameter file |
| | :return: If the subgraph satisfies all of the given parameter, return True. |
| | """ |
| | alert_id = alert_sub_g.graph["alert_id"] |
| | num_accounts = alert_sub_g.number_of_nodes() |
| | tx_attrs = [attr for _, _, attr in alert_sub_g.edges(data=True)] |
| | start_date = min([attr["date"] for attr in tx_attrs]) |
| | end_date = max([attr["date"] for attr in tx_attrs]) |
| | period = (end_date - start_date).days + 1 |
| | init_amount = [attr["amount"] for attr in tx_attrs if attr["date"] == start_date][0] |
| | alert_type = param["type"] |
| |
|
| | if alert_type == "cycle" and not is_cycle(alert_sub_g): |
| | return False |
| | elif alert_type == "scatter_gather" and not is_scatter_gather(alert_sub_g): |
| | return False |
| | elif alert_type == "gather_scatter" and not is_gather_scatter(alert_sub_g): |
| | return False |
| |
|
| | min_acct, max_acct = param["accounts"] |
| | if not min_acct <= num_accounts <= max_acct: |
| | logging.info("Alert %s: The number of accounts %d is not within [%d, %d]" |
| | % (alert_id, num_accounts, min_acct, max_acct)) |
| | return False |
| |
|
| | min_amt, max_amt = param["amount"] |
| | if not min_amt <= init_amount <= max_amt: |
| | logging.info("Alert %s: initial amount %f is not within [%f, %f]" % (alert_id, init_amount, min_amt, max_amt)) |
| | return False |
| |
|
| | min_period, max_period = param["period"] |
| | if not min_period <= period <= max_period: |
| | logging.info("Alert %s: period %d is not within [%d, %d]" % (alert_id, period, min_period, max_period)) |
| | return False |
| |
|
| | return True |
| |
|
| |
|
| | def is_cycle(alert_sub_g: nx.DiGraph, is_ordered: bool = True): |
| | alert_id = alert_sub_g.graph["alert_id"] |
| | edges = alert_sub_g.edges(data=True) |
| | cycles = list(nx.simple_cycles(alert_sub_g)) |
| | if len(cycles) != 1: |
| | logging.info("Alert %s is not a cycle pattern" % alert_id) |
| | return False |
| | if is_ordered: |
| | edges.sort(key=lambda e: e[2]["date"]) |
| | next_orig = None |
| | next_amt = sys.float_info.max |
| | next_date = datetime.strptime("1970-01-01", "%Y-%m-%d") |
| | for orig, bene, attr in edges: |
| | if next_orig is not None and orig != next_orig: |
| | logging.info("Alert %s is not a cycle pattern" % alert_id) |
| | return False |
| | else: |
| | next_orig = bene |
| |
|
| | amount = attr["amount"] |
| | if amount == next_amt: |
| | logging.info("Alert %s cycle transaction amounts are unordered" % alert_id) |
| | return False |
| | else: |
| | next_amt = amount |
| |
|
| | date = attr["date"] |
| | if date < next_date: |
| | logging.info("Alert %s cycle transactions are chronologically unordered" % alert_id) |
| | return False |
| | else: |
| | next_date = date |
| | return True |
| |
|
| |
|
| | def is_scatter_gather(alert_sub_g: nx.DiGraph, is_ordered: bool = True): |
| | alert_id = alert_sub_g.graph["alert_id"] |
| | num_accts = alert_sub_g.number_of_nodes() |
| | num_mid = num_accts - 2 |
| | out_degrees = alert_sub_g.out_degree() |
| | in_degrees = alert_sub_g.in_degree() |
| | orig = None |
| | bene = None |
| | mid_accts = list() |
| | for n, out_d in out_degrees.items(): |
| | in_d = in_degrees[n] |
| | if out_d == num_mid: |
| | orig = n |
| | if in_d != 0: |
| | logging.info("Alert %s is not a scatter-gather pattern: invalid vertex degree %d -> [%s] -> %d" |
| | % (alert_id, in_d, n, out_d)) |
| | return False |
| | elif out_d == 0: |
| | bene = n |
| | if in_d != num_mid: |
| | logging.info("Alert %s is not a scatter-gather pattern: invalid vertex degree %d -> [%s] -> %d" |
| | % (alert_id, in_d, n, out_d)) |
| | return False |
| | elif out_d == 1: |
| | mid_accts.append(n) |
| | if in_d != 1: |
| | logging.info("Alert %s is not a scatter-gather pattern: invalid vertex degree %d -> [%s] -> %d" |
| | % (alert_id, in_d, n, out_d)) |
| | return False |
| | else: |
| | logging.info("Alert %s is not a scatter-gather pattern: invalid vertex degree %d -> [%s] -> %d" |
| | % (alert_id, in_d, n, out_d)) |
| | return False |
| | if len(mid_accts) != num_mid: |
| | logging.info("Not a scatter-gather pattern: " + alert_id) |
| | return False |
| |
|
| | if is_ordered: |
| | for mid in mid_accts: |
| | scatter_attr = alert_sub_g.get_edge_data(orig, mid) |
| | gather_attr = alert_sub_g.get_edge_data(mid, bene) |
| | if scatter_attr is None: |
| | logging.info("Alert %s is not a scatter-gather pattern: scatter edge %s -> %s not found" |
| | % (alert_id, orig, mid)) |
| | return False |
| | elif gather_attr is None: |
| | logging.info("Alert %s is not a scatter-gather pattern: gather edge %s -> %s not found" |
| | % (alert_id, mid, bene)) |
| |
|
| | scatter_date = scatter_attr["date"] |
| | gather_date = gather_attr["date"] |
| | if scatter_date > gather_date: |
| | logging.info("Alert %s scatter-gather transactions are chronologically unordered" % alert_id) |
| | return False |
| | scatter_amount = scatter_attr["amount"] |
| | gather_amount = gather_attr["amount"] |
| | if scatter_amount <= gather_amount: |
| | logging.info("Alert %s scatter-gather transaction amounts are unordered" % alert_id) |
| | return False |
| |
|
| | return True |
| |
|
| |
|
| | def is_gather_scatter(alert_sub_g: nx.DiGraph, is_ordered: bool = True): |
| | alert_id = alert_sub_g.graph["alert_id"] |
| | num_accts = alert_sub_g.number_of_nodes() |
| | out_degrees = alert_sub_g.out_degree() |
| | in_degrees = alert_sub_g.in_degree() |
| |
|
| | orig_accts = [n for n, d in out_degrees.items() if d == 1 and in_degrees[n] == 0] |
| | bene_accts = [n for n, d in in_degrees.items() if d == 1 and out_degrees[n] == 0] |
| | num_orig = len(orig_accts) |
| | num_bene = len(bene_accts) |
| | hub_accts = [n for n, d in out_degrees.items() if d == num_bene and in_degrees[n] == num_orig] |
| | if len(hub_accts) != 1 or (num_orig + num_bene + 1) != num_accts: |
| | logging.info("Alert %s is not a gather-scatter pattern" % alert_id) |
| | return False |
| |
|
| | hub = hub_accts[0] |
| | last_gather_date = datetime.strptime("1970-01-01", "%Y-%m-%d") |
| | total_gather_amount = 0.0 |
| | for orig in orig_accts: |
| | attr = alert_sub_g.get_edge_data(orig, hub) |
| | if attr is None: |
| | logging.info("Alert %s is not a gather-scatter pattern: gather edge %s -> %s not found" |
| | % (alert_id, orig, hub)) |
| | return False |
| | date = attr["date"] |
| | amount = attr["amount"] |
| | last_gather_date = max(last_gather_date, date) |
| | total_gather_amount += amount |
| |
|
| | if is_ordered: |
| | max_scatter_amount = total_gather_amount / num_bene |
| | for bene in bene_accts: |
| | attr = alert_sub_g.get_edge_data(hub, bene) |
| | if attr is None: |
| | return False |
| | date = attr["date"] |
| | amount = attr["amount"] |
| | if date < last_gather_date: |
| | logging.info("Alert %s gather-scatter transactions are chronologically unordered " % alert_id) |
| | return False |
| | elif max_scatter_amount <= amount: |
| | logging.info("Alert %s gather-scatter transaction amounts are unordered" % alert_id) |
| | return False |
| |
|
| | return True |
| |
|
| |
|
| | class AlertValidator: |
| |
|
| | def __init__(self, conf_json, sim_name=None): |
| | with open(conf_json, "r") as rf: |
| | self.conf = json.load(rf) |
| |
|
| | self.sim_name = sim_name if sim_name is not None else self.conf["general"]["simulation_name"] |
| | self.input_dir = self.conf["input"]["directory"] |
| | self.output_dir = os.path.join(self.conf["output"]["directory"], self.sim_name) |
| | schema_json = self.conf["input"]["schema"] |
| | schema_path = os.path.join(self.input_dir, schema_json) |
| | with open(schema_path, "r") as rf: |
| | self.schema = json.load(rf) |
| |
|
| | log_file = os.path.join(self.output_dir, "alert_validations.log") |
| | logging.basicConfig(filename=log_file, filemode="w", level=logging.INFO) |
| |
|
| | |
| | self.alert_param_file = self.conf["input"]["alert_patterns"] |
| | alert_param_path = os.path.join(self.input_dir, self.alert_param_file) |
| | schema_file = self.conf["input"]["schema"] |
| | schema_path = os.path.join(self.input_dir, schema_file) |
| | self.alert_params = load_alert_param(alert_param_path) |
| |
|
| | |
| | alert_tx_file = self.conf["output"]["alert_transactions"] |
| | alert_tx_path = os.path.join(self.output_dir, alert_tx_file) |
| | with open(schema_path, "r") as _rf: |
| | schema = json.load(_rf) |
| | self.alert_graphs = load_alert_tx(schema["alert_tx"], alert_tx_path) |
| |
|
| | def validate_single(self, alert_id): |
| | if alert_id not in self.alert_graphs: |
| | raise KeyError("No such alert ID: " + alert_id) |
| | sub_g = self.alert_graphs[alert_id] |
| | alert_type = sub_g.graph["alert_type"] |
| | for line_num, param in self.alert_params.items(): |
| | if param["type"] != alert_type: |
| | continue |
| | if satisfies_params(sub_g, param): |
| | logging.info("The alert %s subgraph matches the parameter %s:%d, data %s" % |
| | (alert_id, self.alert_param_file, line_num, str(param))) |
| | return True |
| | else: |
| | logging.warning("The alert subgraph (ID:%s, Type:%s) does not match any parameter sets" |
| | % (alert_id, alert_type)) |
| | return False |
| |
|
| | def validate_all(self): |
| | num_alerts = len(self.alert_graphs) |
| | num_matched = 0 |
| | for alert_id in self.alert_graphs.keys(): |
| | if self.validate_single(alert_id): |
| | num_matched += 1 |
| | num_unmatched = num_alerts - num_matched |
| | print("Total number of alerts: %d, matched: %d, unmatched: %d" % (num_alerts, num_matched, num_unmatched)) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | argv = sys.argv |
| | if len(argv) < 2: |
| | print("Usage: python3 %s [ConfJson] [SimName]" % argv[0]) |
| | exit(1) |
| |
|
| | _conf_json = argv[1] |
| | _sim_name = argv[2] if len(argv) >= 3 else None |
| | av = AlertValidator(_conf_json, _sim_name) |
| | av.validate_all() |
| |
|