diff --git a/lib/batch/batch_secgen.rb b/lib/batch/batch_secgen.rb index 6c126d96d..723972e29 100644 --- a/lib/batch/batch_secgen.rb +++ b/lib/batch/batch_secgen.rb @@ -7,6 +7,8 @@ require_relative '../helpers/print.rb' require_relative '../helpers/constants.rb' # Globals +@status_enum = {:todo => 'todo', :running => 'running', :success => 'success', :error => 'error', :failed => 'error'} +@prepared_statements = [] @secgen_args = '' @ranges_in_table = nil @@ -76,14 +78,17 @@ end def get_list_opts list_options = misc_opts + [['--id', GetoptLong::REQUIRED_ARGUMENT], - ['--all', GetoptLong::OPTIONAL_ARGUMENT]] + ['--all', GetoptLong::OPTIONAL_ARGUMENT], + ['--todo', GetoptLong::NO_ARGUMENT], + ['--running', GetoptLong::NO_ARGUMENT], + ['--failed', '--error', GetoptLong::NO_ARGUMENT]] parse_opts(GetoptLong.new(*list_options)) end def get_reset_opts list_options = misc_opts + [['--all', GetoptLong::NO_ARGUMENT], ['--running', GetoptLong::NO_ARGUMENT], - ['--failed', '--error' ,GetoptLong::NO_ARGUMENT]] + ['--failed', '--error', GetoptLong::NO_ARGUMENT]] options = parse_opts(GetoptLong.new(*list_options)) if !options[:running] and !options[:failed] and !options[:all] @@ -99,7 +104,7 @@ def get_delete_opts ['--all', GetoptLong::OPTIONAL_ARGUMENT], ['--failed', GetoptLong::OPTIONAL_ARGUMENT]] options = parse_opts(GetoptLong.new(*delete_options)) - if options[:id] == '' and !options[:all] and !options[:failed] + if !options[:id] and !options[:all] and !options[:failed] Print.err 'Error: The delete command requires an argument.' usage else @@ -108,7 +113,7 @@ def get_delete_opts end def parse_opts(opts) - options = {:instances => '', :max_threads => 9, :id => '', :all => false} + options = {:instances => '', :max_threads => 3, :id => nil, :all => false} opts.each do |opt, arg| case opt when '--instances' @@ -125,6 +130,8 @@ def parse_opts(opts) options[:running] = true when '--failed' options[:failed] = true + when '--todo' + options[:todo] = true else Print.err 'Invalid argument' exit(false) @@ -144,14 +151,14 @@ def add(options) instances.to_i.times do |count| instance_args = "--prefix batch_job_#{(count+1).to_s} " + @secgen_args instance_args = generate_range_arg(db_conn, options) + instance_args - insert_row(db_conn, count.to_s, instance_args) + insert_row(db_conn, @prepared_statements, count.to_s, instance_args) end elsif instances.size > 0 named_prefixes = instances.split(',') named_prefixes.each_with_index do |named_prefix, count| instance_secgen_args = "--prefix #{named_prefix} " + @secgen_args instance_secgen_args = generate_range_arg(db_conn, options) + instance_secgen_args - insert_row(db_conn, count.to_s, instance_secgen_args) + insert_row(db_conn, @prepared_statements, count.to_s, instance_secgen_args) end end db_conn.finish @@ -170,26 +177,28 @@ def start(options) current_threads = [] outer_loop_db_conn = PG::Connection.open(:dbname => 'batch_secgen') while true - if (get_jobs(outer_loop_db_conn).size > 0) and (current_threads.size < options[:max_threads].to_i) + if (get_jobs(outer_loop_db_conn, @prepared_statements).size > 0) and (current_threads.size < options[:max_threads].to_i) current_threads << Thread.new { db_conn = PG::Connection.open(:dbname => 'batch_secgen') - current_job = get_jobs(db_conn)[0] + threadwide_statements = [] + current_job = get_jobs(db_conn, threadwide_statements)[0] job_id = current_job['id'] - update_status(db_conn, job_id, :running) + update_status(db_conn, threadwide_statements, job_id, :running) secgen_args = current_job['secgen_args'] # execute secgen puts "Running job_id(#{job_id}): secgen.rb #{secgen_args}" stdout, stderr, status = Open3.capture3("ruby secgen.rb #{secgen_args}") - puts "Job #{job_id} Complete" # Update job status and back-up paths if status.exitstatus == 0 - update_status(db_conn, job_id, :success) + puts "Job #{job_id} Complete: successful" + update_status(db_conn, threadwide_statements, job_id, :success) log_prefix = '' backup_path = 'batch/successful/' else - update_status(db_conn, job_id, :error) + puts "Job #{job_id} Complete: failed" + update_status(db_conn, threadwide_statements, job_id, :error) log_prefix = 'ERROR_' backup_path = 'batch/failed/' end @@ -222,10 +231,10 @@ def start(options) db_conn.finish } - sleep(1) + sleep(5) else current_threads.delete_if { |thread| !thread.alive? } - sleep(2) # don't use a busy-waiting loop, choose a blocking sleep that frees up CPU + sleep(5) # don't use a busy-waiting loop, choose a blocking sleep that frees up CPU end end @@ -233,14 +242,21 @@ end def list(options) db_conn = PG::Connection.open(:dbname => 'batch_secgen') - if options[:id] == '' + if options[:id] + items = [select_id(db_conn, @prepared_statements, options[:id])] + elsif options[:running] + items = select_status(db_conn, @prepared_statements, :running) + elsif options[:failed] + items = select_status(db_conn, @prepared_statements, :failed) + elsif options[:todo] + items = select_status(db_conn, @prepared_statements, :todo) + else #all items = select_all(db_conn) - items.each do |row| - Print.info row - end - else - Print.info select_id(db_conn, options[:id]) end + items.each do |row| + Print.info row + end + db_conn.finish end @@ -248,21 +264,21 @@ end def reset(options) db_conn = PG::Connection.open(:dbname => 'batch_secgen') if options[:all] - update_all_to_status(db_conn, :todo) + update_all_to_status(db_conn, @prepared_statements, :todo) end if options[:running] - update_all_by_status(db_conn, :running, :todo) + update_all_by_status(db_conn, @prepared_statements, :running, :todo) end if options[:failed] - update_all_by_status(db_conn, :error, :todo) + update_all_by_status(db_conn, @prepared_statements, :error, :todo) end db_conn.finish end def delete(options) db_conn = PG::Connection.open(:dbname => 'batch_secgen') - if options[:id] != '' - delete_id(db_conn, options[:id]) + if options[:id] + delete_id(db_conn, @prepared_statements, options[:id]) elsif options[:failed] delete_failed(db_conn) elsif options[:all] @@ -272,12 +288,15 @@ def delete(options) end # Database interactions -def insert_row(db_conn, statement_id, secgen_args) +def insert_row(db_conn, prepared_statements, statement_id, secgen_args) statement = "insert_row_#{statement_id}" # Add --shutdown and strip trailing whitespace secgen_args = '--shutdown ' + secgen_args.strip Print.info "Adding to queue: '#{statement}' '#{secgen_args}' 'todo'" - db_conn.prepare(statement, 'insert into queue (secgen_args, status) values ($1, $2)') + unless prepared_statements.include? statement + db_conn.prepare(statement, 'insert into queue (secgen_args, status) values ($1, $2)') + prepared_statements << statement + end db_conn.exec_prepared(statement, [secgen_args, 'todo']) end @@ -285,38 +304,49 @@ def select_all(db_conn) db_conn.exec_params('SELECT * FROM queue;') end -def select_all_todo(db_conn) - db_conn.exec_params("SELECT * FROM queue where status = 'todo';") +def select_status(db_conn, prepared_statements, status) + statement = "select_status_#{status}" + unless prepared_statements.include? statement + db_conn.prepare(statement, 'SELECT * FROM queue where status = $1;') + prepared_statements << statement + end + db_conn.exec_prepared(statement, [@status_enum[status]]) end -def select_id(db_conn, id) +def select_id(db_conn, prepared_statements, id) statement = "select_id_#{id}" - db_conn.prepare(statement, 'SELECT * FROM queue where id = $1;') + unless prepared_statements.include? statement + db_conn.prepare(statement, 'SELECT * FROM queue where id = $1;') + prepared_statements << statement + end db_conn.exec_prepared(statement, [id]).first end -def update_status(db_conn, job_id, status) - status_enum = {:todo => 'todo', :running => 'running', :success => 'success', :error => 'error'} - +def update_status(db_conn, prepared_statements, job_id, status) statement = "update_status_#{job_id}_#{status}" - db_conn.prepare(statement, 'UPDATE queue SET status = $1 WHERE id = $2') - db_conn.exec_prepared(statement, [status_enum[status], job_id]) + unless prepared_statements.include? statement + db_conn.prepare(statement, 'UPDATE queue SET status = $1 WHERE id = $2') + prepared_statements << statement + end + db_conn.exec_prepared(statement, [@status_enum[status], job_id]) end -def update_all_by_status(db_conn, from_status, to_status) - status_enum = {:todo => 'todo', :running => 'running', :success => 'success', :error => 'error'} - +def update_all_by_status(db_conn, prepared_statements, from_status, to_status) statement = "mass_update_status_#{from_status}_#{to_status}" - db_conn.prepare(statement, 'UPDATE queue SET status = $1 WHERE status = $2') - db_conn.exec_prepared(statement, [status_enum[to_status], status_enum[from_status]]) + unless prepared_statements.include? statement + db_conn.prepare(statement, 'UPDATE queue SET status = $1 WHERE status = $2') + prepared_statements << statement + end + db_conn.exec_prepared(statement, [@status_enum[to_status], @status_enum[from_status]]) end -def update_all_to_status(db_conn, to_status) - status_enum = {:todo => 'todo', :running => 'running', :success => 'success', :error => 'error'} - +def update_all_to_status(db_conn, prepared_statements, to_status) statement = "mass_update_to_status_#{to_status}" - db_conn.prepare(statement, 'UPDATE queue SET status = $1') - db_conn.exec_prepared(statement, [status_enum[to_status]]) + unless prepared_statements.include? statement + db_conn.prepare(statement, 'UPDATE queue SET status = $1') + prepared_statements << statement + end + db_conn.exec_prepared(statement, [@status_enum[to_status]]) end def delete_failed(db_conn) @@ -341,15 +371,18 @@ def delete_all(db_conn) end end -def delete_id(db_conn, id) +def delete_id(db_conn, prepared_statements, id) Print.info "Deleting job_id: #{id}" statement = "delete_job_id_#{id}" - db_conn.prepare(statement, 'DELETE FROM queue where id = $1') + unless prepared_statements.include? statement + db_conn.prepare(statement, 'DELETE FROM queue where id = $1') + prepared_statements << statement + end db_conn.exec_prepared(statement, [id]) end -def get_jobs(db_conn) - select_all_todo(db_conn).to_a +def get_jobs(db_conn, prepared_statements) + select_status(db_conn, prepared_statements, :todo).to_a end def secgen_arg_network_ranges(secgen_args)