mirror of
https://github.com/ail-project/ail-framework.git
synced 2024-11-10 08:38:28 +00:00
chg: [export MISP] export Items + Screenshots objects with relationships
This commit is contained in:
parent
b63fa51166
commit
4a732ea9f3
7 changed files with 364 additions and 9 deletions
293
bin/export/MispExport.py
Executable file
293
bin/export/MispExport.py
Executable file
|
@ -0,0 +1,293 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*-coding:UTF-8 -*
|
||||
|
||||
import os
|
||||
import sys
|
||||
import uuid
|
||||
import redis
|
||||
|
||||
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib'))
|
||||
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages'))
|
||||
#from Cryptocurrency import cryptocurrency
|
||||
import Decoded
|
||||
import Domain
|
||||
import Item
|
||||
import Screenshot
|
||||
|
||||
import Correlate_object
|
||||
|
||||
# MISP
|
||||
from pymisp import MISPEvent, MISPObject, PyMISP
|
||||
|
||||
|
||||
def tag_misp_object_attributes(l_ref_obj_attr, tags):
|
||||
for obj_attr in l_ref_obj_attr:
|
||||
for tag in tags:
|
||||
obj_attr.add_tag(tag)
|
||||
|
||||
def export_ail_item(item_id):
|
||||
dict_metadata = Item.get_item({'id': item_id, 'date':True, 'tags':True, 'raw_content':True})[0]
|
||||
|
||||
#obj = MISPObject('ail-item', standalone=True)
|
||||
obj = MISPObject('ail-leak', standalone=True)
|
||||
obj.first_seen = dict_metadata['date']
|
||||
|
||||
l_obj_attr = []
|
||||
l_obj_attr.append( obj.add_attribute('first-seen', value=dict_metadata['date']) )
|
||||
l_obj_attr.append( obj.add_attribute('raw-data', value=item_id, data=dict_metadata['raw_content']) )
|
||||
|
||||
# add tags
|
||||
if dict_metadata['tags']:
|
||||
tag_misp_object_attributes(l_obj_attr, dict_metadata['tags'])
|
||||
return obj
|
||||
|
||||
# # TODO: create domain-port-history object
|
||||
def export_domain(domain):
|
||||
domain_obj = Domain.Domain(domain)
|
||||
dict_metadata = domain_obj.get_domain_metadata(tags=True)
|
||||
|
||||
# create domain-ip obj
|
||||
obj = MISPObject('domain-ip', standalone=True)
|
||||
obj.first_seen = dict_metadata['first_seen']
|
||||
obj.last_seen = dict_metadata['last_check']
|
||||
|
||||
l_obj_attr = []
|
||||
l_obj_attr.append( obj.add_attribute('first-seen', value=dict_metadata['first_seen']) )
|
||||
l_obj_attr.append( obj.add_attribute('last-seen', value=dict_metadata['last_check']) )
|
||||
l_obj_attr.append( obj.add_attribute('domain', value=domain) )
|
||||
|
||||
# add tags
|
||||
if dict_metadata['tags']:
|
||||
tag_misp_object_attributes(l_obj_attr, dict_metadata['tags'])
|
||||
|
||||
#print(obj.to_json())
|
||||
return obj
|
||||
|
||||
# TODO: add tags
|
||||
def export_decoded(sha1_string):
|
||||
|
||||
decoded_metadata = Decoded.get_decoded_metadata(sha1_string, tag=True)
|
||||
|
||||
obj = MISPObject('file')
|
||||
obj.first_seen = decoded_metadata['first_seen']
|
||||
obj.last_seen = decoded_metadata['last_seen']
|
||||
|
||||
l_obj_attr = []
|
||||
l_obj_attr.append( obj.add_attribute('sha1', value=sha1_string) )
|
||||
l_obj_attr.append( obj.add_attribute('mimetype', value=Decoded.get_decoded_item_type(sha1_string)) )
|
||||
l_obj_attr.append( obj.add_attribute('malware-sample', value=sha1_string, data=Decoded.get_decoded_file_content(sha1_string)) )
|
||||
|
||||
# add tags
|
||||
if decoded_metadata['tags']:
|
||||
tag_misp_object_attributes(l_obj_attr, decoded_metadata['tags'])
|
||||
|
||||
return obj
|
||||
|
||||
# TODO: add tags
|
||||
def export_screenshot(sha256_string):
|
||||
obj = MISPObject('file')
|
||||
|
||||
l_obj_attr = []
|
||||
l_obj_attr.append( obj.add_attribute('sha256', value=sha256_string) )
|
||||
l_obj_attr.append( obj.add_attribute('attachment', value=sha256_string, data=Screenshot.get_screenshot_file_content(sha256_string)) )
|
||||
|
||||
# add tags
|
||||
tags = Screenshot.get_screenshot_tags(sha256_string)
|
||||
if tags:
|
||||
tag_misp_object_attributes(l_obj_attr, tags)
|
||||
|
||||
return obj
|
||||
|
||||
# TODO: add tags
|
||||
def export_cryptocurrency(crypto_type, crypto_address):
|
||||
dict_metadata = cryptocurrency.get_metadata(crypto_address, crypto_type)
|
||||
|
||||
obj = MISPObject('coin-address')
|
||||
obj.first_seen = dict_metadata['first_seen']
|
||||
obj.last_seen = dict_metadata['last_seen']
|
||||
|
||||
l_obj_attr = []
|
||||
l_obj_attr.append( obj.add_attribute('address', value=crypto_address) )
|
||||
#l_obj_attr.append( obj.add_attribute('symbol', value=Cryptocurrency.get_cryptocurrency_symbol(crypto_type)) )
|
||||
l_obj_attr.append( obj.add_attribute('first-seen', value=dict_metadata['first_seen']) )
|
||||
l_obj_attr.append( obj.add_attribute('last-seen', value=dict_metadata['last_seen']) )
|
||||
|
||||
return obj
|
||||
|
||||
# filter objects to export, export only object who correlect which each other
|
||||
def filter_obj_linked(l_obj):
|
||||
for obj in l_obj:
|
||||
res = Correlate_object.get_object_correlation(obj['type'], obj['id'], obj.get('subtype', None))
|
||||
print(res)
|
||||
|
||||
|
||||
def export_object_list(l_obj, mode='union'):
|
||||
# filter elements to export
|
||||
if mode=='linked':
|
||||
filter_obj_linked(l_obj)
|
||||
|
||||
def add_relation_ship_to_create(set_relationship, dict_obj, dict_new_obj):
|
||||
global_id = Correlate_object.get_obj_global_id(dict_obj['type'], dict_obj['id'], dict_obj.get('subtype', None))
|
||||
global_id_new = Correlate_object.get_obj_global_id(dict_new_obj['type'], dict_new_obj['id'], dict_new_obj.get('subtype', None))
|
||||
if global_id > global_id_new:
|
||||
res = (global_id, global_id_new)
|
||||
else:
|
||||
res = (global_id_new, global_id)
|
||||
set_relationship.add( res )
|
||||
|
||||
# # TODO: add action by obj type
|
||||
# ex => Domain
|
||||
def add_obj_to_create(all_obj_to_export, set_relationship, dict_obj):
|
||||
all_obj_to_export.add(Correlate_object.get_obj_global_id(dict_obj['type'], dict_obj['id'], dict_obj.get('subtype', None)))
|
||||
|
||||
def add_obj_to_create_by_lvl(all_obj_to_export, set_relationship, dict_obj, lvl):
|
||||
# # TODO: filter by export mode or filter on all global ?
|
||||
if lvl >= 0:
|
||||
add_obj_to_create(all_obj_to_export, add_obj_to_create, dict_obj)
|
||||
|
||||
if lvl > 0:
|
||||
lvl = lvl - 1
|
||||
|
||||
# # TODO: filter by correlation types
|
||||
obj_correlations = Correlate_object.get_object_correlation(dict_obj['type'], dict_obj['id'], dict_obj.get('subtype', None))
|
||||
for obj_type in obj_correlations:
|
||||
dict_new_obj = {'type': obj_type}
|
||||
if obj_type=='pgp' or obj_type=='cryptocurrency':
|
||||
for subtype in obj_correlations[obj_type]:
|
||||
dict_new_obj['subtype'] = subtype
|
||||
for obj_id in obj_correlations[obj_type][subtype]:
|
||||
dict_new_obj['id'] = obj_id
|
||||
add_obj_to_create_by_lvl(all_obj_to_export, set_relationship, dict_new_obj, lvl)
|
||||
add_relation_ship_to_create(set_relationship, dict_obj, dict_new_obj)
|
||||
|
||||
else:
|
||||
for obj_id in obj_correlations[obj_type]:
|
||||
dict_new_obj['id'] = obj_id
|
||||
add_obj_to_create_by_lvl(all_obj_to_export, set_relationship, dict_new_obj, lvl)
|
||||
add_relation_ship_to_create(set_relationship, dict_obj, dict_new_obj)
|
||||
|
||||
|
||||
add_obj_to_create_by_lvl(all_obj_to_export, set_relationship, dict_obj, lvl)
|
||||
|
||||
|
||||
def create_list_of_objs_to_export(l_obj, mode='union'):
|
||||
all_obj_to_export = set()
|
||||
set_relationship = set()
|
||||
for obj in l_obj:
|
||||
add_obj_to_create_by_lvl(all_obj_to_export, set_relationship, obj, obj.get('lvl', 1))
|
||||
|
||||
# create MISP objects
|
||||
dict_misp_obj = create_all_misp_obj(all_obj_to_export, set_relationship)
|
||||
|
||||
# create object relationships
|
||||
for obj_global_id_1, obj_global_id_2 in set_relationship:
|
||||
dict_relationship = get_relationship_between_global_obj(obj_global_id_1, obj_global_id_2)
|
||||
if dict_relationship:
|
||||
obj_src = dict_misp_obj[dict_relationship['src']]
|
||||
obj_dest = dict_misp_obj[dict_relationship['dest']]
|
||||
obj_src.add_reference(obj_dest.uuid, dict_relationship['relation'], 'add a comment')
|
||||
|
||||
event = MISPEvent()
|
||||
event.info = 'AIL framework export'
|
||||
for obj_global_id in dict_misp_obj:
|
||||
misp_obj = dict_misp_obj[obj_global_id]
|
||||
if misp_obj:
|
||||
# add object to event
|
||||
event.add_object(dict_misp_obj[obj_global_id])
|
||||
|
||||
#print(event.to_json())
|
||||
|
||||
misp = PyMISP('https://127.0.0.1:8443/', 'uXgcN42b7xuL88XqK5hubwD8Q8596VrrBvkHQzB0', False)
|
||||
misp.add_event(event, pythonify=True)
|
||||
|
||||
|
||||
def create_all_misp_obj(all_obj_to_export, set_relationship):
|
||||
dict_misp_obj = {}
|
||||
for obj_global_id in all_obj_to_export:
|
||||
obj_type, obj_id = obj_global_id.split(':', 1)
|
||||
dict_misp_obj[obj_global_id] = create_misp_obj(obj_type, obj_id)
|
||||
|
||||
return dict_misp_obj
|
||||
|
||||
def create_misp_obj(obj_type, obj_id):
|
||||
if obj_type == 'item':
|
||||
return export_ail_item(obj_id)
|
||||
elif obj_type == 'decoded':
|
||||
return export_decoded(obj_id)
|
||||
elif obj_type == 'image':
|
||||
return export_screenshot(obj_id)
|
||||
elif obj_type == 'cryptocurrency':
|
||||
obj_subtype, obj_id = obj_id.split(':', 1)
|
||||
return export_cryptocurrency(obj_subtype, obj_id)
|
||||
elif obj_type == 'pgp':
|
||||
obj_subtype, obj_id = obj_id.split(':', 1)
|
||||
pass
|
||||
elif obj_type == 'domain':
|
||||
return export_domain(obj_id)
|
||||
|
||||
def get_relationship_between_global_obj(obj_global_id_1, obj_global_id_2):
|
||||
obj_type_1 = obj_global_id_1.split(':', 1)[0]
|
||||
obj_type_2 = obj_global_id_2.split(':', 1)[0]
|
||||
type_tuple = [obj_type_1, obj_type_2]
|
||||
|
||||
if 'image' in type_tuple: # or screenshot ## TODO:
|
||||
if obj_type_1 == 'image':
|
||||
src = obj_global_id_1
|
||||
dest = obj_global_id_2
|
||||
else:
|
||||
src = obj_global_id_2
|
||||
dest = obj_global_id_1
|
||||
return {'relation': 'screenshot-of', 'src': src, 'dest': dest}
|
||||
elif 'decoded' in type_tuple:
|
||||
if obj_type_1 == 'decoded':
|
||||
src = obj_global_id_1
|
||||
dest = obj_global_id_2
|
||||
else:
|
||||
src = obj_global_id_2
|
||||
dest = obj_global_id_1
|
||||
return {'relation': 'included-in', 'src': src, 'dest': dest}
|
||||
elif 'pgp':
|
||||
return None
|
||||
elif 'cryptocurrency':
|
||||
return None
|
||||
elif 'domain':
|
||||
return None
|
||||
|
||||
######
|
||||
#
|
||||
# EXPORT LVL DEFINITION:
|
||||
#
|
||||
# LVL 0 => PARTIAL Only add core item Correlation
|
||||
# LVL 1 => DETAILED Also add correlated_items correlation
|
||||
######
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
l_obj = [{'id': 'crawled/2019/11/08/6d3zimnpbwbzdgnp.onionf58258c8-c990-4707-b236-762a2b881183', 'type': 'item', 'lvl': 3},
|
||||
{'id': '6d3zimnpbwbzdgnp.onion', 'type': 'domain', 'lvl': 0},
|
||||
#{'id': '0xA4BB02A75E6AF448', 'type': 'pgp', 'subtype': 'key', 'lvl': 0},
|
||||
{'id': 'a92d459f70c4dea8a14688f585a5e2364be8b91fbf924290ead361d9b909dcf1', 'type': 'image', 'lvl': 3}]
|
||||
create_list_of_objs_to_export(l_obj, mode='union')
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#event = MISPEvent()
|
||||
#event.info = 'AIL framework export'
|
||||
#
|
||||
# obj_item = export_ail_item('crawled/2019/11/08/6d3zimnpbwbzdgnp.onionf58258c8-c990-4707-b236-762a2b881183')
|
||||
# event.add_object(obj_item)
|
||||
#
|
||||
# obj_domain = export_domain('2222222222xkrmay.onion')
|
||||
# event.add_object(obj_domain)
|
||||
#
|
||||
# obj_decoded = export_decoded('fc351baadefce6f702155fb908a9e84dd5dd0fa7')
|
||||
# obj_decoded.add_reference(obj_domain.uuid, 'injected-into', 'add a comment')
|
||||
# event.add_object(obj_decoded)
|
||||
|
||||
# obj_screenshot = export_screenshot('5fcc292ea8a699aa7a9ce93a704b78b8f493620ccdb2a5cebacb1069a4327211')
|
||||
# obj_screenshot.add_reference(obj_domain.uuid, 'screenshot-of')
|
||||
# event.add_object(obj_screenshot)
|
||||
|
||||
#print(event.to_json())
|
|
@ -47,7 +47,7 @@ def get_all_correlation_objects():
|
|||
def exist_object(object_type, correlation_id, type_id=None):
|
||||
if object_type == 'domain':
|
||||
return Domain.verify_if_domain_exist(correlation_id)
|
||||
elif object_type == 'paste':
|
||||
elif object_type == 'paste' or object_type == 'item':
|
||||
return Item.exist_item(correlation_id)
|
||||
elif object_type == 'decoded':
|
||||
return Decoded.exist_decoded(correlation_id)
|
||||
|
@ -75,19 +75,19 @@ def get_object_metadata(object_type, correlation_id, type_id=None):
|
|||
elif object_type == 'screenshot' or object_type == 'image':
|
||||
return Screenshot.get_metadata(correlation_id)
|
||||
|
||||
def get_object_correlation(object_type, value, correlation_names, correlation_objects, requested_correl_type=None):
|
||||
def get_object_correlation(object_type, value, correlation_names=None, correlation_objects=None, requested_correl_type=None):
|
||||
if object_type == 'domain':
|
||||
return Domain.get_domain_all_correlation(value, correlation_names=correlation_names)
|
||||
elif object_type == 'paste':
|
||||
elif object_type == 'paste' or object_type == 'item':
|
||||
return Item.get_item_all_correlation(value, correlation_names=correlation_names)
|
||||
elif object_type == 'decoded':
|
||||
return Decoded.get_decoded_correlated_object(value, correlation_objects)
|
||||
return Decoded.get_decoded_correlated_object(value, correlation_objects=correlation_objects)
|
||||
elif object_type == 'pgp':
|
||||
return Pgp.pgp.get_correlation_all_object(requested_correl_type, value, correlation_objects=correlation_objects)
|
||||
elif object_type == 'cryptocurrency':
|
||||
return Cryptocurrency.cryptocurrency.get_correlation_all_object(requested_correl_type, value, correlation_objects=correlation_objects)
|
||||
elif object_type == 'screenshot' or object_type == 'image':
|
||||
return Screenshot.get_screenshot_correlated_object(value, correlation_objects)
|
||||
return Screenshot.get_screenshot_correlated_object(value, correlation_objects=correlation_objects)
|
||||
return {}
|
||||
|
||||
def get_correlation_node_icon(correlation_name, correlation_type=None, value=None):
|
||||
|
@ -328,6 +328,18 @@ def get_graph_node_object_correlation(object_type, root_value, mode, correlation
|
|||
return {"nodes": create_graph_nodes(nodes, root_node_id), "links": create_graph_links(links)}
|
||||
|
||||
|
||||
def get_obj_global_id(obj_type, obj_id, obj_sub_type=None):
|
||||
if obj_sub_type:
|
||||
return '{}:{}:{}'.format(obj_type, obj_sub_type, obj_id)
|
||||
else:
|
||||
# # TODO: remove me
|
||||
if obj_type=='paste':
|
||||
obj_type='item'
|
||||
# # TODO: remove me
|
||||
if obj_type=='screenshot':
|
||||
obj_type='image'
|
||||
|
||||
return '{}:{}'.format(obj_type, obj_id)
|
||||
|
||||
######## API EXPOSED ########
|
||||
def sanitize_object_type(object_type):
|
||||
|
|
|
@ -5,6 +5,7 @@ import os
|
|||
import sys
|
||||
import redis
|
||||
|
||||
from io import BytesIO
|
||||
|
||||
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages'))
|
||||
import Item
|
||||
|
@ -16,6 +17,7 @@ import ConfigLoader
|
|||
|
||||
config_loader = ConfigLoader.ConfigLoader()
|
||||
r_serv_metadata = config_loader.get_redis_conn("ARDB_Metadata")
|
||||
HASH_DIR = config_loader.get_config_str('Directories', 'hash')
|
||||
config_loader = None
|
||||
|
||||
def get_decoded_item_type(sha1_string):
|
||||
|
@ -40,6 +42,14 @@ def nb_decoded_item_size(sha1_string):
|
|||
else:
|
||||
return int(nb)
|
||||
|
||||
def get_decoded_relative_path(sha1_string, mimetype=None):
|
||||
if not mimetype:
|
||||
mimetype = get_decoded_item_type(sha1_string)
|
||||
return os.path.join(HASH_DIR, mimetype, sha1_string[0:2], sha1_string)
|
||||
|
||||
def get_decoded_filepath(sha1_string, mimetype=None):
|
||||
return os.path.join(os.environ['AIL_HOME'], get_decoded_relative_path(sha1_string, mimetype=mimetype))
|
||||
|
||||
def exist_decoded(sha1_string):
|
||||
return r_serv_metadata.exists('metadata_hash:{}'.format(sha1_string))
|
||||
|
||||
|
@ -150,3 +160,10 @@ def get_decoded_correlated_object(sha1_string, correlation_objects=[]):
|
|||
def save_domain_decoded(domain, sha1_string):
|
||||
r_serv_metadata.sadd('hash_domain:{}'.format(domain), sha1_string) # domain - hash map
|
||||
r_serv_metadata.sadd('domain_hash:{}'.format(sha1_string), domain) # hash - domain ma
|
||||
|
||||
|
||||
def get_decoded_file_content(sha1_string, mimetype=None):
|
||||
filepath = get_decoded_filepath(sha1_string, mimetype=mimetype)
|
||||
with open(filepath, 'rb') as f:
|
||||
file_content = BytesIO(f.read())
|
||||
return file_content
|
||||
|
|
|
@ -5,13 +5,14 @@ import os
|
|||
import sys
|
||||
import redis
|
||||
|
||||
from io import BytesIO
|
||||
|
||||
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages'))
|
||||
import Item
|
||||
import Date
|
||||
import Tag
|
||||
|
||||
|
||||
import Correlate_object
|
||||
import ConfigLoader
|
||||
|
||||
config_loader = ConfigLoader.ConfigLoader()
|
||||
|
@ -27,8 +28,11 @@ def get_screenshot_rel_path(sha256_string, add_extension=False):
|
|||
screenshot_path = screenshot_path + '.png'
|
||||
return screenshot_path
|
||||
|
||||
def get_screenshot_filepath(sha256_string):
|
||||
return os.path.join(SCREENSHOT_FOLDER, get_screenshot_rel_path(sha256_string, add_extension=True))
|
||||
|
||||
def exist_screenshot(sha256_string):
|
||||
screenshot_path = os.path.join(SCREENSHOT_FOLDER, get_screenshot_rel_path(sha256_string, add_extension=True))
|
||||
screenshot_path = get_screenshot_filepath(sha256_string)
|
||||
return os.path.isfile(screenshot_path)
|
||||
|
||||
def get_metadata(sha256_string):
|
||||
|
@ -110,7 +114,7 @@ def get_screenshot_correlated_object(sha256_string, correlation_objects=[]):
|
|||
:rtype: dict
|
||||
'''
|
||||
if correlation_objects is None:
|
||||
correlation_objects = Correlation.get_all_correlation_objects()
|
||||
correlation_objects = Correlate_object.get_all_correlation_objects()
|
||||
decoded_correlation = {}
|
||||
for correlation_object in correlation_objects:
|
||||
if correlation_object == 'paste':
|
||||
|
@ -122,3 +126,10 @@ def get_screenshot_correlated_object(sha256_string, correlation_objects=[]):
|
|||
if res:
|
||||
decoded_correlation[correlation_object] = res
|
||||
return decoded_correlation
|
||||
|
||||
|
||||
def get_screenshot_file_content(sha256_string):
|
||||
filepath = get_screenshot_filepath(sha256_string)
|
||||
with open(filepath, 'rb') as f:
|
||||
file_content = BytesIO(f.read())
|
||||
return file_content
|
||||
|
|
|
@ -7,6 +7,7 @@ import redis
|
|||
|
||||
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/'))
|
||||
import ConfigLoader
|
||||
import Correlate_object
|
||||
|
||||
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages/'))
|
||||
import Date
|
||||
|
@ -235,7 +236,7 @@ class Correlation(object):
|
|||
|
||||
def get_correlation_all_object(self, correlation_type, correlation_value, correlation_objects=[]):
|
||||
if correlation_objects is None:
|
||||
correlation_objects = get_all_correlation_objects()
|
||||
correlation_objects = Correlate_object.get_all_correlation_objects()
|
||||
correlation_obj = {}
|
||||
for correlation_object in correlation_objects:
|
||||
if correlation_object == 'paste':
|
||||
|
|
|
@ -94,3 +94,9 @@ def save_cryptocurrency_data(cryptocurrency_name, date, item_path, cryptocurrenc
|
|||
domain = Item.get_item_domain(item_path)
|
||||
r_serv_metadata.sadd('domain_cryptocurrency_{}:{}'.format(cryptocurrency_name, domain), cryptocurrency_address)
|
||||
r_serv_metadata.sadd('set_domain_cryptocurrency_{}:{}'.format(cryptocurrency_name, cryptocurrency_address), domain)
|
||||
|
||||
def get_cryptocurrency_symbol(crypto_type):
|
||||
if type=='bitcoin':
|
||||
return 'BTC'
|
||||
else:
|
||||
return ''
|
||||
|
|
|
@ -6,6 +6,8 @@ import sys
|
|||
import gzip
|
||||
import redis
|
||||
|
||||
from io import BytesIO
|
||||
|
||||
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages/'))
|
||||
import Date
|
||||
import Tag
|
||||
|
@ -35,6 +37,9 @@ def exist_item(item_id):
|
|||
def get_item_id(full_path):
|
||||
return full_path.replace(PASTES_FOLDER, '', 1)
|
||||
|
||||
def get_item_filepath(item_id):
|
||||
return os.path.join(PASTES_FOLDER, item_id)
|
||||
|
||||
def get_item_date(item_id, add_separator=False):
|
||||
l_directory = item_id.split('/')
|
||||
if add_separator:
|
||||
|
@ -115,6 +120,10 @@ def get_item(request_dict):
|
|||
# UTF-8 outpout, # TODO: use base64
|
||||
dict_item['content'] = get_item_content(item_id)
|
||||
|
||||
raw_content = request_dict.get('raw_content', False)
|
||||
if raw_content:
|
||||
dict_item['raw_content'] = get_raw_content(item_id)
|
||||
|
||||
lines_info = request_dict.get('lines', False)
|
||||
if lines_info:
|
||||
dict_item['lines'] = get_lines_info(item_id, dict_item.get('content', 'None'))
|
||||
|
@ -293,3 +302,9 @@ def get_item_har_name(item_id):
|
|||
|
||||
def get_item_har(har_path):
|
||||
pass
|
||||
|
||||
def get_raw_content(item_id):
|
||||
filepath = get_item_filepath(item_id)
|
||||
with open(filepath, 'rb') as f:
|
||||
file_content = BytesIO(f.read())
|
||||
return file_content
|
||||
|
|
Loading…
Reference in a new issue