mirror of
https://github.com/ail-project/ail-framework.git
synced 2024-11-27 00:07:16 +00:00
Added backward support + renamed index with timestamp
This commit is contained in:
parent
381e72ee99
commit
4005b72180
3 changed files with 55 additions and 22 deletions
|
@ -25,15 +25,23 @@ INDEX_SIZE_THRESHOLD = 500 #Mb
|
||||||
TIME_WAIT = 1.0 #sec
|
TIME_WAIT = 1.0 #sec
|
||||||
|
|
||||||
# return in bytes
|
# return in bytes
|
||||||
def check_index_size(indexnum):
|
def check_index_size(indexname):
|
||||||
global baseindexpath
|
global baseindexpath
|
||||||
the_index_name = "index_"+str(indexnum) if indexnum != 0 else "old_index"
|
the_index_name = indexname if indexname != "0" else "old_index"
|
||||||
the_index_name = os.path.join(baseindexpath, the_index_name)
|
the_index_name = join(baseindexpath, the_index_name)
|
||||||
cur_sum = 0
|
cur_sum = 0
|
||||||
for root, dirs, files in os.walk(the_index_name):
|
for root, dirs, files in os.walk(the_index_name):
|
||||||
cur_sum += sum(getsize(join(root, name)) for name in files)
|
cur_sum += sum(getsize(join(root, name)) for name in files)
|
||||||
return cur_sum
|
return cur_sum
|
||||||
|
|
||||||
|
def move_index_into_old_index_folder(baseindexpath):
|
||||||
|
command_move = "mv {} {}"
|
||||||
|
command_dir = "mkdir {}"
|
||||||
|
os.system(command_dir.format(join(baseindexpath, "old_index")))
|
||||||
|
for files in os.listdir(baseindexpath):
|
||||||
|
if not files == "old_index":
|
||||||
|
os.system(command_move.format(join(baseindexpath, files), join(join(baseindexpath, "old_index"), files)))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
publisher.port = 6380
|
publisher.port = 6380
|
||||||
|
@ -44,9 +52,9 @@ if __name__ == "__main__":
|
||||||
p = Process(config_section)
|
p = Process(config_section)
|
||||||
|
|
||||||
# Indexer configuration - index dir and schema setup
|
# Indexer configuration - index dir and schema setup
|
||||||
baseindexpath = os.path.join(os.environ['AIL_HOME'],
|
baseindexpath = join(os.environ['AIL_HOME'],
|
||||||
p.config.get("Indexer", "path"))
|
p.config.get("Indexer", "path"))
|
||||||
indexRegister_path = os.path.join(os.environ['AIL_HOME'],
|
indexRegister_path = join(os.environ['AIL_HOME'],
|
||||||
p.config.get("Indexer", "register"))
|
p.config.get("Indexer", "register"))
|
||||||
indexertype = p.config.get("Indexer", "type")
|
indexertype = p.config.get("Indexer", "type")
|
||||||
if indexertype == "whoosh":
|
if indexertype == "whoosh":
|
||||||
|
@ -57,23 +65,33 @@ if __name__ == "__main__":
|
||||||
os.mkdir(baseindexpath)
|
os.mkdir(baseindexpath)
|
||||||
|
|
||||||
# create the index register if not present
|
# create the index register if not present
|
||||||
if not os.path.isfile(indexRegister_path):
|
time_now = int(time.time())
|
||||||
|
if not os.path.isfile(indexRegister_path): #index are not organised
|
||||||
|
#move all files to old_index folder
|
||||||
|
move_index_into_old_index_folder(baseindexpath)
|
||||||
|
#create all_index.txt
|
||||||
with open(indexRegister_path, 'w') as f:
|
with open(indexRegister_path, 'w') as f:
|
||||||
f.write("1")
|
f.write(str(time_now))
|
||||||
|
#create dir
|
||||||
|
os.system("mkdir "+join(baseindexpath, str(time_now)))
|
||||||
|
|
||||||
with open(indexRegister_path, "r") as f:
|
with open(indexRegister_path, "r") as f:
|
||||||
allIndex = f.read()
|
allIndex = f.read()
|
||||||
allIndex = allIndex.split(',')
|
allIndex = allIndex.split(',') # format [time1,time2]
|
||||||
allIndex.sort()
|
allIndex.sort()
|
||||||
indexnum = int(allIndex[-1])
|
|
||||||
|
|
||||||
indexpath = os.path.join(baseindexpath, "index_"+str(indexnum))
|
try:
|
||||||
|
indexname = allIndex[-1].strip('\n\r')
|
||||||
|
except IndexError as e:
|
||||||
|
indexname = time_now
|
||||||
|
|
||||||
|
indexpath = join(baseindexpath, str(indexname))
|
||||||
if not exists_in(indexpath):
|
if not exists_in(indexpath):
|
||||||
ix = create_in(indexpath, schema)
|
ix = create_in(indexpath, schema)
|
||||||
else:
|
else:
|
||||||
ix = open_dir(indexpath)
|
ix = open_dir(indexpath)
|
||||||
|
|
||||||
last_refresh = time.time()
|
last_refresh = time_now
|
||||||
|
|
||||||
# LOGGING #
|
# LOGGING #
|
||||||
publisher.info("ZMQ Indexer is Running")
|
publisher.info("ZMQ Indexer is Running")
|
||||||
|
@ -90,17 +108,19 @@ if __name__ == "__main__":
|
||||||
continue
|
continue
|
||||||
docpath = message.split(" ", -1)[-1]
|
docpath = message.split(" ", -1)[-1]
|
||||||
paste = PST.get_p_content()
|
paste = PST.get_p_content()
|
||||||
print "Indexing :", docpath
|
print "Indexing - "+indexname+" :", docpath
|
||||||
|
|
||||||
|
|
||||||
if time.time() - last_refresh > TIME_WAIT: #avoid calculating the index's size at each message
|
if time.time() - last_refresh > TIME_WAIT: #avoid calculating the index's size at each message
|
||||||
last_refresh = time.time()
|
last_refresh = time.time()
|
||||||
if check_index_size(indexnum) > INDEX_SIZE_THRESHOLD*(1000*1000):
|
if check_index_size(indexname) > INDEX_SIZE_THRESHOLD*(1000*1000):
|
||||||
indexpath = os.path.join(baseindexpath, "index_"+str(indexnum+1))
|
timestamp = int(time.time())
|
||||||
ix = create_in(indexpath, schema, indexname=str(indexnum+1))
|
indexpath = join(baseindexpath, str(timestamp))
|
||||||
|
ix = create_in(indexpath, schema)
|
||||||
|
indexname = str(timestamp)
|
||||||
## Correctly handle the file
|
## Correctly handle the file
|
||||||
with open(indexRegister_path, "a") as f:
|
with open(indexRegister_path, "a") as f:
|
||||||
f.write(","+str(indexnum))
|
f.write(","+str(timestamp))
|
||||||
|
|
||||||
|
|
||||||
if indexertype == "whoosh":
|
if indexertype == "whoosh":
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
import redis
|
import redis
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
import datetime
|
||||||
import flask
|
import flask
|
||||||
from flask import Flask, render_template, jsonify, request
|
from flask import Flask, render_template, jsonify, request
|
||||||
|
|
||||||
|
@ -33,10 +34,13 @@ indexRegister_path = os.path.join(os.environ['AIL_HOME'],
|
||||||
def get_current_index():
|
def get_current_index():
|
||||||
with open(indexRegister_path, "r") as f:
|
with open(indexRegister_path, "r") as f:
|
||||||
allIndex = f.read()
|
allIndex = f.read()
|
||||||
allIndex = allIndex.split(',')
|
allIndex = allIndex.split(',') # format [time1,time2]
|
||||||
allIndex.sort()
|
allIndex.sort()
|
||||||
indexnum = int(allIndex[-1])
|
try:
|
||||||
indexpath = os.path.join(baseindexpath, "index_"+str(indexnum))
|
indexname = allIndex[-1].strip('\n\r')
|
||||||
|
except IndexError as e:
|
||||||
|
indexname = "no-index"
|
||||||
|
indexpath = os.path.join(baseindexpath, indexname)
|
||||||
return indexpath
|
return indexpath
|
||||||
|
|
||||||
def get_index_list(selected_index=""):
|
def get_index_list(selected_index=""):
|
||||||
|
@ -44,23 +48,32 @@ def get_index_list(selected_index=""):
|
||||||
for dirs in os.listdir(baseindexpath):
|
for dirs in os.listdir(baseindexpath):
|
||||||
if os.path.isdir(os.path.join(baseindexpath, dirs)):
|
if os.path.isdir(os.path.join(baseindexpath, dirs)):
|
||||||
value = dirs
|
value = dirs
|
||||||
name = dirs + " - " + \
|
name = to_iso_date(dirs) + " - " + \
|
||||||
str(get_dir_size(dirs) / (1000*1000)) + " Mb " + \
|
str(get_dir_size(dirs) / (1000*1000)) + " Mb " + \
|
||||||
"(" + str(get_item_count(dirs)) + " Items" + ")"
|
"(" + str(get_item_count(dirs)) + " Items" + ")"
|
||||||
flag = dirs==selected_index.split('/')[-1]
|
flag = dirs==selected_index.split('/')[-1]
|
||||||
index_list.append([ value, name, flag])
|
index_list.append([ value, name, flag])
|
||||||
|
|
||||||
return index_list
|
return index_list
|
||||||
|
|
||||||
def get_dir_size(directory):
|
def get_dir_size(directory):
|
||||||
cur_sum = 0
|
cur_sum = 0
|
||||||
for directory, subdirs, files in os.walk(os.path.join(baseindexpath,directory)):
|
for directory, subdirs, files in os.walk(os.path.join(baseindexpath,directory)):
|
||||||
cur_sum += sum(os.path.getsize(os.path.join(directory, name)) for name in files)
|
try:
|
||||||
|
cur_sum += sum(os.path.getsize(os.path.join(directory, name)) for name in files)
|
||||||
|
except OSError as e: #File disappeared
|
||||||
|
pass
|
||||||
return cur_sum
|
return cur_sum
|
||||||
|
|
||||||
def get_item_count(dirs):
|
def get_item_count(dirs):
|
||||||
ix = index.open_dir(os.path.join(baseindexpath, dirs))
|
ix = index.open_dir(os.path.join(baseindexpath, dirs))
|
||||||
return ix.doc_count_all()
|
return ix.doc_count_all()
|
||||||
|
|
||||||
|
def to_iso_date(timestamp):
|
||||||
|
if timestamp == "old_index":
|
||||||
|
return "old_index"
|
||||||
|
return str(datetime.datetime.fromtimestamp(int(timestamp))).split()[0]
|
||||||
|
|
||||||
|
|
||||||
# ============ ROUTES ============
|
# ============ ROUTES ============
|
||||||
|
|
||||||
|
|
|
@ -92,7 +92,7 @@
|
||||||
<div class="row">
|
<div class="row">
|
||||||
<div class="col-md-12">
|
<div class="col-md-12">
|
||||||
<strong style="">Index: </strong>
|
<strong style="">Index: </strong>
|
||||||
<select class="form-control" id="index_name" style="display: inline-block; margin-bottom: 5px; width: 25%">
|
<select class="form-control" id="index_name" style="display: inline-block; margin-bottom: 5px; width: 30%">
|
||||||
{% for indexElem in index_list %}
|
{% for indexElem in index_list %}
|
||||||
<option {% if indexElem[2] %} selected="selected" {% endif %} value="{{ indexElem[0] }}" >{{ indexElem[1] }}</option>
|
<option {% if indexElem[2] %} selected="selected" {% endif %} value="{{ indexElem[0] }}" >{{ indexElem[1] }}</option>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
|
|
Loading…
Reference in a new issue