batch_secgen.rb: Threads have their own DB connections (fixes the message type 0xXX arrived from server while idle problem); List now accepts status types (e.g. --running, --failed/--error, --todo); and other minor tweaks

This commit is contained in:
thomashaw
2017-11-06 13:05:09 +00:00
parent 818752fef3
commit 60f7618808

View File

@@ -7,6 +7,8 @@ require_relative '../helpers/print.rb'
require_relative '../helpers/constants.rb'
# Globals
@status_enum = {:todo => 'todo', :running => 'running', :success => 'success', :error => 'error', :failed => 'error'}
@prepared_statements = []
@secgen_args = ''
@ranges_in_table = nil
@@ -76,14 +78,17 @@ end
def get_list_opts
list_options = misc_opts + [['--id', GetoptLong::REQUIRED_ARGUMENT],
['--all', GetoptLong::OPTIONAL_ARGUMENT]]
['--all', GetoptLong::OPTIONAL_ARGUMENT],
['--todo', GetoptLong::NO_ARGUMENT],
['--running', GetoptLong::NO_ARGUMENT],
['--failed', '--error', GetoptLong::NO_ARGUMENT]]
parse_opts(GetoptLong.new(*list_options))
end
def get_reset_opts
list_options = misc_opts + [['--all', GetoptLong::NO_ARGUMENT],
['--running', GetoptLong::NO_ARGUMENT],
['--failed', '--error' ,GetoptLong::NO_ARGUMENT]]
['--failed', '--error', GetoptLong::NO_ARGUMENT]]
options = parse_opts(GetoptLong.new(*list_options))
if !options[:running] and !options[:failed] and !options[:all]
@@ -99,7 +104,7 @@ def get_delete_opts
['--all', GetoptLong::OPTIONAL_ARGUMENT],
['--failed', GetoptLong::OPTIONAL_ARGUMENT]]
options = parse_opts(GetoptLong.new(*delete_options))
if options[:id] == '' and !options[:all] and !options[:failed]
if !options[:id] and !options[:all] and !options[:failed]
Print.err 'Error: The delete command requires an argument.'
usage
else
@@ -108,7 +113,7 @@ def get_delete_opts
end
def parse_opts(opts)
options = {:instances => '', :max_threads => 9, :id => '', :all => false}
options = {:instances => '', :max_threads => 3, :id => nil, :all => false}
opts.each do |opt, arg|
case opt
when '--instances'
@@ -125,6 +130,8 @@ def parse_opts(opts)
options[:running] = true
when '--failed'
options[:failed] = true
when '--todo'
options[:todo] = true
else
Print.err 'Invalid argument'
exit(false)
@@ -144,14 +151,14 @@ def add(options)
instances.to_i.times do |count|
instance_args = "--prefix batch_job_#{(count+1).to_s} " + @secgen_args
instance_args = generate_range_arg(db_conn, options) + instance_args
insert_row(db_conn, count.to_s, instance_args)
insert_row(db_conn, @prepared_statements, count.to_s, instance_args)
end
elsif instances.size > 0
named_prefixes = instances.split(',')
named_prefixes.each_with_index do |named_prefix, count|
instance_secgen_args = "--prefix #{named_prefix} " + @secgen_args
instance_secgen_args = generate_range_arg(db_conn, options) + instance_secgen_args
insert_row(db_conn, count.to_s, instance_secgen_args)
insert_row(db_conn, @prepared_statements, count.to_s, instance_secgen_args)
end
end
db_conn.finish
@@ -170,26 +177,28 @@ def start(options)
current_threads = []
outer_loop_db_conn = PG::Connection.open(:dbname => 'batch_secgen')
while true
if (get_jobs(outer_loop_db_conn).size > 0) and (current_threads.size < options[:max_threads].to_i)
if (get_jobs(outer_loop_db_conn, @prepared_statements).size > 0) and (current_threads.size < options[:max_threads].to_i)
current_threads << Thread.new {
db_conn = PG::Connection.open(:dbname => 'batch_secgen')
current_job = get_jobs(db_conn)[0]
threadwide_statements = []
current_job = get_jobs(db_conn, threadwide_statements)[0]
job_id = current_job['id']
update_status(db_conn, job_id, :running)
update_status(db_conn, threadwide_statements, job_id, :running)
secgen_args = current_job['secgen_args']
# execute secgen
puts "Running job_id(#{job_id}): secgen.rb #{secgen_args}"
stdout, stderr, status = Open3.capture3("ruby secgen.rb #{secgen_args}")
puts "Job #{job_id} Complete"
# Update job status and back-up paths
if status.exitstatus == 0
update_status(db_conn, job_id, :success)
puts "Job #{job_id} Complete: successful"
update_status(db_conn, threadwide_statements, job_id, :success)
log_prefix = ''
backup_path = 'batch/successful/'
else
update_status(db_conn, job_id, :error)
puts "Job #{job_id} Complete: failed"
update_status(db_conn, threadwide_statements, job_id, :error)
log_prefix = 'ERROR_'
backup_path = 'batch/failed/'
end
@@ -222,10 +231,10 @@ def start(options)
db_conn.finish
}
sleep(1)
sleep(5)
else
current_threads.delete_if { |thread| !thread.alive? }
sleep(2) # don't use a busy-waiting loop, choose a blocking sleep that frees up CPU
sleep(5) # don't use a busy-waiting loop, choose a blocking sleep that frees up CPU
end
end
@@ -233,14 +242,21 @@ end
def list(options)
db_conn = PG::Connection.open(:dbname => 'batch_secgen')
if options[:id] == ''
if options[:id]
items = [select_id(db_conn, @prepared_statements, options[:id])]
elsif options[:running]
items = select_status(db_conn, @prepared_statements, :running)
elsif options[:failed]
items = select_status(db_conn, @prepared_statements, :failed)
elsif options[:todo]
items = select_status(db_conn, @prepared_statements, :todo)
else #all
items = select_all(db_conn)
items.each do |row|
Print.info row
end
else
Print.info select_id(db_conn, options[:id])
end
items.each do |row|
Print.info row
end
db_conn.finish
end
@@ -248,21 +264,21 @@ end
def reset(options)
db_conn = PG::Connection.open(:dbname => 'batch_secgen')
if options[:all]
update_all_to_status(db_conn, :todo)
update_all_to_status(db_conn, @prepared_statements, :todo)
end
if options[:running]
update_all_by_status(db_conn, :running, :todo)
update_all_by_status(db_conn, @prepared_statements, :running, :todo)
end
if options[:failed]
update_all_by_status(db_conn, :error, :todo)
update_all_by_status(db_conn, @prepared_statements, :error, :todo)
end
db_conn.finish
end
def delete(options)
db_conn = PG::Connection.open(:dbname => 'batch_secgen')
if options[:id] != ''
delete_id(db_conn, options[:id])
if options[:id]
delete_id(db_conn, @prepared_statements, options[:id])
elsif options[:failed]
delete_failed(db_conn)
elsif options[:all]
@@ -272,12 +288,15 @@ def delete(options)
end
# Database interactions
def insert_row(db_conn, statement_id, secgen_args)
def insert_row(db_conn, prepared_statements, statement_id, secgen_args)
statement = "insert_row_#{statement_id}"
# Add --shutdown and strip trailing whitespace
secgen_args = '--shutdown ' + secgen_args.strip
Print.info "Adding to queue: '#{statement}' '#{secgen_args}' 'todo'"
db_conn.prepare(statement, 'insert into queue (secgen_args, status) values ($1, $2)')
unless prepared_statements.include? statement
db_conn.prepare(statement, 'insert into queue (secgen_args, status) values ($1, $2)')
prepared_statements << statement
end
db_conn.exec_prepared(statement, [secgen_args, 'todo'])
end
@@ -285,38 +304,49 @@ def select_all(db_conn)
db_conn.exec_params('SELECT * FROM queue;')
end
def select_all_todo(db_conn)
db_conn.exec_params("SELECT * FROM queue where status = 'todo';")
def select_status(db_conn, prepared_statements, status)
statement = "select_status_#{status}"
unless prepared_statements.include? statement
db_conn.prepare(statement, 'SELECT * FROM queue where status = $1;')
prepared_statements << statement
end
db_conn.exec_prepared(statement, [@status_enum[status]])
end
def select_id(db_conn, id)
def select_id(db_conn, prepared_statements, id)
statement = "select_id_#{id}"
db_conn.prepare(statement, 'SELECT * FROM queue where id = $1;')
unless prepared_statements.include? statement
db_conn.prepare(statement, 'SELECT * FROM queue where id = $1;')
prepared_statements << statement
end
db_conn.exec_prepared(statement, [id]).first
end
def update_status(db_conn, job_id, status)
status_enum = {:todo => 'todo', :running => 'running', :success => 'success', :error => 'error'}
def update_status(db_conn, prepared_statements, job_id, status)
statement = "update_status_#{job_id}_#{status}"
db_conn.prepare(statement, 'UPDATE queue SET status = $1 WHERE id = $2')
db_conn.exec_prepared(statement, [status_enum[status], job_id])
unless prepared_statements.include? statement
db_conn.prepare(statement, 'UPDATE queue SET status = $1 WHERE id = $2')
prepared_statements << statement
end
db_conn.exec_prepared(statement, [@status_enum[status], job_id])
end
def update_all_by_status(db_conn, from_status, to_status)
status_enum = {:todo => 'todo', :running => 'running', :success => 'success', :error => 'error'}
def update_all_by_status(db_conn, prepared_statements, from_status, to_status)
statement = "mass_update_status_#{from_status}_#{to_status}"
db_conn.prepare(statement, 'UPDATE queue SET status = $1 WHERE status = $2')
db_conn.exec_prepared(statement, [status_enum[to_status], status_enum[from_status]])
unless prepared_statements.include? statement
db_conn.prepare(statement, 'UPDATE queue SET status = $1 WHERE status = $2')
prepared_statements << statement
end
db_conn.exec_prepared(statement, [@status_enum[to_status], @status_enum[from_status]])
end
def update_all_to_status(db_conn, to_status)
status_enum = {:todo => 'todo', :running => 'running', :success => 'success', :error => 'error'}
def update_all_to_status(db_conn, prepared_statements, to_status)
statement = "mass_update_to_status_#{to_status}"
db_conn.prepare(statement, 'UPDATE queue SET status = $1')
db_conn.exec_prepared(statement, [status_enum[to_status]])
unless prepared_statements.include? statement
db_conn.prepare(statement, 'UPDATE queue SET status = $1')
prepared_statements << statement
end
db_conn.exec_prepared(statement, [@status_enum[to_status]])
end
def delete_failed(db_conn)
@@ -341,15 +371,18 @@ def delete_all(db_conn)
end
end
def delete_id(db_conn, id)
def delete_id(db_conn, prepared_statements, id)
Print.info "Deleting job_id: #{id}"
statement = "delete_job_id_#{id}"
db_conn.prepare(statement, 'DELETE FROM queue where id = $1')
unless prepared_statements.include? statement
db_conn.prepare(statement, 'DELETE FROM queue where id = $1')
prepared_statements << statement
end
db_conn.exec_prepared(statement, [id])
end
def get_jobs(db_conn)
select_all_todo(db_conn).to_a
def get_jobs(db_conn, prepared_statements)
select_status(db_conn, prepared_statements, :todo).to_a
end
def secgen_arg_network_ranges(secgen_args)