1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
| from rich.progress import track from rich import print import pandas as pd import hashlib import re
USERS_FILE = "users.csv" PERMISSIONS_FILE = "permissions.csv" TABLES_FILE = "tables.csv" ACTIONLOG_FILE = "actionlog.csv"
def parse_sql_statement(sql_statement: str) -> tuple[str, str]:
insert_match = re.match(r'\s*insert\s+into\s+(\w+)', sql_statement, re.I) delete_match = re.match(r'\s*delete\s+from\s+(\w+)', sql_statement, re.I) update_match = re.match(r'\s*update\s+(\w+)', sql_statement, re.I) select_match = re.match(r'\s*select\s+.+\s+from\s+(\w+)', sql_statement, re.I)
if insert_match: return "insert", insert_match.group(1) elif delete_match: return "delete", delete_match.group(1) elif update_match: return "update", update_match.group(1) elif select_match: return "select", select_match.group(1) else: raise ValueError("Invalid SQL statement")
def is_in_time(action_str: str, allow_times: str) -> bool:
def to_timestamp(time: str) -> int:
hour, minute, second = map(int, time.split(":")) return hour * 3600 + minute * 60 + second
for allow_time in allow_times.split(","): start_str, end_str = allow_time.split("~")
start_time = to_timestamp(start_str) end_time = to_timestamp(end_str) action_time = to_timestamp(action_str)
if start_time <= action_time <= end_time: return True
return False
if __name__ == "__main__":
users = pd.read_csv(USERS_FILE) permissions = pd.read_csv(PERMISSIONS_FILE) tables = pd.read_csv(TABLES_FILE) actionlog = pd.read_csv(ACTIONLOG_FILE)
threats: list[list[int]] = []
def audit(log: pd.Series) -> None: global threats
log_id = int(log["编号"]) log_user = str(log["账号"]) log_time = str(log["操作时间"]).split(" ")[1] log_method, log_table = parse_sql_statement(log["执行操作"])
user_query: pd.DataFrame = users[users["账号"] == log_user] if user_query.empty: threats.append([0, 0, 0, log_id]) return
user = user_query.iloc[0] user_id = int(user["编号"]) user_permission = user["所属权限组编号"]
permission_query: pd.DataFrame = \ permissions[permissions["编号"] == user_permission] assert not permission_query.empty permission = permission_query.iloc[0]
permission_id = int(permission["编号"]) permission_method = str(permission["可操作权限"]).split(",") permission_table = list(map(int, str(permission["可操作表编号"]).split(",")))
table_query: pd.DataFrame = tables[tables["表名"] == log_table] assert not table_query.empty table = table_query.iloc[0]
table_id = int(table["编号"]) table_available_time = str(table["可操作时间段(时:分:秒)"])
if table_id not in permission_table or \ log_method not in permission_method or \ not is_in_time(log_time, table_available_time): threats.append([user_id, permission_id, table_id, log_id]) return
for i, log in track(actionlog.iterrows(), total=len(actionlog)): audit(log)
threats.sort(key=lambda x: f"{x[0]:03d}_{x[1]:03d}_{x[2]:03d}_{x[3]:05d}") threats_str = ",".join(["_".join(map(str, threat)) for threat in threats]) threats_hash = hashlib.md5(threats_str.encode()).hexdigest() print(f"Flag: [blue]{threats_hash}[/blue]")
|