I was recently brought into a data migration project that required copying all of the data from two schemas in one database to the public schema in another database. To make this more interesting, a few of the tables were in both of the schemas in the source database and I would need to merge them into the target database.

My first step was to copy the raw souce data into the target database. Once the source data was migrated and merged into the target database I needed to verify that the data was complete and nothing was lost. A quick check of row counts showed that they matched across the schemas but I needed to verify that the data was complete.

One option was comparing the actual data in the tables. This turned out to be problematic because of the nullable columns. If I concatenated them into a TEXT string for easy comparison, a null value would nullify the whole string. I could have gotten past this by wrapping the nullable columns in a COALESCE function call but that was too much work. To keep things simple I decided to generate and compare checksums of the data.

First, I created a query to get a list of the tables I cared about from information_schema.tables. For each of these tables I queried the definition of the primary key from information_schema.table_constraints and information_schema.key_column_usage. Using this information I constructed a SELECT statement that would return the primary key value and a MD5 checksum of the entire record for every row in the table. I sent the table name, primary key value and checksum to output using RAISE INFO statement.

Unfortunately this didn’t really work. The result was millions of lines of text that needed to be compared to find the differences. My compare tool (Notepad++ Compare Plugin) choked on the volume of data. I am sure that it would have eventually returned results but I was not willing to wait hours or days for them.

At this point I added the schema name to the data elements I was collecting and INSERTed them all into a table in the database. I was then able to write a query that would find and DELETE the matching rows, leaving behind only the unmatched rows. This allowed me to identify a couple of dozen rows that I tracked back to the few tables that existed in both of the source schemas. Success!

Miscellaneous Notes

  • In order for this to work, every table being migrated has to have a primary key. Since that is our standard this was not a problem.
  • To avoid the need to use dblink or foreign data wrappers I bulk copied the source data into the target database by piping the output of a pg_dump command into psql.
  • After the data was successfully migrated to the target database I renamed the schemas in the source database so the application could not find them.

The final version of the script I used can be found below. I am sure there are things I could have done better but it got the job done. I created and executed a stored procedure so that I could control when COMMITs happened.

CREATE OR REPLACE PROCEDURE ipg_validate_data_with_checksums()
AS $$

DECLARE

    table_rec           RECORD;
    pkey_rec            RECORD;
    cols_rec            RECORD;
    data_rec            RECORD;
    data_query          TEXT;
    order_by            TEXT;
    order_pad           TEXT;
    pkey_cols           TEXT;
    pkey_pad            TEXT;

BEGIN

    CREATE TABLE IF NOT EXISTS ipg_checksum_data (
        table_schema        TEXT NOT NULL,
        table_name          TEXT NOT NULL,
        pk_value            TEXT NOT NULL,
        checksum            TEXT NOT NULL,
        PRIMARY KEY (table_schema,table_name,pk_value,checksum)
    );
    COMMIT;
    TRUNCATE TABLE ipg_checksum_data;
    COMMIT;
    
    FOR table_rec IN SELECT * 
                     FROM information_schema.tables 
                     WHERE table_type = 'BASE TABLE' 
                     AND table_schema IN ('public','source_1','source_2')
                     ORDER BY table_schema ASC, table_name ASC
    LOOP
    
        RAISE INFO 'table_rec: %',table_rec;
        SELECT * 
        FROM information_schema.table_constraints 
        WHERE table_schema = table_rec.table_schema 
        AND table_name = table_rec.table_name 
        AND constraint_type = 'PRIMARY KEY' 
        INTO pkey_rec;
        RAISE INFO 'pkey_rec: %',pkey_rec;
        
        pkey_cols = '';
        order_by = '';
        pkey_pad = '';
        order_pad = '';
        
        FOR cols_rec IN SELECT * 
                        FROM information_schema.key_column_usage 
                        WHERE table_schema = pkey_rec.table_schema
                        AND table_name = pkey_rec.table_name
                        AND constraint_name = pkey_rec.constraint_name 
                        ORDER BY ordinal_position ASC
        LOOP
        
            RAISE INFO 'cols_rec: %',cols_rec;
            order_by = order_by||order_pad||cols_rec.column_name||' ASC';
            pkey_cols = pkey_cols||pkey_pad||cols_rec.column_name;
            order_pad = ',';
            pkey_pad = '||';
            
        END LOOP;
        
        data_query = 'SELECT '||pkey_cols||'::TEXT AS pk_value, * FROM '||
                                   pkey_rec.table_schema||'.'||pkey_rec.table_name||
                                   ' ORDER BY '||order_by;
        
        RAISE INFO 'data_query: %',data_query;

        IF data_query IS NOT NULL THEN
        
            FOR data_rec IN EXECUTE( data_query )
            LOOP
        
                --RAISE INFO 'data_rec: %,%,%,%','pkey_rec.table_schema',pkey_rec.table_name,data_rec.pkey,MD5(data_rec::TEXT);
                INSERT INTO ipg_checksum_data(table_schema,table_name,pk_value,checksum)
                VALUES(pkey_rec.table_schema,pkey_rec.table_name,data_rec.pk_value,MD5(data_rec::TEXT));
            
            END LOOP;
            COMMIT;
            
        ELSE
            
            RAISE INFO 'Resulting query is null for %.%',pkey_rec.table_schema,pkey_rec.table_name;
            RETURN;
            
        END IF;

    END LOOP;
        
END $$ LANGUAGE plpgsql;

CALL ipg_validate_data_with_checksums();

Leave a Reply

Your email address will not be published. Required fields are marked *