2025-06-24 22:43:46 +05:30
|
|
|
import sqlite3
|
|
|
|
|
from contextlib import contextmanager
|
|
|
|
|
from typing import List, Tuple, Any, Optional, Union
|
|
|
|
|
import logging
|
2014-12-13 21:52:35 +02:00
|
|
|
from imports import globals
|
|
|
|
|
|
2025-06-24 22:43:46 +05:30
|
|
|
logger = logging.getLogger(__name__)
|
2014-12-13 21:52:35 +02:00
|
|
|
|
|
|
|
|
class DBHandler:
|
2025-06-24 22:43:46 +05:30
|
|
|
|
2014-12-13 21:52:35 +02:00
|
|
|
def __init__(self):
|
2025-06-24 22:43:46 +05:30
|
|
|
self.db_path = globals.vars.db_path
|
|
|
|
|
self._connection = None
|
2025-06-21 17:04:57 +07:00
|
|
|
self._connect()
|
2025-06-24 22:43:46 +05:30
|
|
|
|
2025-06-21 17:04:57 +07:00
|
|
|
def _connect(self):
|
2014-12-13 21:52:35 +02:00
|
|
|
try:
|
2025-06-24 22:43:46 +05:30
|
|
|
self._connection = sqlite3.connect(
|
|
|
|
|
self.db_path,
|
|
|
|
|
check_same_thread=False,
|
|
|
|
|
timeout=30.0
|
|
|
|
|
)
|
|
|
|
|
self._connection.execute("PRAGMA journal_mode=WAL")
|
|
|
|
|
self._connection.execute("PRAGMA foreign_keys=ON")
|
|
|
|
|
self._connection.execute("PRAGMA synchronous=NORMAL")
|
|
|
|
|
self._connection.execute("PRAGMA cache_size=10000")
|
|
|
|
|
self._connection.execute("PRAGMA temp_store=MEMORY")
|
|
|
|
|
|
|
|
|
|
logger.info("Database connection established successfully")
|
|
|
|
|
except sqlite3.Error as e:
|
|
|
|
|
logger.error(f"Database connection failed: {e}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
@contextmanager
|
|
|
|
|
def get_cursor(self):
|
|
|
|
|
if not self._connection:
|
|
|
|
|
self._connect()
|
|
|
|
|
|
|
|
|
|
cursor = self._connection.cursor()
|
|
|
|
|
try:
|
|
|
|
|
yield cursor
|
|
|
|
|
self._connection.commit()
|
|
|
|
|
except sqlite3.Error as e:
|
|
|
|
|
self._connection.rollback()
|
|
|
|
|
logger.error(f"Database operation failed: {e}")
|
|
|
|
|
raise
|
|
|
|
|
finally:
|
|
|
|
|
cursor.close()
|
|
|
|
|
|
|
|
|
|
def _execute_query(self, query: str, params: Optional[Union[List, Tuple]] = None) -> List[Tuple]:
|
|
|
|
|
with self.get_cursor() as cursor:
|
|
|
|
|
if params:
|
|
|
|
|
cursor.execute(query, params)
|
|
|
|
|
else:
|
|
|
|
|
cursor.execute(query)
|
|
|
|
|
return cursor.fetchall()
|
|
|
|
|
|
|
|
|
|
def get_full_details(self) -> List[Tuple]:
|
|
|
|
|
return self._execute_query("SELECT * FROM Malwares")
|
|
|
|
|
|
|
|
|
|
def get_partial_details(self) -> List[Tuple]:
|
|
|
|
|
query = """
|
|
|
|
|
SELECT ID, TYPE, LANGUAGE, ARCHITECTURE, PLATFORM, NAME
|
|
|
|
|
FROM Malwares
|
|
|
|
|
"""
|
|
|
|
|
return self._execute_query(query)
|
|
|
|
|
|
|
|
|
|
def get_mal_list(self) -> List[Tuple]:
|
2025-06-21 17:04:57 +07:00
|
|
|
query = "SELECT ID, NAME, TYPE FROM Malwares"
|
2025-06-24 22:43:46 +05:30
|
|
|
return self._execute_query(query)
|
|
|
|
|
|
|
|
|
|
def get_mal_names(self) -> List[str]:
|
2025-06-21 17:04:57 +07:00
|
|
|
query = "SELECT NAME FROM Malwares"
|
2025-06-24 22:43:46 +05:30
|
|
|
results = self._execute_query(query)
|
|
|
|
|
return [row[0] for row in results if row[0]]
|
|
|
|
|
|
|
|
|
|
def get_mal_tags(self) -> List[str]:
|
|
|
|
|
query = "SELECT DISTINCT TAGS FROM Malwares WHERE TAGS IS NOT NULL AND TAGS != ''"
|
|
|
|
|
results = self._execute_query(query)
|
|
|
|
|
return [row[0] for row in results]
|
|
|
|
|
|
|
|
|
|
def get_mal_info(self, mal_id: int) -> List[Tuple]:
|
|
|
|
|
query = """
|
|
|
|
|
SELECT TYPE, NAME, VERSION, AUTHOR, LANGUAGE, DATE,
|
|
|
|
|
ARCHITECTURE, PLATFORM, TAGS
|
|
|
|
|
FROM Malwares
|
|
|
|
|
WHERE ID = ?
|
|
|
|
|
"""
|
|
|
|
|
return self._execute_query(query, (mal_id,))
|
|
|
|
|
|
|
|
|
|
def query(self, query: str, params: Optional[Union[List, Tuple, Any]] = None) -> List[Tuple]:
|
2025-06-21 17:04:57 +07:00
|
|
|
if globals.vars.DEBUG_LEVEL == 2:
|
2025-06-24 22:43:46 +05:30
|
|
|
logger.debug(f"Executing query: {query} with params: {params}")
|
|
|
|
|
|
|
|
|
|
if params is not None and not isinstance(params, (list, tuple)):
|
|
|
|
|
params = (params,)
|
|
|
|
|
|
|
|
|
|
return self._execute_query(query, params)
|
|
|
|
|
|
|
|
|
|
def execute_many(self, query: str, params_list: List[Tuple]) -> None:
|
|
|
|
|
with self.get_cursor() as cursor:
|
|
|
|
|
cursor.executemany(query, params_list)
|
|
|
|
|
|
|
|
|
|
def get_connection_info(self) -> dict:
|
|
|
|
|
with self.get_cursor() as cursor:
|
|
|
|
|
cursor.execute("PRAGMA database_list")
|
|
|
|
|
db_info = cursor.fetchall()
|
|
|
|
|
|
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM Malwares")
|
|
|
|
|
record_count = cursor.fetchone()[0]
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'database_path': self.db_path,
|
|
|
|
|
'databases': db_info,
|
|
|
|
|
'malware_count': record_count,
|
|
|
|
|
'connection_active': self._connection is not None
|
|
|
|
|
}
|
|
|
|
|
|
2014-12-16 20:59:45 +02:00
|
|
|
def close_connection(self):
|
2025-06-24 22:43:46 +05:30
|
|
|
if self._connection:
|
|
|
|
|
try:
|
|
|
|
|
self._connection.close()
|
|
|
|
|
self._connection = None
|
|
|
|
|
logger.info("Database connection closed successfully")
|
|
|
|
|
except sqlite3.Error as e:
|
|
|
|
|
logger.error(f"Error closing database connection: {e}")
|
|
|
|
|
raise
|
|
|
|
|
|
2014-12-16 20:59:45 +02:00
|
|
|
def renew_connection(self):
|
2025-06-24 22:43:46 +05:30
|
|
|
self.close_connection()
|
|
|
|
|
self._connect()
|
|
|
|
|
|
|
|
|
|
def __enter__(self):
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
|
|
self.close_connection()
|
|
|
|
|
|
|
|
|
|
def __del__(self):
|
|
|
|
|
if hasattr(self, '_connection') and self._connection:
|
|
|
|
|
self._connection.close()
|