I was recently brought into a data migration project that required copying all of the data from two schemas in one database to the public schema in another database. To make this more interesting, a few of the tables were in both of the schemas in the source database and I would need to merge them into the target database.
My first step was to copy the raw souce data into the target database. Once the source data was migrated and merged into the target database I needed to verify that the data was complete and nothing was lost. A quick check of row counts showed that they matched across the schemas but I needed to verify that the data was complete.
One option was comparing the actual data in the tables. This turned out to be problematic because of the nullable columns. If I concatenated them into a TEXT string for easy comparison, a null value would nullify the whole string. I could have gotten past this by wrapping the nullable columns in a COALESCE function call but that was too much work. To keep things simple I decided to generate and compare checksums of the data.
First, I created a query to get a list of the tables I cared about from information_schema.tables. For each of these tables I queried the definition of the primary key from information_schema.table_constraints and information_schema.key_column_usage. Using this information I constructed a SELECT statement that would return the primary key value and a MD5 checksum of the entire record for every row in the table. I sent the table name, primary key value and checksum to output using RAISE INFO statement.
Unfortunately this didn’t really work. The result was millions of lines of text that needed to be compared to find the differences. My compare tool (Notepad++ Compare Plugin) choked on the volume of data. I am sure that it would have eventually returned results but I was not willing to wait hours or days for them.
At this point I added the schema name to the data elements I was collecting and INSERTed them all into a table in the database. I was then able to write a query that would find and DELETE the matching rows, leaving behind only the unmatched rows. This allowed me to identify a couple of dozen rows that I tracked back to the few tables that existed in both of the source schemas. Success!
Miscellaneous Notes
- In order for this to work, every table being migrated has to have a primary key. Since that is our standard this was not a problem.
- To avoid the need to use dblink or foreign data wrappers I bulk copied the source data into the target database by piping the output of a pg_dump command into psql.
- After the data was successfully migrated to the target database I renamed the schemas in the source database so the application could not find them.
The final version of the script I used can be found below. I am sure there are things I could have done better but it got the job done. I created and executed a stored procedure so that I could control when COMMITs happened.
CREATE OR REPLACE PROCEDURE ipg_validate_data_with_checksums() AS $$ DECLARE table_rec RECORD; pkey_rec RECORD; cols_rec RECORD; data_rec RECORD; data_query TEXT; order_by TEXT; order_pad TEXT; pkey_cols TEXT; pkey_pad TEXT; BEGIN CREATE TABLE IF NOT EXISTS ipg_checksum_data ( table_schema TEXT NOT NULL, table_name TEXT NOT NULL, pk_value TEXT NOT NULL, checksum TEXT NOT NULL, PRIMARY KEY (table_schema,table_name,pk_value,checksum) ); COMMIT; TRUNCATE TABLE ipg_checksum_data; COMMIT; FOR table_rec IN SELECT * FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_schema IN ('public','source_1','source_2') ORDER BY table_schema ASC, table_name ASC LOOP RAISE INFO 'table_rec: %',table_rec; SELECT * FROM information_schema.table_constraints WHERE table_schema = table_rec.table_schema AND table_name = table_rec.table_name AND constraint_type = 'PRIMARY KEY' INTO pkey_rec; RAISE INFO 'pkey_rec: %',pkey_rec; pkey_cols = ''; order_by = ''; pkey_pad = ''; order_pad = ''; FOR cols_rec IN SELECT * FROM information_schema.key_column_usage WHERE table_schema = pkey_rec.table_schema AND table_name = pkey_rec.table_name AND constraint_name = pkey_rec.constraint_name ORDER BY ordinal_position ASC LOOP RAISE INFO 'cols_rec: %',cols_rec; order_by = order_by||order_pad||cols_rec.column_name||' ASC'; pkey_cols = pkey_cols||pkey_pad||cols_rec.column_name; order_pad = ','; pkey_pad = '||'; END LOOP; data_query = 'SELECT '||pkey_cols||'::TEXT AS pk_value, * FROM '|| pkey_rec.table_schema||'.'||pkey_rec.table_name|| ' ORDER BY '||order_by; RAISE INFO 'data_query: %',data_query; IF data_query IS NOT NULL THEN FOR data_rec IN EXECUTE( data_query ) LOOP --RAISE INFO 'data_rec: %,%,%,%','pkey_rec.table_schema',pkey_rec.table_name,data_rec.pkey,MD5(data_rec::TEXT); INSERT INTO ipg_checksum_data(table_schema,table_name,pk_value,checksum) VALUES(pkey_rec.table_schema,pkey_rec.table_name,data_rec.pk_value,MD5(data_rec::TEXT)); END LOOP; COMMIT; ELSE RAISE INFO 'Resulting query is null for %.%',pkey_rec.table_schema,pkey_rec.table_name; RETURN; END IF; END LOOP; END $$ LANGUAGE plpgsql; CALL ipg_validate_data_with_checksums();