In this part (7) I will continue building my performance monitor by setting up a PowerShell job to download the CSV files from S3. I will then use SSIS to import them into a staging table. And finally I will use SSIS to move the staged data to the final data warehouse database that reports will access.
On my staging server at the MAIN COMPANY I run this powershell script on a five (5) minute incrimental schedule. This will look for new files in AWS S3 and grab a copy to the local server.
# These lines may be needed if task is run from SQL Agent Job.
$env:Path += ';C:\Program Files\Amazon\AWSCLIV2\'
$env:USERPROFILE = "C:\users\$env:USERNAME"
# Specify a limit time so that new files that arrive in mid process are not archived or deleted.
$limit = (Get-Date)
$pathArchive = "D:\ssispc\archived\"
$pathStage = "D:\ssispc\staged\"
$bucket = "s3://MYHEADQUARTERS-database/ssis/PerfCounter/FileDrop/"
$bucketArchive = "s3://MYHEADQUARTERS-database/ssis/PerfCounter/FileArchive/"
$bucket2 = "s3://MYHEADQUARTERS-performance-monitor/PerfCounter/FileDrop/"
$bucketArchive2 = "s3://MYHEADQUARTERS-performance-monitor/PerfCounter/FileArchive/"
# Cross copy COMPANYA files from COMPANYA S3 to MYHEADQUARTERS S3
aws s3 cp s3://COMPANYA-database/ssis/PerfCounter/FileDrop/ s3://MYHEADQUARTERS-performance-monitor/PerfCounter/FileDrop/ --recursive --acl bucket-owner-full-control --profile prod
# Archive Prod files at prod
aws s3 mv s3://COMPANYA-database/ssis/PerfCounter/FileDrop/ s3://COMPANYA-database/ssis/PerfCounter/FileArchive/ --recursive --acl bucket-owner-full-control --profile prod
# Cross copy COMPANYB S3 to MYHEADQUARTERS S3
aws s3 cp s3://COMPANYB-database/ssis/PerfCounter/FileDrop/ s3://MYHEADQUARTERS-performance-monitor/PerfCounter/FileDrop/ --recursive --acl bucket-owner-full-control --profile impl
# Archive COMPANYB files
aws s3 mv s3://COMPANYB-database/ssis/PerfCounter/FileDrop/ s3://COMPANYB-database/ssis/PerfCounter/FileArchive/ --recursive --acl bucket-owner-full-control --profile impl
# Grab ALL FILES from S3 and copy to LOCAL for DW porocessing:
# Copy & Archive files from S3 - EC2 instance set up
aws s3 cp $bucket $pathStage --recursive
aws s3 mv $bucket $bucketArchive --recursive
# Copy & Archive files from S3 - RDS method
aws s3 cp $bucket2 $pathStage --recursive
aws s3 mv $bucket2 $bucketArchive2 --recursive
# Will delete all files in SUB Folders over 2 weeks old.
Set-Location -Path $pathArchive
Get-ChildItem -Path $pathArchive -Recurse -Force -File | Where-Object { $_.LastWriteTime -lt ((Get-Date).AddDays(-16)) } | ForEach-Object { Remove-Item -Path $_.FullName -Force }
The SQL Agent Job
I created this SQL Agent job to run the PowerShell script as well as the SSIS EXTRACTION PACKAGE that I will unpack after this.
SSIS Package: PerfCounterExtract
First I will define my PROJECT PARAMS using the local path on my MAIN COMPANY staging server where the CSV files were copied. I use the same path that I defined in the PowerShell script above.
Next I will set up my PACKAGE VARIABLES that I will need.
CONNECTION MANAGERS
You will need to set up CONNECTION MANAGERS for the following:
Server | Database |
(local) | FLAT FILE CSV CONNECTION MANAGER |
STAGING SERVER | ServerAnalysisDwStaging |
MAIN DW SERVER | ServerAnalysisDw |
This is an OVERVIEW of the PACKAGE flow. I will not be going into all of the details of each object, but instead only hit the highlights of what actually needs customized configuration. Most programmers with SSIS experience should be able to figure the rest out.
AUDIT LOG BEGIN STEP: Use the following query on the GENERAL tab, while connecting to the ServerAnalysisDwStaging database.
INSERT INTO [Analysis].[PerfFileExtractionLog]
([ParentID], [ServerName], [FilPath], [FileName], [RunStartUtc], [RunEndUtc], [RunSuccess], [RowsAffected], [ErrorCount])
VALUES ( 0, 'DBSQLRPT01', 'SSIS EXTRACTION', 'START', GETUTCDATE(), NULL, 0,0,0);
SELECT MAX(LogID) AS ParentID
from Analysis.PerfFileExtractionLog;
On the RESULT SET tab configure as shown.
TRUNCATE STAGING STEP: Use the following query while connecting to the ServerAnalysisDwStaging database.
truncate table [Analysis].[PerfCounterLogStaging];
FOR EACH LOOP CONTAINER: Right click on the top bar and configure the COLLECTION PROPERTY and on then configure the VARIABLE MAPPINGS.
INSIDE THE LOOP, we add another AUDIT LOG BEGIN STEP. Configured with a query, PARAMETER MAPPINGS and a RESULT SET.
INSERT INTO [Analysis].[PerfFileExtractionLog]
([ParentID], [ServerName], [FilPath], [FileName], [RunStartUtc], [RunEndUtc], [RunSuccess], [RowsAffected], [ErrorCount])
VALUES ( ?, 'DBSQLRPT01', ?, ?, GETUTCDATE(), NULL, 0,0,0);
SELECT MAX(LogID) AS ParentID
from Analysis.PerfFileExtractionLog;
DATA FLOW TASK: I add a data flow task to IMPORT THE CSV file. This task has its own set of objects. I will be adding a FLAT FILE SOURCE, two ROW COUNTS, and an OLE DB DESTINATION object here.
FLAT FILE SOURCE: is set to use my FLAT FILE CSV CONNECTION MANAGER. See Part six(6) on how to create a CONNECTION MANAGER if you are not sure how to do this.
MOVE FILE TO ARCHIVE is a FILE TASK object. Now that I have extracted the data from the CSV file (in the previous data dlow step), I will move the physical file into an archive folder that I defined earlier in my PROJECT PARAMS.
SQL AUDIT END (inside loop): Note that I have two AUDIT END tasks. The first is to mark the end of each looped file, the next (outside loop) to mark end of Extraction.
UPDATE [Analysis].[PerfFileExtractionLog]
SET ParentID = ?,RunEndUtc = SYSDATETIME(),
RunSuccess=1,RowsAffected=?,ErrorCount=?
Where LogID = ?
SQL AUDIT END (MAIN)
UPDATE [Analysis].[PerfFileExtractionLog]
SET RunEndUtc = SYSDATETIME(),
RunSuccess=1
Where LogID = ?
Data Transformation Section
The next part of this package creates two AUDIT entries and a DATA FLOW to transform and copy the staged data to two final locations, one on the staging server and one on the main report server. This could be its own package, but I’ve included it in the same package for simplification.
AUDIT LOG BEGIN
INSERT INTO [Analysis].[PerfFileExtractionLog]
([ParentID], [ServerName], [FilPath], [FileName], [RunStartUtc], [RunEndUtc], [RunSuccess], [RowsAffected], [ErrorCount])
VALUES ( 0, 'DBSQLRPT01', 'SSIS DATA LOAD', 'MULTI LOAD', GETUTCDATE(), NULL, 0,0,0);
SELECT MAX(LogID) AS ParentID
from Analysis.PerfFileExtractionLog;
DATA FLOW: DATA TRANSFORM overview of Objects
OLE DB SOURCE Configuration. Use the following query on the GENERAL tab, while connecting to the ServerAnalysisDwStaging database.
SELECT l.LocationID,
l.ClusterID,
CONVERT(int,p.MetricID) AS MetricID,
p.PartName,
CONVERT(int,p.DateKey) AS DateKey,
LEFT(p.TimeKey,6) AS TimeKey,
(CASE WHEN IsNumeric(p.iVal)=1 THEN CONVERT(bigint,p.iVal) ELSE NULL END) AS iVal,
(CASE WHEN IsNumeric(p.dVal)=1 THEN CONVERT(decimal(10,4),p.dVal) ELSE NULL END) AS dVal,
dbo.CreateDateTimeFrom2Strings(p.DateKey,p.TimeKey) AS UtcDate
FROM Analysis.PerfCounterLogStaging p
INNER JOIN Analysis.PerfLocation l on CONVERT(int,p.PerfLocationID) = l.id
WHERE NOT EXISTS (SELECT 1
FROM Analysis.PerfCounterStats q
WHERE q.LocationID = l.LocationID
and q.ClusterID = l.ClusterID
and q.MetricID = CONVERT(int,p.MetricID)
and q.PartName = p.PartName
and q.DateKey = CONVERT(int,p.DateKey)
and q.TimeKey = LEFT(p.TimeKey,6))
GROUP BY l.LocationID, l.ClusterID, p.MetricID, p.PartName, p.DateKey, p.TimeKey, p.iVal, p.dVal
MULTI CAST: The Multicast is going to route to an OLE DB STAGING DESTIANTION and an OLE DB PRODUCTION DESTINATION used for reporting.
SQL AUDIT END
UPDATE [Analysis].[PerfFileExtractionLog]
SET RunEndUtc = SYSDATETIME(),
RunSuccess=1,RowsAffected = ?,ErrorCount = ?
Where LogID = ?
That concludes this part. At this point your data should be polling form each company location, creating CSV files that are uploaded to AWS S3, then downloaded at HEADQUARTERS and imported into the DW Database.
In the next section we create an SSRS REPORTING DASHBOARD to monitor these stats.