chg: [objs processed] xxhash messages

This commit is contained in:
terrtia 2023-09-07 10:38:03 +02:00
parent b459498db2
commit bb3dad2873
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
3 changed files with 24 additions and 18 deletions

View file

@ -6,7 +6,7 @@ import sys
import datetime import datetime
import time import time
from hashlib import sha256 import xxhash
sys.path.append(os.environ['AIL_BIN']) sys.path.append(os.environ['AIL_BIN'])
################################## ##################################
@ -80,12 +80,12 @@ class AILQueue:
# raise Exception(f'Error: queue {self.name}, no AIL object provided') # raise Exception(f'Error: queue {self.name}, no AIL object provided')
else: else:
obj_global_id, mess = row_mess obj_global_id, mess = row_mess
sha256_mess = sha256(message.encode()).hexdigest() m_hash = xxhash.xxh3_64_hexdigest(message)
add_processed_obj(obj_global_id, sha256_mess, module=self.name) add_processed_obj(obj_global_id, m_hash, module=self.name)
return obj_global_id, sha256_mess, mess return obj_global_id, m_hash, mess
def end_message(self, obj_global_id, sha256_mess): def end_message(self, obj_global_id, m_hash):
end_processed_obj(obj_global_id, sha256_mess, module=self.name) end_processed_obj(obj_global_id, m_hash, module=self.name)
def send_message(self, obj_global_id, message='', queue_name=None): def send_message(self, obj_global_id, message='', queue_name=None):
if not self.subscribers_modules: if not self.subscribers_modules:
@ -100,14 +100,14 @@ class AILQueue:
message = f'{obj_global_id};{message}' message = f'{obj_global_id};{message}'
if obj_global_id != '::': if obj_global_id != '::':
sha256_mess = sha256(message.encode()).hexdigest() m_hash = xxhash.xxh3_64_hexdigest(message)
else: else:
sha256_mess = None m_hash = None
# Add message to all modules # Add message to all modules
for module_name in self.subscribers_modules[queue_name]: for module_name in self.subscribers_modules[queue_name]:
if sha256_mess: if m_hash:
add_processed_obj(obj_global_id, sha256_mess, queue=module_name) add_processed_obj(obj_global_id, m_hash, queue=module_name)
r_queues.rpush(f'queue:{module_name}:in', message) r_queues.rpush(f'queue:{module_name}:in', message)
# stats # stats
@ -192,23 +192,23 @@ def get_processed_obj_queues(obj_global_id):
def get_processed_obj(obj_global_id): def get_processed_obj(obj_global_id):
return {'modules': get_processed_obj_modules(obj_global_id), 'queues': get_processed_obj_queues(obj_global_id)} return {'modules': get_processed_obj_modules(obj_global_id), 'queues': get_processed_obj_queues(obj_global_id)}
def add_processed_obj(obj_global_id, sha256_mess, module=None, queue=None): def add_processed_obj(obj_global_id, m_hash, module=None, queue=None):
obj_type = obj_global_id.split(':', 1)[0] obj_type = obj_global_id.split(':', 1)[0]
new_obj = r_obj_process.sadd(f'objs:process', obj_global_id) new_obj = r_obj_process.sadd(f'objs:process', obj_global_id)
# first process: # first process:
if new_obj: if new_obj:
r_obj_process.zadd(f'objs:process:{obj_type}', {obj_global_id: int(time.time())}) r_obj_process.zadd(f'objs:process:{obj_type}', {obj_global_id: int(time.time())})
if queue: if queue:
r_obj_process.zadd(f'obj:queues:{obj_global_id}', {f'{queue}:{sha256_mess}': int(time.time())}) r_obj_process.zadd(f'obj:queues:{obj_global_id}', {f'{queue}:{m_hash}': int(time.time())})
if module: if module:
r_obj_process.zadd(f'obj:modules:{obj_global_id}', {f'{module}:{sha256_mess}': int(time.time())}) r_obj_process.zadd(f'obj:modules:{obj_global_id}', {f'{module}:{m_hash}': int(time.time())})
r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{module}:{sha256_mess}') r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{module}:{m_hash}')
def end_processed_obj(obj_global_id, sha256_mess, module=None, queue=None): def end_processed_obj(obj_global_id, m_hash, module=None, queue=None):
if queue: if queue:
r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{queue}:{sha256_mess}') r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{queue}:{m_hash}')
if module: if module:
r_obj_process.zrem(f'obj:modules:{obj_global_id}', f'{module}:{sha256_mess}') r_obj_process.zrem(f'obj:modules:{obj_global_id}', f'{module}:{m_hash}')
# TODO HANDLE QUEUE DELETE # TODO HANDLE QUEUE DELETE
# process completed # process completed
@ -322,7 +322,7 @@ def save_queue_digraph():
if __name__ == '__main__': if __name__ == '__main__':
# clear_modules_queues_stats() # clear_modules_queues_stats()
# save_queue_digraph() # save_queue_digraph()
oobj_global_id = 'item::submitted/2023/06/22/submitted_f656119e-f2ea-49d7-9beb-fb97077f8fe5.gz' oobj_global_id = 'item::submitted/2023/09/06/submitted_75fb9ff2-8c91-409d-8bd6-31769d73db8f.gz'
while True: while True:
print(get_processed_obj(oobj_global_id)) print(get_processed_obj(oobj_global_id))
time.sleep(0.5) time.sleep(0.5)

View file

@ -154,6 +154,11 @@ host = localhost
port = 6381 port = 6381
db = 0 db = 0
[Redis_Process]
host = localhost
port = 6381
db = 2
[Redis_Mixer_Cache] [Redis_Mixer_Cache]
host = localhost host = localhost
port = 6381 port = 6381

View file

@ -17,6 +17,7 @@ websockets>9.0
crcmod crcmod
mmh3>2.5 mmh3>2.5
ssdeep>3.3 ssdeep>3.3
xxhash>3.1.0
# ZMQ # ZMQ
zmq zmq