* Made more progress on the automatic DB resync code. It now pulls the table columns and reads the data from the DB.

Signed-off-by: Digimer <digimer@alteeve.ca>
main
Digimer 7 years ago
parent ce607238f8
commit 21694716e8
  1. 233
      AN/Tools/Database.pm

@ -1019,6 +1019,9 @@ sub connect
source => $source,
tables => $tables,
});
### TEMP
$an->Database->resync_databases;
}
die;
@ -1033,7 +1036,7 @@ sub connect
$an->Database->archive_databases({});
# Sync the database, if needed.
$an->Database->resync_databases({tables => $tables});
$an->Database->resync_databases;
# Add ourselves to the database, if needed.
$an->Database->insert_or_update_hosts;
@ -2497,8 +2500,184 @@ sub resync_databases
my $an = $self->parent;
$an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 3, key => "log_0125", variables => { method => "Database->resync_databases()" }});
# Get a list if tables. Note that we'll only sync a given table with peers that have the same table.
my $table_array = ref($parameter->{tables}) eq "ARRAY" ? $parameter->{tables} : [];
# If a resync isn't needed, just return.
if (not $an->data->{sys}{database}{resync_needed})
{
return(0);
}
### NOTE: Don't sort this array, we need to resync in the order that the user passed the tables to us
### to avoid trouble with primary/foreign keys.
# We're going to use the array of tables assembles by _find_behind_databases() stored in 'sys::database::check_tables'
foreach my $table (@{$an->data->{sys}{database}{check_tables}})
{
# If the 'schema' is 'public', there is no table in the history schema. If there is a host
# column, the resync will be restricted to entries from this host uuid.
my $schema = $an->data->{sys}{database}{table}{$table}{schema};
my $host_column = $an->data->{sys}{database}{table}{$table}{host_column};
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
table => $table,
schema => $schema,
host_column => $host_column,
}});
# Get all the columns in this table.
my $query = "SELECT column_name, is_nullable, data_type FROM information_schema.columns WHERE table_schema = ".$an->data->{sys}{use_db_fh}->quote($schema)." AND table_name = ".$an->data->{sys}{use_db_fh}->quote($table)." AND column_name != 'history_id';";
$an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 3, key => "log_0124", variables => { query => $query }});
my $results = $an->Database->query({query => $query, source => $THIS_FILE, line => __LINE__});
my $count = @{$results};
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
results => $results,
count => $count,
}});
foreach my $row (@{$results})
{
my $column_name = $row->[0];
my $not_null = $row->[1] eq "NO" ? 1 : 0;
my $data_type = $row->[2];
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
column_name => $column_name,
not_null => $not_null,
data_type => $data_type,
}});
$an->data->{sys}{database}{table}{$table}{column}{$column_name}{not_null} = $not_null;
$an->data->{sys}{database}{table}{$table}{column}{$column_name}{data_type} = $data_type;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
"sys::database::table::${table}::column::${column_name}::not_null" => $an->data->{sys}{database}{table}{$table}{column}{$column_name}{not_null},
"sys::database::table::${table}::column::${column_name}::data_type" => $an->data->{sys}{database}{table}{$table}{column}{$column_name}{data_type},
}});
}
# Now read in the data from the different databases.
foreach my $id (sort {$a cmp $b} keys %{$an->data->{database}})
{
# ...
$an->data->{db_resync}{$id}{sql} = [];
$an->data->{db_resync}{$id}{public}{sql} = [];
$an->data->{db_resync}{$id}{history}{sql} = [];
# Read in the history schema
my $query = "SELECT ";
my $read_columns = [];
foreach my $column_name (sort {$a cmp $b} keys %{$an->data->{sys}{database}{table}{$table}{column}})
{
# We'll skip the host column as we'll use it in the conditional.
next if $column_name eq $host_column;
next if $column_name eq "modified_date";
$query .= $column_name.", ";
push @{$read_columns}, $column_name;
}
# Manually add modified_date.
push @{$read_columns}, "modified_date";
$query .= "modified_date FROM ".$schema.".".$table." ";
# Restrict to this host if a host column was found.
if ($host_column)
{
$query .= "WHERE ".$host_column." = ".$an->data->{sys}{use_db_fh}->quote($an->data->{sys}{host_uuid});
}
$query .= ";";
$an->Log->entry({level => 2, key => "log_0074", variables => { id => $id, query => $query }});
$an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 3, key => "log_0124", variables => { query => $query }});
my $results = $an->Database->query({id => $id, query => $query, source => $THIS_FILE, line => __LINE__});
my $count = @{$results};
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
results => $results,
count => $count,
}});
next if not $count;
foreach my $row (@{$results})
{
for (my $i = 0; $i < @{$read_columns}; $i++)
{
my $column_name = $read_columns->[$i];
my $column_value = defined $row->[$i] ? $row->[$i] : "NULL";
my $not_null = $an->data->{sys}{database}{table}{$table}{column}{$column_name}{not_null};
my $data_type = $an->data->{sys}{database}{table}{$table}{column}{$column_name}{data_type};
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
column_name => $column_name,
column_value => $column_value,
not_null => $not_null,
data_type => $data_type,
}});
if ((not $not_null) && ($column_value eq "NULL"))
{
$column_value = "";
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { column_value => $column_value }});
}
}
}
die;
# $an->Log->entry({log_level => 3, message_key => "an_variables_0002", message_variables => {
# name1 => "id", value1 => $id,
# name2 => "query", value2 => $query
# }, file => $THIS_FILE, line => __LINE__});
#
# # Do the query against the source DB and loop through the results.
# my $results = $an->DB->do_db_query({id => $id, query => $query, source => $THIS_FILE, line => __LINE__});
# my $count = @{$results};
# $an->Log->entry({log_level => 3, message_key => "an_variables_0002", message_variables => {
# name1 => "results", value1 => $results,
# name2 => "count", value2 => $count,
# }, file => $THIS_FILE, line => __LINE__});
# foreach my $row (@{$results})
# {
# my $host_uuid = $row->[0];
# my $host_location_uuid = $row->[1];
# my $host_name = $row->[2];
# my $host_type = $row->[3];
# my $host_emergency_stop = $row->[4] ? "TRUE" : "FALSE";
# my $host_stop_reason = $row->[5] ? $row->[5] : "";
# my $host_health = $row->[6] ? $row->[6] : "";
# my $modified_date = $row->[7];
# $an->Log->entry({log_level => 3, message_key => "an_variables_0008", message_variables => {
# name1 => "host_uuid", value1 => $host_uuid,
# name2 => "host_location_uuid", value2 => $host_location_uuid,
# name3 => "host_name", value3 => $host_name,
# name4 => "host_type", value4 => $host_type,
# name5 => "host_emergency_stop", value5 => $host_emergency_stop,
# name6 => "host_stop_reason", value6 => $host_stop_reason,
# name7 => "host_health", value7 => $host_health,
# name8 => "modified_date", value8 => $modified_date,
# }, file => $THIS_FILE, line => __LINE__});
#
# # Record this in the unified and local hashes.
# $an->data->{db_data}{unified}{hosts}{modified_date}{$modified_date}{host_uuid}{$host_uuid} = {
# host_location_uuid => $host_location_uuid,
# host_name => $host_name,
# host_type => $host_type,
# host_emergency_stop => $host_emergency_stop,
# host_stop_reason => $host_stop_reason,
# host_health => $host_health,
# };
#
# $an->data->{db_data}{$id}{hosts}{host_uuid}{$host_uuid}{'exists'} = 1;
# $an->data->{db_data}{$id}{hosts}{host_uuid}{$host_uuid}{seen} = 0;
# $an->data->{db_data}{$id}{hosts}{modified_date}{$modified_date}{host_uuid}{$host_uuid} = {
# host_location_uuid => $host_location_uuid,
# host_name => $host_name,
# host_type => $host_type,
# host_emergency_stop => $host_emergency_stop,
# host_stop_reason => $host_stop_reason,
# host_health => $host_health,
# };
# }
# Free some RAM.
delete $an->data->{db_resync}{$id}{sql};
delete $an->data->{db_resync}{$id}{public}{sql};
delete $an->data->{db_resync}{$id}{history}{sql};
}
}
# Show tables;
# SELECT table_schema, table_name FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_schema NOT IN ('pg_catalog', 'information_schema') ORDER BY table_name ASC, table_schema DESC;
@ -2757,25 +2936,30 @@ sub _find_behind_databases
# Now, look through the core tables, plus any tables the user might have passed, for differing
# 'modified_date' entries, or no entries in one DB with entries in the other (as can happen with a
# newly setup db).
my $check_tables = [];
$an->data->{sys}{database}{check_tables} = [];
foreach my $table (@{$an->data->{sys}{database}{core_tables}})
{
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { table => $table }});
push @{$check_tables}, $table;
push @{$an->data->{sys}{database}{check_tables}}, $table;
}
if (ref($tables) eq "ARRAY")
{
foreach my $table (@{$tables})
{
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { table => $table }});
push @{$check_tables}, $table;
# This will store the most recent time stamp.
$an->data->{sys}{database}{table}{$table}{last_updated} = 0;
push @{$an->data->{sys}{database}{check_tables}}, $table;
}
}
# Preset all tables to have an initial 'modified_date' of 0.
foreach my $table (sort {$a cmp $b} @{$an->data->{sys}{database}{check_tables}})
{
$an->data->{sys}{database}{table}{$table}{last_updated} = 0;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
"sys::database::table::${table}::last_updated" => $an->data->{sys}{database}{table}{$table}{last_updated},
}});
}
# Look at all the databases and find the most recent time stamp (and the ID of the DB).
my $source_updated_time = 0;
foreach my $id (sort {$a cmp $b} keys %{$an->data->{database}})
@ -2791,7 +2975,7 @@ sub _find_behind_databases
# Loop through the tables in this DB. For each table, we'll record the most recent time
# stamp. Later, We'll look through again and any table/DB with an older time stamp will be
# behind and a resync will be needed.
foreach my $table (@{$check_tables})
foreach my $table (@{$an->data->{sys}{database}{check_tables}})
{
# Does this table exist yet?
my $query = "SELECT COUNT(*) FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_schema = 'public' AND table_name = ".$an->data->{sys}{use_db_fh}->quote($table).";";
@ -2842,18 +3026,23 @@ ORDER BY
my $last_updated = $an->Database->query({id => $id, query => $query, source => $THIS_FILE, line => __LINE__})->[0]->[0];
$last_updated = 0 if not defined $last_updated;
# Record this table's last modified_date for later comparison.
# Record this table's last modified_date for later comparison. We'll also
# record the schema and host column, if found, to save looking the same thing
# up later if we do need a resync.
$an->data->{sys}{database}{table}{$table}{id}{$id}{last_updated} = $last_updated;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
$an->data->{sys}{database}{table}{$table}{schema} = $schema;
$an->data->{sys}{database}{table}{$table}{host_column} = $host_column;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
"sys::database::table::${table}::id::${id}::last_updated" => $an->data->{sys}{database}{table}{$table}{id}{$id}{last_updated},
"sys::database::table::${table}::last_updated" => $an->data->{sys}{database}{table}{$table}{last_updated},
"sys::database::table::${table}::schema" => $an->data->{sys}{database}{table}{$table}{schema},
"sys::database::table::${table}::host_column" => $an->data->{sys}{database}{table}{$table}{host_column},
}});
if ($an->data->{sys}{database}{table}{$table}{id}{$id}{last_updated} > $an->data->{sys}{database}{table}{$table}{last_updated})
{
$an->data->{sys}{database}{table}{$table}{last_updated} = $an->data->{sys}{database}{table}{$table}{id}{$id}{last_updated};
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
"sys::database::table::${table}::last_updated" => $an->data->{sys}{database}{table}{$table}{last_updated},
}});
}
@ -2865,12 +3054,12 @@ ORDER BY
# databases. If it has, trigger a resync.
foreach my $table (sort {$a cmp $b} keys %{$an->data->{sys}{database}{table}})
{
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
"sys::database::table::${table}::last_updated" => $an->data->{sys}{database}{table}{$table}{last_updated},
}});
foreach my $id (sort {$a cmp $b} keys %{$an->data->{sys}{database}{table}{$table}{id}})
{
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
"sys::database::table::${table}::id::${id}::last_updated" => $an->data->{sys}{database}{table}{$table}{id}{$id}{last_updated},
}});
if ($an->data->{sys}{database}{table}{$table}{last_updated} > $an->data->{sys}{database}{table}{$table}{id}{$id}{last_updated})
@ -2908,30 +3097,30 @@ sub _mark_database_as_behind
$an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 3, key => "log_0125", variables => { method => "Database->_mark_database_as_behind()" }});
my $id = $parameter->{id} ? $parameter->{id} : "";
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { id => $id }});
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { id => $id }});
$an->data->{sys}{database}{to_update}{$id}{behind} = 1;
$an->data->{sys}{database}{resync_needed} = 1;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
"sys::database::to_update::${id}::behind" => $an->data->{sys}{database}{to_update}{$id}{behind},
"sys::database::resync_needed" => $an->data->{sys}{database}{resync_needed},
}});
# We can't trust this database for reads, so switch to another database for reads if
# necessary.
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => {
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => {
id => $id,
"sys::read_db_id" => $an->data->{sys}{read_db_id},
}});
if ($id eq $an->data->{sys}{read_db_id})
{
# Switch.
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { ">> sys::read_db_id" => $an->data->{sys}{read_db_id} }});
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { ">> sys::read_db_id" => $an->data->{sys}{read_db_id} }});
foreach my $this_id (sort {$a cmp $b} keys %{$an->data->{database}})
{
next if $this_id eq $id;
$an->data->{sys}{read_db_id} = $this_id;
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "<< sys::read_db_id" => $an->data->{sys}{read_db_id} }});
$an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { "<< sys::read_db_id" => $an->data->{sys}{read_db_id} }});
last;
}
}

Loading…
Cancel
Save