From 21694716e822956400f4cdc378e9bbdc3058e9c0 Mon Sep 17 00:00:00 2001 From: Digimer Date: Sun, 20 Aug 2017 03:36:33 -0400 Subject: [PATCH] * Made more progress on the automatic DB resync code. It now pulls the table columns and reads the data from the DB. Signed-off-by: Digimer --- AN/Tools/Database.pm | 233 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 211 insertions(+), 22 deletions(-) diff --git a/AN/Tools/Database.pm b/AN/Tools/Database.pm index 5b9404e0..d0649965 100755 --- a/AN/Tools/Database.pm +++ b/AN/Tools/Database.pm @@ -1019,6 +1019,9 @@ sub connect source => $source, tables => $tables, }); + + ### TEMP + $an->Database->resync_databases; } die; @@ -1033,7 +1036,7 @@ sub connect $an->Database->archive_databases({}); # Sync the database, if needed. - $an->Database->resync_databases({tables => $tables}); + $an->Database->resync_databases; # Add ourselves to the database, if needed. $an->Database->insert_or_update_hosts; @@ -2497,8 +2500,184 @@ sub resync_databases my $an = $self->parent; $an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 3, key => "log_0125", variables => { method => "Database->resync_databases()" }}); - # Get a list if tables. Note that we'll only sync a given table with peers that have the same table. - my $table_array = ref($parameter->{tables}) eq "ARRAY" ? $parameter->{tables} : []; + # If a resync isn't needed, just return. + if (not $an->data->{sys}{database}{resync_needed}) + { + return(0); + } + + ### NOTE: Don't sort this array, we need to resync in the order that the user passed the tables to us + ### to avoid trouble with primary/foreign keys. + # We're going to use the array of tables assembles by _find_behind_databases() stored in 'sys::database::check_tables' + foreach my $table (@{$an->data->{sys}{database}{check_tables}}) + { + # If the 'schema' is 'public', there is no table in the history schema. If there is a host + # column, the resync will be restricted to entries from this host uuid. + my $schema = $an->data->{sys}{database}{table}{$table}{schema}; + my $host_column = $an->data->{sys}{database}{table}{$table}{host_column}; + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { + table => $table, + schema => $schema, + host_column => $host_column, + }}); + + # Get all the columns in this table. + my $query = "SELECT column_name, is_nullable, data_type FROM information_schema.columns WHERE table_schema = ".$an->data->{sys}{use_db_fh}->quote($schema)." AND table_name = ".$an->data->{sys}{use_db_fh}->quote($table)." AND column_name != 'history_id';"; + $an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 3, key => "log_0124", variables => { query => $query }}); + + my $results = $an->Database->query({query => $query, source => $THIS_FILE, line => __LINE__}); + my $count = @{$results}; + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { + results => $results, + count => $count, + }}); + foreach my $row (@{$results}) + { + my $column_name = $row->[0]; + my $not_null = $row->[1] eq "NO" ? 1 : 0; + my $data_type = $row->[2]; + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { + column_name => $column_name, + not_null => $not_null, + data_type => $data_type, + }}); + + $an->data->{sys}{database}{table}{$table}{column}{$column_name}{not_null} = $not_null; + $an->data->{sys}{database}{table}{$table}{column}{$column_name}{data_type} = $data_type; + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { + "sys::database::table::${table}::column::${column_name}::not_null" => $an->data->{sys}{database}{table}{$table}{column}{$column_name}{not_null}, + "sys::database::table::${table}::column::${column_name}::data_type" => $an->data->{sys}{database}{table}{$table}{column}{$column_name}{data_type}, + }}); + } + + # Now read in the data from the different databases. + foreach my $id (sort {$a cmp $b} keys %{$an->data->{database}}) + { + # ... + $an->data->{db_resync}{$id}{sql} = []; + $an->data->{db_resync}{$id}{public}{sql} = []; + $an->data->{db_resync}{$id}{history}{sql} = []; + + # Read in the history schema + my $query = "SELECT "; + my $read_columns = []; + foreach my $column_name (sort {$a cmp $b} keys %{$an->data->{sys}{database}{table}{$table}{column}}) + { + # We'll skip the host column as we'll use it in the conditional. + next if $column_name eq $host_column; + next if $column_name eq "modified_date"; + $query .= $column_name.", "; + + push @{$read_columns}, $column_name; + } + + # Manually add modified_date. + push @{$read_columns}, "modified_date"; + $query .= "modified_date FROM ".$schema.".".$table." "; + + # Restrict to this host if a host column was found. + if ($host_column) + { + $query .= "WHERE ".$host_column." = ".$an->data->{sys}{use_db_fh}->quote($an->data->{sys}{host_uuid}); + } + $query .= ";"; + $an->Log->entry({level => 2, key => "log_0074", variables => { id => $id, query => $query }}); + + $an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 3, key => "log_0124", variables => { query => $query }}); + + my $results = $an->Database->query({id => $id, query => $query, source => $THIS_FILE, line => __LINE__}); + my $count = @{$results}; + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { + results => $results, + count => $count, + }}); + next if not $count; + + foreach my $row (@{$results}) + { + for (my $i = 0; $i < @{$read_columns}; $i++) + { + my $column_name = $read_columns->[$i]; + my $column_value = defined $row->[$i] ? $row->[$i] : "NULL"; + my $not_null = $an->data->{sys}{database}{table}{$table}{column}{$column_name}{not_null}; + my $data_type = $an->data->{sys}{database}{table}{$table}{column}{$column_name}{data_type}; + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { + column_name => $column_name, + column_value => $column_value, + not_null => $not_null, + data_type => $data_type, + }}); + if ((not $not_null) && ($column_value eq "NULL")) + { + $column_value = ""; + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { column_value => $column_value }}); + } + } + } + die; + + +# $an->Log->entry({log_level => 3, message_key => "an_variables_0002", message_variables => { +# name1 => "id", value1 => $id, +# name2 => "query", value2 => $query +# }, file => $THIS_FILE, line => __LINE__}); +# +# # Do the query against the source DB and loop through the results. +# my $results = $an->DB->do_db_query({id => $id, query => $query, source => $THIS_FILE, line => __LINE__}); +# my $count = @{$results}; +# $an->Log->entry({log_level => 3, message_key => "an_variables_0002", message_variables => { +# name1 => "results", value1 => $results, +# name2 => "count", value2 => $count, +# }, file => $THIS_FILE, line => __LINE__}); +# foreach my $row (@{$results}) +# { +# my $host_uuid = $row->[0]; +# my $host_location_uuid = $row->[1]; +# my $host_name = $row->[2]; +# my $host_type = $row->[3]; +# my $host_emergency_stop = $row->[4] ? "TRUE" : "FALSE"; +# my $host_stop_reason = $row->[5] ? $row->[5] : ""; +# my $host_health = $row->[6] ? $row->[6] : ""; +# my $modified_date = $row->[7]; +# $an->Log->entry({log_level => 3, message_key => "an_variables_0008", message_variables => { +# name1 => "host_uuid", value1 => $host_uuid, +# name2 => "host_location_uuid", value2 => $host_location_uuid, +# name3 => "host_name", value3 => $host_name, +# name4 => "host_type", value4 => $host_type, +# name5 => "host_emergency_stop", value5 => $host_emergency_stop, +# name6 => "host_stop_reason", value6 => $host_stop_reason, +# name7 => "host_health", value7 => $host_health, +# name8 => "modified_date", value8 => $modified_date, +# }, file => $THIS_FILE, line => __LINE__}); +# +# # Record this in the unified and local hashes. +# $an->data->{db_data}{unified}{hosts}{modified_date}{$modified_date}{host_uuid}{$host_uuid} = { +# host_location_uuid => $host_location_uuid, +# host_name => $host_name, +# host_type => $host_type, +# host_emergency_stop => $host_emergency_stop, +# host_stop_reason => $host_stop_reason, +# host_health => $host_health, +# }; +# +# $an->data->{db_data}{$id}{hosts}{host_uuid}{$host_uuid}{'exists'} = 1; +# $an->data->{db_data}{$id}{hosts}{host_uuid}{$host_uuid}{seen} = 0; +# $an->data->{db_data}{$id}{hosts}{modified_date}{$modified_date}{host_uuid}{$host_uuid} = { +# host_location_uuid => $host_location_uuid, +# host_name => $host_name, +# host_type => $host_type, +# host_emergency_stop => $host_emergency_stop, +# host_stop_reason => $host_stop_reason, +# host_health => $host_health, +# }; +# } + + # Free some RAM. + delete $an->data->{db_resync}{$id}{sql}; + delete $an->data->{db_resync}{$id}{public}{sql}; + delete $an->data->{db_resync}{$id}{history}{sql}; + } + } # Show tables; # SELECT table_schema, table_name FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_schema NOT IN ('pg_catalog', 'information_schema') ORDER BY table_name ASC, table_schema DESC; @@ -2757,25 +2936,30 @@ sub _find_behind_databases # Now, look through the core tables, plus any tables the user might have passed, for differing # 'modified_date' entries, or no entries in one DB with entries in the other (as can happen with a # newly setup db). - my $check_tables = []; + $an->data->{sys}{database}{check_tables} = []; foreach my $table (@{$an->data->{sys}{database}{core_tables}}) { $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { table => $table }}); - push @{$check_tables}, $table; + push @{$an->data->{sys}{database}{check_tables}}, $table; } if (ref($tables) eq "ARRAY") { foreach my $table (@{$tables}) { $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { table => $table }}); - push @{$check_tables}, $table; - - # This will store the most recent time stamp. - $an->data->{sys}{database}{table}{$table}{last_updated} = 0; - + push @{$an->data->{sys}{database}{check_tables}}, $table; } } + # Preset all tables to have an initial 'modified_date' of 0. + foreach my $table (sort {$a cmp $b} @{$an->data->{sys}{database}{check_tables}}) + { + $an->data->{sys}{database}{table}{$table}{last_updated} = 0; + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { + "sys::database::table::${table}::last_updated" => $an->data->{sys}{database}{table}{$table}{last_updated}, + }}); + } + # Look at all the databases and find the most recent time stamp (and the ID of the DB). my $source_updated_time = 0; foreach my $id (sort {$a cmp $b} keys %{$an->data->{database}}) @@ -2791,7 +2975,7 @@ sub _find_behind_databases # Loop through the tables in this DB. For each table, we'll record the most recent time # stamp. Later, We'll look through again and any table/DB with an older time stamp will be # behind and a resync will be needed. - foreach my $table (@{$check_tables}) + foreach my $table (@{$an->data->{sys}{database}{check_tables}}) { # Does this table exist yet? my $query = "SELECT COUNT(*) FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_schema = 'public' AND table_name = ".$an->data->{sys}{use_db_fh}->quote($table).";"; @@ -2842,18 +3026,23 @@ ORDER BY my $last_updated = $an->Database->query({id => $id, query => $query, source => $THIS_FILE, line => __LINE__})->[0]->[0]; $last_updated = 0 if not defined $last_updated; - # Record this table's last modified_date for later comparison. + # Record this table's last modified_date for later comparison. We'll also + # record the schema and host column, if found, to save looking the same thing + # up later if we do need a resync. $an->data->{sys}{database}{table}{$table}{id}{$id}{last_updated} = $last_updated; - $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { + $an->data->{sys}{database}{table}{$table}{schema} = $schema; + $an->data->{sys}{database}{table}{$table}{host_column} = $host_column; + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { "sys::database::table::${table}::id::${id}::last_updated" => $an->data->{sys}{database}{table}{$table}{id}{$id}{last_updated}, "sys::database::table::${table}::last_updated" => $an->data->{sys}{database}{table}{$table}{last_updated}, - + "sys::database::table::${table}::schema" => $an->data->{sys}{database}{table}{$table}{schema}, + "sys::database::table::${table}::host_column" => $an->data->{sys}{database}{table}{$table}{host_column}, }}); if ($an->data->{sys}{database}{table}{$table}{id}{$id}{last_updated} > $an->data->{sys}{database}{table}{$table}{last_updated}) { $an->data->{sys}{database}{table}{$table}{last_updated} = $an->data->{sys}{database}{table}{$table}{id}{$id}{last_updated}; - $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { "sys::database::table::${table}::last_updated" => $an->data->{sys}{database}{table}{$table}{last_updated}, }}); } @@ -2865,12 +3054,12 @@ ORDER BY # databases. If it has, trigger a resync. foreach my $table (sort {$a cmp $b} keys %{$an->data->{sys}{database}{table}}) { - $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { "sys::database::table::${table}::last_updated" => $an->data->{sys}{database}{table}{$table}{last_updated}, }}); foreach my $id (sort {$a cmp $b} keys %{$an->data->{sys}{database}{table}{$table}{id}}) { - $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { "sys::database::table::${table}::id::${id}::last_updated" => $an->data->{sys}{database}{table}{$table}{id}{$id}{last_updated}, }}); if ($an->data->{sys}{database}{table}{$table}{last_updated} > $an->data->{sys}{database}{table}{$table}{id}{$id}{last_updated}) @@ -2908,30 +3097,30 @@ sub _mark_database_as_behind $an->Log->entry({source => $THIS_FILE, line => __LINE__, level => 3, key => "log_0125", variables => { method => "Database->_mark_database_as_behind()" }}); my $id = $parameter->{id} ? $parameter->{id} : ""; - $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { id => $id }}); + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { id => $id }}); $an->data->{sys}{database}{to_update}{$id}{behind} = 1; $an->data->{sys}{database}{resync_needed} = 1; - $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { "sys::database::to_update::${id}::behind" => $an->data->{sys}{database}{to_update}{$id}{behind}, "sys::database::resync_needed" => $an->data->{sys}{database}{resync_needed}, }}); # We can't trust this database for reads, so switch to another database for reads if # necessary. - $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { id => $id, "sys::read_db_id" => $an->data->{sys}{read_db_id}, }}); if ($id eq $an->data->{sys}{read_db_id}) { # Switch. - $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { ">> sys::read_db_id" => $an->data->{sys}{read_db_id} }}); + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { ">> sys::read_db_id" => $an->data->{sys}{read_db_id} }}); foreach my $this_id (sort {$a cmp $b} keys %{$an->data->{database}}) { next if $this_id eq $id; $an->data->{sys}{read_db_id} = $this_id; - $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 2, list => { "<< sys::read_db_id" => $an->data->{sys}{read_db_id} }}); + $an->Log->variables({source => $THIS_FILE, line => __LINE__, level => 3, list => { "<< sys::read_db_id" => $an->data->{sys}{read_db_id} }}); last; } }