ail-framework/bin/modules/LibInjection.py

102 lines
3 KiB
Python
Executable file

#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The LibInjection Module
================================
This module is consuming the Redis-list created by the Urls module.
It tries to identify SQL Injections with libinjection.
"""
import os
import sys
import redis
import urllib.request
import pylibinjection
from datetime import datetime
from pyfaup.faup import Faup
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from modules.abstract_module import AbstractModule
from lib.ConfigLoader import ConfigLoader
from packages.Item import Item
class LibInjection(AbstractModule):
"""docstring for LibInjection module."""
def __init__(self):
super(LibInjection, self).__init__()
self.faup = Faup()
config_loader = ConfigLoader()
self.server_statistics = config_loader.get_redis_conn("ARDB_Statistics")
self.redis_logger.info(f"Module: {self.module_name} Launched")
def compute(self, message):
url, id = message.split()
self.faup.decode(url)
url_parsed = self.faup.get()
## TODO: # FIXME: remove me
try:
resource_path = url_parsed['resource_path'].encode()
except:
resource_path = url_parsed['resource_path']
## TODO: # FIXME: remove me
try:
query_string = url_parsed['query_string'].encode()
except:
query_string = url_parsed['query_string']
result_path = {'sqli' : False}
result_query = {'sqli' : False}
if resource_path is not None:
result_path = pylibinjection.detect_sqli(resource_path)
#print(f'path is sqli : {result_path}')
if query_string is not None:
result_query = pylibinjection.detect_sqli(query_string)
#print(f'query is sqli : {result_query}')
if result_path['sqli'] is True or result_query['sqli'] is True:
item = Item(id)
item_id = item.get_id()
print(f"Detected (libinjection) SQL in URL: {item_id}")
print(urllib.request.unquote(url))
to_print = f'LibInjection;{item.get_source()};{item.get_date()};{item.get_basename()};Detected SQL in URL;{item_id}'
self.redis_logger.warning(to_print)
# Send to duplicate
self.send_message_to_queue(item_id, 'Duplicate')
# Add tag
msg = f'infoleak:automatic-detection="sql-injection";{item_id}'
self.send_message_to_queue(msg, 'Tags')
#statistics
## TODO: # FIXME: remove me
try:
tld = url_parsed['tld'].decode()
except:
tld = url_parsed['tld']
if tld is not None:
date = datetime.now().strftime("%Y%m")
self.server_statistics.hincrby(f'SQLInjection_by_tld:{date}', tld, 1)
if __name__ == "__main__":
module = LibInjection()
module.run()