chg: [API] get domain metadata (minimal)

This commit is contained in:
Terrtia 2019-12-11 13:58:43 +01:00
parent 0fb4990d98
commit e71a181bb9
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
2 changed files with 87 additions and 21 deletions

View file

@ -134,6 +134,28 @@ def get_domain_items_crawled(domain, domain_type, port, epoch=None, items_link=F
def get_link_tree(): def get_link_tree():
pass pass
def get_domain_first_seen(domain, domain_type=None, r_format="str"):
'''
Get domain first seen date
:param domain: crawled domain
:type domain: str
:param domain_type: domain type
:type domain_type: str
:return: domain first seen date
:rtype: str
'''
if not domain_type:
domain_type = get_domain_type(domain)
first_seen = r_serv_onion.hget('{}_metadata:{}'.format(domain_type, domain), 'first_seen')
if first_seen is not None:
if r_format=="int":
first_seen = int(first_seen)
else:
first_seen = '{}/{}/{}'.format(first_seen[0:4], first_seen[4:6], first_seen[6:8])
return first_seen
def get_domain_last_check(domain, domain_type=None, r_format="str"): def get_domain_last_check(domain, domain_type=None, r_format="str"):
''' '''
Get domain last check date Get domain last check date
@ -180,6 +202,44 @@ def get_domain_tags(domain):
''' '''
return Tag.get_item_tags(domain) return Tag.get_item_tags(domain)
def get_domain_metadata(domain, domain_type, first_seen=True, last_ckeck=True, status=True, ports=True, tags=False):
'''
Get Domain basic metadata
:param first_seen: get domain first_seen
:type first_seen: boolean
:param last_ckeck: get domain last_check
:type last_ckeck: boolean
:param ports: get all domain ports
:type ports: boolean
:param tags: get all domain tags
:type tags: boolean
:return: a dict of all metadata for a given domain
:rtype: dict
'''
dict_metadata = {}
if first_seen:
res = get_domain_first_seen(domain, domain_type=domain_type)
if res is not None:
dict_metadata['first_seen'] = res
if last_ckeck:
res = get_domain_last_check(domain, domain_type=domain_type)
if res is not None:
dict_metadata['last_check'] = res
if status:
dict_metadata['status'] = is_domain_up(domain, domain_type)
if ports:
dict_metadata['ports'] = get_domain_all_ports(domain, domain_type)
if tags:
dict_metadata['tags'] = get_domain_tags(domain)
return dict_metadata
def get_domain_metadata_basic(domain, domain_type=None):
if not domain_type:
domain_type = get_domain_type(domain)
return get_domain_metadata(domain, domain_type, first_seen=True, last_ckeck=True, status=True, ports=False)
def get_domain_cryptocurrency(domain, currencies_type=None, get_nb=False): def get_domain_cryptocurrency(domain, currencies_type=None, get_nb=False):
''' '''
Retun all cryptocurrencies of a given domain. Retun all cryptocurrencies of a given domain.
@ -287,12 +347,15 @@ def get_domain_history_with_status(domain, domain_type, port): # TODO: add date_
def verify_if_domain_exist(domain): def verify_if_domain_exist(domain):
return r_serv_onion.exists('{}_metadata:{}'.format(get_domain_type(domain), domain)) return r_serv_onion.exists('{}_metadata:{}'.format(get_domain_type(domain), domain))
## API ##
def api_verify_if_domain_exist(domain): def api_verify_if_domain_exist(domain):
if not verify_if_domain_exist(domain): if not verify_if_domain_exist(domain):
return ({'status': 'error', 'reason': 'Unknow Domain'}, 404) return ({'status': 'error', 'reason': 'Domain not found'}, 404)
else: else:
return None return None
## CLASS ##
class Domain(object): class Domain(object):
"""docstring for Domain.""" """docstring for Domain."""
@ -318,10 +381,7 @@ class Domain(object):
:return: domain first seen date :return: domain first seen date
:rtype: str :rtype: str
''' '''
first_seen = r_serv_onion.hget('{}_metadata:{}'.format(self.type, self.domain), 'first_seen') return get_domain_first_seen(self.domain, domain_type=self.type)
if first_seen is not None:
first_seen = '{}/{}/{}'.format(first_seen[0:4], first_seen[4:6], first_seen[6:8])
return first_seen
def get_domain_last_check(self): def get_domain_last_check(self):
''' '''
@ -371,22 +431,7 @@ class Domain(object):
:return: a dict of all metadata for a given domain :return: a dict of all metadata for a given domain
:rtype: dict :rtype: dict
''' '''
dict_metadata = {} return get_domain_metadata(self.domain, self.type, first_seen=first_seen, last_ckeck=last_ckeck, status=status, ports=ports, tags=tags)
if first_seen:
res = self.get_domain_first_seen()
if res is not None:
dict_metadata['first_seen'] = res
if last_ckeck:
res = self.get_domain_last_check()
if res is not None:
dict_metadata['last_check'] = res
if status:
dict_metadata['status'] = self.is_domain_up()
if ports:
dict_metadata['ports'] = self.get_domain_all_ports()
if tags:
dict_metadata['tags'] = self.get_domain_tags()
return dict_metadata
def get_domain_tags(self): def get_domain_tags(self):
''' '''

View file

@ -13,6 +13,9 @@ import json
import redis import redis
import datetime import datetime
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/'))
import Domain
import Import_helper import Import_helper
import Cryptocurrency import Cryptocurrency
import Pgp import Pgp
@ -134,6 +137,9 @@ def authErrors(user_role):
# ============ API CORE ============= # ============ API CORE =============
def create_json_response(data_dict, response_code):
return Response(json.dumps(data_dict, indent=2, sort_keys=True), mimetype='application/json'), int(response_code)
# ============ FUNCTIONS ============ # ============ FUNCTIONS ============
def is_valid_uuid_v4(header_uuid): def is_valid_uuid_v4(header_uuid):
@ -454,6 +460,21 @@ def get_item_cryptocurrency_bitcoin():
return Response(json.dumps(res[0], indent=2, sort_keys=True), mimetype='application/json'), res[1] return Response(json.dumps(res[0], indent=2, sort_keys=True), mimetype='application/json'), res[1]
''' '''
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# # # # # # # # # # # # # # DOMAIN # # # # # # # # # # # # # # # # #
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
@restApi.route("api/v1/get/domain/metadata/minimal", methods=['POST'])
@token_required('analyst')
def get_domain_metadata_minimal():
data = request.get_json()
domain = data.get('domain', None)
# error handler
res = Domain.api_verify_if_domain_exist(domain)
if res:
return create_json_response(res[0], res[1])
res = Domain.get_domain_metadata_basic(domain)
return create_json_response(res, 200)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# # # # # # # # # # # # # IMPORT # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # IMPORT # # # # # # # # # # # # # # # # # #
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #