Dataexpert.io Data Engineering Bootcamp Capstone Project – B.A.S.E. (Business Assistance for Security Enhancements) Subsidy Program
I won Best Capstone Project in Zach's January Dataexpert.io Data Engineering Bootcamp
A data engineering capstone project that builds a program for Colorado’s OEDIT (Office of Economic Development and International Trade) to allocate security system subsidies to businesses using historical crime, population, and income data trends from 1997-2020.
Github Repo
data-expert-data-engineering-capstone github repo
About
My name is Taylor Ortiz and I enrolled in
’s Dataexpert.io Data Engineering bootcamp as part of the January 2025 cohort. Part of the requirements for achieving the highest certification in the bootcamp is to complete a capstone project that showcases the skills attained during the bootcamp and the general skills that I possess as a data engineer and data architect. The following capstone business case, KPIs, and use cases are fictional and made up by me for the sake of the exercise (although I think it has merit for an actual State of Colorado OEDIT program one day). The technology stack used was built entirely from scratch using publicly accessible data from data.colorado.gov and CDPHE Open Data. Enjoy!Features
9,048,771 rows of source data extracted
A 28 task DAG (Directed Acyclic Graph) using Apache Airflow data pipeline orchestration running in a Astronomer cloud production environment
Data visualization of 16 required KPIs and use cases, as well as overall Colorado crime density, using Grafana Labs dashboards
A user experience for Colorado business entities to search for their business, understand the tier structure, and discover what subsidy tier their business qualifies for using Grafana Labs dashboards
Apache Spark jobs with Amazon Web Services (AWS) and Geoapify Geocoding API
Medallion architecture data design patterns
Comprehensive architecture diagram
Comprehensive data dictionary displaying all data sources and manipulations used
Capstone Requirements
Identify a problem you’d like to solve.
Scope the project and choose datasets.
Use at least 2 different data sources and formats (e.g., CSV, JSON, APIs), with at least 1 million rows.
Define your use case: analytics table, relational database, dashboard, etc.
Choose your tech stack and justify your choices.
Explore and assess the data.
Identify and resolve data quality issues.
Document your data cleaning steps.
Create a data model diagram and dictionary.
Build ETL pipelines to transform and load the data.
Run pipelines and perform data quality checks.
Visualize or present the data through dashboards, applications, or reports.
Your data pipeline must be running in a production cloud environment
Project Overview
Problem Statement
The State of Colorado has hired your data firm to develop internal back-office visualizations and a front facing business user experience to support a critical initiative for the Office of Economic Development and International Trade (OEDIT). The program, B.A.S.E. (Business Assistance for Security Enhancements), is designed to evaluate and qualify businesses across the state for security system subsidies to protect their business based on historical crime trends, income, and population data by city and county. Historical data from 1997 to 2020, as well as new data extracted daily, will be analyzed to determine which security tiers businesses qualify for.
Additionally, the State of Colorado has sixteen KPIs and use cases they would like to see vizualized out of the extracted, transformed and aggregated data to assist them in planning, funding and outreach initiatives.
This tool will allow OEDIT to automatically notify active businesses in good standing about the subsidies they qualify for, streamlining the application process for business owners seeking to participate. Lastly, the State requires the creation of an intuitive user experience that enables businesses to search for their assigned tier, providing them with easy access to their eligibility information.
KPIs and Use Cases
County Level KPIs and Use Cases
KPI: Set a goal for each police agency within a given county to reduce crime by 5% within the next fiscal year, based on a baseline crime count from historical data.
Use Case: Calculate crime count for police agencies in a given county to get a baseline number.
KPI: Bring 100% free self defense and resident safety programs to lower income counties.
Use Case: Calculate the average median household income per county.
KPI: Evaluate additional police agency support required in counties that show a correlation between rising population and rising crime.
Use Case: Show population trends for each county alongside corresponding crime trends by year.
KPI: Identify counties with an average annual population growth rate exceeding 10% from 1997–2020, and adjust state support allocations to improve funding alignment for resident programs.
Use Case: Show population trends for each county from 1997–2020.
KPI: Inform police agencies of which crime categories are most prevalent in their respective county.
Use Case: Calculate the totals for each crime category per county from 1997 to 2020 to gauge frequency.
KPI: Enable police agencies to be proactive by identifying which months of the year have a higher volume of crime.
Use Case: Show average seasonal crime trends for each month by county.
KPI: Provide an interactive visual representation of crime density across counties for the entire state.
Use Case: Create a geo-map of crime density for counties from 1997–2020.
KPI: Drive further marketing outreach for supportive safety programs in lower income counties.
Use Case: Show average income per capita for counties.
KPI: Display crime rates compared to median household income to pinpoint high-risk areas.
Use Case: Compare crime data with median household income for each county.
KPI: Illustrate crime type distribution by county by Property, Person and Society to better understand where to best allocate safety resources
Use Case: Analyze the distribution of different crime types across each county from 1997–2020 for Property, Person and Society crimes.
City Level KPIs and Use Cases
KPI: Deploy an interactive dashboard displaying seasonal crime trends for each city to detect seasonal peaks to guide targeted patrol planning.
Use Case: Show seasonal crime trends for the year in each city.
KPI: Identify and report the top three crime categories most prevalent during daytime (6 AM–6 PM) in each city to optimize resource allocation during peak hours.
Use Case: Determine which crimes are more likely to happen during the day for a city.
KPI: Calculate the average age of individuals involved in each crime category across cities to support the development of tailored intervention programs.
Use Case: Compute the average age for crime categories across cities.
KPI: Identify and report the top three crime categories most common at night (6 PM–6 AM) in each city to inform optimized night patrol scheduling.
Use Case: Determine which crimes are more likely to happen at night for a city.
KPI: Develop a dynamic visualization that shows the percentage distribution of crime types by city by Property, Person and Society to support targeted law enforcement initiatives.
Use Case: Display crime type distribution by city.
KPI: Provide an analysis dashboard showing average crime trends by day of the week for each city, highlighting peak crime days to drive strategic patrol scheduling.
Use Case: Show crime trends on average by day of the week to determine when to patrol more.
Data Sources
Crimes in Colorado (2016-2020): Offenses in Colorado for 2016 through 2020 by Agency from the FBI’s Crime Data Explorer.
Number of rows: 3.1M
Crimes in Colorado (1997-2015): Crime stats for the State of Colorado from 1997 to 2015. Data provided by the CDPS and the FBI’s Crime Data Explorer (CDE).
Number of rows: 4.95M
Personal Income in Colorado: Income (per capita or total) for each county by year with rank and population. From Colorado Department of Labor and Employment (CDLE), since 1969.
Number of rows: 10k
Population Projections in Colorado: Actual and predicted population data by gender and age from the Department of Local Affairs (DOLA), from 1990 to 2040.
Number of rows: 382k
Business Entities in Colorado: Colorado Business Entities (corporations, LLCs, etc.) registered with the Colorado Department of State (CDOS) since 1864.
Number of rows: 2.81M
Colorado County Boundaries: This feature class contains county boundaries for all 64 Colorado counties and 2010 US Census attributes data describing the population within each county.
Number of rows: 64
Data Tables
Bronze Layer (raw) data
colorado_business_entities_raw
Number of rows: 984,368
colorado_city_county_zip_raw
Number of rows: 760
colorado_county_coordinates_raw
Number of rows: 64
colorado_crimes_1997_2015_raw
Number of rows: 4,952,282
colorado_crimes_2016_2020_raw
Number of rows: 3,101,365
colorado_income_raw
Number of rows: 9,932
Silver Layer (transform) data
colorado_business_entities
Number of rows: 817,324 (grows by Airflow pipeline)
colorado_business_entities_duplicates
Number of rows: 721 (grows by Airflow pipeline)
colorado_business_entities_misfit_entities
Number of rows: 71 (grows by Airflow pipeline)
colorado_business_entities_with_ranking
Number of rows: 817,324 (grows by Airflow pipeline)
colorado_crime_population_trends
Number of rows: 1,536
colorado_crime_tier_arson
Number of rows: 72
colorado_crime_tier_burglary
Number of rows: 79
colorado_crime_tier_county_rank
Number of rows: 79
colorado_crime_tier_larceny_theft
Number of rows: 79
colorado_crime_tier_property_destruction
Number of rows: 79
colorado_crime_tier_property_destruction
Number of rows: 70
colorado_crime_tier_stolen_property
Number of rows: 75
colorado_crime_tier_vehicle_theft
Number of rows: 79
colorado_crimes
Number of rows: 8,053,647
colorado_crimes_2016_2020_with_cities
Number of rows: 2,771,984
colorado_crimes_with_cities
Number of rows: 7,724,266
colorado_final_county_tier_rank
Number of rows: 64
colorado_income_1997_2020
Number of rows: 4604
colorado_income_household_tier_rank
Number of rows: 64
colorado_population_raw
Number of rows: 381,504
colorado_temp_geocoded_entities
Number of rows: 2,033
Gold Layer (aggregate) data
colorado_avg_age_per_crime_category_1997_2020
Number of rows: 3,154
colorado_city_crime_counts_1997_2020
Number of rows: 182
colorado_city_crime_time_likelihood
Number of rows: 3,206
colorado_city_seasonal_crime_rates_1997_2020
Number of rows: 2,196
colorado_county_agency_crime_counts
Number of rows: 526
colorado_county_average_income_1997_2020
Number of rows: 64
colorado_county_crime_per_capita_1997_2020
Number of rows: 64
colorado_county_crime_per_capita_with_coordinates
Number of rows: 64
colorado_county_crime_vs_population_1997_2020
Number of rows: 1,665
colorado_county_seasonal_crime_rates_1997_2020
Number of rows: 911
colorado_crime_category_totals_per_county_1997_2020
Number of rows: 1,520
colorado_crime_type_distribution_by_city
Number of rows: 549
colorado_crime_vs_median_household_income_1997_2020
Number of rows: 1,536
colorado_population_crime_per_capita_rank
Number of rows: 64
colorado_total_population_per_county_1997_2020
Number of rows: 1,536
coloraodo_county_crime_rate_per_capita
Number of rows: 1,458
Access the entire capstone data dictionary for more detailed info on these datasets used.
Technologies Used
Python: Programming language used for Spark jobs with AWS and Iceberg loads, Geoapify APIs and python callables in Airflow orchestration
SQL: fundamental for querying, transforming, and aggregating structured data.
Apache Airflow: orchestration tool that manages, schedules and automates the daily busines entity extract from the Colorado Information Marketplace
AWS: S3 bucket storage for CSV extracts
Tabular: data lake operations
Trino: distributed SQL query engine for medallion architecture
Geoapify: geocoding api for address validation of business entities
Grafana: dashboards for data visualizations of KPIs, use cases and business user experience
Astronomer: data orchestraton platform that provides a managed service for Apache Airflow orchestrations in the cloud
Apache Spark: open source distributed computing system for processing source extracts from AWS to Iceberg and backfilling / verifying address data with Geoapify
Architecture
Business Entity Tier Ranking
Subsidy Tiers
Tiers are represented as a range of 1 through 4 in the B.A.S.E. program. 1 indicates the lowest level need and 4 indicates the highest level need for security system subsidies. Each subsidy tier below offers a gradual increase of security system services based on the tier that your business qualifies for. The idea is that tiers will be backfilled and assigned on the source business entities dataset and as new business entities are added daily to the Colorado Information Marketplace, those records will also be assigned a tier through the Airflow orchestration that we will cover below. However, how are tiers actually calculated and assigned? Read on!
How Tiers Are Calculated and Assigned
Step 1: Identify Criteria
Crimes:
Out of all the crime categories that exist, I chose 7 that closely resemble property-related or adjacent crimes that would factor into the ranking:Destruction/Damage/Vandalism of Property
Burglary/Breaking & Entering
Larceny/Theft Offenses
Motor Vehicle Theft
Robbery
Arson
Stolen Property Offenses
Population:
Identify crime per capita (per 1,000 residents) for all counties between the years of 1997 and 2020.Income:
Identify Median Household Income for all counties between the years of 1997 and 2020.
Step 2: Establish Individual Rankings
For each metric, we transform raw data into a standardized ranking by following a similar process:
Crime Categories
For each of the 7 selected crime categories:
Aggregate Data:
Count the number of incidents per county.Compute Percentile Ranks:
Use a percentile function (e.g.,PERCENT_RANK()
) to determine each county’s standing relative to others.Assign Tiers:
Counties in the highest percentiles (top 25%) receive an assignment of 4. Counties >= 50% and <= 75% receive an assignment of 3. Counties >= 25% and <= 50% receive an assignment of 2. Counties that are <= 25% receive an assignment of 1.
Population-Adjusted Crime (Crime Per Capita)
We adjust raw crime counts by county population to calculate the crime rate per 1,000 residents. This involves:
Calculating Yearly Crime Rates:
Join crime data with population figures.Averaging:
Compute an average crime rate per county over the selected years.Ranking:
Counties in the highest percentiles (top 25%) receive an assignment of 4. Counties >= 50% and <= 75% receive an assignment of 3. Counties >= 25% and <= 50% receive an assignment of 2. Counties that are <= 25% receive an assignment of 1.
Income
For median household income, we:
Aggregate Income Data:
Average the income per county across the years.Compute Percentile Ranks:
Establish how each county compares to others.Assign Tiers:
Income is the inverse of the previous two. We actually want to rank the best Income counties the lowest because they would require less subsidy assistance than lower income counties. Therefore, Counties in the highest percentiles (top 25%) receive an assignment of 1. Counties >= 50% and <= 75% receive an assignment of 2. Counties >= 25% and <= 50% receive an assignment of 3. Counties that are <= 25% receive an assignment of 4. Below is an example distribution. While this is not a perfect distribution, I was pretty happy with the result from a public dataset. With a mostly even distribution based on percentage, we can start assigning tiers based on the rubric mentioned above.
Below is an example query for evaluating the Destruction/Damage/Vandalism of Property crime category and assigning a tier to counties:
CREATE TABLE tayloro.colorado_crime_tier_destruction_property AS
WITH crime_aggregated AS (
-- Step 1: Aggregate property destruction crime counts per county
SELECT
county,
COUNT(*) AS total_property_destruction_crimes
FROM academy.tayloro.colorado_crimes
WHERE offense_category_name = 'Destruction/Damage/Vandalism of Property'
GROUP BY county
),
crime_percentiles AS (
-- Step 2: Compute percentile rank for property destruction crime counts per county
SELECT
county,
total_property_destruction_crimes,
PERCENT_RANK() OVER (ORDER BY total_arson_crimes) AS crime_percentile
FROM crime_aggregated
),
crime_tiers AS (
-- Step 3: Assign tiers based on percentile rankings
SELECT
county,
total_property_destruction_crimes,
crime_percentile,
CASE
WHEN crime_percentile >= 0.75 THEN 1 -- Counties with the highest counts (Top 25%)
WHEN crime_percentile >= 0.50 THEN 2 -- Next 25% (50%-75%)
WHEN crime_percentile >= 0.25 THEN 3 -- Next 25% (25%-50%)
ELSE 4 -- Counties with the lowest counts (Bottom 25%)
END AS crime_tier
FROM crime_percentiles
)
SELECT * FROM crime_tiers;
Step 3: Merge Individual Crime Rankings to Form a Unified Crime Tier County Rank
After calculating individual crime tiers for each of the 7 selected crime categories, the next step is to merge these rankings into a single unified score per county. This unified score, referred to as the overall crime tier, is computed by averaging the individual tiers for each county and then rounding the result to the nearest whole number. The process involves the following steps:
Unioning the Individual Tables:
All individual crime tier tables (e.g., for property destruction, burglary, larceny/theft, etc.) are combined using aUNION ALL
. Each record is tagged with its corresponding table name for identification.Pivoting and Aggregating Data:
The combined dataset is grouped by county. For each crime category, the query extracts the crime tier and computes the average of these tiers across all categories.Final Ordering:
The resulting unified table is sorted by the overall crime tier in descending order, thereby highlighting counties with the highest aggregated crime tiers.
Below is the SQL query that performs these operations:
CREATE TABLE tayloro.colorado_crime_tier_county_rank AS
WITH county_scores AS (
SELECT
county_name,
-- Select the crime tiers for each category
MAX(CASE WHEN table_name = 'colorado_crime_tier_property_destruction' THEN crime_tier ELSE NULL END) AS property_destruction_tier,
MAX(CASE WHEN table_name = 'colorado_crime_tier_burglary' THEN crime_tier ELSE NULL END) AS burglary_tier,
MAX(CASE WHEN table_name = 'colorado_crime_tier_larceny_theft' THEN crime_tier ELSE NULL END) AS larceny_theft_tier,
MAX(CASE WHEN table_name = 'colorado_crime_tier_vehicle_theft' THEN crime_tier ELSE NULL END) AS vehicle_theft_tier,
MAX(CASE WHEN table_name = 'colorado_crime_tier_robbery' THEN crime_tier ELSE NULL END) AS robbery_tier,
MAX(CASE WHEN table_name = 'colorado_crime_tier_arson' THEN crime_tier ELSE NULL END) AS arson_tier,
MAX(CASE WHEN table_name = 'colorado_crime_tier_stolen_property' THEN crime_tier ELSE NULL END) AS stolen_property_tier,
-- Calculate the average of all crime tiers
ROUND(
AVG(
CASE
WHEN table_name IN (
'colorado_crime_tier_property_destruction',
'colorado_crime_tier_burglary',
'colorado_crime_tier_larceny_theft',
'colorado_crime_tier_vehicle_theft',
'colorado_crime_tier_robbery',
'colorado_crime_tier_arson',
'colorado_crime_tier_stolen_property'
) THEN crime_tier
ELSE NULL
END
)
) AS overall_crime_tier -- Round to the nearest whole number
FROM (
-- Union all 7 crime tier tables
SELECT county_name, 'colorado_crime_tier_property_destruction' AS table_name, crime_tier FROM academy.tayloro.colorado_crime_tier_property_destruction
UNION ALL
SELECT county_name, 'colorado_crime_tier_burglary' AS table_name, crime_tier FROM academy.tayloro.colorado_crime_tier_burglary
UNION ALL
SELECT county_name, 'colorado_crime_tier_larceny_theft' AS table_name, crime_tier FROM academy.tayloro.colorado_crime_tier_larceny_theft
UNION ALL
SELECT county_name, 'colorado_crime_tier_vehicle_theft' AS table_name, crime_tier FROM academy.tayloro.colorado_crime_tier_vehicle_theft
UNION ALL
SELECT county_name, 'colorado_crime_tier_robbery' AS table_name, crime_tier FROM academy.tayloro.colorado_crime_tier_robbery
UNION ALL
SELECT county_name, 'colorado_crime_tier_arson' AS table_name, crime_tier FROM academy.tayloro.colorado_crime_tier_arson
UNION ALL
SELECT county_name, 'colorado_crime_tier_stolen_property' AS table_name, crime_tier FROM academy.tayloro.colorado_crime_tier_stolen_property
) combined
GROUP BY county_name
)
SELECT * FROM county_scores
ORDER BY overall_crime_tier DESC;
Step 4: Merge Rankings for Final County Tier Rank
In this step, we combine the overall crime tier county rank, the income tier rank, and the population (crime per capita) rank to generate a final composite ranking for each county. Since our primary focus is on crime, we weigh the crime rank a little heavier in our final calculation.
The merging process involves:
Combining Ranks:
We perform a full outer join on the three individual ranking tables to ensure every county is represented.Handling Missing Values:
COALESCE
is used to manage missing values by defaulting to 0 when a rank is not available.Calculating the Final Rank:
The final rank is computed as the average of the three rankings. With crime being our primary focus, its rank is given slightly more influence in this average.Final Ordering:
Counties are ordered by the final average rank (in ascending order) to highlight those with the highest overall tier.
Below is the SQL query that accomplishes these steps:
CREATE TABLE tayloro.colorado_final_county_tier_rank AS
WITH combined_ranks AS (
SELECT
COALESCE(c.county_name, i.county, p.county) AS county,
COALESCE(c.overall_crime_tier, 0) AS crime_rank,
COALESCE(i.income_tier, 0) AS income_rank,
COALESCE(p.crime_per_capita_tier, 0) AS population_rank
FROM academy.tayloro.colorado_crime_tier_county_rank c
FULL OUTER JOIN academy.tayloro.colorado_income_household_tier_rank i
ON LOWER(c.county_name) = LOWER(i.county)
FULL OUTER JOIN academy.tayloro.colorado_population_crime_per_capita_rank p
ON LOWER(c.county_name) = LOWER(p.county)
),
average_rank AS (
SELECT
county,
crime_rank,
income_rank,
population_rank,
ROUND(
(crime_rank + income_rank + population_rank) /
NULLIF((CASE WHEN crime_rank > 0 THEN 1 ELSE 0 END
+ CASE WHEN income_rank > 0 THEN 1 ELSE 0 END
+ CASE WHEN population_rank > 0 THEN 1 ELSE 0 END), 0)
) AS final_rank -- Calculate the average rank and round to the nearest whole number
FROM combined_ranks
WHERE crime_rank > 0 AND income_rank > 0 AND population_rank > 0 -- Ensure all ranks are present
)
SELECT * FROM average_rank
ORDER BY final_rank ASC; -- Order by the final average rank
Step 5: Backfill Business Entities Table with Final Ranking
In this final step, we join the final county tier rank table with the business entities table. This allows us to backfill all business entities with their corresponding ranking information based on county. The query uses a LEFT JOIN
to ensure that every business entity is retained, matching on a case-insensitive comparison of county names.
CREATE TABLE tayloro.colorado_business_entities_with_ranking AS
SELECT
be.*,
fr.crime_rank,
fr.income_rank,
fr.population_rank,
fr.final_rank
FROM tayloro.colorado_business_entities be
LEFT JOIN tayloro.colorado_final_county_tier_rank fr
ON LOWER(be.principalcounty) = LOWER(fr.county);
Below is a visualization of how these datasets were merged together to form a cohesive ranking across the three main categories.
KPI and Use Case Visualizations
These dashboards represent the 16 KPIs and use cases that were provided to the data firm by the state. Please note as well that Grafana allows us to create dynamic variables that enable us to choose our city or county and fetch dynamic data from those values in real time for visualization. Since these are static images, that will not be represented here, but is a huge component to the user experience.
Colorado County Dashboard
This county dashboard shows the following use cases:
Calculate crime count for police agencies in a given county to get a baseline number.
Calculate the average median household income per county.
Show population trends for each county alongside corresponding crime trends by year.
Show population trends for each county from 1997–2020.
Calculate the totals for each crime category per county from 1997 to 2020 to gauge frequency.
Show average seasonal crime trends for each month by county.
Create a geo-map of crime density for counties from 1997–2020.
Show average income per capita for counties.
Compare crime data with median household income for each county.
Analyze the distribution of different crime types across each county from 1997–2020 for Property, Person and Society crimes.
Colorado City Dashboard
This city dashboard shows the following use cases:
Show seasonal crime trends for the year in each city.
Determine which crimes are more likely to happen during the day for a city.
Compute the average age for crime categories across cities.
Determine which crimes are more likely to happen at night for a city.
Display crime type distribution by city.
Show crime trends on average by day of the week to determine when to patrol more.
Colorado Crime Density Dashboard
This dashboard shows overall crime density on crime per capita (100k) residents across the entire State of Colorado. Its cool to see the larger areas represent the highest density and its not surprising that many of those are in the metro area of Denver.
Business Entity Data Pipeline
Business Entity Daily DAG
The final part of the equation is our data pipeline that evaluates newly incoming business entities daily. This DAG calls out to the public Colorado Information Marketplace API every day at 6 AM MT and fetches yesterday’s Colorado Business Entity data for processing. It then cleans the data, joins matching County data based on Zip and City and then assigns an appropriate subsidy tier. This DAG involves several tasks to ingest, clean, validate, enrich, and load data from the data.colorado.gov API into production tables. Below is a breakdown of each task and its purpose.
Task Descriptions
create_business_entities_yesterday_stage
Description: Creates a staging table for business entities for yesterday’s data. This table is partitioned byprincipalcounty
and sets up the structure to temporarily hold the data for further processing.create_misfit_business_entities_table
Description: Creates a table for misfit business entities that do not meet the expected data patterns. This helps segregate records that require special attention or further review.create_dupes_staging_table
Description: Creates a staging table for capturing potential duplicate records from the business entities data. It mirrors the structure of the main table and is partitioned byprincipalcounty
.create_duplicates_table
Description: Creates a permanent table to store confirmed duplicate records from the business entities dataset.create_geo_validation_staging_table
Description: Creates a temporary table to hold the results of geo-validated address data. This table is partitioned bygeo_county
and is used to store responses from the Geoapify API.create_addded_city_county_zip_staging_table
Description: Creates a staging table to store newly discovered city/county/zip combinations from geo-validation. This table is later used to update the reference data for address mapping.fetch_and_insert_business_entities_yesterday
Description: Fetches business entity data for yesterday from the data.colorado.gov API and inserts the records into the staging table. This task calls the API, transforms the response into records, and executes an INSERT query.filter_invalid_addresses
Description: Deletes records from the staging table that have NULL values in critical address fields (principalcity
orprincipalzipcode
), ensuring only valid addresses proceed to the next steps.reformat_zip_code
Description: Reformats theprincipalzipcode
field in the staging table to a standard 5-digit format, ensuring consistency for downstream processing.update_county_on_staging_table
Description: Updates theprincipalcounty
field in the staging table by joining with the cleaned reference table (colorado_city_county_zip
). The matching is done using normalized (trimmed and lowercased) city names and ZIP codes.process_unmatched_entities_task
Description: Processes records from the staging table whereprincipalcounty
is still NULL. It calls the Geoapify API to fetch validated address data for these unmatched entities, transforms the API responses, and inserts the results into the geo validation staging table.check_validated_address_combos
Description: Checks validated address combinations by joining the geo validation staging table with the staging table. This task identifies new city/county/zip combinations not present in the reference table.insert_validated_address_combos
Description: Inserts the new, validated city/county/zip combinations into the reference table (colorado_city_county_zip
). This enhances the mapping accuracy for future data processing.override_from_validated_addresses
Description: Updates the main business entities table by overriding existing address fields with geo-validated data. This ensures that the most accurate and standardized addresses are stored.retry_update_county_on_staging_table
Description: Reattempts the update of theprincipalcounty
field on the staging table for any records that remain unmatched after initial processing.identify_duplicates_task
Description: Identifies duplicate records within the staging table by comparing key address and entity fields. The duplicates are then inserted into the duplicates staging table.run_dq_check_null_values
Description: Runs a data quality check that counts records with NULL values in critical fields (entityname
,principalcity
,entitytype
,entityformdate
) to ensure data completeness.run_dq_check_principalstate
Description: Executes a data quality check to verify that all records in the staging table haveprincipalstate
equal to ‘CO’.run_dq_check_entityformdate
Description: Performs a data quality check to ensure that theentityformdate
in the staging table matches the expected date (yesterday’s date).run_dq_check_no_unexpected_duplicates
Description: Checks for any unexpected duplicates in the staging table by grouping onentityid
and counting occurrences greater than one.insert_new_records_task
Description: Inserts new business entity records from the staging table into the production table, excluding any records that have been flagged as duplicates and ensuring thatprincipalcounty
is populated.insert_business_entities_with_ranking
Description: Inserts business entity records along with their associated ranking information by joining the staging table with the final county tier rank table.insert_duplicates
Description: Inserts duplicate records from the duplicates staging table into the permanent duplicates table for further review or archival.insert_misfits
Description: Inserts misfit records (those without a validprincipalcounty
) from the staging table into the misfits table, isolating records that could not be processed normally.drop_staging_table_task
Description: Drops the staging table to clean up temporary data once processing is complete.drop_dupes_staging_table_task
Description: Drops the duplicates staging table to remove temporary duplicate data after it has been archived.drop_geo_validation_staging_table
Description: Drops the geo validation staging table that was used for holding geo API responses once the data has been processed and merged.drop_added_city_county_zip_staging_table
Description: Drops the staging table for newly added city/county/zip combinations, finalizing the cleanup of temporary tables.
DAG Flow Order
The tasks order looks like this:
Astronomer Cloud Deployment
One of the core requirements from Zach for the capstone project is that it must be running in a production cloud environment to count. Below are snapshots of my DAG in the Astronomer production environment running daily.
This capture shows the successes and failures of my DAG runs as I worked through errors and bugs.
Putting It All Together
Business Entity Search Dashboard
The dashboard below enables a business owner to navigate to this dashboard, search for their business entity and see information about their business, information on the available security system subsidy tiers and exactly which tier their business qualifies for. In this example, Lost Coffee could search for information about their business and learn that they have been assigned a tier ranking of 3 based on all of the criteria explained above.
Challenges and Findings
In-depth Summary of Business Entities Data Cleaning and Update Process
Challenges Faced:
Mismatches in Data: Many records in the
colorado_business_entities
table lacked corresponding county information when joined with the reference table (colorado_city_county_zip
).Inconsistent Data Formatting: Typos, case discrepancies, and extra spaces in city names and ZIP codes led to failed matches.
Incomplete Reference Data: Missing city/ZIP combinations and duplicate entries in the reference table required augmentation and deduplication to ensure accurate mapping.
Process Overview:
Initial Analysis and Identification of Issues:
Ran queries to identify records in
colorado_business_entities
with no matching county in the reference table.Examined unmatched city/ZIP pairs to determine the most frequent discrepancies.
Data Correction in the Business Entities Table:
Performed manual corrections for known typos and inconsistencies (e.g., updating “peyton co” to “Peyton”, correcting ZIP 80524 entries to “Fort Collins”).
Verified corrections by running SELECT queries against both the business entities and reference datasets.
Assessing Match Quality:
Executed queries to compare the count of matched versus unmatched records, revealing a high number of mismatches that required further attention.
Used Geoapify API to resolve some address issues:
Leveraged the Geocoding API from Geoapify to validate address descrepancies and improve address, city, county and zip attributes of non matching business entities
One funny thing is that while processing the remaining unmatched records, I totally blew through the daily Geoapify limits and appreciate their services very much for letting me do so
Inserting Missing Reference Data:
Augmented the reference table by inserting missing city/ZIP combinations (e.g., added entries for “Evans” in Weld County).
Inspected the reference table to ensure that the inserted data met quality standards.
Deduplicating the Reference Table:
Identified duplicate rows for certain city/ZIP pairs (e.g., “Burlington, 80807” and “New Raymer, 80742”).
Created a new deduplicated version of the reference table and replaced the original with the cleaned version.
Updating the Business Entities with County Data:
Employed a final update query (using a correlated subquery) to backfill the
principalcounty
field incolorado_business_entities
by matching normalized city and ZIP code data from the cleaned reference table.Verified the update by confirming that previously unmatched records were now successfully mapped.
Final Outcome:
Normalized Addresses: Data cleaning (trimming, lowercasing, casting ZIP codes) ensured consistent comparisons.
Accurate Mapping: Manual corrections and enhanced reference data led to a complete and unique set of city/ZIP combinations.
Successful Update: The final update query backfilled the
principalcounty
field for all records, reducing unmatched counts to zero.
Out-of-Memory (OOM) Exceptions and Spark Configuration Adjustments
When processing large datasets (over a million rows) using Spark, I frequently encountered out-of-memory exceptions—especially when fetching large CSV files from S3 and writing to Iceberg. The default Spark settings were insufficient for these workloads.
To address this challenge, I reviewed the Spark documentation and applied recommendations from the bootcamp. I adjusted the Spark configuration to allocate more memory to both the executors and the driver, and to optimize shuffle operations. The following settings helped mitigate the memory issues:
conf.set('spark.executor.memory', '16g') # Increase executor memory based on your system's capacity
conf.set('spark.sql.shuffle.partitions', '200') # Optimize the number of shuffle partitions
conf.set('spark.driver.memory', '16g') # Increase driver memory to handle larger workloads
By increasing the memory allocation and tuning the shuffle partitions, I was able to process the large CSV files efficiently and write to Iceberg without running into out-of-memory errors.
Challenge: Ambiguous City and County Matching
Description:
Initially, my orchestration relied solely on a city and county dataset to map business entities to their respective counties. However, an issue arose where cities that span multiple counties were being randomly assigned to just one county.
Example:
Aurora: This city spans across Douglas County, Arapahoe County, and Denver County.
Without additional disambiguation, business entities in Aurora were incorrectly mapped to a single county at random, leading to inaccurate county assignments.
Resolution:
I enhanced the reference dataset by adding a ZIP code column.
By matching on both the city and ZIP code of a business entity’s address, the mapping became much more precise.
This change ensured that each business entity is matched to the correct county based on the distinct combination of city and ZIP code.
Outcome:
Incorporating the ZIP code significantly improved the accuracy of county assignments, eliminating the randomness in cases where cities span multiple counties.
Inconsistent Zip Code Formats Causing Casting Errors
Description:
Initially, the ETL process transformed API responses without validating the principalzipcode
field. This oversight led to casting errors when attempting to convert zip code values in unexpected formats (e.g., "80137-6828"
) into an integer for insertion into the staging table.
Example:
Problem Record: A record with a
principalzipcode
value like"80137-6828"
resulted in the following error during insertion:
TrinoUserError: TrinoUserError(type=USER_ERROR, name=INVALID_CAST_ARGUMENT, message=”Cannot cast ‘80137-6828’ to INT”, query_id=…)
Resolution:
Data Transformation Enhancement:
I updated thetransform_entity_data_to_records
function to validate the zip code using a regular expression. Only zip codes matching exactly five digits (\d{5}
) are accepted:Valid zip codes are converted to an integer.
Invalid zip codes are set to Python’s
None
.SQL Query Adjustment:
In theinsert_into_staging_table
function, I modified the query construction to check forNone
. If a zip code isNone
, it is replaced with the SQL literalNULL
(without quotes) in the final query. This ensures that only a valid integer or a properNULL
value is inserted into theprincipalzipcode
column.
Outcome:
By separating data validation from SQL query construction, this solution prevents casting errors and maintains data integrity. Only correctly formatted 5-digit zip codes (or NULL
for invalid ones) are passed to the database, resulting in a robust and error-free ETL process.
Next Steps and Closing Thoughts
Next Steps
Here is a running list of things that I would continue to refine, improve or new features I would introduce:
Address level tier assignment that are specific to the lat and long of business location
Understanding residential vs commercial businesses and how to delineate on security subsidy
Write automation to handle the duplicates and misfits table instead of it being manually remedied
Build Machine Learning in to the population projections data that uses forecasted population growth to predict crime trends in respective counties
Closing Thoughts
In closing, this capstone project was easily one of the hardest projects that I have ever done by myself end to end. I pushed myself and accomplished more than I ever thought I would and had so much fun in the process. The most exciting thing about it is that while I only focused on 16 KPIs and use cases that I made up, these datasets could serve hundreds of other exciting and interesting use cases. I spent countless hours over 8 weeks finding the right datasets, refining the data into the KPIs and Use Cases you just reviewed and building the experience that allows business owners to search for their qualifying subsidy tiers. I am so proud of myself and what I was able to accomplish from free data extracts. It was really cool for me to focus on data that is in my own back yard here in Colorado. I was able to accomplish things I never thought I would ever do and I cannot wait to see what is next for my career after this.