Automated Weekly Data Ingestion Using PowerShell, AWS S3, and SQL Server

Background

This month, my team was tasked with implementing a weekly data import from a publicly available government website. The initial proposal involved manually downloading a ZIP file each week, after which I would be responsible for extracting the CSV files, validating the data, and normalizing it into a SQL Server database.

As soon as the phrase “someone will manually download” was mentioned, it was clear this approach would not scale. Since the data source was publicly accessible, automation was an obvious and more reliable solution. Due to a tight deadline, however, I initially deferred proposing automation and planned to introduce it later if time allowed.

A few days later, project delays on both teams created the opportunity to implement a fully automated solution end-to-end.

Final Schema

Weekly download form website.


Solution Overview

Before writing any code, I outlined the overall data flow to ensure the solution was both scalable and maintainable:

  1. Import the initial dataset into a staging database, then populate production tables.
  2. Create a PowerShell script to download the weekly incremental ZIP files, extract them, and upload the contents to an AWS S3 bucket.
  3. Move the files from S3 to an AWS RDS SQL Server instance, where they can be efficiently bulk imported.
  4. MERGE the staging data into production tables with auditing and error handling.

Step One – Initial Data Load

The initial ZIP file was very large and contained millions of records. The source data was inconsistent, and no formal schema or column length definitions were provided.

To establish an initial schema:

  • I analyzed a smaller incremental file.
  • Based on observed values, I made conservative estimates for column sizes (e.g., VARCHAR(n)).

Most of the incremental files were relatively small (under 10 GB), allowing them to be imported using SQL Server BULK INSERT. However, the main baseline file exceeded the 10 GB RDS file transfer limit.

Because this file was a one-time import, I copied it locally and created a lightweight SSIS package to load the data into the staging database.

Simple SSIS package to import initial data.


Step Two – Database Design

Several CSV files contained repeated column patterns such as:

identifier_1, identifier_2, … identifier_15

Rather than storing these in a wide table, I normalized the design by breaking them out into child tables. This reduced redundancy, improved query performance, and simplified downstream joins.

Staging DB Schema


Step Three – Weekly File Automation (PowerShell)

The next step was to automate the weekly file ingestion process:

  • Identify the most recent weekly ZIP file on the public website.
  • Download and extract the ZIP file.
  • Remove unneeded files.
  • Upload the remaining CSV files to an AWS S3 bucket.
  • Generate a dynamic file list (since filenames change weekly).

This file list is later used as an anchor to ensure consistency between S3 and RDS.

# By David Speight
# 20260122
# This script will find the most recent WEEKLY NPI update file, download it and extract the files.

param(
    [string]$Url = "https://download.branch.gov/public.html",
    [string]$SearchString = "Weekly_V2.zip",
    [string]$OutputFile = "D:\MyProject\ZIP\WeeklyLinks.csv",
    [string]$UrlBase = "https://download.branch.gov/folder"
)

# Clear the output file if it already exists
Clear-Content -Path $OutputFile

# Archive Last Weeks Files in S3 bucket
aws s3 mv s3://my-bucket/ssis/ProjectFolder/FileDrop/ s3://my-bucket/ssis/ProjectFolder/Archive/ --recursive  --profile MyAwsCliProfile  


try {
    # Fetch the content of the webpage
    Write-Host "Fetching webpage from $Url..."
    $response = Invoke-WebRequest -Uri $Url -UseBasicParsing -ErrorAction Stop

    # Extract all links and filter for the search string
    Write-Host "Filtering links for the string '$SearchString'..."
    $filteredLinks = $response.Links | Where-Object { $_.href -like "*$SearchString*" }

    # Process and save the filtered links
    if ($filteredLinks) {
        Write-Host "Found $($filteredLinks.Count) matching links."
        foreach ($link in $filteredLinks) {
            $FileName = $link.href.Remove(0,1)
            $finalLink = $UrlBase + $link.href.Remove(0,1)
        }

        # Clear Files Folder
        Get-ChildItem -Path "D:\MyProject\ZIP\Files" -Filter "*pfile*" -File | Remove-Item -Force

        # Sort the filtered links alphabetically in descending order and save to a text file
        $finalLink | Sort-Object -Descending | Out-File -FilePath $OutputFile

        $url = $finalLink
        # Define the local path to save the downloaded ZIP file
        $downloadPath = "D:\MyProject\ZIP" + $FileName
        $extractPath = "D:\MyProject\ZIP\Files"

        # Force use of TLS 1.2, which is often required for modern websites
        [Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12

        # 1. Download the file
        Write-Host "Downloading file from: $url" -ForegroundColor Yellow
        try {
            Invoke-WebRequest -Uri $url -OutFile $downloadPath -ErrorAction Stop
            Write-Host "File downloaded successfully to: $downloadPath" -ForegroundColor Green
        } catch {
            Write-Host "Error during download: $_" -ForegroundColor Red
            exit
        }

        # 2. Unzip the file
        Write-Host "Unzipping file to: $extractPath" -ForegroundColor Yellow
        try {
            Expand-Archive -LiteralPath $downloadPath -DestinationPath $extractPath -Force
            Write-Host "File unzipped successfully to: $extractPath" -ForegroundColor Green
        } catch {
            Write-Host "Error during unzipping: $_" -ForegroundColor Red
            exit
        }


        #Remove Files we won't need
        Get-ChildItem -Path "D:\MyProject\ZIP\Files" -Filter "*OTHER_Prefix*" -File | Remove-Item -Force
        Get-ChildItem -Path "D:\MyProject\ZIP\Files" -Filter "*fileheader*" -File | Remove-Item -Force

        # 3. Optional: Remove the original ZIP file after extraction
        Remove-Item -Path $downloadPath -Force

    } else {
        Write-Host "No links found containing the string '$SearchString'."
    }
} catch {
    Write-Warning "An error occurred while accessing the URL or processing the request: $($_.Exception.Message)"
}

# Move the files to S3 bucket on AWS
aws s3 mv "D:\MyProject\ZIP\Files\" s3://my-bucket/ssis/ProjectFolder/FileDrop/ --recursive  --profile MyAwsCliProfile  

# List the files into a text file
aws s3 ls s3://my-bucket/ssis/ProjectFolder/FileDrop/ --recursive > D:\MyProject\ZIP\WeeklyList.csv --profile MyAwsCliProfile

# Copy List to  S3 bucket on AWS
aws s3 cp "D:\MyProject\ZIP\WeeklyList.csv" s3://my-bucket/ssis/ProjectFolder/FileDrop/ --profile MyAwsCliProfile 

(PowerShell script continues unchanged)

Once the files are uploaded:

  • A stored procedure copies the files from S3 to the RDS file system.
  • Another procedure performs the staging load and production merge.

Note: All steps are scheduled using Windows Task Scheduler for PowerShell and SQL Server Agent for database jobs.


Step Four – Scheduling

The PowerShell script is scheduled to run weekly using Windows Task Scheduler.
SQL Server stored procedures are scheduled via SQL Server Agent, ensuring a clean separation of responsibilities.


Step Five – Staging and Production Merge

To simplify troubleshooting and improve resiliency, I split the database logic into two stored procedures:

  1. npidata_weekly_copy_S3_to_RDS
    • Clears old files from the RDS directory.
    • Downloads the latest weekly files from S3.
    • Loads the weekly file inventory into a tracking table.
  2. npidata_Weekly_InsertUpdateMerge
    • Validates file availability.
    • Truncates staging tables.
    • Performs bulk imports.
    • Populates normalized child tables.
    • Merges data into production with full auditing and email alerts.

Each major step writes to an audit table, allowing:

  • Partial success without restarting the entire pipeline
  • Easier root-cause analysis if failures occur
  • Safe re-execution of individual steps

USE [DBStaging]
GO
/****** Object:  StoredProcedure [dbo].[npidata_weekly_copy_S3_to_RDS]    Script Date: 2/4/2026 10:07:28 AM ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
-- =============================================
-- Author:		David Speight
-- Create date: 20260123
-- Description:	This will copy the NPI files from AWS-S3 to the D:\ drive on RDS.
--				Old files will first be deleted from the folder on RDS
-- NOTE: We DO NOT delete the WeeklyList file, but instead use it as an anchor to keep folders live
-- =============================================
CREATE   PROCEDURE [dbo].[npidata_weekly_copy_S3_to_RDS]
AS
BEGIN

	SET NOCOUNT ON;

	
    -- Delete all the old files in our directory
	exec msdb.dbo.rds_delete_from_filesystem @rds_file_path='D:\S3\MyProject\', @force_delete=1;

	-- Recreate File and File List
	exec msdb.dbo.rds_download_from_s3 @s3_arn_of_file='arn:aws:s3:::my-bucket/ssis/MyDataDrop/FileDrop/WeeklyList.csv', @rds_file_path='D:\S3\MyProject\WeeklyList.csv', @overwrite_file=1; 

	-- Delay the process by 2 minutes 20 seconds 
	-- NOTE If other backup processs happen to be running at the same time, 
	-- this could be delayed 5-90 minutes, thus the next step would fail

	WAITFOR DELAY '00:02:20';

	-- DEBUGING Queries
	--EXEC msdb.dbo.rds_gather_file_details; 
	--SELECT * FROM msdb.dbo.rds_fn_list_file_details(Digit of result from abobe execution)
	-- EXEC msdb.dbo.rds_task_status  --< checks all tasks in the rds scheduler

	-- Now we load the list of files into a SQL Server table for reference and record.
	IF OBJECT_ID(N'TempDb..#tempBakFileList',N'U') IS NOT NULL DROP TABLE #tempBakFileList
	CREATE TABLE #tempBakFileList (FileDetail VARCHAR(MAX))
	BULK INSERT #tempBakFileList FROM 'D:\S3\MyProject\WeeklyList.csv' WITH(FIELDTERMINATOR = ',')

	INSERT INTO [dbo].[npidata_weekly_files]
	([FileDate], [FileSize], [FileName], [UploadUtcDate])
	SELECT 
	CONVERT(datetime,LEFT(FileDetail,19)) AS [FileDate],
	SUBSTRING(FileDetail,20,12) AS [FileSize],
	SUBSTRING(FileDetail,32,250) AS [FileName],
	GETUTCDATE() AS [CheckedDate]
	FROM #tempBakFileList 
	WHERE RIGHT(SUBSTRING(FileDetail,32,250),4) = '.csv'
	AND SUBSTRING(FileDetail,32,250) NOT LIKE '%WeeklyList%'
	AND 
	NOT EXISTS (	SELECT [FileDate], [FileSize], [FileName], [UploadUtcDate]
			FROM	[dbo].[npidata_weekly_files]
			WHERE	[FileName] = SUBSTRING(FileDetail,32,250)
			AND CONVERT(DATE,[UploadUtcDate]) = CONVERT(DATE,GETUTCDATE()) );

	-- NEXT COPY FILES FORM S3 to RDS
	DECLARE @tFileList AS TABLE (ID INT NOT NULL IDENTITY(1,1), FileName varchar(100) null)
		
	INSERT INTO @tFileList
	SELECT SUBSTRING([FileName], 32,250) AS [FileName]
	FROM   [dbo].[npidata_weekly_files] 
	WHERE CONVERT(Date,[UploadUtcDate]) = CONVERT(DATE,getutcdate()) 
	ORDER BY [FileName] -- this assures we process the right data in order
			--select * from @tFileList

	DECLARE @FilePathSource varchar(250);	
	DECLARE @FilePathDestination varchar(250);	
	DECLARE @iLoop INT;
	SELECT @iLoop = COUNT(*) FROM @tFileList;

	WHILE @iLoop > 0 
	BEGIN

		SELECT	@FilePathSource = CONCAT('arn:aws:s3:::my-bucket/ssis/MyDataDrop/FileDrop/',FileName),
			@FilePathDestination = CONCAT('D:\S3\MyProject\',FileName)
		FROM @tFileList
		WHERE ID = @iLoop;

		EXEC msdb.dbo.rds_download_from_s3 @s3_arn_of_file=@FilePathSource, @rds_file_path=@FilePathDestination, @overwrite_file=1; 

		SELECT @iLoop = @iLoop - 1;
	END;

	-- NOW WE WAIT 7 minutes for the AWS scheduler to move our files
	WAITFOR DELAY '00:07:20';

	-- Now we want to use that file list to import data into the staging tables
	-- We will run a second sproc to do that: [dbo].[npidata_Weekly_InsertUpdateMerge]
	-- This has been broken into separate sprocs to make it easier to fix issues.

END

USE [DBStaging]
GO
/****** Object:  StoredProcedure [dbo].[npidata_Weekly_InsertUpdateMerge]    Script Date: 2/4/2026 10:08:04 AM ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

-- =============================================
-- Author:		David Speight
-- Create date: 20260121
-- Description: This will INSERT, UPDATE, and MERGE weekly file
-- =============================================
CREATE   PROCEDURE [dbo].[npidata_Weekly_InsertUpdateMerge]
@EmailAlertAddress varchar(150) = 'email@domain.com'
AS
BEGIN

	SET NOCOUNT ON;
	-- DEBUGING Queries
	--EXEC msdb.dbo.rds_gather_file_details; 
	--SELECT * FROM msdb.dbo.rds_fn_list_file_details(Digit of result from abobe execution)
	-- EXEC msdb.dbo.rds_task_status  --< checks all tasks in the rds scheduler

	-- START LOG HERE
	DECLARE @ParentAuditKey INT = 1;
	INSERT INTO dbo.DimAudit ([ParentAuditKey], [TableName], [PkgName], [PkgVersionMajor], [ExecStartDT], [ExecStopDT], [InsertRowCnt],  [SuccessfulProcessingInd])
	VALUES(@ParentAuditKey, '[npidata_Weekly_InsertUpdateMerge]','BEGIN: Weekly NPI Data Import', 1, getutcdate(), null, 0, 'N');
	SELECT @ParentAuditKey = SCOPE_IDENTITY();

	-- STEP ONE:  Check to make sure files were downloaded
	IF (SELECT COUNT(*) FROM   [dbo].[npidata_weekly_files] WHERE CONVERT(Date,[UploadUtcDate]) = CONVERT(DATE,getutcdate())) != 4
	BEGIN
		--  END LOG HERE as FAILURE
		UPDATE dbo.DimAudit
		SET  ParentAuditKey = @ParentAuditKey, ExecStopDT = getutcdate(), [SuccessfulProcessingInd] = 'N'
		WHERE AuditKey = @ParentAuditKey

		EXEC msdb.dbo.sp_send_dbmail
			@profile_name = 'MYMAIL',
			@recipients = @EmailAlertAddress,
			@body = 'The job using sproc npidata_Weekly_InsertUpdateMerge at DBStaging did not find any data files and was aborted. If you get this message try re-running ONLY this step of the job to complete the import: EXEC [dbo].[npidata_Weekly_InsertUpdateMerge] ''youremail@yourdomain''; ',
		@subject = 'FAIL: Staging npidata_Weekly_InsertUpdateMerge job';

	END
	ELSE
	BEGIN

		-- STEP TWO:  TRUNCATE TABLES
		TRUNCATE TABLE [dbo].[endpoint_pfile_week_stage];
		TRUNCATE TABLE [dbo].[npidata_pfile_week_stage];
		TRUNCATE TABLE [dbo].[othername_pfile_week_stage];
		TRUNCATE TABLE [dbo].[pl_pfile_week_stage];
		TRUNCATE TABLE [dbo].[npidata_pfile_step2_stage];
		TRUNCATE TABLE [dbo].[npidata_pfile_license_number_stage2];
		TRUNCATE TABLE [dbo].[npidata_pfile_other_identifier_stage2];
			

		INSERT INTO dbo.DimAudit ([ParentAuditKey], [TableName], [PkgName], [PkgVersionMajor], [ExecStartDT], [ExecStopDT], [InsertRowCnt],  [SuccessfulProcessingInd])
		VALUES(@ParentAuditKey, '[npidata_Weekly_InsertUpdateMerge]','Tables Truncated', 1, getutcdate(), getutcdate(), 0, 'Y');

		-- STEP THREE: Get our file names
		DECLARE @tFileList AS TABLE (ID INT NOT NULL IDENTITY(1,1), FileName varchar(100) null);
		
		INSERT INTO @tFileList
		SELECT CONCAT( 'D:\S3\MyProject\',SUBSTRING([FileName], 32,250) ) AS [FileName]
		FROM   [dbo].[npidata_weekly_files] 
		WHERE CONVERT(Date,[UploadUtcDate]) = CONVERT(DATE,getutcdate()) 
		ORDER BY [FileName]; -- this assures we process the right data in order

		DECLARE @FilePath1 varchar(250);	-- endpoiint
		DECLARE @FilePath2 varchar(250);	-- npidata
		DECLARE @FilePath3 varchar(250);	-- othername
		DECLARE @FilePath4 varchar(250);	-- pl

		SELECT	@FilePath1	=	[FileName]	FROM @tFileList WHERE ID = 1;
		SELECT	@FilePath2	=	[FileName]	FROM @tFileList WHERE ID = 2;
		SELECT	@FilePath3	=	[FileName]	FROM @tFileList WHERE ID = 3;
		SELECT	@FilePath4	=	[FileName]	FROM @tFileList WHERE ID = 4;

		DECLARE @tSql varchar(1000);
		-- 1  Update ENDPOINT table
		SELECT @tSql = [dbo].[BulkInsertCreateTsql]( '[dbo].[endpoint_pfile_week_stage]',@FilePath1,'CSV',2,'0x0a',',','"');
		EXEC (@tSql);
		-- 2  Update NPIDATA table
		SELECT @tSql = [dbo].[BulkInsertCreateTsql]( '[dbo].[npidata_pfile_week_stage]',@FilePath2,'CSV',2,'0x0a',',','"');
		EXEC (@tSql);
		-- 3  Update OTHERNAME table
		SELECT @tSql = [dbo].[BulkInsertCreateTsql]( '[dbo].[othername_pfile_week_stage]',@FilePath3,'CSV',2,'0x0a',',','"');
		EXEC (@tSql);
		-- 4  Update PL table
		SELECT @tSql = [dbo].[BulkInsertCreateTsql]( '[dbo].[pl_pfile_week_stage]',@FilePath4,'CSV',2,'0x0a',',','"');
		EXEC (@tSql);


		INSERT INTO dbo.DimAudit ([ParentAuditKey], [TableName], [PkgName], [PkgVersionMajor], [ExecStartDT], [ExecStopDT], [InsertRowCnt],  [SuccessfulProcessingInd])
		VALUES(@ParentAuditKey, '[npidata_Weekly_InsertUpdateMerge]','RDS Bulk insert Completed', 1, getutcdate(), getutcdate(), 0, 'Y');

		-- Populate the child breakout tables
		EXEC [dbo].[npidata_pfile_license_number_populate] @ParentAuditKey;

		EXEC [dbo].[npidata_pfile_other_identifier_populate] @ParentAuditKey;

		INSERT INTO dbo.DimAudit ([ParentAuditKey], [TableName], [PkgName], [PkgVersionMajor], [ExecStartDT], [ExecStopDT], [InsertRowCnt],  [SuccessfulProcessingInd])
		VALUES(@ParentAuditKey, '[npidata_Weekly_InsertUpdateMerge]','NPI Child Tables populted', 1, getutcdate(), getutcdate(), 0, 'Y');


	-- STEP FOUR:  MERGE: INSERT/UPDATE ANY NPI DATA to PROD
	DECLARE @MergeOutput TABLE (
			ActionType NVARCHAR(20)
		);

	SET IDENTITY_INSERT [DBProd].[dbo].[providers] ON;
	MERGE INTO [DBProd].[dbo].[providers] AS T
		USING ( SELECT  [NPI], [Entity_Type_Code], [Replacement_NPI], 
						[Employer_Identification_Number_EIN], 
						[Provider_Organization_Name_Legal_Business_Name], [Provider_Last_Name_Legal_Name], 
						[Provider_First_Name], [Provider_Middle_Name], [Provider_Name_Prefix_Text], 
						[Provider_Name_Suffix_Text], [Provider_Credential_Text], 
						[Provider_Other_Organization_Name], [Provider_Other_Organization_Name_Type_Code], 
						[Provider_Other_Last_Name], [Provider_Other_First_Name], [Provider_Other_Middle_Name], 
						[Provider_Other_Name_Prefix_Text], [Provider_Other_Name_Suffix_Text], 
						[Provider_Other_Credential_Text], [Provider_Other_Last_Name_Type_Code], 
						[Provider_First_Line_Business_Mailing_Address], 
						[Provider_Second_Line_Business_Mailing_Address], 
						[Provider_Business_Mailing_Address_City_Name], 
						[Provider_Business_Mailing_Address_State_Name], 
						[Provider_Business_Mailing_Address_Postal_Code], 
						[Provider_Business_Mailing_Address_Country_Code_If_outside_U_S], 
						[Provider_Business_Mailing_Address_Telephone_Number], 
						[Provider_Business_Mailing_Address_Fax_Number], 
						[Provider_First_Line_Business_Practice_Location_Address], 
						[Provider_Second_Line_Business_Practice_Location_Address], 
						[Provider_Business_Practice_Location_Address_City_Name], 
						[Provider_Business_Practice_Location_Address_State_Name], 
						[Provider_Business_Practice_Location_Address_Postal_Code],
						[Provider_Business_Practice_Location_Address_Country_Code_If_outside_U_S], 
						[Provider_Business_Practice_Location_Address_Telephone_Number], 
						[Provider_Business_Practice_Location_Address_Fax_Number], [Provider_Enumeration_Date], 
						[Last_Update_Date], [NPI_Deactivation_Reason_Code], [NPI_Deactivation_Date], 
						[NPI_Reactivation_Date], [Provider_Sex_Code], [Authorized_Official_Last_Name], 
						[Authorized_Official_First_Name], [Authorized_Official_Middle_Name], 
						[Authorized_Official_Title_or_Position], [Authorized_Official_Telephone_Number], 
						[Is_Sole_Proprietor], [Is_Organization_Subpart], [Parent_Organization_LBN], 
						[Parent_Organization_TIN], [Authorized_Official_Name_Prefix_Text], 
						[Authorized_Official_Name_Suffix_Text], [Authorized_Official_Credential_Text], 
						[Certification_Date], 
						1 AS [isActive], getutcdate() AS [updatedAt], 1 AS [IsUpdate]
				FROM [dbo].[npidata_pfile_week_stage] ) AS S
		ON      (T.NPI = S.NPI)
		WHEN MATCHED THEN
			UPDATE SET	T.[Entity_Type_Code] = S.[Entity_Type_Code],
						T.[Replacement_NPI] = S.[Replacement_NPI],
						T.[Employer_Identification_Number_EIN] = S.[Employer_Identification_Number_EIN],
						T.[Provider_Organization_Name_Legal_Business_Name] = S.[Provider_Organization_Name_Legal_Business_Name],
						T.[Provider_Last_Name_Legal_Name] = S.[Provider_Last_Name_Legal_Name],
						T.[Provider_First_Name] = S.[Provider_First_Name],
						T.[Provider_Middle_Name] = S.[Provider_Middle_Name],
						T.[Provider_Name_Prefix_Text] = S.[Provider_Name_Prefix_Text],
						T.[Provider_Name_Suffix_Text] = S.[Provider_Name_Suffix_Text],
						T.[Provider_Credential_Text] = S.[Provider_Credential_Text],
						T.[Provider_Other_Organization_Name] = S.[Provider_Other_Organization_Name],
						T.[Provider_Other_Organization_Name_Type_Code] = S.[Provider_Other_Organization_Name_Type_Code],
						T.[Provider_Other_Last_Name] = S.[Provider_Other_Last_Name],
						T.[Provider_Other_First_Name] = S.[Provider_Other_First_Name],
						T.[Provider_Other_Middle_Name] = S.[Provider_Other_Middle_Name],
						T.[Provider_Other_Name_Prefix_Text] = S.[Provider_Other_Name_Prefix_Text],
						T.[Provider_Other_Name_Suffix_Text] = S.[Provider_Other_Name_Suffix_Text],
						T.[Provider_Other_Credential_Text] = S.[Provider_Other_Credential_Text],
						T.[Provider_Other_Last_Name_Type_Code] = S.[Provider_Other_Last_Name_Type_Code],
						T.[Provider_First_Line_Business_Mailing_Address] = S.[Provider_First_Line_Business_Mailing_Address],
						T.[Provider_Second_Line_Business_Mailing_Address] = S.[Provider_Second_Line_Business_Mailing_Address],
						T.[Provider_Business_Mailing_Address_City_Name] = S.[Provider_Business_Mailing_Address_City_Name],
						T.[Provider_Business_Mailing_Address_State_Name] = S.[Provider_Business_Mailing_Address_State_Name],
						T.[Provider_Business_Mailing_Address_Postal_Code] = S.[Provider_Business_Mailing_Address_Postal_Code],
						T.[Provider_Business_Mailing_Address_Country_Code_If_outside_U_S] = S.[Provider_Business_Mailing_Address_Country_Code_If_outside_U_S],
						T.[Provider_Business_Mailing_Address_Telephone_Number] = S.[Provider_Business_Mailing_Address_Telephone_Number],
						T.[Provider_Business_Mailing_Address_Fax_Number] = S.[Provider_Business_Mailing_Address_Fax_Number],
						T.[Provider_First_Line_Business_Practice_Location_Address] = S.[Provider_First_Line_Business_Practice_Location_Address],
						T.[Provider_Second_Line_Business_Practice_Location_Address] = S.[Provider_Second_Line_Business_Practice_Location_Address],
						T.[Provider_Business_Practice_Location_Address_City_Name] = S.[Provider_Business_Practice_Location_Address_City_Name],
						T.[Provider_Business_Practice_Location_Address_State_Name] = S.[Provider_Business_Practice_Location_Address_State_Name],
						T.[Provider_Business_Practice_Location_Address_Postal_Code] = S.[Provider_Business_Practice_Location_Address_Postal_Code],
						T.[Provider_Business_Practice_Location_Address_Country_Code_If_outside_U_S] = S.[Provider_Business_Practice_Location_Address_Country_Code_If_outside_U_S],
						T.[Provider_Business_Practice_Location_Address_Telephone_Number] = S.[Provider_Business_Practice_Location_Address_Telephone_Number],
						T.[Provider_Business_Practice_Location_Address_Fax_Number] = S.[Provider_Business_Practice_Location_Address_Fax_Number],
						T.[Provider_Enumeration_Date] = S.[Provider_Enumeration_Date],
						T.[Last_Update_Date] = S.[Last_Update_Date],
						T.[NPI_Deactivation_Reason_Code] = S.[NPI_Deactivation_Reason_Code],
						T.[NPI_Deactivation_Date] = S.[NPI_Deactivation_Date],
						T.[NPI_Reactivation_Date] = S.[NPI_Reactivation_Date],
						T.[Provider_Sex_Code] = S.[Provider_Sex_Code],
						T.[Authorized_Official_Last_Name] = S.[Authorized_Official_Last_Name],
						T.[Authorized_Official_First_Name] = S.[Authorized_Official_First_Name],
						T.[Authorized_Official_Middle_Name] = S.[Authorized_Official_Middle_Name],
						T.[Authorized_Official_Title_or_Position] = S.[Authorized_Official_Title_or_Position],
						T.[Authorized_Official_Telephone_Number] = S.[Authorized_Official_Telephone_Number],
						T.[Is_Sole_Proprietor] = S.[Is_Sole_Proprietor],
						T.[Is_Organization_Subpart] = S.[Is_Organization_Subpart],
						T.[Parent_Organization_LBN] = S.[Parent_Organization_LBN],
						T.[Parent_Organization_TIN] = S.[Parent_Organization_TIN],
						T.[Authorized_Official_Name_Prefix_Text] = S.[Authorized_Official_Name_Prefix_Text],
						T.[Authorized_Official_Name_Suffix_Text] = S.[Authorized_Official_Name_Suffix_Text],
						T.[Authorized_Official_Credential_Text] = S.[Authorized_Official_Credential_Text],
						T.[Certification_Date] = S.[Certification_Date],
						T.[isActive] = 1,
						T.[updatedAt] = getutcdate(),
						T.[IsUpdate] = 1
		WHEN NOT MATCHED THEN
			INSERT ([NPI], [Entity_Type_Code], [Replacement_NPI], 
						[Employer_Identification_Number_EIN], 
						[Provider_Organization_Name_Legal_Business_Name], [Provider_Last_Name_Legal_Name], 
						[Provider_First_Name], [Provider_Middle_Name], [Provider_Name_Prefix_Text], 
						[Provider_Name_Suffix_Text], [Provider_Credential_Text], 
						[Provider_Other_Organization_Name], [Provider_Other_Organization_Name_Type_Code], 
						[Provider_Other_Last_Name], [Provider_Other_First_Name], [Provider_Other_Middle_Name], 
						[Provider_Other_Name_Prefix_Text], [Provider_Other_Name_Suffix_Text], 
						[Provider_Other_Credential_Text], [Provider_Other_Last_Name_Type_Code], 
						[Provider_First_Line_Business_Mailing_Address], 
						[Provider_Second_Line_Business_Mailing_Address], 
						[Provider_Business_Mailing_Address_City_Name], 
						[Provider_Business_Mailing_Address_State_Name], 
						[Provider_Business_Mailing_Address_Postal_Code], 
						[Provider_Business_Mailing_Address_Country_Code_If_outside_U_S], 
						[Provider_Business_Mailing_Address_Telephone_Number], 
						[Provider_Business_Mailing_Address_Fax_Number], 
						[Provider_First_Line_Business_Practice_Location_Address], 
						[Provider_Second_Line_Business_Practice_Location_Address], 
						[Provider_Business_Practice_Location_Address_City_Name], 
						[Provider_Business_Practice_Location_Address_State_Name], 
						[Provider_Business_Practice_Location_Address_Postal_Code],
						[Provider_Business_Practice_Location_Address_Country_Code_If_outside_U_S], 
						[Provider_Business_Practice_Location_Address_Telephone_Number], 
						[Provider_Business_Practice_Location_Address_Fax_Number], [Provider_Enumeration_Date], 
						[Last_Update_Date], [NPI_Deactivation_Reason_Code], [NPI_Deactivation_Date], 
						[NPI_Reactivation_Date], [Provider_Sex_Code], [Authorized_Official_Last_Name], 
						[Authorized_Official_First_Name], [Authorized_Official_Middle_Name], 
						[Authorized_Official_Title_or_Position], [Authorized_Official_Telephone_Number], 
						[Is_Sole_Proprietor], [Is_Organization_Subpart], [Parent_Organization_LBN], 
						[Parent_Organization_TIN], [Authorized_Official_Name_Prefix_Text], 
						[Authorized_Official_Name_Suffix_Text], [Authorized_Official_Credential_Text], 
						[Certification_Date], 
						[isActive], [updatedAt]) 
			VALUES (S.[NPI], S.[Entity_Type_Code], S.[Replacement_NPI], 
						S.[Employer_Identification_Number_EIN], 
						S.[Provider_Organization_Name_Legal_Business_Name], S.[Provider_Last_Name_Legal_Name], 
						S.[Provider_First_Name], S.[Provider_Middle_Name], S.[Provider_Name_Prefix_Text], 
						S.[Provider_Name_Suffix_Text], S.[Provider_Credential_Text], 
						S.[Provider_Other_Organization_Name], S.[Provider_Other_Organization_Name_Type_Code], 
						S.[Provider_Other_Last_Name], S.[Provider_Other_First_Name], S.[Provider_Other_Middle_Name], 
						S.[Provider_Other_Name_Prefix_Text], S.[Provider_Other_Name_Suffix_Text], 
						S.[Provider_Other_Credential_Text], S.[Provider_Other_Last_Name_Type_Code], 
						S.[Provider_First_Line_Business_Mailing_Address], 
						S.[Provider_Second_Line_Business_Mailing_Address], 
						S.[Provider_Business_Mailing_Address_City_Name], 
						S.[Provider_Business_Mailing_Address_State_Name], 
						S.[Provider_Business_Mailing_Address_Postal_Code], 
						S.[Provider_Business_Mailing_Address_Country_Code_If_outside_U_S], 
						S.[Provider_Business_Mailing_Address_Telephone_Number], 
						S.[Provider_Business_Mailing_Address_Fax_Number], 
						S.[Provider_First_Line_Business_Practice_Location_Address], 
						S.[Provider_Second_Line_Business_Practice_Location_Address], 
						S.[Provider_Business_Practice_Location_Address_City_Name], 
						S.[Provider_Business_Practice_Location_Address_State_Name], 
						S.[Provider_Business_Practice_Location_Address_Postal_Code],
						S.[Provider_Business_Practice_Location_Address_Country_Code_If_outside_U_S], 
						S.[Provider_Business_Practice_Location_Address_Telephone_Number], 
						S.[Provider_Business_Practice_Location_Address_Fax_Number], S.[Provider_Enumeration_Date], 
						S.[Last_Update_Date], S.[NPI_Deactivation_Reason_Code], S.[NPI_Deactivation_Date], 
						S.[NPI_Reactivation_Date], S.[Provider_Sex_Code], S.[Authorized_Official_Last_Name], 
						S.[Authorized_Official_First_Name], S.[Authorized_Official_Middle_Name], 
						S.[Authorized_Official_Title_or_Position], S.[Authorized_Official_Telephone_Number], 
						S.[Is_Sole_Proprietor], S.[Is_Organization_Subpart], S.[Parent_Organization_LBN], 
						S.[Parent_Organization_TIN], S.[Authorized_Official_Name_Prefix_Text], 
						S.[Authorized_Official_Name_Suffix_Text], S.[Authorized_Official_Credential_Text], 
						S.[Certification_Date],
						1, getutcdate() )
		OUTPUT $action INTO @MergeOutput; -- Output the action type into the table variable

		DECLARE @InsertCount INT, @UpdateCount INT;
		SELECT
			@InsertCount = SUM(CASE WHEN ActionType = 'INSERT' THEN 1 ELSE 0 END),
			@UpdateCount = SUM(CASE WHEN ActionType = 'UPDATE' THEN 1 ELSE 0 END)
		FROM
			@MergeOutput;
	
		SET IDENTITY_INSERT [DBProd].[dbo].[providers] OFF;

		INSERT INTO dbo.DimAudit ([ParentAuditKey], [TableName], [PkgName], [PkgVersionMajor], [ExecStartDT], [ExecStopDT], [InsertRowCnt], [UpdateRowCnt], [SuccessfulProcessingInd])
		VALUES(@ParentAuditKey, '[npidata_Weekly_InsertUpdateMerge]','MERGE NPIDATA', 1, getutcdate(), getutcdate(), @InsertCount, @UpdateCount, 'Y');


	-- STEP FIVE: MERGE CHILD TABLES TO PROD
		DECLARE @MergeOutputLicense TABLE (
			ActionType NVARCHAR(20)
		);

		MERGE INTO [DBProd].[dbo].[provider_licenses] AS T
		USING ( SELECT  [NPI], [OrdinalId],
						[Provider_License_Number], 
						[Provider_License_Number_State_Code], 
						[Healthcare_Provider_Taxonomy_Code], 
						[Healthcare_Provider_Primary_Taxonomy_Switch], 
						[Healthcare_Provider_Taxonomy_Group], 
						[isActive], 
						[updatedAt]
		FROM [dbo].[npidata_pfile_license_number_stage2] ) AS S
		ON      (T.NPI = S.NPI 
				AND T.OrdinalId = S.OrdinalId
				)
		WHEN MATCHED THEN
			UPDATE SET	T.Provider_License_Number = S.Provider_License_Number, 
						T.Provider_License_Number_State_Code = S.Provider_License_Number_State_Code, 
						T.Healthcare_Provider_Taxonomy_Code = S.Healthcare_Provider_Taxonomy_Code, 
						T.Healthcare_Provider_Primary_Taxonomy_Switch = S.Healthcare_Provider_Primary_Taxonomy_Switch, 
						T.Healthcare_Provider_Taxonomy_Group = S.Healthcare_Provider_Taxonomy_Group, 
						T.isActive = 1, 
						T.updatedAT = getutcdate()
		WHEN NOT MATCHED BY TARGET THEN
			INSERT ([NPI], [OrdinalId], [Provider_License_Number], [Provider_License_Number_State_Code], [Healthcare_Provider_Taxonomy_Code], [Healthcare_Provider_Primary_Taxonomy_Switch], [Healthcare_Provider_Taxonomy_Group], [isActive], [updatedAt]) 
			VALUES (S.NPI, S.OrdinalId, S.Provider_License_Number, S.Provider_License_Number_State_Code, S.Healthcare_Provider_Taxonomy_Code, S.Healthcare_Provider_Primary_Taxonomy_Switch, S.Healthcare_Provider_Taxonomy_Group, 1, getutcdate() )
		OUTPUT $action INTO @MergeOutputLicense; -- Output the action type into the table variable

		DECLARE @InsertCountLicense INT, @UpdateCountLicense INT;
		SELECT
			@InsertCountLicense = SUM(CASE WHEN ActionType = 'INSERT' THEN 1 ELSE 0 END),
			@UpdateCountLicense = SUM(CASE WHEN ActionType = 'UPDATE' THEN 1 ELSE 0 END)
		FROM
			@MergeOutputLicense;

		INSERT INTO dbo.DimAudit ([ParentAuditKey], [TableName], [PkgName], [PkgVersionMajor], [ExecStartDT], [ExecStopDT], [InsertRowCnt], [UpdateRowCnt], [SuccessfulProcessingInd])
		VALUES(@ParentAuditKey, '[npidata_Weekly_InsertUpdateMerge]','MERGE NPIDATA-License', 1, getutcdate(), getutcdate(), @InsertCountLicense, @UpdateCountLicense, 'Y');


		DECLARE @MergeOutputOther TABLE (
			ActionType NVARCHAR(20)
		);

		MERGE INTO [DBProd].[dbo].[provider_other_identifiers] AS T
		USING ( SELECT  [NPI], [OrdinalId],
				[Other_Provider_Identifier], [Other_Provider_Identifier_Type_Code], 
[Other_Provider_Identifier_State], 
[Other_Provider_Identifier_Issuer], 						
				[isActive], 
				[updatedAt]
		FROM [dbo].[npidata_pfile_other_identifier_stage2] ) AS S
		ON      (T.NPI = S.NPI 
				AND T.OrdinalId = S.OrdinalId
				)
		WHEN MATCHED  THEN
			UPDATE SET	T.Other_Provider_Identifier = S.Other_Provider_Identifier,
T.Other_Provider_Identifier_Type_Code = S.Other_Provider_Identifier_Type_Code,
T.Other_Provider_Identifier_State = S.Other_Provider_Identifier_State, 
T.Other_Provider_Identifier_Issuer = S.Other_Provider_Identifier_Issuer,
					T.isActive = 1, 
					T.updatedAT = getutcdate()
		WHEN NOT MATCHED BY TARGET THEN
			INSERT ([NPI],[OrdinalId], [Other_Provider_Identifier], [Other_Provider_Identifier_Type_Code], 
[Other_Provider_Identifier_State], [Other_Provider_Identifier_Issuer],[isActive], [updatedAt]) 
			VALUES (S.NPI, S.OrdinalId, S.Other_Provider_Identifier, S.Other_Provider_Identifier_Type_Code, 
S.Other_Provider_Identifier_State, S.Other_Provider_Identifier_Issuer,  1, getutcdate() )
		OUTPUT $action INTO @MergeOutputOther; -- Output the action type into the table variable

		DECLARE @InsertCountOther INT, @UpdateCountOther INT;
		SELECT
			@InsertCountOther = SUM(CASE WHEN ActionType = 'INSERT' THEN 1 ELSE 0 END),
			@UpdateCountOther = SUM(CASE WHEN ActionType = 'UPDATE' THEN 1 ELSE 0 END)
		FROM
			@MergeOutputLicense;
	
		INSERT INTO dbo.DimAudit ([ParentAuditKey], [TableName], [PkgName], [PkgVersionMajor], [ExecStartDT], [ExecStopDT], [InsertRowCnt], [UpdateRowCnt], [SuccessfulProcessingInd])
		VALUES(@ParentAuditKey, '[npidata_Weekly_InsertUpdateMerge]','MERGE NPIDATA-Other', 1, getutcdate(), getutcdate(), @InsertCountOther, @UpdateCountOther, 'Y');


		-- STEP SEVEN:  INSERT  DATA from all other tables into PROD where NOT EXISTS
		INSERT INTO [DBProd].[dbo].[provider_endpoints]
		([NPI], [Endpoint_Type], [Endpoint_Type_Description], [Endpoint], [Affiliation], [Endpoint_Description], [Affiliation_Legal_Business_Name], [Use_Code], [Use_Description], [Other_Use_Description], [Content_Type], [Content_Description], [Other_Content_Description], [Affiliation_Address_Line_One], [Affiliation_Address_Line_Two], [Affiliation_Address_City], [Affiliation_Address_State], [Affiliation_Address_Country], [Affiliation_Address_Postal_Code])
		SELECT 
		[NPI], [Endpoint_Type], [Endpoint_Type_Description], [Endpoint], [Affiliation], [Endpoint_Description], [Affiliation_Legal_Business_Name], [Use_Code], [Use_Description], [Other_Use_Description], [Content_Type], [Content_Description], [Other_Content_Description], [Affiliation_Address_Line_One], [Affiliation_Address_Line_Two], [Affiliation_Address_City], [Affiliation_Address_State], [Affiliation_Address_Country], [Affiliation_Address_Postal_Code]
		FROM [dbo].[endpoint_pfile_week_stage] s
		WHERE NOT EXISTS (  SELECT NPI FROM [DBProd].[dbo].[provider_endpoints]
				WHERE	[NPI] = s.NPI AND 
									COALESCE([Endpoint_Type], '') = COALESCE(s.[Endpoint_Type], '') AND
									COALESCE([Endpoint_Type_Description], '') = COALESCE(s.[Endpoint_Type_Description], '') AND
									COALESCE([Endpoint], '') = COALESCE(s.[Endpoint], '') AND
									COALESCE([Affiliation], '') = COALESCE(s.[Affiliation], '') AND
									COALESCE([Endpoint_Description], '') = COALESCE(s.[Endpoint_Description], '') AND
									COALESCE([Affiliation_Legal_Business_Name], '') = COALESCE(s.[Affiliation_Legal_Business_Name], '') AND
									COALESCE([Use_Code], '') = COALESCE(s.[Use_Code], '') AND
									COALESCE([Use_Description], '') = COALESCE(s.[Use_Description], '') AND
									COALESCE([Other_Use_Description],'') = COALESCE(s.[Other_Use_Description], '') AND
									COALESCE([Content_Type], '') = COALESCE(s.[Content_Type], '') AND
									COALESCE([Content_Description], '') = COALESCE(s.[Content_Description], '') AND
									COALESCE([Other_Content_Description], '') = COALESCE(s.[Other_Content_Description], '') AND
									COALESCE([Affiliation_Address_Line_One], '') = COALESCE(s.[Affiliation_Address_Line_One], '') AND
									COALESCE([Affiliation_Address_Line_Two], '') = COALESCE(s.[Affiliation_Address_Line_Two], '') AND
									COALESCE([Affiliation_Address_City], '') = COALESCE(s.[Affiliation_Address_City], '') AND
									COALESCE([Affiliation_Address_State], '') = COALESCE(s.[Affiliation_Address_State], '') AND
									COALESCE([Affiliation_Address_Country], '') = COALESCE(s.[Affiliation_Address_Country], '') AND
									COALESCE([Affiliation_Address_Postal_Code], '') = COALESCE(s.[Affiliation_Address_Postal_Code], '')
									)
		;

		INSERT INTO dbo.DimAudit ([ParentAuditKey], [TableName], [PkgName], [PkgVersionMajor], [ExecStartDT], [ExecStopDT], [InsertRowCnt], [UpdateRowCnt], [SuccessfulProcessingInd])
		VALUES(@ParentAuditKey, '[npidata_Weekly_InsertUpdateMerge]','INSERT Endpoint', 1, getutcdate(), getutcdate(), @@ROWCOUNT, 0, 'Y');


		INSERT INTO [DBProd].[dbo].[provider_other_names]
		([NPI], [Provider_Other_Organization_Name], [Provider_Other_Organization_Name_Type_Code])
		SELECT 
		[NPI], [Provider_Other_Organization_Name], [Provider_Other_Organization_Name_Type_Code]
		FROM [dbo].[othername_pfile_week_stage] s
		WHERE NOT EXISTS (  SELECT NPI FROM [DBProd].[dbo].[provider_other_names]
							WHERE	[NPI] = s.NPI AND 
									COALESCE([Provider_Other_Organization_Name], '') = COALESCE(s.[Provider_Other_Organization_Name], '') AND
									COALESCE([Provider_Other_Organization_Name_Type_Code], '') = COALESCE(s.[Provider_Other_Organization_Name_Type_Code], '')
							);

		INSERT INTO dbo.DimAudit ([ParentAuditKey], [TableName], [PkgName], [PkgVersionMajor], [ExecStartDT], [ExecStopDT], [InsertRowCnt], [UpdateRowCnt], [SuccessfulProcessingInd])
		VALUES(@ParentAuditKey, '[npidata_Weekly_InsertUpdateMerge]','INSERT Othername', 1, getutcdate(), getutcdate(), @@ROWCOUNT, 0, 'Y');

		INSERT INTO [DBProd].[dbo].[provider_locations]
		([NPI], [Provider_Secondary_Practice_Location_Address_Address_Line_1], [Provider_Secondary_Practice_Location_Address_Address_Line_2], [Provider_Secondary_Practice_Location_Address_City_Name], [Provider_Secondary_Practice_Location_Address_State_Name], [Provider_Secondary_Practice_Location_Address_Postal_Code], [Provider_Secondary_Practice_Location_Address_Country_Code_If_outside_U_S], [Provider_Secondary_Practice_Location_Address_Telephone_Number], [Provider_Secondary_Practice_Location_Address_Telephone_Extension], [Provider_Practice_Location_Address_Fax_Number])
		SELECT
		[NPI], [Provider_Secondary_Practice_Location_Address_Address_Line_1], [Provider_Secondary_Practice_Location_Address_Address_Line_2], [Provider_Secondary_Practice_Location_Address_City_Name], [Provider_Secondary_Practice_Location_Address_State_Name], [Provider_Secondary_Practice_Location_Address_Postal_Code], [Provider_Secondary_Practice_Location_Address_Country_Code_If_outside_U_S], [Provider_Secondary_Practice_Location_Address_Telephone_Number], [Provider_Secondary_Practice_Location_Address_Telephone_Extension], [Provider_Practice_Location_Address_Fax_Number]
		FROM [dbo].[pl_pfile_week_stage] s
		WHERE NOT EXISTS (  SELECT NPI FROM [DBProd].[dbo].[provider_locations]
							WHERE	[NPI] = s.NPI AND 
									COALESCE([Provider_Secondary_Practice_Location_Address_Address_Line_1], '') = COALESCE(s.[Provider_Secondary_Practice_Location_Address_Address_Line_1], '') AND
									COALESCE([Provider_Secondary_Practice_Location_Address_Address_Line_2], '') = COALESCE(s.[Provider_Secondary_Practice_Location_Address_Address_Line_2], '') AND
									COALESCE([Provider_Secondary_Practice_Location_Address_City_Name], '') = COALESCE(s.[Provider_Secondary_Practice_Location_Address_City_Name], '') AND
									COALESCE([Provider_Secondary_Practice_Location_Address_State_Name], '') = COALESCE(s.[Provider_Secondary_Practice_Location_Address_State_Name], '') AND
									COALESCE([Provider_Secondary_Practice_Location_Address_Postal_Code], '') = COALESCE(s.[Provider_Secondary_Practice_Location_Address_Postal_Code], '') AND
									COALESCE([Provider_Secondary_Practice_Location_Address_Country_Code_If_outside_U_S], '') = COALESCE(s.[Provider_Secondary_Practice_Location_Address_Country_Code_If_outside_U_S], '') AND
									COALESCE([Provider_Secondary_Practice_Location_Address_Telephone_Number], '') = COALESCE(s.[Provider_Secondary_Practice_Location_Address_Telephone_Number], '') AND
									COALESCE([Provider_Secondary_Practice_Location_Address_Telephone_Extension], '') = COALESCE(s.[Provider_Secondary_Practice_Location_Address_Telephone_Extension], '') AND
									COALESCE([Provider_Practice_Location_Address_Fax_Number], '') = COALESCE(s.[Provider_Practice_Location_Address_Fax_Number], '')
							);

		INSERT INTO dbo.DimAudit ([ParentAuditKey], [TableName], [PkgName], [PkgVersionMajor], [ExecStartDT], [ExecStopDT], [InsertRowCnt], [UpdateRowCnt], [SuccessfulProcessingInd])
		VALUES(@ParentAuditKey, '[npidata_Weekly_InsertUpdateMerge]','INSERT Pl', 1, getutcdate(), getutcdate(), @@ROWCOUNT, 0, 'Y');


		--  END LOG HERE as SUCCESS
		UPDATE dbo.DimAudit
		SET ParentAuditKey = @ParentAuditKey, ExecStopDT = getutcdate(), [SuccessfulProcessingInd] = 'Y'
		WHERE AuditKey = @ParentAuditKey

	END

END

GO
/****** Object:  StoredProcedure [dbo].[npidata_pfile_license_number_populate]    Script Date: 2/4/2026 10:50:03 AM ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
-- =============================================
-- Author:		David Speight
-- Create date: 20260120
-- Description:	npidata normalization step
-- =============================================
CREATE   PROCEDURE [dbo].[npidata_pfile_license_number_populate]
@ParentAuditKey INT
AS
BEGIN
	-- SET NOCOUNT ON added to prevent extra result sets from
	-- interfering with SELECT statements.
	SET NOCOUNT ON;

	--   TRUNCATE TABLE npidata_pfile_license_number_stage2

	DECLARE @ordinalId VARCHAR(8) = 15;
	DECLARE @tSql varchar(2000);

	WHILE @ordinalId > 0
	BEGIN

	SELECT  @tSql = CONCAT('INSERT INTO [dbo].[npidata_pfile_license_number_stage2]
		( [NPI], [OrdinalId], [Provider_License_Number], [Provider_License_Number_State_Code], 
		[Healthcare_Provider_Taxonomy_Code], [Healthcare_Provider_Primary_Taxonomy_Switch], 
		[Healthcare_Provider_Taxonomy_Group],
		[isActive], [updatedAt])
		SELECT 	[NPI], ',@ordinalId,', 
		[Provider_License_Number_',@ordinalId,'], 
		[Provider_License_Number_State_Code_',@ordinalId,'], 
		[Healthcare_Provider_Taxonomy_Code_',@ordinalId,'], 
		[Healthcare_Provider_Primary_Taxonomy_Switch_',@ordinalId,'], 
		[Healthcare_Provider_Taxonomy_Group_',@ordinalId,'], 
		1, getutcdate() 
		FROM  [dbo].[npidata_pfile_week_stage]
		WHERE LEN([Provider_License_Number_',@ordinalId,']) > 0 OR 
		LEN([Provider_License_Number_State_Code_',@ordinalId,']) > 0 OR 
		LEN([Healthcare_Provider_Taxonomy_Code_',@ordinalId,']) > 0 OR 
		LEN([Healthcare_Provider_Primary_Taxonomy_Switch_',@ordinalId,']) > 0  OR 
		LEN([Healthcare_Provider_Taxonomy_Group_',@ordinalId,']) > 0 
		;')

		EXEC(@tSql);		

		INSERT INTO DimAudit ([ParentAuditKey], [TableName], [PkgName], [PkgVersionMajor], [ExecStartDT], [ExecStopDT], [InsertRowCnt],  [SuccessfulProcessingInd])
		VALUES(@ParentAuditKey, '[npidata_pfile_license_number_stage2]','SPROC: [npidata_pfile_license_number_populate]', @ordinalId, getutcdate(), getutcdate(), @@ROWCOUNT, 'Y');

		SELECT @ordinalId = @ordinalId - 1;
	END;

END	

/****** Object:  StoredProcedure [dbo].[npidata_pfile_other_identifier_populate]    Script Date: 2/4/2026 10:52:44 AM ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
-- =============================================
-- Author:		David Speight
-- Create date: 20260120
-- Description:	npidata normalization step for identifier
-- =============================================
CREATE   PROCEDURE [dbo].[npidata_pfile_other_identifier_populate] 
@ParentAuditKey INT
AS
BEGIN
	-- SET NOCOUNT ON added to prevent extra result sets from
	-- interfering with SELECT statements.
	SET NOCOUNT ON;

	--   TRUNCATE TABLE npidata_pfile_other_identifier_stage2

	DECLARE @ordinalId VARCHAR(8) = 50;
	DECLARE @tSql varchar(2000);

	WHILE @ordinalId > 0
	BEGIN

	SELECT  @tSql = CONCAT('INSERT INTO [dbo].[npidata_pfile_other_identifier_stage2]
		( [NPI], [OrdinalId], 
		[Other_Provider_Identifier], [Other_Provider_Identifier_Type_Code], 
		[Other_Provider_Identifier_State], [Other_Provider_Identifier_Issuer],
		[isActive], [updatedAt])
		SELECT 	[NPI], ',@ordinalId,', 
		[Other_Provider_Identifier_',@ordinalId,'], 
		[Other_Provider_Identifier_Type_Code_',@ordinalId,'], 
		[Other_Provider_Identifier_State_',@ordinalId,'], 
		[Other_Provider_Identifier_Issuer_',@ordinalId,'], 
		1, getutcdate() 
		FROM  [dbo].[npidata_pfile_week_stage]
		WHERE LEN([Other_Provider_Identifier_',@ordinalId,']) > 0 OR 
		LEN([Other_Provider_Identifier_Type_Code_',@ordinalId,']) > 0 OR 
		LEN([Other_Provider_Identifier_State_',@ordinalId,']) > 0 OR 
		LEN([Other_Provider_Identifier_Issuer_',@ordinalId,']) > 0  
		;')

		EXEC(@tSql);		

		INSERT INTO DimAudit ([ParentAuditKey], [TableName], [PkgName], [PkgVersionMajor], [ExecStartDT], [ExecStopDT], [InsertRowCnt],  [SuccessfulProcessingInd])
		VALUES(@ParentAuditKey, '[npidata_pfile_other_identifier_stage2]','SPROC: [npidata_pfile_other_identifier_populate]', @ordinalId, getutcdate(), getutcdate(), @@ROWCOUNT, 'Y');

		SELECT @ordinalId = @ordinalId - 1;
	END;

END


Error Handling and Auditing

Key safeguards include:

  • Row-level audit logging
  • Conditional aborts when expected files are missing
  • Email notifications via sp_send_dbmail
  • Insert/update counts captured during MERGE operations

This design allows DBAs and developers to rerun only the failed components rather than restarting the entire workflow.


Final Step – SQL Server Agent Scheduling

The final production step is scheduling both stored procedures in SQL Server Agent:

  1. npidata_weekly_copy_S3_to_RDS
  2. npidata_Weekly_InsertUpdateMerge

These jobs are scheduled during low-activity maintenance windows to avoid contention with backups and other system tasks.


Conclusion

By automating the entire ingestion pipeline—from public download to production merge—we eliminated manual intervention, reduced operational risk, and created a repeatable, auditable process. The combination of PowerShell, AWS S3, SQL Server staging, and controlled MERGE logic provides a robust foundation that can easily scale as data volumes grow.