Production-hardened ETL pipeline with data quality gates, retry logic, idempotent loads, and full test coverage. Built with Apache Airflow + PostgreSQL.
Click any stage to see the implementation. Parallel extraction with sequential transform and load.
Pull from Flask API with retry logic and idempotent staging
@retry(stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10))
def fetch_shipments():
resp = requests.get(
f"{API_BASE}/shipments",
timeout=30
)
resp.raise_for_status()
return resp.json()
def extract_to_staging(conn):
data = fetch_shipments()
cur = conn.cursor()
cur.execute("TRUNCATE staging.shipments")
cur.executemany(
"""INSERT INTO staging.shipments
(shipment_id, customer_id, cost, status, ship_date)
VALUES (%s,%s,%s,%s,%s)""",
[(r['id'],r['customer'],r['cost'],
r['status'],r['date']) for r in data]
)
conn.commit()
Load tier classifications from CSV reference data
def extract_tiers_to_staging(conn):
df = pd.read_csv(
DATA_DIR / "customer_tiers.csv",
dtype={"customer_id": str, "tier": str}
)
# Validate expected columns exist
required = {"customer_id", "tier"}
assert required.issubset(df.columns), \
f"Missing columns: {required - set(df.columns)}"
cur = conn.cursor()
cur.execute("TRUNCATE staging.customer_tiers")
for _, row in df.iterrows():
cur.execute(
"""INSERT INTO staging.customer_tiers
(customer_id, tier) VALUES (%s, %s)""",
(row["customer_id"], row["tier"])
)
conn.commit()
Dedup, null/negative filter, SCD-aware tier join
def transform(conn):
cur = conn.cursor()
cur.execute("TRUNCATE staging.shipments_with_tiers")
cur.execute("""
INSERT INTO staging.shipments_with_tiers
SELECT
s.shipment_id,
s.customer_id,
s.cost,
s.ship_date,
COALESCE(t.tier, 'Unknown') AS tier
FROM (
-- Deduplicate: keep latest per shipment_id
SELECT DISTINCT ON (shipment_id) *
FROM staging.shipments
ORDER BY shipment_id, created_at DESC
) s
LEFT JOIN staging.customer_tiers t
ON s.customer_id = t.customer_id
WHERE
s.customer_id IS NOT NULL -- Gate 1: nulls
AND s.cost > 0 -- Gate 2: negatives
AND s.status != 'CANCELLED' -- Gate 3: cancelled
""")
conn.commit()
Aggregate by tier + month into final analytics table
def load_analytics(conn):
"""Idempotent: TRUNCATE + INSERT every run."""
cur = conn.cursor()
cur.execute(
"TRUNCATE analytics.shipping_spend_by_tier"
)
cur.execute("""
INSERT INTO analytics.shipping_spend_by_tier
(tier, month, total_spend, shipment_count)
SELECT
tier,
DATE_TRUNC('month', ship_date) AS month,
SUM(cost) AS total_spend,
COUNT(*) AS shipment_count
FROM staging.shipments_with_tiers
GROUP BY tier, DATE_TRUNC('month', ship_date)
ORDER BY month, tier
""")
conn.commit()
cur.execute(
"SELECT COUNT(*) FROM "
"analytics.shipping_spend_by_tier"
)
n = cur.fetchone()[0]
print(f"Loaded {n} rows into analytics")
Every record passes through four gates. 21 raw records → 17 clean records in the final table.
21 shipment records pulled from Flask API
Remove SHP002 duplicate - keep latest by created_at
Remove records with no customer_id
Remove negative or zero shipping costs
Cancelled orders excluded. Ready for analytics aggregation.
Six critical fixes that transformed a fragile script into a production-ready pipeline.
Pipeline output - shipping spend aggregated by customer tier. Refreshes every DAG run.
| Customer Tier | Month | Total Spend | Shipments | Avg Spend | Spend Share |
|---|---|---|---|---|---|
| Gold | 2025-01 | $45,230 | 8 | $5,654 | |
| Silver | 2025-01 | $28,150 | 5 | $5,630 | |
| Bronze | 2025-01 | $15,890 | 4 | $3,973 | |
| Unknown | 2025-01 | $3,450 | 2 | $1,725 |
Fully containerized - three Docker services, one docker-compose up to run.
DAG orchestration & scheduling
v2.8.1Staging & analytics schemas
v15.4Shipment data source service
v3.03 services, isolated network
v2.2428 tests, 100% pass rate
v8.1pandas · tenacity · psycopg2
v3.1228 tests across 3 files - extraction, transformation, loading, and edge cases.