chg: [AIL items + Onion] create AIL item objects + Onion module refactor

This commit is contained in:
Terrtia 2021-05-14 14:42:16 +02:00
parent ab14ec0144
commit 4bbff47989
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
5 changed files with 350 additions and 247 deletions

View file

@ -22,254 +22,165 @@ Requirements
""" """
import time import time
from packages import Paste
from pubsublogger import publisher
import datetime import datetime
import os import os
import base64 import sys
import subprocess
import redis
import signal
import re import re
from pyfaup.faup import Faup from module.abstract_module import AbstractModule
from lib.ConfigLoader import ConfigLoader
from lib import crawlers
from lib import regex_helper
from packages.Item import Item
from Helper import Process ## Manually fetch first page if crawler is disabled
# import base64
class TimeoutException(Exception): # import subprocess
pass #
# torclient_host = '127.0.0.1'
def timeout_handler(signum, frame): # torclient_port = 9050
raise TimeoutException #
# def fetch(p, r_cache, urls, domains):
signal.signal(signal.SIGALRM, timeout_handler) # now = datetime.datetime.now()
# path = os.path.join('onions', str(now.year).zfill(4),
def fetch(p, r_cache, urls, domains, path): # str(now.month).zfill(2),
failed = [] # str(now.day).zfill(2),
downloaded = [] # str(int(time.mktime(now.utctimetuple()))))
print('{} Urls to fetch'.format(len(urls))) # failed = []
for url, domain in zip(urls, domains): # downloaded = []
if r_cache.exists(url) or url in failed: # print('{} Urls to fetch'.format(len(urls)))
continue # for url, domain in zip(urls, domains):
to_fetch = base64.standard_b64encode(url.encode('utf8')) # if r_cache.exists(url) or url in failed:
print('fetching url: {}'.format(to_fetch)) # continue
process = subprocess.Popen(["python", './tor_fetcher.py', to_fetch], # to_fetch = base64.standard_b64encode(url.encode('utf8'))
stdout=subprocess.PIPE) # print('fetching url: {}'.format(to_fetch))
while process.poll() is None: # process = subprocess.Popen(["python", './tor_fetcher.py', to_fetch],
time.sleep(1) # stdout=subprocess.PIPE)
# while process.poll() is None:
if process.returncode == 0: # time.sleep(1)
r_cache.setbit(url, 0, 1) #
r_cache.expire(url, 360000) # if process.returncode == 0:
downloaded.append(url) # r_cache.setbit(url, 0, 1)
print('downloaded : {}'.format(downloaded)) # r_cache.expire(url, 360000)
'''tempfile = process.stdout.read().strip() # downloaded.append(url)
tempfile = tempfile.decode('utf8') # print('downloaded : {}'.format(downloaded))
#with open(tempfile, 'r') as f: # '''tempfile = process.stdout.read().strip()
filename = path + domain + '.gz' # tempfile = tempfile.decode('utf8')
fetched = f.read() # #with open(tempfile, 'r') as f:
content = base64.standard_b64decode(fetched) # filename = path + domain + '.gz'
save_path = os.path.join(os.environ['AIL_HOME'], # fetched = f.read()
p.config.get("Directories", "pastes"), # content = base64.standard_b64decode(fetched)
filename) # save_path = os.path.join(os.environ['AIL_HOME'],
dirname = os.path.dirname(save_path) # p.config.get("Directories", "pastes"),
if not os.path.exists(dirname): # filename)
os.makedirs(dirname) # dirname = os.path.dirname(save_path)
with open(save_path, 'w') as ff: # if not os.path.exists(dirname):
ff.write(content) # os.makedirs(dirname)
p.populate_set_out(save_path, 'Global') # with open(save_path, 'w') as ff:
p.populate_set_out(url, 'ValidOnion') # ff.write(content)
p.populate_set_out(fetched, 'FetchedOnion')''' # p.populate_set_out(save_path, 'Global')
yield url # p.populate_set_out(url, 'ValidOnion')
#os.unlink(tempfile) # p.populate_set_out(fetched, 'FetchedOnion')'''
else: # yield url
r_cache.setbit(url, 0, 0) # #os.unlink(tempfile)
r_cache.expire(url, 3600) # else:
failed.append(url) # r_cache.setbit(url, 0, 0)
print('Failed at downloading', url) # r_cache.expire(url, 3600)
print(process.stdout.read()) # failed.append(url)
print('Failed:', len(failed), 'Downloaded:', len(downloaded)) # print('Failed at downloading', url)
# print(process.stdout.read())
# print('Failed:', len(failed), 'Downloaded:', len(downloaded))
if __name__ == "__main__": class Onion(AbstractModule):
publisher.port = 6380 """docstring for Onion module."""
publisher.channel = "Script"
torclient_host = '127.0.0.1' def __init__(self):
torclient_port = 9050 super(Onion, self).__init__()
config_section = 'Onion' config_loader = ConfigLoader()
self.r_cache = config_loader.get_redis_conn("Redis_Cache")
self.r_onion = config_loader.get_redis_conn("ARDB_Onion")
p = Process(config_section) self.pending_seconds = config_loader.get_config_int("Onion", "max_execution_time")
r_cache = redis.StrictRedis( # regex timeout
host=p.config.get("Redis_Cache", "host"), self.regex_timeout = 30
port=p.config.getint("Redis_Cache", "port"),
db=p.config.getint("Redis_Cache", "db"),
decode_responses=True)
r_onion = redis.StrictRedis( self.faup = crawlers.get_faup()
host=p.config.get("ARDB_Onion", "host"), self.redis_cache_key = regex_helper.generate_redis_cache_key(self.module_name)
port=p.config.getint("ARDB_Onion", "port"),
db=p.config.getint("ARDB_Onion", "db"),
decode_responses=True)
# FUNCTIONS # # activate_crawler = p.config.get("Crawler", "activate_crawler")
publisher.info("Script subscribed to channel onion_categ")
# FIXME For retro compatibility
channel = 'onion_categ'
# Getting the first message from redis.
message = p.get_from_set()
prec_filename = None
max_execution_time = p.config.getint("Onion", "max_execution_time")
# send to crawler:
activate_crawler = p.config.get("Crawler", "activate_crawler")
if activate_crawler == 'True':
activate_crawler = True
print('Crawler enabled')
else:
activate_crawler = False
print('Crawler disabled')
faup = Faup()
# Thanks to Faup project for this regex
# https://github.com/stricaud/faup
url_regex = "((http|https|ftp)?(?:\://)?([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.onion)(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*)"
i2p_regex = "((http|https|ftp)?(?:\://)?([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.i2p)(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*)"
re.compile(url_regex)
while True: self.url_regex = "((http|https|ftp)?(?:\://)?([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.onion)(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*)"
message = p.get_from_set() self.i2p_regex = "((http|https|ftp)?(?:\://)?([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.i2p)(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*)"
if message is not None: re.compile(self.url_regex)
re.compile(self.i2p_regex)
self.redis_logger.info(f"Module: {self.module_name} Launched")
# TEMP var: SAVE I2P Domain (future I2P crawler)
self.save_i2p = config_loader.get_config_boolean("Onion", "save_i2p")
def compute(self, message):
# list of tuples: (url, subdomains, domain)
urls_to_crawl = []
print(message) print(message)
filename, score = message.split() id, score = message.split()
item = Item(id)
# "For each new paste" item_content = item.get_content()
if prec_filename is None or filename != prec_filename: item_content = 'http://33333333.kingdom7rv6wkfzn.onion?sdsd=ooooo http://2222222.kingdom7rv6wkfzn.onion'
domains_list = []
urls = []
PST = Paste.Paste(filename)
# max execution time on regex # max execution time on regex
signal.alarm(max_execution_time) res = regex_helper.regex_findall(self.module_name, self.redis_cache_key, self.url_regex, item.get_id(), item_content)
try: for x in res:
for x in PST.get_regex(url_regex): # String to tuple
print(x) x = x[2:-2].replace(" '", "").split("',")
# Extracting url with regex url = x[0]
url, s, credential, subdomain, domain, host, port, \ subdomain = x[4].lower()
resource_path, query_string, f1, f2, f3, f4 = x self.faup.decode(url)
url_unpack = self.faup.get()
if '.onion' in url: try: ## TODO: # FIXME: check faup version
print(url)
domains_list.append(domain)
urls.append(url)
except TimeoutException:
encoded_list = []
p.incr_module_timeout_statistic()
print ("{0} processing timeout".format(PST.p_rel_path))
continue
signal.alarm(0)
'''
for x in PST.get_regex(i2p_regex):
# Extracting url with regex
url, s, credential, subdomain, domain, host, port, \
resource_path, query_string, f1, f2, f3, f4 = x
if '.i2p' in url:
print('add i2p')
print(domain)
if not r_onion.sismember('i2p_domain', domain) and not r_onion.sismember('i2p_domain_crawler_queue', domain):
r_onion.sadd('i2p_domain', domain)
r_onion.sadd('i2p_link', url)
r_onion.sadd('i2p_domain_crawler_queue', domain)
msg = '{};{}'.format(url,PST.p_rel_path)
r_onion.sadd('i2p_crawler_queue', msg)
'''
to_print = 'Onion;{};{};{};'.format(PST.p_source, PST.p_date,
PST.p_name)
print(len(domains_list))
if len(domains_list) > 0:
if not activate_crawler:
publisher.warning('{}Detected {} .onion(s);{}'.format(
to_print, len(domains_list),PST.p_rel_path))
else:
publisher.info('{}Detected {} .onion(s);{}'.format(
to_print, len(domains_list),PST.p_rel_path))
now = datetime.datetime.now()
path = os.path.join('onions', str(now.year).zfill(4),
str(now.month).zfill(2),
str(now.day).zfill(2),
str(int(time.mktime(now.utctimetuple()))))
to_print = 'Onion;{};{};{};'.format(PST.p_source,
PST.p_date,
PST.p_name)
if activate_crawler:
date_month = datetime.datetime.now().strftime("%Y%m")
date = datetime.datetime.now().strftime("%Y%m%d")
for url in urls:
faup.decode(url)
url_unpack = faup.get()
## TODO: # FIXME: remove me
try:
domain = url_unpack['domain'].decode().lower() domain = url_unpack['domain'].decode().lower()
except Exception as e: except Exception as e:
domain = url_unpack['domain'].lower() domain = url_unpack['domain'].lower()
print('----')
print(url)
print(subdomain)
print(domain)
## TODO: blackilst by port ? if crawlers.is_valid_onion_domain(domain):
# check blacklist urls_to_crawl.append((url, subdomain, domain))
if r_onion.sismember('blacklist_onion', domain):
continue
subdomain = re.findall(url_regex, url) to_print = f'Onion;{item.get_source()};{item.get_date()};{item.get_basename()};'
if len(subdomain) > 0: if not urls_to_crawl:
subdomain = subdomain[0][4].lower() self.redis_logger.info(f'{to_print}Onion related;{item.get_id()}')
else: return
continue
# too many subdomain
if len(subdomain.split('.')) > 3:
subdomain = '{}.{}.onion'.format(subdomain[-3], subdomain[-2])
if not r_onion.sismember('month_onion_up:{}'.format(date_month), subdomain) and not r_onion.sismember('onion_down:'+date , subdomain):
if not r_onion.sismember('onion_domain_crawler_queue', subdomain):
print('send to onion crawler')
r_onion.sadd('onion_domain_crawler_queue', subdomain)
msg = '{};{}'.format(url,PST.p_rel_path)
if not r_onion.hexists('onion_metadata:{}'.format(subdomain), 'first_seen'):
r_onion.sadd('onion_crawler_discovery_queue', msg)
print('send to priority queue')
else:
r_onion.sadd('onion_crawler_queue', msg)
# tag if domain was up
if r_onion.sismember('full_onion_up', subdomain):
# TAG Item
msg = 'infoleak:automatic-detection="onion";{}'.format(PST.p_rel_path)
p.populate_set_out(msg, 'Tags')
else:
for url in fetch(p, r_cache, urls, domains_list, path):
publisher.info('{}Checked {};{}'.format(to_print, url, PST.p_rel_path))
# TAG Item # TAG Item
msg = 'infoleak:automatic-detection="onion";{}'.format(PST.p_rel_path) msg = f'infoleak:automatic-detection="onion";{item.get_id()}'
p.populate_set_out(msg, 'Tags') self.send_message_to_queue('Tags', msg)
else:
publisher.info('{}Onion related;{}'.format(to_print, PST.p_rel_path))
prec_filename = filename if crawlers.is_crawler_activated():
for to_crawl in urls_to_crawl:
crawlers.add_item_to_discovery_queue('onion', to_crawl[2], to_crawl[1], to_crawl[0], item.get_id())
else: else:
publisher.debug("Script url is Idling 10s") self.redis_logger.warning(f'{to_print}Detected {len(urls_to_crawl)} .onion(s);{item.get_id()}')
#print('Sleeping') # keep manual fetcher ????
time.sleep(10) ## Manually fetch first page if crawler is disabled
# for url in fetch(p, r_cache, urls, domains_list):
# publisher.info('{}Checked {};{}'.format(to_print, url, PST.p_rel_path))
if __name__ == "__main__":
module = Onion()
module.run()
##########################

View file

@ -19,7 +19,8 @@ import uuid
import subprocess import subprocess
from datetime import datetime, timedelta from datetime import datetime, timedelta
from urllib.parse import urlparse from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
from pyfaup.faup import Faup from pyfaup.faup import Faup
@ -41,6 +42,7 @@ r_serv_metadata = config_loader.get_redis_conn("ARDB_Metadata")
r_serv_onion = config_loader.get_redis_conn("ARDB_Onion") r_serv_onion = config_loader.get_redis_conn("ARDB_Onion")
r_cache = config_loader.get_redis_conn("Redis_Cache") r_cache = config_loader.get_redis_conn("Redis_Cache")
PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes"))
activate_crawler = config_loader.get_config_str("Crawler", "activate_crawler")
config_loader = None config_loader = None
faup = Faup() faup = Faup()
@ -76,6 +78,64 @@ def is_valid_onion_domain(domain):
def get_faup(): def get_faup():
return faup return faup
# # # # # # # #
# #
# FAVICON #
# #
# # # # # # # #
def get_favicon_from_html(html, domain, url):
favicon_urls = extract_favicon_from_html(html, url)
# add root favicom
if not favicon_urls:
favicon_urls.add(f'{urlparse(url).scheme}://{domain}/favicon.ico')
print(favicon_urls)
return favicon_urls
def extract_favicon_from_html(html, url):
favicon_urls = set()
soup = BeautifulSoup(html, 'html.parser')
set_icons = set()
# If there are multiple <link rel="icon">s, the browser uses their media,
# type, and sizes attributes to select the most appropriate icon.
# If several icons are equally appropriate, the last one is used.
# If the most appropriate icon is later found to be inappropriate,
# for example because it uses an unsupported format,
# the browser proceeds to the next-most appropriate, and so on.
# # DEBUG: /!\ firefox load all favicon ???
# iOS Safari 'apple-touch-icon'
# Safari pinned tabs 'mask-icon'
# Android Chrome 'manifest'
# Edge and IE 12:
# - <meta name="msapplication-TileColor" content="#aaaaaa"> <meta name="theme-color" content="#ffffff">
# - <meta name="msapplication-config" content="/icons/browserconfig.xml">
# desktop browser 'shortcut icon' (older browser), 'icon'
for favicon_tag in ['icon', 'shortcut icon']:
if soup.head:
for icon in soup.head.find_all('link', attrs={'rel': lambda x : x and x.lower() == favicon_tag, 'href': True}):
set_icons.add(icon)
# # TODO: handle base64 favicon
for tag in set_icons:
icon_url = tag.get('href')
if icon_url:
if icon_url.startswith('//'):
icon_url = icon_url.replace('//', '/')
if icon_url.startswith('data:'):
# # TODO: handle base64 favicon
pass
else:
icon_url = urljoin(url, icon_url)
icon_url = urlparse(icon_url, scheme=urlparse(url).scheme).geturl()
favicon_urls.add(icon_url)
return favicon_urls
# # # - - # # #
################################################################################ ################################################################################
# # TODO: handle prefix cookies # # TODO: handle prefix cookies
@ -412,6 +472,13 @@ def api_create_cookie(user_id, cookiejar_uuid, cookie_dict):
#### CRAWLER GLOBAL #### #### CRAWLER GLOBAL ####
## TODO: # FIXME: config db, dynamic load
def is_crawler_activated():
return activate_crawler == 'True'
def get_crawler_all_types():
return ['onion', 'regular']
def get_all_spash_crawler_status(): def get_all_spash_crawler_status():
crawler_metadata = [] crawler_metadata = []
all_crawlers = r_cache.smembers('all_splash_crawlers') all_crawlers = r_cache.smembers('all_splash_crawlers')
@ -741,6 +808,83 @@ def api_add_crawled_item(dict_crawled):
create_item_metadata(item_id, domain, 'last_url', port, 'father') create_item_metadata(item_id, domain, 'last_url', port, 'father')
#### CRAWLER QUEUES #### #### CRAWLER QUEUES ####
## queues priority:
# 1 - priority queue
# 2 - discovery queue
# 3 - default queue
##
def get_all_queues_names():
return ['priority', 'discovery', 'default']
def get_all_queues_keys():
return ['{}_crawler_priority_queue', '{}_crawler_discovery_queue', '{}_crawler_queue']
def get_queue_key_by_name(queue_name):
if queue_name == 'priority':
return '{}_crawler_priority_queue'
elif queue_name == 'discovery':
return '{}_crawler_discovery_queue'
else: # default
return '{}_crawler_queue'
def get_stats_elem_to_crawl_by_queue_type(queue_type):
dict_stats = {}
for queue_name in get_all_queues_names():
dict_stats[queue_name] = r_serv_onion.scard(get_queue_key_by_name(queue_name).format(queue_type))
return dict_stats
def get_all_queues_stats():
print(get_all_crawlers_queues_types())
dict_stats = {}
for queue_type in get_crawler_all_types():
dict_stats[queue_type] = get_stats_elem_to_crawl_by_queue_type(queue_type)
for queue_type in get_all_splash():
dict_stats[queue_type] = get_stats_elem_to_crawl_by_queue_type(queue_type)
return dict_stats
def add_item_to_discovery_queue(queue_type, domain, subdomain, url, item_id):
date_month = datetime.now().strftime("%Y%m")
date = datetime.now().strftime("%Y%m%d")
# check blacklist
if r_serv_onion.sismember(f'blacklist_{queue_type}', domain):
return
# too many subdomain # # FIXME: move to crawler module ?
if len(subdomain.split('.')) > 3:
subdomain = f'{subdomain[-3]}.{subdomain[-2]}.{queue_type}'
if not r_serv_onion.sismember(f'month_{queue_type}_up:{date_month}', subdomain) and not r_serv_onion.sismember(f'{queue_type}_down:{date}' , subdomain):
if not r_serv_onion.sismember(f'{queue_type}_domain_crawler_queue', subdomain):
r_serv_onion.sadd(f'{queue_type}_domain_crawler_queue', subdomain)
msg = f'{url};{item_id}'
# First time we see this domain => Add to discovery queue (priority=2)
if not r_serv_onion.hexists(f'{queue_type}_metadata:{subdomain}', 'first_seen'):
r_serv_onion.sadd(f'{queue_type}_crawler_discovery_queue', msg)
print(f'sent to priority queue: {subdomain}')
# Add to default queue (priority=3)
else:
r_serv_onion.sadd(f'{queue_type}_crawler_queue', msg)
print(f'sent to queue: {subdomain}')
def remove_task_from_crawler_queue(queue_name, queue_type, key_to_remove):
r_serv_onion.srem(queue_name.format(queue_type), key_to_remove)
# # TODO: keep auto crawler ?
def clear_crawler_queues():
for queue_key in get_all_queues_keys():
for queue_type in get_crawler_all_types():
r_serv_onion.delete(queue_key.format(queue_type))
###################################################################################
def get_nb_elem_to_crawl_by_type(queue_type): # # TODO: rename me
nb = r_serv_onion.scard('{}_crawler_priority_queue'.format(queue_type))
nb += r_serv_onion.scard('{}_crawler_discovery_queue'.format(queue_type))
nb += r_serv_onion.scard('{}_crawler_queue'.format(queue_type))
return nb
###################################################################################
def get_all_crawlers_queues_types(): def get_all_crawlers_queues_types():
all_queues_types = set() all_queues_types = set()
all_splash_name = get_all_crawlers_to_launch_splash_name() all_splash_name = get_all_crawlers_to_launch_splash_name()
@ -782,9 +926,8 @@ def get_elem_to_crawl_by_queue_type(l_queue_type):
# 2 - discovery queue # 2 - discovery queue
# 3 - normal queue # 3 - normal queue
## ##
all_queue_key = ['{}_crawler_priority_queue', '{}_crawler_discovery_queue', '{}_crawler_queue']
for queue_key in all_queue_key: for queue_key in get_all_queues_keys():
for queue_type in l_queue_type: for queue_type in l_queue_type:
message = r_serv_onion.spop(queue_key.format(queue_type)) message = r_serv_onion.spop(queue_key.format(queue_type))
if message: if message:
@ -801,12 +944,6 @@ def get_elem_to_crawl_by_queue_type(l_queue_type):
return {'url': url, 'paste': item_id, 'type_service': crawler_type, 'queue_type': queue_type, 'original_message': message} return {'url': url, 'paste': item_id, 'type_service': crawler_type, 'queue_type': queue_type, 'original_message': message}
return None return None
def get_nb_elem_to_crawl_by_type(queue_type):
nb = r_serv_onion.scard('{}_crawler_priority_queue'.format(queue_type))
nb += r_serv_onion.scard('{}_crawler_discovery_queue'.format(queue_type))
nb += r_serv_onion.scard('{}_crawler_queue'.format(queue_type))
return nb
#### ---- #### #### ---- ####
# # # # # # # # # # # # # # # # # # # # # # # #
@ -1281,8 +1418,9 @@ def test_ail_crawlers():
#### ---- #### #### ---- ####
if __name__ == '__main__': if __name__ == '__main__':
res = get_splash_manager_version() # res = get_splash_manager_version()
res = test_ail_crawlers() # res = test_ail_crawlers()
res = is_test_ail_crawlers_successful() # res = is_test_ail_crawlers_successful()
print(res) # print(res)
print(get_test_ail_crawlers_message()) # print(get_test_ail_crawlers_message())
#print(get_all_queues_stats())

View file

@ -15,7 +15,6 @@ import time
from pubsublogger import publisher from pubsublogger import publisher
from Helper import Process from Helper import Process
class AbstractModule(ABC): class AbstractModule(ABC):
""" """
Abstract Module class Abstract Module class
@ -38,6 +37,7 @@ class AbstractModule(ABC):
self.redis_logger.port = 6380 self.redis_logger.port = 6380
# Channel name to publish logs # Channel name to publish logs
self.redis_logger.channel = 'Script' self.redis_logger.channel = 'Script'
# # TODO: refactor logging
# TODO modify generic channel Script to a namespaced channel like: # TODO modify generic channel Script to a namespaced channel like:
# publish module logs to script:<ModuleName> channel # publish module logs to script:<ModuleName> channel
# self.redis_logger.channel = 'script:%s'%(self.module_name) # self.redis_logger.channel = 'script:%s'%(self.module_name)
@ -51,6 +51,23 @@ class AbstractModule(ABC):
# Setup the I/O queues # Setup the I/O queues
self.process = Process(self.queue_name) self.process = Process(self.queue_name)
def get_message(self):
"""
Get message from the Redis Queue (QueueIn)
Input message can change between modules
ex: '<item id>'
"""
return self.process.get_from_set()
def send_message_to_queue(self, queue_name, message):
"""
Send message to queue
:param queue_name: queue or module name
:param message: message to send in queue
ex: send_to_queue(item_id, 'Global')
"""
self.process.populate_set_out(message, queue_name)
def run(self): def run(self):
""" """
@ -59,8 +76,8 @@ class AbstractModule(ABC):
# Endless loop processing messages from the input queue # Endless loop processing messages from the input queue
while self.proceed: while self.proceed:
# Get one message (paste) from the QueueIn (copy of Redis_Global publish) # Get one message (ex:item id) from the Redis Queue (QueueIn)
message = self.process.get_from_set() message = self.get_message()
if message is None: if message is None:
self.computeNone() self.computeNone()

View file

@ -25,6 +25,7 @@ import Decoded
import Screenshot import Screenshot
import Username import Username
from ail_objects import AbstractObject
from item_basic import * from item_basic import *
config_loader = ConfigLoader.ConfigLoader() config_loader = ConfigLoader.ConfigLoader()
@ -549,7 +550,42 @@ def delete_domain_node(item_id):
for child_id in get_all_domain_node_by_item_id(item_id): for child_id in get_all_domain_node_by_item_id(item_id):
delete_item(child_id) delete_item(child_id)
class Item(AbstractObject):
"""
AIL Item Object. (strings)
"""
def __init__(self, id):
super(Item, self).__init__('item', id)
def get_date(self, separator=False):
"""
Returns Item date
"""
return item_basic.get_item_date(self.id, add_separator=separator)
def get_source(self):
"""
Returns Item source/feeder name
"""
return item_basic.get_source(self.id)
def get_basename(self):
return os.path.basename(self.id)
def get_content(self):
"""
Returns Item content
"""
return item_basic.get_item_content(self.id)
# if __name__ == '__main__': # if __name__ == '__main__':
#
# item = Item('')
# res = item.get_date(separator=True)
# print(res)
# import Domain # import Domain
# domain = Domain.Domain('domain.onion') # domain = Domain.Domain('domain.onion')
# for domain_history in domain.get_domain_history(): # for domain_history in domain.get_domain_history():

View file

@ -77,6 +77,7 @@ minTopPassList=5
max_execution_time = 90 max_execution_time = 90
[Onion] [Onion]
save_i2p = False
max_execution_time = 180 max_execution_time = 180
[PgpDump] [PgpDump]