* Resync'ing tables works!! (Well, kind of, loooots more testing needed).

Signed-off-by: Digimer <digimer@alteeve.ca>
main
Digimer 7 years ago
parent 1aefa7fb8a
commit 1d1258e8b5
  1. 286
      AN/Tools/Database.pm

@ -872,11 +872,11 @@ sub connect
# Pick a timestamp for this run, if we haven't yet.
if (not $an->data->{sys}{db_timestamp})
{
my $query = "SELECT cast(now() AS timestamp with time zone)";
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { query => $query }});
my $query = "SELECT cast(now() AS timestamp with time zone)::timestamptz(0);";
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { query => $query }});
$an->data->{sys}{db_timestamp} = $an->Database->query({id => $id, query => $query, source => $THIS_FILE, line => __LINE__})->[0]->[0];
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { "sys::db_timestamp" => $an->data->{sys}{db_timestamp} }});
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "sys::db_timestamp" => $an->data->{sys}{db_timestamp} }});
}
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
@ -2506,6 +2506,7 @@ sub resync_databases
return(0);
}
### TODO: I don't know if this works yet for tables without a '*_host_uuid' column
### NOTE: Don't sort this array, we need to resync in the order that the user passed the tables to us
### to avoid trouble with primary/foreign keys.
# We're going to use the array of tables assembles by _find_behind_databases() stored in
@ -2536,15 +2537,15 @@ sub resync_databases
{
$query = "SELECT column_name FROM information_schema.columns WHERE table_catalog = 'scancore' AND table_schema = 'public' AND table_name = ".$an->data->{sys}{use_db_fh}->quote($table)." AND data_type = 'uuid' AND is_nullable = 'NO' AND (column_name = ".$an->data->{sys}{use_db_fh}->quote($column1)." OR column_name = ".$an->data->{sys}{use_db_fh}->quote($column2).");";
}
$an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 2, key => "log_0124", variables => { query => $query }});
my $uuid_column = $an->Database->query({id => $id, query => $query, source => $THIS_FILE, line => __LINE__})->[0]->[0];
$an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 3, key => "log_0124", variables => { query => $query }});
my $uuid_column = $an->Database->query({query => $query, source => $THIS_FILE, line => __LINE__})->[0]->[0];
$uuid_column = "" if not defined $uuid_column;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { uuid_column => $uuid_column }});
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { uuid_column => $uuid_column }});
next if not $uuid_column;
# Get all the columns in this table.
$query = "SELECT column_name, is_nullable, data_type FROM information_schema.columns WHERE table_schema = ".$an->data->{sys}{use_db_fh}->quote($schema)." AND table_name = ".$an->data->{sys}{use_db_fh}->quote($table)." AND column_name != 'history_id';";
$an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 2, key => "log_0124", variables => { query => $query }});
$an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 3, key => "log_0124", variables => { query => $query }});
my $results = $an->Database->query({query => $query, source => $THIS_FILE, line => __LINE__});
my $count = @{$results};
@ -2572,10 +2573,9 @@ sub resync_databases
}
# Now read in the data from the different databases.
foreach my $id (sort {$a cmp $b} keys %{$an->data->{database}})
foreach my $id (sort {$a cmp $b} keys %{$an->data->{cache}{db_fh}})
{
# ...
$an->data->{db_resync}{$id}{sql} = [];
$an->data->{db_resync}{$id}{public}{sql} = [];
$an->data->{db_resync}{$id}{history}{sql} = [];
@ -2583,6 +2583,7 @@ sub resync_databases
my $query = "SELECT modified_date, $uuid_column, ";
my $read_columns = [];
push @{$read_columns}, "modified_date";
push @{$read_columns}, $uuid_column;
foreach my $column_name (sort {$a cmp $b} keys %{$an->data->{sys}{database}{table}{$table}{column}})
{
# We'll skip the host column as we'll use it in the conditional.
@ -2604,7 +2605,7 @@ sub resync_databases
$query .= " WHERE ".$host_column." = ".$an->data->{sys}{use_db_fh}->quote($an->data->{sys}{host_uuid});
}
$query .= ";";
$an->Log->entry({level => 2, key => "log_0074", variables => { id => $id, query => $query }});
$an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 3, key => "log_0074", variables => { id => $id, query => $query }});
my $results = $an->Database->query({id => $id, query => $query, source => $THIS_FILE, line => __LINE__});
my $count = @{$results};
@ -2624,7 +2625,7 @@ sub resync_databases
my $column_value = defined $row->[$i] ? $row->[$i] : "NULL";
my $not_null = $an->data->{sys}{database}{table}{$table}{column}{$column_name}{not_null};
my $data_type = $an->data->{sys}{database}{table}{$table}{column}{$column_name}{data_type};
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
"s1:i" => $i,
"s2:column_name" => $column_name,
"s3:column_value" => $column_value,
@ -2634,14 +2635,14 @@ sub resync_databases
if ((not $not_null) && ($column_value eq "NULL"))
{
$column_value = "";
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { column_value => $column_value }});
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { column_value => $column_value }});
}
# The modified_date should be the first row.
if ($column_name eq "modified_date")
{
$modified_date = $column_value;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { modified_date => $modified_date }});
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { modified_date => $modified_date }});
next;
}
@ -2649,13 +2650,13 @@ sub resync_databases
if ($column_name eq $uuid_column)
{
$row_uuid = $column_value;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { row_uuid => $row_uuid }});
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { row_uuid => $row_uuid }});
# This is used to determine if a given entry needs to be
# updated or inserted into the public schema
$an->data->{db_data}{$id}{$table}{$uuid_column}{$row_uuid}{'exists'} = 1;
$an->data->{db_data}{$id}{$table}{$uuid_column}{$row_uuid}{seen} = 0;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
"db_data::${id}::${table}::${uuid_column}::${row_uuid}::exists" => $an->data->{db_data}{$id}{$table}{$uuid_column}{$row_uuid}{'exists'},
"db_data::${id}::${table}::${uuid_column}::${row_uuid}::seen" => $an->data->{db_data}{$id}{$table}{$uuid_column}{$row_uuid}{seen},
}});
@ -2663,7 +2664,7 @@ sub resync_databases
}
die $THIS_FILE." ".__LINE__."; This row's modified_date wasn't the first column returned in query: [$query]\n" if not $modified_date;
die $THIS_FILE." ".__LINE__."; This row's UUID column: [$uuid_column] wasn't the second column returned in query: [$query]\n" if not $modified_date;
die $THIS_FILE." ".__LINE__."; This row's UUID column: [$uuid_column] wasn't the second column returned in query: [$query]\n" if not $row_uuid;
# Record this in the unified and local hashes. Note that we'll handle
# the 'hosts' table in a special way, then the rest depending on
@ -2675,94 +2676,180 @@ sub resync_databases
else
{
# This table isn't restricted to given hosts.
$an->data->{db_data}{unified}{hosts}{modified_date}{$modified_date}{$uuid_column}{$row_uuid}{$column_name} = $column_value;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
"db_data::unified::hosts::modified_date::${modified_date}::${uuid_column}::${row_uuid}::${column_name}" => $an->data->{db_data}{unified}{hosts}{modified_date}{$modified_date}{$uuid_column}{$row_uuid}{$column_name},
$an->data->{db_data}{unified}{$table}{modified_date}{$modified_date}{$uuid_column}{$row_uuid}{$column_name} = $column_value;
$an->data->{db_data}{$id}{hosts}{modified_date}{$modified_date}{$uuid_column}{$row_uuid}{$column_name} = $column_value;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
"db_data::unified::${table}::modified_date::${modified_date}::${uuid_column}::${row_uuid}::${column_name}" => $an->data->{db_data}{unified}{$table}{modified_date}{$modified_date}{$uuid_column}{$row_uuid}{$column_name},
"db_data::${id}::${table}::modified_date::${modified_date}::${uuid_column}::${row_uuid}::${column_name}" => $an->data->{db_data}{$id}{$table}{modified_date}{$modified_date}{$uuid_column}{$row_uuid}{$column_name},
}});
}
# $an->data->{db_data}{unified}{hosts}{modified_date}{$modified_date}{host_uuid}{$host_uuid} = {
# host_location_uuid => $host_location_uuid,
# host_name => $host_name,
# host_type => $host_type,
# host_emergency_stop => $host_emergency_stop,
# host_stop_reason => $host_stop_reason,
# host_health => $host_health,
# };
#
# $an->data->{db_data}{$id}{hosts}{host_uuid}{$host_uuid}{'exists'} = 1;
# $an->data->{db_data}{$id}{hosts}{host_uuid}{$host_uuid}{seen} = 0;
# $an->data->{db_data}{$id}{hosts}{modified_date}{$modified_date}{host_uuid}{$host_uuid} = {
# host_location_uuid => $host_location_uuid,
# host_name => $host_name,
# host_type => $host_type,
# host_emergency_stop => $host_emergency_stop,
# host_stop_reason => $host_stop_reason,
# host_health => $host_health,
# };
}
}
die;
# Now all the data is read in, we can see what might be missing from each DB.
foreach my $modified_date (sort {$b cmp $a} keys %{$an->data->{db_data}{unified}{$table}{modified_date}})
{
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { modified_date => $modified_date }});
foreach my $row_uuid (sort {$a cmp $b} keys %{$an->data->{db_data}{unified}{$table}{modified_date}{$modified_date}{$uuid_column}})
{
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { row_uuid => $row_uuid }});
foreach my $id (sort {$a cmp $b} keys %{$an->data->{cache}{db_fh}})
{
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { id => $id }});
# For each 'row_uuid' we see;
# - Check if we've *seen* it before
# |- If not seen; See if it *exists* in the public schema yet.
# | |- If so, check to see if the entry in the public schema is up to date.
# | | \- If not, _UPDATE_ public schema.
# | \- If not, do an _INSERT_ into public schema.
# \- If we have seen, see if it exists at the current timestamp.
# \- If not, _INSERT_ it into history schema.
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
"db_data::${id}::${table}::${uuid_column}::${row_uuid}::seen" => $an->data->{db_data}{$id}{$table}{$uuid_column}{$row_uuid}{seen}
}});
if (not $an->data->{db_data}{$id}{$table}{$uuid_column}{$row_uuid}{seen})
{
# Mark this record as now having been seen.
$an->data->{db_data}{$id}{$table}{$uuid_column}{$row_uuid}{seen} = 1;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
"db_data::${id}::${table}::${uuid_column}::${row_uuid}::seen" => $an->data->{db_data}{$id}{$table}{$uuid_column}{$row_uuid}{seen}
}});
# Does it exist?
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
"db_data::${id}::${table}::${uuid_column}::${row_uuid}::exists" => $an->data->{db_data}{$id}{$table}{$uuid_column}{$row_uuid}{'exists'}
}});
if ($an->data->{db_data}{$id}{$table}{$uuid_column}{$row_uuid}{'exists'})
{
# It exists, but does it exist at this time stamp?
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
"db_data::${id}::${table}::modified_date::${modified_date}::${uuid_column}::${row_uuid}" => $an->data->{db_data}{$id}{$table}{modified_date}{$modified_date}{$uuid_column}{$row_uuid},
}});
if (not $an->data->{db_data}{$id}{$table}{modified_date}{$modified_date}{$uuid_column}{$row_uuid})
{
# No, so UPDATE it. We'll build the query now...
my $query = "UPDATE public.$table SET ";
foreach my $column_name (sort {$a cmp $b} keys %{$an->data->{db_data}{unified}{$table}{modified_date}{$modified_date}{$uuid_column}{$row_uuid}})
{
my $column_value = $an->data->{sys}{use_db_fh}->quote($an->data->{db_data}{unified}{$table}{modified_date}{$modified_date}{$uuid_column}{$row_uuid}{$column_name});
$column_value =~ s/'NULL'/NULL/g;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
column_name => $column_name,
column_value => $column_value,
}});
$query .= "$column_name = ".$an->data->{sys}{use_db_fh}->quote().", ";
}
$query .= "modified_date = ".$an->data->{sys}{use_db_fh}->quote($modified_date)." WHERE $uuid_column = ".$an->data->{sys}{use_db_fh}->quote($row_uuid).";";
$an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 2, key => "log_0074", variables => { id => $id, query => $query }});
# Now record the query in the array
push @{$an->data->{db_resync}{$id}{public}{sql}}, $query;
} # if not exists - timestamp
} # if exists
else
{
# It doesn't exist, so INSERT it. We need to
# build entries for the column names and
# values at the same time to make certain
# they're in the same order.
my $columns = "";
my $values = "";
foreach my $column_name (sort {$a cmp $b} keys %{$an->data->{db_data}{unified}{$table}{modified_date}{$modified_date}{$uuid_column}{$row_uuid}})
{
my $column_value = $an->data->{sys}{use_db_fh}->quote($an->data->{db_data}{unified}{$table}{modified_date}{$modified_date}{$uuid_column}{$row_uuid}{$column_name});
$column_value =~ s/'NULL'/NULL/g;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
column_name => $column_name,
column_value => $column_value,
}});
$columns .= $column_name.", ";
$values .= $column_value.", ";
}
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
columns => $columns,
'values' => $values,
}});
my $query = "INSERT INTO public.$table (".$uuid_column.", ".$columns."modified_date) VALUES (".$an->data->{sys}{use_db_fh}->quote($row_uuid).", ".$values.$an->data->{sys}{use_db_fh}->quote($modified_date).");";
$an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 2, key => "log_0074", variables => { id => $id, query => $query }});
# Now record the query in the array
push @{$an->data->{db_resync}{$id}{public}{sql}}, $query;
} # if not exists
} # if not seen
else
{
### NOTE: If the table doesn't have a history schema,
### we skip this.
next if $schema eq "public";
# We've seen this row_uuid before, so it is just a
# question of whether the entry for the current
# timestamp exists in the history schema.
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
"db_data::${id}::${table}::modified_date::${modified_date}::${uuid_column}::${row_uuid}" => $an->data->{db_data}{$id}{$table}{modified_date}{$modified_date}{$uuid_column}{$row_uuid},
}});
if (not $an->data->{db_data}{$id}{$table}{modified_date}{$modified_date}{$uuid_column}{$row_uuid})
{
# It hasn't been seen, so INSERT it. We need
# to build entries for the column names and
# values at the same time to make certain
# they're in the same order.
my $columns = "";
my $values = "";
foreach my $column_name (sort {$a cmp $b} keys %{$an->data->{db_data}{unified}{$table}{modified_date}{$modified_date}{$uuid_column}{$row_uuid}})
{
my $column_value = $an->data->{sys}{use_db_fh}->quote($an->data->{db_data}{unified}{$table}{modified_date}{$modified_date}{$uuid_column}{$row_uuid}{$column_name});
$column_value =~ s/'NULL'/NULL/g;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
column_name => $column_name,
column_value => $column_value,
}});
$columns .= $column_name.", ";
$values .= $column_value.", ";
}
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
columns => $columns,
'values' => $values,
}});
my $query = "INSERT INTO history.$table (".$uuid_column.", ".$columns."modified_date) VALUES (".$an->data->{sys}{use_db_fh}->quote($row_uuid).", ".$values.$an->data->{sys}{use_db_fh}->quote($modified_date).");";
$an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 2, key => "log_0074", variables => { id => $id, query => $query }});
# Now record the query in the array
push @{$an->data->{db_resync}{$id}{history}{sql}}, $query;
} # if not exists - timestamp
} # if seen
} # foreach $id
} # foreach $row_uuid
} # foreach $modified_date ...
# $an->Log->entry({log_level => 3, message_key => "an_variables_0002", message_variables => {
# name1 => "id", value1 => $id,
# name2 => "query", value2 => $query
# }, file => $THIS_FILE, line => __LINE__});
#
# # Do the query against the source DB and loop through the results.
# my $results = $an->DB->do_db_query({id => $id, query => $query, source => $THIS_FILE, line => __LINE__});
# my $count = @{$results};
# $an->Log->entry({log_level => 3, message_key => "an_variables_0002", message_variables => {
# name1 => "results", value1 => $results,
# name2 => "count", value2 => $count,
# }, file => $THIS_FILE, line => __LINE__});
# foreach my $row (@{$results})
# {
# my $host_uuid = $row->[0];
# my $host_location_uuid = $row->[1];
# my $host_name = $row->[2];
# my $host_type = $row->[3];
# my $host_emergency_stop = $row->[4] ? "TRUE" : "FALSE";
# my $host_stop_reason = $row->[5] ? $row->[5] : "";
# my $host_health = $row->[6] ? $row->[6] : "";
# my $modified_date = $row->[7];
# $an->Log->entry({log_level => 3, message_key => "an_variables_0008", message_variables => {
# name1 => "host_uuid", value1 => $host_uuid,
# name2 => "host_location_uuid", value2 => $host_location_uuid,
# name3 => "host_name", value3 => $host_name,
# name4 => "host_type", value4 => $host_type,
# name5 => "host_emergency_stop", value5 => $host_emergency_stop,
# name6 => "host_stop_reason", value6 => $host_stop_reason,
# name7 => "host_health", value7 => $host_health,
# name8 => "modified_date", value8 => $modified_date,
# }, file => $THIS_FILE, line => __LINE__});
#
# # Record this in the unified and local hashes.
# $an->data->{db_data}{unified}{hosts}{modified_date}{$modified_date}{host_uuid}{$host_uuid} = {
# host_location_uuid => $host_location_uuid,
# host_name => $host_name,
# host_type => $host_type,
# host_emergency_stop => $host_emergency_stop,
# host_stop_reason => $host_stop_reason,
# host_health => $host_health,
# };
#
# $an->data->{db_data}{$id}{hosts}{host_uuid}{$host_uuid}{'exists'} = 1;
# $an->data->{db_data}{$id}{hosts}{host_uuid}{$host_uuid}{seen} = 0;
# $an->data->{db_data}{$id}{hosts}{modified_date}{$modified_date}{host_uuid}{$host_uuid} = {
# host_location_uuid => $host_location_uuid,
# host_name => $host_name,
# host_type => $host_type,
# host_emergency_stop => $host_emergency_stop,
# host_stop_reason => $host_stop_reason,
# host_health => $host_health,
# };
# }
# Free up memory by deleting the DB data from the main hash.
delete $an->data->{db_data};
# Free some RAM.
delete $an->data->{db_resync}{$id}{sql};
delete $an->data->{db_resync}{$id}{public}{sql};
delete $an->data->{db_resync}{$id}{history}{sql};
# Do the INSERTs now and then release the memory.
foreach my $id (sort {$a cmp $b} keys %{$an->data->{cache}{db_fh}})
{
# Merge the queries for both schemas into one array, with public schema
# queries being first, then delete the arrays holding them to free memory
# before we start the resync.
my $merged = [];
@{$merged} = (@{$an->data->{db_resync}{$id}{public}{sql}}, @{$an->data->{db_resync}{$id}{history}{sql}});
undef $an->data->{db_resync}{$id}{public}{sql};
undef $an->data->{db_resync}{$id}{history}{sql};
if (@{$merged} > 0)
{
$an->Database->write({id => $id, query => $merged, source => $THIS_FILE, line => __LINE__});
undef $merged;
}
}
die;
}
}
@ -2934,10 +3021,7 @@ sub write
#if ($an->Log->db_transactions())
if (1)
{
$an->Log->entry({source => $source, line => $line, secure => $secure, level => 2, key => "log_0074", variables => {
id => $id,
query => $query,
}});
$an->Log->entry({source => $source, line => $line, secure => $secure, level => 2, key => "log_0074", variables => { id => $id, query => $query }});
}
if (not $an->data->{cache}{db_fh}{$id})

Loading…
Cancel
Save