@ -1412,242 +1412,298 @@ sub run_jobs
# Get a list of pending or incomplete jobs.
my $ended_within = $startup ? 1 : 300;
my $return = $anvil->Database->get_jobs({ended_within => $ended_within});
my $count = @{$return};
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
ended_within => $ended_within,
'return' => $return,
count => $count,
}});
foreach my $hash_ref (@{$return})
{
my $job_uuid = $hash_ref->{job_uuid};
my $job_command = $hash_ref->{job_command};
my $job_data = $hash_ref->{job_data};
my $job_picked_up_by = $hash_ref->{job_picked_up_by};
my $job_picked_up_at = $hash_ref->{job_picked_up_at};
my $job_updated = $hash_ref->{job_updated};
my $job_name = $hash_ref->{job_name};
my $job_progress = $hash_ref->{job_progress};
my $job_title = $hash_ref->{job_title};
my $job_description = $hash_ref->{job_description};
my $job_status = $hash_ref->{job_status};
my $started_seconds_ago = $job_picked_up_at ? (time - $job_picked_up_at) : 0;
my $updated_seconds_ago = $job_updated ? (time - $job_updated) : 0;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
job_uuid => $job_uuid,
job_command => $job_command,
job_data => $job_data,
job_picked_up_by => $job_picked_up_by,
job_picked_up_at => $job_picked_up_at,
job_updated => $job_updated,
job_name => $job_name,
job_progress => $job_progress,
job_title => $job_title,
job_description => $job_description,
job_status => $job_status,
started_seconds_ago => $started_seconds_ago,
updated_seconds_ago => $updated_seconds_ago,
}});
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { ended_within => $ended_within }});
# To minimize the chance of race conditions, any given command will be called only once at a
# time. If two jobs of the same command exist, only one will be called.
if ($job_progress != 100)
$anvil->Database->get_jobs({ended_within => $ended_within});
foreach my $modified_date (sort {$a cmp $b} keys %{$anvil->data->{jobs}{modified_date}})
{
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { modified_date => $modified_date }});
foreach my $job_uuid (sort {$a cmp $b} keys %{$anvil->data->{jobs}{modified_date}{$modified_date}{job_uuid}})
{
my $short_command = $job_command;
$short_command =~ s/\s.*$//;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { short_command => $short_command }});
if (exists $anvil->data->{sys}{started}{$short_command})
# Reload the jobs so we get an updated view of them.
$anvil->Database->get_jobs({ended_within => $ended_within});
# Collect the data.
my $job_command = $anvil->data->{jobs}{modified_date}{$modified_date}{job_uuid}{$job_uuid}{job_command};
my $short_command = $job_command;
$short_command =~ s/\s.*$//;
my $job_data = $anvil->data->{jobs}{modified_date}{$modified_date}{job_uuid}{$job_uuid}{job_data};
my $job_picked_up_by = $anvil->data->{jobs}{modified_date}{$modified_date}{job_uuid}{$job_uuid}{job_picked_up_by};
my $job_picked_up_at = $anvil->data->{jobs}{modified_date}{$modified_date}{job_uuid}{$job_uuid}{job_picked_up_at};
my $job_updated = $anvil->data->{jobs}{modified_date}{$modified_date}{job_uuid}{$job_uuid}{job_updated};
my $job_name = $anvil->data->{jobs}{modified_date}{$modified_date}{job_uuid}{$job_uuid}{job_name};
my $job_progress = $anvil->data->{jobs}{modified_date}{$modified_date}{job_uuid}{$job_uuid}{job_progress};
my $job_title = $anvil->data->{jobs}{modified_date}{$modified_date}{job_uuid}{$job_uuid}{job_title};
my $job_description = $anvil->data->{jobs}{modified_date}{$modified_date}{job_uuid}{$job_uuid}{job_description};
my $job_status = $anvil->data->{jobs}{modified_date}{$modified_date}{job_uuid}{$job_uuid}{job_status};
my $started_seconds_ago = $job_picked_up_at ? (time - $job_picked_up_at) : 0;
my $updated_seconds_ago = $job_updated ? (time - $job_updated) : 0;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
's01:job_uuid' => $job_uuid,
's02:job_command' => $job_command,
's03:short_command' => $short_command,
's04:job_data' => $job_data,
's05:job_picked_up_by' => $job_picked_up_by,
's06:job_picked_up_at' => $job_picked_up_at,
's07:job_updated' => $job_updated,
's08:job_name' => $job_name,
's09:job_progress' => $job_progress,
's10:job_title' => $job_title,
's11:job_description' => $job_description,
's12:job_status' => $job_status,
's13:started_seconds_ago' => $started_seconds_ago,
's14:updated_seconds_ago' => $updated_seconds_ago,
}});
# To minimize the chance of race conditions, any given command will be called only
# once at a time. If two jobs of the same command exist, only one will be called.
if ($job_progress != 100)
{
if (exists $anvil->data->{sys}{started}{$short_command})
{
# Skip it.
my $started_job = $anvil->data->{sys}{started}{$short_command};
$anvil->Log->entry({source => $THIS_FILE, line => __LINE__, level => 2, key => "log_0737", variables => {
started_job => $started_job,
job_uuid => $job_uuid,
command => $short_command,
}});
next;
}
$anvil->data->{sys}{started}{$short_command} = $job_uuid;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "sys::started::${short_command}" => $anvil->data->{sys}{started}{$short_command} }});
}
# If this is a start-up call, only start jobs whose status is 'anvil_startup'.
if (($startup) && ($job_status ne "anvil_startup"))
{
# Skip it.
my $started_job = $anvil->data->{sys}{started}{$short_command};
$anvil->Log->entry({source => $THIS_FILE, line => __LINE__, level => 2, key => "log_0737", variables => {
started_job => $started_job,
$anvil->Log->entry({source => $THIS_FILE, line => __LINE__, level => 2, key => "log_0639", variables => {
job_uuid => $job_uuid,
command => $short_command,
job_command => $job_command,
}});
next;
}
$anvil->data->{sys}{started}{$short_command} = $job_uuid;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "sys::started::${short_command}" => $anvil->data->{sys}{started}{$short_command} }});
}
# If this is a start-up call, only start jobs whose status is 'anvil_startup'.
if (($startup) && ($job_status ne "anvil_startup"))
{
$anvil->Log->entry({source => $THIS_FILE, line => __LINE__, level => 2, key => "log_0639", variables => {
job_uuid => $job_uuid,
job_command => $job_command,
}});
next;
}
if ($job_progress ne "100")
{
$anvil->data->{sys}{jobs_running} = 1;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "sys::jobs_running" => $anvil->data->{sys}{jobs_running} }});
}
# See if the job was picked up by a now-dead instance.
if ($job_picked_up_by)
{
# Check if the PID is still active.
$anvil->System->pids({ignore_me => 1});
if ($job_progress == 100)
{
# This is a job that might have just completed, clear the started value.
$anvil->data->{jobs}{$job_uuid}{started} = 0;
$job_picked_up_at = 0;
$job_picked_up_by = 0;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
job_picked_up_at => $job_picked_up_at,
job_picked_up_by => $job_picked_up_by,
"jobs::${job_uuid}::started" => $anvil->data->{jobs}{$job_uuid}{started},
}});
}
else
{
$anvil->data->{sys}{jobs_running} = 1;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "sys::jobs_running" => $anvil->data->{sys}{jobs_running} }});
}
### TODO: Add a check to verify the job isn't hung.
# Skip if this job is in progress.
if (not exists $anvil->data->{pids}{$job_picked_up_by})
# See if the job was picked up by a now-dead instance.
if ($job_picked_up_by)
{
# If the job is done, just clear the 'job_picked_up_by' and be done.
if ($job_progress ne "100")
# Check if the PID is still active.
$anvil->System->pids({ignore_me => 1});
### TODO: Add a check to verify the job isn't hung.
# Skip if this job is in progress.
if (not exists $anvil->data->{pids}{$job_picked_up_by})
{
# It's possible that the job updated to 100% and exited after we
# gathered the job data, so we won't restart until we've seen it not
# running and not at 100% after 5 loops.
if ((not exists $anvil->data->{lost_job_count}{$job_uuid}) or (not defined $anvil->data->{lost_job_count}{$job_uuid}))
# If the job is done, just clear the 'job_picked_up_by' and be done.
if ($job_progress ne "100")
{
$anvil->data->{lost_job_count}{$job_uuid} = 0;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "lost_job_count::${job_uuid}" => $anvil->data->{lost_job_count}{$job_uuid} }});
# It's possible that the job updated to 100% and exited after
# we gathered the job data, so we won't restart until we've
# seen it not running and not at 100% after 5 loops.
if ((not exists $anvil->data->{lost_job_count}{$job_uuid}) or (not defined $anvil->data->{lost_job_count}{$job_uuid}))
{
$anvil->data->{lost_job_count}{$job_uuid} = 0;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "lost_job_count::${job_uuid}" => $anvil->data->{lost_job_count}{$job_uuid} }});
}
if ($anvil->data->{lost_job_count}{$job_uuid} > 5)
{
# The previous job is gone, but the job isn't
# finished. Start it again.
$anvil->Log->entry({source => $THIS_FILE, line => __LINE__, level => 1, priority => "alert", key => "warning_0007", variables => {
command => $job_command,
pid => $job_picked_up_by,
percent => $job_progress,
}});
# Clear some variables.
$job_progress = 0;
$job_status = "message_0056";
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
job_progress => $job_progress,
job_status => $job_status,
}});
# Clear the job.
$anvil->Job->clear({debug => 2, job_uuid => $job_uuid});
$anvil->data->{lost_job_count}{$job_uuid} = 0;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "lost_job_count::${job_uuid}" => $anvil->data->{lost_job_count}{$job_uuid} }});
}
else
{
$anvil->data->{lost_job_count}{$job_uuid}++;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "lost_job_count::${job_uuid}" => $anvil->data->{lost_job_count}{$job_uuid} }});
}
}
if ($anvil->data->{lost_job_count}{$job_uuid} > 5)
{
# The previous job is gone, but the job isn't finished. Start it again.
$anvil->Log->entry({source => $THIS_FILE, line => __LINE__, level => 1, priority => "alert", key => "warning_0007", variables => {
command => $job_command,
pid => $job_picked_up_by,
percent => $job_progress,
}});
# Clear some variables.
$job_progress = 0;
$job_status = "message_0056";
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
job_progress => $job_progress,
job_status => $job_status,
}});
# Clear the job.
$anvil->Job->clear({debug => 2, job_uuid => $job_uuid});
$anvil->data->{lost_job_count}{$job_uuid} = 0;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "lost_job_count::${job_uuid}" => $anvil->data->{lost_job_count}{$job_uuid} }});
}
else
{
$anvil->data->{lost_job_count}{$job_uuid}++;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "lost_job_count::${job_uuid}" => $anvil->data->{lost_job_count}{$job_uuid} }});
}
# Clear the PID
$job_picked_up_by = 0;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { job_picked_up_by => $job_picked_up_by }});
}
elsif ($job_progress ne "100")
{
# The job is running.
$anvil->data->{jobs_started}{$short_command} = $job_uuid;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "jobs_started::${short_command}" => $anvil->data->{jobs_started}{$short_command} }});
}
# Clear the PID
$job_picked_up_by = 0;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { job_picked_up_by => $job_picked_up_by }});
}
}
# Convert the double-banged strings into a proper message.
my $say_title = $job_title ? $anvil->Words->parse_banged_string({key_string => $job_title}) : "";
my $say_description = $job_description ? $anvil->Words->parse_banged_string({key_string => $job_description}) : "";
my $say_status = $job_status ? $anvil->Words->parse_banged_string({key_string => $job_status}) : "";
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
job_title => $job_title,
say_description => $say_description,
say_status => $say_status,
}});
# Convert the double-banged strings into a proper message.
my $say_title = $job_title ? $anvil->Words->parse_banged_string({key_string => $job_title}) : "";
my $say_description = $job_description ? $anvil->Words->parse_banged_string({key_string => $job_description}) : "";
my $say_status = $job_status ? $anvil->Words->parse_banged_string({key_string => $job_status}) : "";
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
job_title => $job_title,
say_description => $say_description,
say_status => $say_status,
}});
# Make the status HTML friendly. Strip any embedded HTML then encode the text string.
if ($say_status)
{
my $html_strip = HTML::Strip->new();
$say_status = $html_strip->parse($say_status);
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { say_status => $say_status }});
# Now make the resulting text string HTML friendly
my $text_to_html = HTML::FromText->new({
urls => 1,
email => 1,
lines => 1,
# Make the status HTML friendly. Strip any embedded HTML then encode the text string.
if ($say_status)
{
my $html_strip = HTML::Strip->new();
$say_status = $html_strip->parse($say_status);
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { say_status => $say_status }});
# Now make the resulting text string HTML friendly
my $text_to_html = HTML::FromText->new({
urls => 1,
email => 1,
lines => 1,
});
$say_status = $text_to_html->parse($say_status);
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { say_status => $say_status }});
}
# Add this to the jobs.json file
my $json_string = to_json ({
job_uuid => $job_uuid,
job_command => $job_command,
job_data => $job_data,
job_picked_up_at => $job_picked_up_at,
job_updated => $job_updated,
job_name => $job_name,
job_progress => $job_progress,
job_title => $say_title,
job_description => $say_description,
job_status => $say_status,
started_seconds_ago => $started_seconds_ago,
updated_seconds_ago => $updated_seconds_ago,
});
$say_status = $text_to_html->parse($say_status);
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { say_status => $say_status }});
}
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { json_string => $json_string }});
$jobs_file .= $json_string.",\n";
# Add this to the jobs.json file
my $json_string = to_json ({
job_uuid => $job_uuid,
job_command => $job_command,
job_data => $job_data,
job_picked_up_at => $job_picked_up_at,
job_updated => $job_updated,
job_name => $job_name,
job_progress => $job_progress,
job_title => $say_title,
job_description => $say_description,
job_status => $say_status,
started_seconds_ago => $started_seconds_ago,
updated_seconds_ago => $updated_seconds_ago,
});
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { json_string => $json_string }});
$jobs_file .= $json_string.",\n";
# If the job is done, move on.
next if $job_progress == 100;
next if $anvil->data->{switches}{'no-start'};
# If the job is done, move on.
next if $job_progress eq "100";
next if $anvil->data->{switches}{'no-start'};
# If 'startup' is set, we only care if 'job_status' is 'anvil_startup'
if ((not $startup) && ($say_status eq "anvil_startup"))
{
# Skip this, it will run next time anvil-daemon restarts.
$anvil->Log->entry({source => $THIS_FILE, line => __LINE__, level => 2, key => "log_0593", variables => {
command => $job_command,
job_uuid => $job_uuid,
}});
next;
}
# If 'startup' is set, we only care if 'job_status' is 'anvil_startup'
if ((not $startup) && ($say_status eq "anvil_startup"))
{
# Skip this, it will run next time anvil-daemon restarts.
$anvil->Log->entry({source => $THIS_FILE, line => __LINE__, level => 2, key => "log_0593", variables => {
command => $job_command,
job_uuid => $job_uuid,
}});
next;
}
# If the job is not running, and we've not started any other of the same command this
# loop, start it.
if (not $job_picked_up_by)
{
if (exists $anvil->data->{jobs_started}{$short_command})
{
# Is the job_uuid associated with this command done?
my $started_job_uuid = $anvil->data->{jobs_started}{$short_command};
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { started_job_uuid => $started_job_uuid }});
# If the job is not running, start it.
if (not $job_picked_up_by)
{
my $command = $job_command." --job-uuid ".$job_uuid;
$anvil->Log->entry({source => $THIS_FILE, line => __LINE__, level => 1, key => "log_0210", variables => { command => $command }});
if (exists $anvil->data->{jobs}{running}{$started_job_uuid})
{
if ($anvil->data->{jobs}{running}{$started_job_uuid}{job_progress} != 100)
{
# Don't start it in this pass.
$anvil->Log->entry({source => $THIS_FILE, line => __LINE__, level => 1, key => "log_0741", variables => {
command => $job_command,
this_job_uuid => $job_uuid,
other_job_uuid => $started_job_uuid,
}});
next;
}
else
{
# The previous job is done, delete it.
$anvil->data->{jobs_started}{$short_command} = "";
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
"jobs_started::${short_command}" => $anvil->data->{jobs_started}{$short_command},
}});
}
}
}
# Have we started this job recently?
if (exists $anvil->data->{jobs}{$job_uuid}{started})
{
my $last_start = time - $anvil->data->{jobs}{$job_uuid}{started};
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { last_start => $last_start }});
my $command = $job_command." --job-uuid ".$job_uuid;
$anvil->Log->entry({source => $THIS_FILE, line => __LINE__, level => 1, key => "log_0210", variables => { command => $command }});
if ($last_start < 60)
# Have we started this job recently?
if (exists $anvil->data->{jobs}{$job_uuid}{started})
{
# Skip, Started too recently.
$anvil->Log->entry({source => $THIS_FILE, line => __LINE__, level => 1, key => "log_0578", variables => {
command => $command,
last_start => $last_start,
}});
next;
my $last_start = time - $anvil->data->{jobs}{$job_uuid}{started};
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { last_start => $last_start }});
if ($last_start < 60)
{
# Skip, Started too recently.
$anvil->Log->entry({source => $THIS_FILE, line => __LINE__, level => 1, key => "log_0578", variables => {
command => $command,
last_start => $last_start,
}});
next;
}
}
}
# Start the job, appending '--job-uuid' to the command.
($anvil->data->{jobs}{handles}{$job_uuid}, my $return_code) = $anvil->System->call({
background => 1,
stdout_file => "/tmp/anvil.job.".$job_uuid.".stdout",
stderr_file => "/tmp/anvil.job.".$job_uuid.".stderr",
shell_call => $command,
source => $THIS_FILE,
line => __LINE__,
});
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
"jobs::handles::${job_uuid}" => $anvil->data->{jobs}{handles}{$job_uuid},
return_code => $return_code,
}});
# Start the job, appending '--job-uuid' to the command.
($anvil->data->{jobs}{handles}{$job_uuid}, my $return_code) = $anvil->System->call({
background => 1,
stdout_file => "/tmp/anvil.job.".$job_uuid.".stdout",
stderr_file => "/tmp/anvil.job.".$job_uuid.".stderr",
shell_call => $command,
source => $THIS_FILE,
line => __LINE__,
});
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
"jobs::handles::${job_uuid}" => $anvil->data->{jobs}{handles}{$job_uuid},
return_code => $return_code,
}});
# Log the PID (the job should update the database).
my $pid = $anvil->data->{jobs}{handles}{$job_uuid}->pid();
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { pid => $pid }});
# Log the PID (the job should update the database).
my $pid = $anvil->data->{jobs}{handles}{$job_uuid}->pid();
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { pid => $pid }});
# Record that we've tried to start this job, so that we don't try to restart it for any reason for at least a minute.
$anvil->data->{jobs}{$job_uuid}{started} = time;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { 'jobs::$job_uuid::started' => $anvil->data->{jobs}{$job_uuid}{started} }});
# Record that we've tried to start this job, so that we don't try to restart it for any reason for at least a minute.
$anvil->data->{jobs}{$job_uuid}{started} = time;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { 'jobs::$job_uuid::started' => $anvil->data->{jobs}{$job_uuid}{started} }});
# Record that a job with this command has started
$anvil->data->{jobs_started}{$short_command} = $job_uuid;
$anvil->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "jobs_started::${short_command}" => $anvil->data->{jobs_started}{$short_command} }});
}
}
}