Перейти к содержанию

Мозги нашего BENa

Ниже представленна документация всего бекенда нашего проекта.

fraud_analysis

Classes

AmountExtractor

Сервис для извлечения денежных сумм из текстовых жалоб.

Source code in src_back/fraud_analysis.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class AmountExtractor:
    """Сервис для извлечения денежных сумм из текстовых жалоб."""

    def __init__(self):
        """Инициализирует регулярные выражения для поиска валют."""
        self.amount_pattern = re.compile(
            r'(\d+)[\s]?(?:р|руб|рублей|₽|[\.,][\s]?р)',
            re.IGNORECASE
        )
        self.loss_keyword_pattern = re.compile(
            r'пропали\s+(\d+)',
            re.IGNORECASE
        )

    def extract(self, text: Union[str, float]) -> Optional[int]:
        """
        Извлекает сумму транзакции из текста.

        Args:
            text: Входящий текст жалобы.

        Returns:
            Optional[int]: Извлеченная сумма или None, если совпадений не найдено.
        """
        if pd.isna(text):
            return None

        text_lower = str(text).lower()

        match = self.amount_pattern.search(text_lower)
        if match:
            return int(match.group(1))

        loss_match = self.loss_keyword_pattern.search(text_lower)
        if loss_match:
            return int(loss_match.group(1))

        return None
Functions
__init__()

Инициализирует регулярные выражения для поиска валют.

Source code in src_back/fraud_analysis.py
19
20
21
22
23
24
25
26
27
28
def __init__(self):
    """Инициализирует регулярные выражения для поиска валют."""
    self.amount_pattern = re.compile(
        r'(\d+)[\s]?(?:р|руб|рублей|₽|[\.,][\s]?р)',
        re.IGNORECASE
    )
    self.loss_keyword_pattern = re.compile(
        r'пропали\s+(\d+)',
        re.IGNORECASE
    )
extract(text)

Извлекает сумму транзакции из текста.

Parameters:

Name Type Description Default
text Union[str, float]

Входящий текст жалобы.

required

Returns:

Type Description
Optional[int]

Optional[int]: Извлеченная сумма или None, если совпадений не найдено.

Source code in src_back/fraud_analysis.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def extract(self, text: Union[str, float]) -> Optional[int]:
    """
    Извлекает сумму транзакции из текста.

    Args:
        text: Входящий текст жалобы.

    Returns:
        Optional[int]: Извлеченная сумма или None, если совпадений не найдено.
    """
    if pd.isna(text):
        return None

    text_lower = str(text).lower()

    match = self.amount_pattern.search(text_lower)
    if match:
        return int(match.group(1))

    loss_match = self.loss_keyword_pattern.search(text_lower)
    if loss_match:
        return int(loss_match.group(1))

    return None

EcosystemDB

Слой доступа к данным (DAL) для работы с БД экосистемы.

Source code in src_back/fraud_analysis.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class EcosystemDB:
    """Слой доступа к данным (DAL) для работы с БД экосистемы."""

    def __init__(self, db_path: str):
        """
        Инициализирует параметры подключения.
        Само подключение происходит через менеджер контекста (with).
        """
        self.db_path = db_path
        self.conn: Optional[sqlite3.Connection] = None

    def __enter__(self):
        """Открывает соединение при входе в блок with."""
        self.conn = sqlite3.connect(self.db_path)
        self.conn.row_factory = sqlite3.Row
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Гарантированно закрывает соединение при выходе из блока with."""
        if self.conn:
            self.conn.close()

    def find_transaction_info(self, victim_id: int, amount: int) -> Optional[sqlite3.Row]:
        """
        Ищет транзакцию в банке на основе ID жертвы и суммы.

        Args:
            victim_id: Внутренний ID пользователя в банке.
            amount: Сумма транзакции.

        Returns:
            Optional[sqlite3.Row]: Данные транзакции или None.
        """
        query = """
            SELECT 
                v.account as victim_account,
                v.phone as victim_phone,
                t.account_in as fraud_account,
                t.event_date as transaction_date,
                f.userId as fraud_bank_id,
                f.fio as fraud_fio,
                f.phone as fraud_phone
            FROM bank_clients v
            JOIN bank_transactions t ON t.account_out = v.account
            LEFT JOIN bank_clients f ON f.account = t.account_in
            WHERE v.userId = ? AND t.value = ?
            ORDER BY t.event_date DESC
            LIMIT 1
        """
        cursor = self.conn.cursor()
        cursor.execute(query, (victim_id, amount))
        return cursor.fetchone()

    def get_calls(self, victim_phone: str, fraud_phone: str) -> List[Dict[str, Any]]:
        """Получает историю звонков между жертвой и подозреваемым."""
        query = """
            SELECT 
                event_date, from_call, to_call, duration_sec
            FROM mobile_build
            WHERE (from_call = ? AND to_call = ?)
               OR (from_call = ? AND to_call = ?)
        """
        cursor = self.conn.cursor()
        cursor.execute(query, (victim_phone, fraud_phone, fraud_phone, victim_phone))
        return [dict(row) for row in cursor.fetchall()]

    def get_market_activity(self, fraud_bank_id: int) -> List[Dict[str, Any]]:
        """Находит активность мошенника на маркетплейсе через маппинг ID."""
        query = """
            SELECT 
                md.event_date, md.contact_fio, md.contact_phone, md.address
            FROM ecosystem_mapping em
            JOIN market_place_delivery md ON md.user_id = em.marketplace_id
            WHERE em.bank_id = ?
        """
        cursor = self.conn.cursor()
        cursor.execute(query, (fraud_bank_id,))
        return [dict(row) for row in cursor.fetchall()]
Functions
__enter__()

Открывает соединение при входе в блок with.

Source code in src_back/fraud_analysis.py
67
68
69
70
71
def __enter__(self):
    """Открывает соединение при входе в блок with."""
    self.conn = sqlite3.connect(self.db_path)
    self.conn.row_factory = sqlite3.Row
    return self
__exit__(exc_type, exc_val, exc_tb)

Гарантированно закрывает соединение при выходе из блока with.

Source code in src_back/fraud_analysis.py
73
74
75
76
def __exit__(self, exc_type, exc_val, exc_tb):
    """Гарантированно закрывает соединение при выходе из блока with."""
    if self.conn:
        self.conn.close()
__init__(db_path)

Инициализирует параметры подключения. Само подключение происходит через менеджер контекста (with).

Source code in src_back/fraud_analysis.py
59
60
61
62
63
64
65
def __init__(self, db_path: str):
    """
    Инициализирует параметры подключения.
    Само подключение происходит через менеджер контекста (with).
    """
    self.db_path = db_path
    self.conn: Optional[sqlite3.Connection] = None
find_transaction_info(victim_id, amount)

Ищет транзакцию в банке на основе ID жертвы и суммы.

Parameters:

Name Type Description Default
victim_id int

Внутренний ID пользователя в банке.

required
amount int

Сумма транзакции.

required

Returns:

Type Description
Optional[Row]

Optional[sqlite3.Row]: Данные транзакции или None.

Source code in src_back/fraud_analysis.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def find_transaction_info(self, victim_id: int, amount: int) -> Optional[sqlite3.Row]:
    """
    Ищет транзакцию в банке на основе ID жертвы и суммы.

    Args:
        victim_id: Внутренний ID пользователя в банке.
        amount: Сумма транзакции.

    Returns:
        Optional[sqlite3.Row]: Данные транзакции или None.
    """
    query = """
        SELECT 
            v.account as victim_account,
            v.phone as victim_phone,
            t.account_in as fraud_account,
            t.event_date as transaction_date,
            f.userId as fraud_bank_id,
            f.fio as fraud_fio,
            f.phone as fraud_phone
        FROM bank_clients v
        JOIN bank_transactions t ON t.account_out = v.account
        LEFT JOIN bank_clients f ON f.account = t.account_in
        WHERE v.userId = ? AND t.value = ?
        ORDER BY t.event_date DESC
        LIMIT 1
    """
    cursor = self.conn.cursor()
    cursor.execute(query, (victim_id, amount))
    return cursor.fetchone()
get_calls(victim_phone, fraud_phone)

Получает историю звонков между жертвой и подозреваемым.

Source code in src_back/fraud_analysis.py
109
110
111
112
113
114
115
116
117
118
119
120
def get_calls(self, victim_phone: str, fraud_phone: str) -> List[Dict[str, Any]]:
    """Получает историю звонков между жертвой и подозреваемым."""
    query = """
        SELECT 
            event_date, from_call, to_call, duration_sec
        FROM mobile_build
        WHERE (from_call = ? AND to_call = ?)
           OR (from_call = ? AND to_call = ?)
    """
    cursor = self.conn.cursor()
    cursor.execute(query, (victim_phone, fraud_phone, fraud_phone, victim_phone))
    return [dict(row) for row in cursor.fetchall()]
get_market_activity(fraud_bank_id)

Находит активность мошенника на маркетплейсе через маппинг ID.

Source code in src_back/fraud_analysis.py
122
123
124
125
126
127
128
129
130
131
132
133
def get_market_activity(self, fraud_bank_id: int) -> List[Dict[str, Any]]:
    """Находит активность мошенника на маркетплейсе через маппинг ID."""
    query = """
        SELECT 
            md.event_date, md.contact_fio, md.contact_phone, md.address
        FROM ecosystem_mapping em
        JOIN market_place_delivery md ON md.user_id = em.marketplace_id
        WHERE em.bank_id = ?
    """
    cursor = self.conn.cursor()
    cursor.execute(query, (fraud_bank_id,))
    return [dict(row) for row in cursor.fetchall()]

FraudInvestigator

Оркестратор процесса расследования мошенничества.

Source code in src_back/fraud_analysis.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
class FraudInvestigator:
    """Оркестратор процесса расследования мошенничества."""

    def __init__(self, db_path: str, complaints_path: str, output_dir: str):
        self.db_path = db_path
        self.extractor = AmountExtractor()
        self.complaints_path = complaints_path
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.cases = []

    def run(self):
        """Запускает полный цикл анализа данных."""
        logger.info("Загрузка данных жалоб...")
        try:
            df = pd.read_csv(self.complaints_path, sep='\t')
        except Exception as e:
            logger.error(f"Ошибка при загрузке файла: {e}")
            return

        records = df.to_dict('records')
        logger.info(f"Обработка {len(records)} записей...")

        with EcosystemDB(self.db_path) as db:
            for row in records:
                # Обработка опечатки в названии колонки
                v_id = row.get('userId') if pd.notnull(row.get('userId')) else row.get('uerId')
                text = row.get('text')

                amount = self.extractor.extract(text)
                if not amount:
                    continue

                trans = db.find_transaction_info(v_id, amount)
                if trans:
                    case = self._process_fraud_case(db, v_id, text, row.get('event_date'), amount, trans)
                    self.cases.append(case)

        self._save_results()
        logger.info("Расследование завершено.")

    def _process_fraud_case(self, db: EcosystemDB, v_id: int, text: str,
                            date: str, amount: int, trans: sqlite3.Row) -> Dict[str, Any]:
        """
        Обогащает данные кейса информацией из других систем (мобильная связь, маркетплейс).
        """
        case = {
            'complaint_id': v_id,
            'complaint_text': text,
            'complaint_date': date,
            'extracted_amount': amount,
            'victim_account': trans['victim_account'],
            'victim_phone': trans['victim_phone'],
            'fraud_account': trans['fraud_account'],
            'transaction_date': trans['transaction_date'],
            'fraud_bank_owner_id': trans['fraud_bank_id'],
            'fraud_bank_owner_fio': trans['fraud_fio'],
            'fraud_bank_owner_phone': trans['fraud_phone']
        }

        calls = db.get_calls(case['victim_phone'], case['fraud_bank_owner_phone'])
        case['calls_data'] = calls
        case['has_calls'] = 1 if calls else 0

        market = db.get_market_activity(case['fraud_bank_owner_id'])
        case['market_data'] = market
        case['has_market_activity'] = 1 if market else 0

        return case

    def _save_results(self):
        """Сохраняет результаты в CSV и готовит данные для графовой БД."""
        if not self.cases:
            logger.warning("Совпадений не найдено.")
            return

        logger.info(f"Найдено {len(self.cases)} потенциальных кейсов. Сохранение...")

        df_main = pd.DataFrame(self.cases).drop(columns=['calls_data', 'market_data'])
        df_main.to_csv(self.output_dir / "fraud_cases_detected.csv", index=False)
Functions
run()

Запускает полный цикл анализа данных.

Source code in src_back/fraud_analysis.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def run(self):
    """Запускает полный цикл анализа данных."""
    logger.info("Загрузка данных жалоб...")
    try:
        df = pd.read_csv(self.complaints_path, sep='\t')
    except Exception as e:
        logger.error(f"Ошибка при загрузке файла: {e}")
        return

    records = df.to_dict('records')
    logger.info(f"Обработка {len(records)} записей...")

    with EcosystemDB(self.db_path) as db:
        for row in records:
            # Обработка опечатки в названии колонки
            v_id = row.get('userId') if pd.notnull(row.get('userId')) else row.get('uerId')
            text = row.get('text')

            amount = self.extractor.extract(text)
            if not amount:
                continue

            trans = db.find_transaction_info(v_id, amount)
            if trans:
                case = self._process_fraud_case(db, v_id, text, row.get('event_date'), amount, trans)
                self.cases.append(case)

    self._save_results()
    logger.info("Расследование завершено.")

db_creator

Classes

DataPopulator

Класс для наполнения базы данных экосистемы синтетическими данными.

Этот класс отвечает за создание схемы таблиц, генерацию профилей пользователей, имитацию транзакций, звонков и сценариев мошенничества.

Attributes:

Name Type Description
conn

Подключение к базе данных SQLite.

cursor

Объект курсора для выполнения SQL-запросов.

Source code in src_back/db_creator.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
class DataPopulator:
    """Класс для наполнения базы данных экосистемы синтетическими данными.

    Этот класс отвечает за создание схемы таблиц, генерацию профилей пользователей,
    имитацию транзакций, звонков и сценариев мошенничества.

    Attributes:
        conn: Подключение к базе данных SQLite.
        cursor: Объект курсора для выполнения SQL-запросов.
    """

    def __init__(self, db_path: str):
        """Инициализирует подключение к базе данных.

        Args:
            db_path: Путь к файлу базы данных SQLite.
        """
        self.conn = sqlite3.connect(db_path)
        self.cursor = self.conn.cursor()

    def setup_schema(self):
        """Создает структуру таблиц в базе данных.

        Удаляет существующие таблицы, если они есть, и создает новые согласно
        схеме экосистемы (unified_users, bank_clients, transactions и др.).
        """
        self.cursor.executescript("""
            DROP TABLE IF EXISTS unified_users;
            DROP TABLE IF EXISTS bank_clients;
            DROP TABLE IF EXISTS bank_transactions;
            DROP TABLE IF EXISTS market_place_delivery;
            DROP TABLE IF EXISTS mobile_build;
            DROP TABLE IF EXISTS mobile_clients;
            DROP TABLE IF EXISTS ecosystem_mapping;

            CREATE TABLE unified_users (
                unique_id TEXT, mobile_id TEXT, bank_id TEXT, marketplace_id TEXT,
                phone_mobile REAL, fio_mobile TEXT, address TEXT, account TEXT,
                phone_bank REAL, fio_bank TEXT, event_date TEXT, contact_fio TEXT,
                contact_phone REAL, address_market TEXT
            );
            CREATE TABLE bank_clients (userId TEXT, account TEXT, phone INTEGER, fio TEXT);
            CREATE TABLE bank_transactions (event_date TEXT, account_out TEXT, account_in TEXT, value REAL);
            CREATE TABLE market_place_delivery (event_date TEXT, user_id TEXT, contact_fio TEXT, contact_phone INTEGER, address TEXT);
            CREATE TABLE mobile_build (event_date TEXT, from_call INTEGER, to_call INTEGER, duration_sec INTEGER);
            CREATE TABLE mobile_clients (client_id TEXT, phone INTEGER, fio TEXT, address TEXT);
            CREATE TABLE ecosystem_mapping (unique_id TEXT, mobile_id TEXT, bank_id TEXT, marketplace_id TEXT);
        """)

    def generate_data(self, n_users: int = 100, n_frauds: int = 10):
        """Генерирует синтетические данные и наполняет ими таблицы.

        Процесс включает создание "мастер-данных" пользователей, наполнение
        источников (банк, мобильный оператор, маркетплейс), имитацию звонков
        злоумышленников и последующих подозрительных транзакций.

        Args:
            n_users: Общее количество уникальных пользователей для генерации.
            n_frauds: Количество генерируемых сценариев мошенничества.
        """
        # 1. Генерируем "Мастер-данные" пользователей
        users_pool = []
        for i in range(n_users):
            phone = int(f"79{random.randint(100000000, 999999999)}")
            user = {
                'unique_id': f"UID_{i + 1:03d}",
                'fio': fake.name(),
                'phone': phone,
                'address': fake.address(),
                'bank_id': f"B_{fake.unique.random_int(1000, 9999)}",
                'mobile_id': f"MOB_{fake.unique.random_int(1000, 9999)}",
                'market_id': f"MKT_{fake.unique.random_int(1000, 9999)}",
                'account': f"40817810{random.randint(100000000000, 999999999999)}",
                'is_fraudster': False
            }
            users_pool.append(user)

        # Помечаем некоторых как мошенников
        fraudsters = random.sample(users_pool, n_frauds)
        for f in fraudsters:
            f['is_fraudster'] = True

        # 2. Наполняем таблицы источников
        complaints = []

        for u in users_pool:
            # Bank
            self.cursor.execute("INSERT INTO bank_clients VALUES (?,?,?,?)",
                                (u['bank_id'], u['account'], u['phone'], u['fio']))
            # Mobile
            self.cursor.execute("INSERT INTO mobile_clients VALUES (?,?,?,?)",
                                (u['mobile_id'], u['phone'], u['fio'], u['address']))
            # Mapping
            self.cursor.execute("INSERT INTO ecosystem_mapping VALUES (?,?,?,?)",
                                (u['unique_id'], u['mobile_id'], u['bank_id'], u['market_id']))

            # Marketplace activity
            if random.random() > 0.3:
                self.cursor.execute("INSERT INTO market_place_delivery VALUES (?,?,?,?,?)",
                                    (fake.date_this_month().strftime('%Y-%m-%d'), u['market_id'],
                                     u['fio'], u['phone'], u['address']))

            # Unified Users
            self.cursor.execute("""
                INSERT INTO unified_users VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
            """, (u['unique_id'], u['mobile_id'], u['bank_id'], u['market_id'],
                  u['phone'], u['fio'], u['address'], u['account'],
                  u['phone'], u['fio'], fake.date_this_month().strftime('%Y-%m-%d'),
                  u['fio'], u['phone'], u['address']))

        # 3. Генерируем сценарии мошенничества
        victims = [u for u in users_pool if not u['is_fraudster']]

        for fraudster in fraudsters:
            victim = random.choice(victims)
            amount = random.choice([1500, 5000, 12000, 45000, 90000])
            dt_call = fake.date_time_this_month()
            dt_trans = dt_call + timedelta(minutes=random.randint(5, 30))

            # Звонок
            self.cursor.execute("INSERT INTO mobile_build VALUES (?,?,?,?)",
                                (dt_call.strftime('%Y-%m-%d %H:%M:%S'),
                                 fraudster['phone'], victim['phone'], random.randint(30, 300)))

            # Транзакция
            self.cursor.execute("INSERT INTO bank_transactions VALUES (?,?,?,?)",
                                (dt_trans.strftime('%Y-%m-%d %H:%M:%S'),
                                 victim['account'], fraudster['account'], amount))

            # Жалоба (в TSV)
            complaints.append({
                'userId': victim['bank_id'],
                'text': f"Помогите! {amount} руб. украли со счета после звонка.",
                'event_date': (dt_trans + timedelta(hours=1)).strftime('%Y-%m-%d %H:%M:%S')
            })

        # 4. Добавляем обычный шум
        for _ in range(n_users * 2):
            u1, u2 = random.sample(users_pool, 2)
            self.cursor.execute("INSERT INTO bank_transactions VALUES (?,?,?,?)",
                                (fake.date_time_this_month().strftime('%Y-%m-%d %H:%M:%S'),
                                 u1['account'], u2['account'], random.randint(100, 2000)))
            self.cursor.execute("INSERT INTO mobile_build VALUES (?,?,?,?)",
                                (fake.date_time_this_month().strftime('%Y-%m-%d %H:%M:%S'),
                                 u1['phone'], u2['phone'], random.randint(10, 100)))

        self.conn.commit()

        # Сохранение TSV
        pd.DataFrame(complaints).to_csv(COMPLAINTS_TSV, sep='\t', index=False)
        print(f"База данных успешно наполнена. Сгенерировано {n_users} пользователей и {n_frauds} кейсов мошенничества.")

    def close(self):
        """Закрывает соединение с базой данных."""
        self.conn.close()
Functions
__init__(db_path)

Инициализирует подключение к базе данных.

Parameters:

Name Type Description Default
db_path str

Путь к файлу базы данных SQLite.

required
Source code in src_back/db_creator.py
43
44
45
46
47
48
49
50
def __init__(self, db_path: str):
    """Инициализирует подключение к базе данных.

    Args:
        db_path: Путь к файлу базы данных SQLite.
    """
    self.conn = sqlite3.connect(db_path)
    self.cursor = self.conn.cursor()
close()

Закрывает соединение с базой данных.

Source code in src_back/db_creator.py
184
185
186
def close(self):
    """Закрывает соединение с базой данных."""
    self.conn.close()
generate_data(n_users=100, n_frauds=10)

Генерирует синтетические данные и наполняет ими таблицы.

Процесс включает создание "мастер-данных" пользователей, наполнение источников (банк, мобильный оператор, маркетплейс), имитацию звонков злоумышленников и последующих подозрительных транзакций.

Parameters:

Name Type Description Default
n_users int

Общее количество уникальных пользователей для генерации.

100
n_frauds int

Количество генерируемых сценариев мошенничества.

10
Source code in src_back/db_creator.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def generate_data(self, n_users: int = 100, n_frauds: int = 10):
    """Генерирует синтетические данные и наполняет ими таблицы.

    Процесс включает создание "мастер-данных" пользователей, наполнение
    источников (банк, мобильный оператор, маркетплейс), имитацию звонков
    злоумышленников и последующих подозрительных транзакций.

    Args:
        n_users: Общее количество уникальных пользователей для генерации.
        n_frauds: Количество генерируемых сценариев мошенничества.
    """
    # 1. Генерируем "Мастер-данные" пользователей
    users_pool = []
    for i in range(n_users):
        phone = int(f"79{random.randint(100000000, 999999999)}")
        user = {
            'unique_id': f"UID_{i + 1:03d}",
            'fio': fake.name(),
            'phone': phone,
            'address': fake.address(),
            'bank_id': f"B_{fake.unique.random_int(1000, 9999)}",
            'mobile_id': f"MOB_{fake.unique.random_int(1000, 9999)}",
            'market_id': f"MKT_{fake.unique.random_int(1000, 9999)}",
            'account': f"40817810{random.randint(100000000000, 999999999999)}",
            'is_fraudster': False
        }
        users_pool.append(user)

    # Помечаем некоторых как мошенников
    fraudsters = random.sample(users_pool, n_frauds)
    for f in fraudsters:
        f['is_fraudster'] = True

    # 2. Наполняем таблицы источников
    complaints = []

    for u in users_pool:
        # Bank
        self.cursor.execute("INSERT INTO bank_clients VALUES (?,?,?,?)",
                            (u['bank_id'], u['account'], u['phone'], u['fio']))
        # Mobile
        self.cursor.execute("INSERT INTO mobile_clients VALUES (?,?,?,?)",
                            (u['mobile_id'], u['phone'], u['fio'], u['address']))
        # Mapping
        self.cursor.execute("INSERT INTO ecosystem_mapping VALUES (?,?,?,?)",
                            (u['unique_id'], u['mobile_id'], u['bank_id'], u['market_id']))

        # Marketplace activity
        if random.random() > 0.3:
            self.cursor.execute("INSERT INTO market_place_delivery VALUES (?,?,?,?,?)",
                                (fake.date_this_month().strftime('%Y-%m-%d'), u['market_id'],
                                 u['fio'], u['phone'], u['address']))

        # Unified Users
        self.cursor.execute("""
            INSERT INTO unified_users VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
        """, (u['unique_id'], u['mobile_id'], u['bank_id'], u['market_id'],
              u['phone'], u['fio'], u['address'], u['account'],
              u['phone'], u['fio'], fake.date_this_month().strftime('%Y-%m-%d'),
              u['fio'], u['phone'], u['address']))

    # 3. Генерируем сценарии мошенничества
    victims = [u for u in users_pool if not u['is_fraudster']]

    for fraudster in fraudsters:
        victim = random.choice(victims)
        amount = random.choice([1500, 5000, 12000, 45000, 90000])
        dt_call = fake.date_time_this_month()
        dt_trans = dt_call + timedelta(minutes=random.randint(5, 30))

        # Звонок
        self.cursor.execute("INSERT INTO mobile_build VALUES (?,?,?,?)",
                            (dt_call.strftime('%Y-%m-%d %H:%M:%S'),
                             fraudster['phone'], victim['phone'], random.randint(30, 300)))

        # Транзакция
        self.cursor.execute("INSERT INTO bank_transactions VALUES (?,?,?,?)",
                            (dt_trans.strftime('%Y-%m-%d %H:%M:%S'),
                             victim['account'], fraudster['account'], amount))

        # Жалоба (в TSV)
        complaints.append({
            'userId': victim['bank_id'],
            'text': f"Помогите! {amount} руб. украли со счета после звонка.",
            'event_date': (dt_trans + timedelta(hours=1)).strftime('%Y-%m-%d %H:%M:%S')
        })

    # 4. Добавляем обычный шум
    for _ in range(n_users * 2):
        u1, u2 = random.sample(users_pool, 2)
        self.cursor.execute("INSERT INTO bank_transactions VALUES (?,?,?,?)",
                            (fake.date_time_this_month().strftime('%Y-%m-%d %H:%M:%S'),
                             u1['account'], u2['account'], random.randint(100, 2000)))
        self.cursor.execute("INSERT INTO mobile_build VALUES (?,?,?,?)",
                            (fake.date_time_this_month().strftime('%Y-%m-%d %H:%M:%S'),
                             u1['phone'], u2['phone'], random.randint(10, 100)))

    self.conn.commit()

    # Сохранение TSV
    pd.DataFrame(complaints).to_csv(COMPLAINTS_TSV, sep='\t', index=False)
    print(f"База данных успешно наполнена. Сгенерировано {n_users} пользователей и {n_frauds} кейсов мошенничества.")
setup_schema()

Создает структуру таблиц в базе данных.

Удаляет существующие таблицы, если они есть, и создает новые согласно схеме экосистемы (unified_users, bank_clients, transactions и др.).

Source code in src_back/db_creator.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def setup_schema(self):
    """Создает структуру таблиц в базе данных.

    Удаляет существующие таблицы, если они есть, и создает новые согласно
    схеме экосистемы (unified_users, bank_clients, transactions и др.).
    """
    self.cursor.executescript("""
        DROP TABLE IF EXISTS unified_users;
        DROP TABLE IF EXISTS bank_clients;
        DROP TABLE IF EXISTS bank_transactions;
        DROP TABLE IF EXISTS market_place_delivery;
        DROP TABLE IF EXISTS mobile_build;
        DROP TABLE IF EXISTS mobile_clients;
        DROP TABLE IF EXISTS ecosystem_mapping;

        CREATE TABLE unified_users (
            unique_id TEXT, mobile_id TEXT, bank_id TEXT, marketplace_id TEXT,
            phone_mobile REAL, fio_mobile TEXT, address TEXT, account TEXT,
            phone_bank REAL, fio_bank TEXT, event_date TEXT, contact_fio TEXT,
            contact_phone REAL, address_market TEXT
        );
        CREATE TABLE bank_clients (userId TEXT, account TEXT, phone INTEGER, fio TEXT);
        CREATE TABLE bank_transactions (event_date TEXT, account_out TEXT, account_in TEXT, value REAL);
        CREATE TABLE market_place_delivery (event_date TEXT, user_id TEXT, contact_fio TEXT, contact_phone INTEGER, address TEXT);
        CREATE TABLE mobile_build (event_date TEXT, from_call INTEGER, to_call INTEGER, duration_sec INTEGER);
        CREATE TABLE mobile_clients (client_id TEXT, phone INTEGER, fio TEXT, address TEXT);
        CREATE TABLE ecosystem_mapping (unique_id TEXT, mobile_id TEXT, bank_id TEXT, marketplace_id TEXT);
    """)

Functions

normalize_phone(phone)

Нормализует номер телефона, удаляя все нецифровые символы.

Parameters:

Name Type Description Default
phone Any

Номер телефона в любом формате (строка, число или NaN).

required

Returns:

Type Description
Optional[str]

Строка, состоящая только из цифр, или None, если входное значение пустое.

Source code in src_back/db_creator.py
18
19
20
21
22
23
24
25
26
27
28
29
def normalize_phone(phone: Any) -> Optional[str]:
    """Нормализует номер телефона, удаляя все нецифровые символы.

    Args:
        phone: Номер телефона в любом формате (строка, число или NaN).

    Returns:
        Строка, состоящая только из цифр, или None, если входное значение пустое.
    """
    if pd.isna(phone):
        return None
    return re.sub(r'\D', '', str(phone))