chg: [app] Improved data selection, condition checking and evaluation definition

This commit is contained in:
Sami Mokaddem 2024-06-28 17:14:05 +02:00
parent d13577212b
commit 1a2ff4c3c9
8 changed files with 107 additions and 66 deletions

View file

@ -134,9 +134,10 @@ def get_model_action(data: dict):
def is_accepted_query(data: dict) -> bool:
model, action = get_model_action(data)
if model in ['Event', 'Attribute', 'Object', 'Tag',]:
if action in ['add', 'edit', 'delete',]:
if data['Log']['change'].startswith('attribute_count'):
return False
if action in ['add', 'edit', 'delete', 'publish']:
# # improved condition below. It blocks some queries
# if data['Log']['change'].startswith('attribute_count'):
# return False
if data['Log']['change'].startswith('Validation errors:'):
return False
return True
@ -218,7 +219,7 @@ def check_inject(user_id: int, inject: dict, data: dict, context: dict) -> bool:
def is_valid_evaluation_context(user_id: int, inject_evaluation: dict, data: dict, context: dict) -> bool:
if 'evaluation_context' not in inject_evaluation:
if 'evaluation_context' not in inject_evaluation or len(inject_evaluation['evaluation_context']) == 0:
return True
if 'request_is_rest' in inject_evaluation['evaluation_context']:
if 'request_is_rest' in context:
@ -266,12 +267,19 @@ def get_data_to_validate(user_id: int, inject_evaluation: dict, data: dict) -> U
def parse_event_id_from_log(data: dict) -> Union[int, None]:
event_id_from_change_field_regex = r".*event_id \(.*\) => \((\d+)\).*"
event_id_from_title_field_regex = r".*from Event \((\d+)\).*"
if 'Log' in data:
log = data['Log']
if 'model' in log and 'model_id' in log and log['model'] == 'Event':
return int(log['model_id'])
if 'change' in log:
event_id_search = re.search(event_id_from_change_field_regex, log['change'], re.IGNORECASE)
if event_id_search is None:
return None
if event_id_search is not None:
event_id = event_id_search.group(1)
return event_id
if 'title' in log:
event_id_search = re.search(event_id_from_title_field_regex, log['title'], re.IGNORECASE)
if event_id_search is not None:
event_id = event_id_search.group(1)
return event_id
return None

View file

@ -236,12 +236,11 @@
}
},
{
".Event.Object[].Attribute": {
"comparison": "contains",
".Event.Object[].Attribute[] | select(.object_relation == \"email-body\")": {
"extract_type": "all",
"comparison": "count",
"values": [
{
"type": "email-body"
}
">=1"
]
}
}
@ -274,16 +273,11 @@
}
},
{
".Event.Object[].Attribute": {
"comparison": "contains",
".Event.Object[].Attribute[] | select((.type == \"filename\")).value": {
"extract_type": "all",
"comparison": "equals",
"values": [
{
"type": "malware-sample"
},
{
"type": "filename",
"value": "cryptolocker.exe"
}
"cryptolocker.exe"
]
}
}
@ -316,12 +310,11 @@
}
},
{
".Event.Object[] | select((.name == \"domain-ip\") or (.name == \"ip-port\"))": {
".Event.Object[] | select((.name == \"domain-ip\") or (.name == \"ip-port\")) | .Attribute[].value": {
"extract_type": "all",
"comparison": "contains",
"values": [
{
"value": "81.177.170.166"
}
"81.177.170.166"
]
}
}
@ -354,12 +347,11 @@
}
},
{
"[.Event.Object[], .Event.Attribute[]]": {
"comparison": "contains",
"[.Event.Object[].Attribute[], .Event.Attribute[]] | .[].value": {
"extract_type": "all",
"comparison": "contains-regex",
"values": [
{
"value": "HKCU\\SOFTWARE\\CryptoLocker VersionInfo"
}
"HKCU.+SOFTWARE.+CryptoLocker.*"
]
}
}
@ -392,12 +384,11 @@
}
},
{
".Event.Object[]": {
"comparison": "contains",
"[.Event.Object[].Attribute[], .Event.Attribute[]] | .[].value": {
"extract_type": "all",
"comparison": "contains-regex",
"values": [
{
"object_relation": "Public"
}
"-----BEGIN PUBLIC KEY-----.*"
]
}
}
@ -430,7 +421,8 @@
}
},
{
".Event.Tag[].name": {
".Event.Tag | select(length > 0) | .[].name": {
"extract_type": "all",
"comparison": "count",
"values": [
">=3"

View file

@ -6,8 +6,12 @@ import operator
# .Event.Attribute[] | select(.value == "evil.exe") | .Tag
def jq_extract(path: str, data: dict):
return jq.compile(path).input_value(data).first()
def jq_extract(path: str, data: dict, extract_type='first'):
query = jq.compile(path).input_value(data)
try:
return query.first() if extract_type == 'first' else query.all()
except StopIteration:
return None
##
@ -15,6 +19,8 @@ def jq_extract(path: str, data: dict):
##
def condition_satisfied(evaluation_config: dict, data_to_validate: Union[dict, list, str]) -> bool:
if type(data_to_validate) is bool:
data_to_validate = "1" if data_to_validate else "0"
if type(data_to_validate) is str:
return eval_condition_str(evaluation_config, data_to_validate)
elif type(data_to_validate) is list:
@ -61,15 +67,54 @@ def eval_condition_list(evaluation_config: dict, data_to_validate: str) -> bool:
if len(values) == 0:
return False
if comparison_type == 'contains' or comparison_type == 'equals':
data_to_validate_set = set(data_to_validate)
values_set = set(values)
if comparison_type == 'contains':
intersection = data_to_validate_set & values_set
if comparison_type == 'contains':
return len(intersection) == len(values_set)
elif comparison_type == 'equals':
intersection = data_to_validate_set & values_set
return len(intersection) == len(values_set) and len(intersection) == len(data_to_validate_set)
if comparison_type == 'contains-regex':
regex = re.compile(values[0])
for candidate in data_to_validate:
if regex.match(candidate):
return True
return False
elif comparison_type == 'count':
value = values[0]
if value.isdigit():
return len(data_to_validate) == value
elif value[:2] in comparators.keys():
count = len(data_to_validate)
value_operator = values[0][:2]
value = int(value[2:])
return comparators[value_operator](count, value)
elif value[0] in comparators.keys():
count = len(data_to_validate)
value_operator = value[0]
value = int(value[1:])
return comparators[value_operator](count, value)
return False
def eval_condition_dict(evaluation_config: dict, data_to_validate: str) -> bool:
comparison_type = evaluation_config['comparison']
values = evaluation_config['values']
comparators = {
'<': operator.lt,
'<=': operator.le,
'>': operator.gt,
'>=': operator.ge,
'=': operator.eq,
}
comparison_type = evaluation_config['comparison']
if comparison_type == 'contains':
pass
elif comparison_type == 'equals':
pass
elif comparison_type == 'count':
if values[0].isdigit():
return len(data_to_validate) == values[0]
@ -81,23 +126,12 @@ def eval_condition_list(evaluation_config: dict, data_to_validate: str) -> bool:
return False
def eval_condition_dict(evaluation_config: dict, data_to_validate: str) -> bool:
print('Condition on dict not supported yet')
comparison_type = evaluation_config['comparison']
if comparison_type == 'contains':
pass
elif comparison_type == 'equals':
pass
elif comparison_type == 'count':
pass
return False
def eval_data_filtering(user_id: int, inject_evaluation: dict, data: dict) -> bool:
for evaluation_params in inject_evaluation['parameters']:
for evaluation_path, evaluation_config in evaluation_params.items():
data_to_validate = jq_extract(evaluation_path, data)
data_to_validate = jq_extract(evaluation_path, data, evaluation_config.get('extract_type', 'first'))
if data_to_validate is None:
print('Could not extract data')
return False
if not condition_satisfied(evaluation_config, data_to_validate):
return False

View file

@ -20,7 +20,7 @@ def get(url, data={}, api_key=misp_apikey):
}
full_url = urljoin(misp_url, url)
response = requests.get(full_url, data=data, headers=headers, verify=not misp_skipssl)
return response.json() if response.headers['content-type'] == 'application/json' else response.text
return response.json() if response.headers['content-type'].startswith('application/json') else response.text
def post(url, data={}, api_key=misp_apikey):
@ -32,7 +32,7 @@ def post(url, data={}, api_key=misp_apikey):
}
full_url = urljoin(misp_url, url)
response = requests.post(full_url, data=json.dumps(data), headers=headers, verify=not misp_skipssl)
return response.json() if response.headers['content-type'] == 'application/json' else response.text
return response.json() if response.headers['content-type'].startswith('application/json') else response.text
def getEvent(event_id: int) -> Union[None, dict]:

View file

@ -101,7 +101,7 @@ def get_notification_message(data: dict) -> dict:
response_code = data.get('response_code', '?')
user_agent = data.get('user_agent', '?')
_, action = get_scope_action_from_url(url)
http_method = 'DELETE' if http_method == 'POST' and action == 'delete' else http_method # small override for UI
http_method = 'DELETE' if (http_method == 'POST' or http_method == 'PUT') and action == 'delete' else http_method # small override for UI
payload = get_request_post_body(data)
return {
'user': user,
@ -115,13 +115,18 @@ def get_notification_message(data: dict) -> dict:
}
def get_scope_action_from_url(url) -> str:
def get_scope_action_from_url(url) -> Union[str, None]:
split = url.split('/')
if len(split) > 2:
return (split[1], split[2],)
else:
return (None, None,)
def is_accepted_notification(notification) -> bool:
if notification['user_agent'] == 'misp-exercise-dashboard':
if notification['user_agent'] == 'misp-exercise-dashboard': # Ignore message generated from this app
return False
if '@' not in notification['user']: # Ignore message from system
return False
scope, action = get_scope_action_from_url(notification['url'])

View file

@ -65,7 +65,7 @@ def handleMessage(topic, s, message):
if topic == 'misp_json_audit':
user_id, email = notification_model.get_user_email_id_pair(data)
if user_id is not None:
if user_id is not None and '@' in email:
if user_id not in db.USER_ID_TO_EMAIL_MAPPING:
db.USER_ID_TO_EMAIL_MAPPING[user_id] = email
sio.emit('new_user', email)
@ -107,8 +107,10 @@ def forward_zmq_to_socketio():
while True:
message = zsocket.recv_string()
topic, s, m = message.partition(" ")
try:
handleMessage(topic, s, m)
try:
pass
# handleMessage(topic, s, m)
except Exception as e:
print(e)