mirror of
https://github.com/ail-project/ail-framework.git
synced 2024-11-26 15:57:16 +00:00
124 lines
3.6 KiB
Python
Executable file
124 lines
3.6 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*-coding:UTF-8 -*
|
|
|
|
"""
|
|
The BankAccount Module
|
|
======================
|
|
|
|
It apply IBAN regexes on item content and warn if above a threshold.
|
|
|
|
"""
|
|
|
|
import redis
|
|
import time
|
|
import redis
|
|
import datetime
|
|
import re
|
|
import string
|
|
from itertools import chain
|
|
|
|
from packages import Item
|
|
from pubsublogger import publisher
|
|
|
|
from Helper import Process
|
|
|
|
import signal
|
|
|
|
class TimeoutException(Exception):
|
|
pass
|
|
|
|
def timeout_handler(signum, frame):
|
|
raise TimeoutException
|
|
|
|
signal.signal(signal.SIGALRM, timeout_handler)
|
|
|
|
_LETTERS_IBAN = chain(enumerate(string.digits + string.ascii_uppercase),
|
|
enumerate(string.ascii_lowercase, 10))
|
|
LETTERS_IBAN = {ord(d): str(i) for i, d in _LETTERS_IBAN}
|
|
|
|
def iban_number(iban):
|
|
return (iban[4:] + iban[:4]).translate(LETTERS_IBAN)
|
|
|
|
def is_valid_iban(iban):
|
|
iban_numb = iban_number(iban)
|
|
iban_numb_check = iban_number(iban[:2] + '00' + iban[4:])
|
|
check_digit = '{:0>2}'.format(98 - (int(iban_numb_check) % 97))
|
|
if check_digit == iban[2:4] and int(iban_numb) % 97 == 1:
|
|
# valid iban
|
|
print('valid iban')
|
|
return True
|
|
return False
|
|
|
|
def check_all_iban(l_iban, obj_id):
|
|
nb_valid_iban = 0
|
|
for iban in l_iban:
|
|
iban = iban[0]+iban[1]+iban[2]
|
|
iban = ''.join(e for e in iban if e.isalnum())
|
|
#iban = iban.upper()
|
|
res = iban_regex_verify.findall(iban)
|
|
date = datetime.datetime.now().strftime("%Y%m")
|
|
if res:
|
|
print('checking '+iban)
|
|
if is_valid_iban(iban):
|
|
print('------')
|
|
nb_valid_iban = nb_valid_iban + 1
|
|
server_statistics.hincrby('iban_by_country:'+date, iban[0:2], 1)
|
|
|
|
if(nb_valid_iban > 0):
|
|
to_print = 'Iban;{};{};{};'.format(Item.get_source(obj_id), Item.get_item_date(obj_id), Item.get_basename(obj_id))
|
|
publisher.warning('{}Checked found {} IBAN;{}'.format(
|
|
to_print, nb_valid_iban, obj_id))
|
|
msg = 'infoleak:automatic-detection="iban";{}'.format(obj_id)
|
|
p.populate_set_out(msg, 'Tags')
|
|
|
|
#Send to duplicate
|
|
p.populate_set_out(obj_id, 'Duplicate')
|
|
|
|
if __name__ == "__main__":
|
|
publisher.port = 6380
|
|
publisher.channel = "Script"
|
|
|
|
config_section = 'BankAccount'
|
|
|
|
p = Process(config_section)
|
|
max_execution_time = p.config.getint("BankAccount", "max_execution_time")
|
|
|
|
# ARDB #
|
|
server_statistics = redis.StrictRedis(
|
|
host=p.config.get("ARDB_Statistics", "host"),
|
|
port=p.config.getint("ARDB_Statistics", "port"),
|
|
db=p.config.getint("ARDB_Statistics", "db"),
|
|
decode_responses=True)
|
|
|
|
publisher.info("BankAccount started")
|
|
|
|
#iban_regex = re.compile(r'\b[A-Za-z]{2}[0-9]{2}(?:[ ]?[0-9]{4}){4}(?:[ ]?[0-9]{1,2})?\b')
|
|
iban_regex = re.compile(r'\b([A-Za-z]{2}[ \-]?[0-9]{2})(?=(?:[ \-]?[A-Za-z0-9]){9,30})((?:[ \-]?[A-Za-z0-9]{3,5}){2,6})([ \-]?[A-Za-z0-9]{1,3})\b')
|
|
iban_regex_verify = re.compile(r'^([A-Z]{2})([0-9]{2})([A-Z0-9]{9,30})$')
|
|
|
|
|
|
while True:
|
|
|
|
message = p.get_from_set()
|
|
|
|
if message is not None:
|
|
|
|
obj_id = Item.get_item_id(message)
|
|
|
|
content = Item.get_item_content(obj_id)
|
|
|
|
signal.alarm(max_execution_time)
|
|
try:
|
|
l_iban = iban_regex.findall(content)
|
|
except TimeoutException:
|
|
print ("{0} processing timeout".format(obj_id))
|
|
continue
|
|
else:
|
|
signal.alarm(0)
|
|
|
|
if(len(l_iban) > 0):
|
|
check_all_iban(l_iban, obj_id)
|
|
|
|
else:
|
|
publisher.debug("Script BankAccount is Idling 10s")
|
|
time.sleep(10)
|