chg: [SQLInjectionDetection LibInjection modules] add module class

This commit is contained in:
Terrtia 2021-06-07 16:07:08 +02:00
parent 73da7ae4c5
commit b50c7103e7
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
5 changed files with 195 additions and 212 deletions

View file

@ -1,114 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The LibInjection Module
================================
This module is consuming the Redis-list created by the Web module.
It tries to identify SQL Injections with libinjection.
"""
import time
import datetime
import redis
import string
import urllib.request
import re
import pylibinjection
import pprint
from pubsublogger import publisher
from Helper import Process
from packages import Paste
from pyfaup.faup import Faup
def analyse(url, path):
faup.decode(url)
url_parsed = faup.get()
pprint.pprint(url_parsed)
## TODO: # FIXME: remove me
try:
resource_path = url_parsed['resource_path'].encode()
except:
resource_path = url_parsed['resource_path']
## TODO: # FIXME: remove me
try:
query_string = url_parsed['query_string'].encode()
except:
query_string = url_parsed['query_string']
result_path = {'sqli' : False}
result_query = {'sqli' : False}
if resource_path is not None:
result_path = pylibinjection.detect_sqli(resource_path)
print("path is sqli : {0}".format(result_path))
if query_string is not None:
result_query = pylibinjection.detect_sqli(query_string)
print("query is sqli : {0}".format(result_query))
if result_path['sqli'] is True or result_query['sqli'] is True:
paste = Paste.Paste(path)
print("Detected (libinjection) SQL in URL: ")
print(urllib.request.unquote(url))
to_print = 'LibInjection;{};{};{};{};{}'.format(paste.p_source, paste.p_date, paste.p_name, "Detected SQL in URL", paste.p_rel_path)
publisher.warning(to_print)
#Send to duplicate
p.populate_set_out(path, 'Duplicate')
msg = 'infoleak:automatic-detection="sql-injection";{}'.format(path)
p.populate_set_out(msg, 'Tags')
#statistics
## TODO: # FIXME: remove me
try:
tld = url_parsed['tld'].decode()
except:
tld = url_parsed['tld']
if tld is not None:
date = datetime.datetime.now().strftime("%Y%m")
server_statistics.hincrby('SQLInjection_by_tld:'+date, tld, 1)
if __name__ == '__main__':
# If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh)
# Port of the redis instance used by pubsublogger
publisher.port = 6380
# Script is the default channel used for the modules.
publisher.channel = 'Script'
# Section name in bin/packages/modules.cfg
config_section = 'LibInjection'
# Setup the I/O queues
p = Process(config_section)
# Sent to the logging a description of the module
publisher.info("Try to detect SQL injection with LibInjection")
server_statistics = redis.StrictRedis(
host=p.config.get("ARDB_Statistics", "host"),
port=p.config.getint("ARDB_Statistics", "port"),
db=p.config.getint("ARDB_Statistics", "db"),
decode_responses=True)
faup = Faup()
# Endless loop getting messages from the input queue
while True:
# Get one message from the input queue
message = p.get_from_set()
if message is None:
publisher.debug("{} queue is empty, waiting".format(config_section))
time.sleep(10)
continue
else:
# Do something with the message from the queue
url, date, path = message.split()
analyse(url, path)

View file

@ -1,98 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The SQLInjectionDetection Module
================================
This module is consuming the Redis-list created by the Web module.
It test different possibility to makes some sqlInjection.
"""
import time
import datetime
import redis
import urllib.request
import re
from pubsublogger import publisher
from Helper import Process
from packages import Paste
from pyfaup.faup import Faup
# Reference: https://github.com/stamparm/maltrail/blob/master/core/settings.py
SQLI_REGEX = r"information_schema|sysdatabases|sysusers|floor\(rand\(|ORDER BY \d+|\bUNION\s+(ALL\s+)?SELECT\b|\b(UPDATEXML|EXTRACTVALUE)\(|\bCASE[^\w]+WHEN.*THEN\b|\bWAITFOR[^\w]+DELAY\b|\bCONVERT\(|VARCHAR\(|\bCOUNT\(\*\)|\b(pg_)?sleep\(|\bSELECT\b.*\bFROM\b.*\b(WHERE|GROUP|ORDER)\b|\bSELECT \w+ FROM \w+|\b(AND|OR|SELECT)\b.*/\*.*\*/|/\*.*\*/.*\b(AND|OR|SELECT)\b|\b(AND|OR)[^\w]+\d+['\") ]?[=><]['\"( ]?\d+|ODBC;DRIVER|\bINTO\s+(OUT|DUMP)FILE"
def analyse(url, path):
if is_sql_injection(url):
faup.decode(url)
url_parsed = faup.get()
paste = Paste.Paste(path)
print("Detected SQL in URL: ")
print(urllib.request.unquote(url))
to_print = 'SQLInjection;{};{};{};{};{}'.format(paste.p_source, paste.p_date, paste.p_name, "Detected SQL in URL", paste.p_rel_path)
publisher.warning(to_print)
#Send to duplicate
p.populate_set_out(path, 'Duplicate')
msg = 'infoleak:automatic-detection="sql-injection";{}'.format(path)
p.populate_set_out(msg, 'Tags')
#statistics
tld = url_parsed['tld']
if tld is not None:
## TODO: # FIXME: remove me
try:
tld = tld.decode()
except:
pass
date = datetime.datetime.now().strftime("%Y%m")
server_statistics.hincrby('SQLInjection_by_tld:'+date, tld, 1)
# Try to detect if the url passed might be an sql injection by appliying the regex
# defined above on it.
def is_sql_injection(url_parsed):
line = urllib.request.unquote(url_parsed)
return re.search(SQLI_REGEX, line, re.I) is not None
if __name__ == '__main__':
# If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh)
# Port of the redis instance used by pubsublogger
publisher.port = 6380
# Script is the default channel used for the modules.
publisher.channel = 'Script'
# Section name in bin/packages/modules.cfg
config_section = 'SQLInjectionDetection'
# Setup the I/O queues
p = Process(config_section)
# Sent to the logging a description of the module
publisher.info("Try to detect SQL injection")
server_statistics = redis.StrictRedis(
host=p.config.get("ARDB_Statistics", "host"),
port=p.config.getint("ARDB_Statistics", "port"),
db=p.config.getint("ARDB_Statistics", "db"),
decode_responses=True)
faup = Faup()
# Endless loop getting messages from the input queue
while True:
# Get one message from the input queue
message = p.get_from_set()
if message is None:
publisher.debug("{} queue is empty, waiting".format(config_section))
time.sleep(10)
continue
else:
# Do something with the message from the queue
url, date, path = message.split()
analyse(url, path)

102
bin/modules/LibInjection.py Executable file
View file

@ -0,0 +1,102 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The LibInjection Module
================================
This module is consuming the Redis-list created by the Urls module.
It tries to identify SQL Injections with libinjection.
"""
import os
import sys
import redis
import urllib.request
import pylibinjection
from datetime import datetime
from pyfaup.faup import Faup
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from modules.abstract_module import AbstractModule
from lib.ConfigLoader import ConfigLoader
from packages.Item import Item
class LibInjection(AbstractModule):
"""docstring for LibInjection module."""
def __init__(self):
super(LibInjection, self).__init__()
self.faup = Faup()
config_loader = ConfigLoader()
self.server_statistics = config_loader.get_redis_conn("ARDB_Statistics")
self.redis_logger.info(f"Module: {self.module_name} Launched")
def compute(self, message):
url, id = message.split()
self.faup.decode(url)
url_parsed = self.faup.get()
## TODO: # FIXME: remove me
try:
resource_path = url_parsed['resource_path'].encode()
except:
resource_path = url_parsed['resource_path']
## TODO: # FIXME: remove me
try:
query_string = url_parsed['query_string'].encode()
except:
query_string = url_parsed['query_string']
result_path = {'sqli' : False}
result_query = {'sqli' : False}
if resource_path is not None:
result_path = pylibinjection.detect_sqli(resource_path)
#print(f'path is sqli : {result_path}')
if query_string is not None:
result_query = pylibinjection.detect_sqli(query_string)
#print(f'query is sqli : {result_query}')
if result_path['sqli'] is True or result_query['sqli'] is True:
item = Item(id)
item_id = item.get_id()
print(f"Detected (libinjection) SQL in URL: {item_id}")
print(urllib.request.unquote(url))
to_print = f'LibInjection;{item.get_source()};{item.get_date()};{item.get_basename()};Detected SQL in URL;{item_id}'
self.redis_logger.warning(to_print)
# Send to duplicate
self.send_message_to_queue(item_id, 'Duplicate')
# Add tag
msg = f'infoleak:automatic-detection="sql-injection";{item_id}'
self.send_message_to_queue(msg, 'Tags')
#statistics
## TODO: # FIXME: remove me
try:
tld = url_parsed['tld'].decode()
except:
tld = url_parsed['tld']
if tld is not None:
date = datetime.now().strftime("%Y%m")
self.server_statistics.hincrby(f'SQLInjection_by_tld:{date}', tld, 1)
if __name__ == "__main__":
module = LibInjection()
module.run()

2
bin/modules/README.md Normal file
View file

@ -0,0 +1,2 @@
AIL MODULES
===

View file

@ -0,0 +1,91 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The SQLInjectionDetection Module
================================
This module is consuming the Redis-list created by the Urls module.
It test different possibility to makes some sqlInjection.
"""
import os
import sys
import re
import redis
import urllib.request
from datetime import datetime
from pyfaup.faup import Faup
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from modules.abstract_module import AbstractModule
from lib.ConfigLoader import ConfigLoader
from packages.Item import Item
class SQLInjectionDetection(AbstractModule):
"""docstring for SQLInjectionDetection module."""
# # TODO: IMPROVE ME
# Reference: https://github.com/stamparm/maltrail/blob/master/core/settings.py
SQLI_REGEX = r"information_schema|sysdatabases|sysusers|floor\(rand\(|ORDER BY \d+|\bUNION\s+(ALL\s+)?SELECT\b|\b(UPDATEXML|EXTRACTVALUE)\(|\bCASE[^\w]+WHEN.*THEN\b|\bWAITFOR[^\w]+DELAY\b|\bCONVERT\(|VARCHAR\(|\bCOUNT\(\*\)|\b(pg_)?sleep\(|\bSELECT\b.*\bFROM\b.*\b(WHERE|GROUP|ORDER)\b|\bSELECT \w+ FROM \w+|\b(AND|OR|SELECT)\b.*/\*.*\*/|/\*.*\*/.*\b(AND|OR|SELECT)\b|\b(AND|OR)[^\w]+\d+['\") ]?[=><]['\"( ]?\d+|ODBC;DRIVER|\bINTO\s+(OUT|DUMP)FILE"
def __init__(self):
super(SQLInjectionDetection, self).__init__()
self.faup = Faup()
config_loader = ConfigLoader()
self.server_statistics = config_loader.get_redis_conn("ARDB_Statistics")
self.redis_logger.info(f"Module: {self.module_name} Launched")
def compute(self, message):
url, id = message.split()
if self.is_sql_injection(url):
self.faup.decode(url)
url_parsed = self.faup.get()
item = Item(id)
item_id = item.get_id()
print(f"Detected SQL in URL: {item_id}")
print(urllib.request.unquote(url))
to_print = f'SQLInjection;{item.get_source()};{item.get_date()};{item.get_basename()};Detected SQL in URL;{item_id}'
self.redis_logger.warning(to_print)
# Send to duplicate
self.send_message_to_queue(item_id, 'Duplicate')
# Tag
msg = f'infoleak:automatic-detection="sql-injection";{item_id}'
self.send_message_to_queue(msg, 'Tags')
# statistics
tld = url_parsed['tld']
if tld is not None:
## TODO: # FIXME: remove me
try:
tld = tld.decode()
except:
pass
date = datetime.now().strftime("%Y%m")
self.server_statistics.hincrby(f'SQLInjection_by_tld:{date}', tld, 1)
# Try to detect if the url passed might be an sql injection by appliying the regex
# defined above on it.
def is_sql_injection(self, url_parsed):
line = urllib.request.unquote(url_parsed)
return re.search(SQLInjectionDetection.SQLI_REGEX, line, re.I) is not None
if __name__ == "__main__":
module = SQLInjectionDetection()
module.run()