Writing a Python script to fetch data from an API is the easy part. Any junior developer can write a script that pulls HubSpot deals and dumps them into a CSV file.
But what happens six months later?
Suddenly, your “simple” data project has turned into a “Data Swamp”:
If you are serious about building a robust and scalable data pipeline for your business data, you need to stop building fragile scripts and start building Data Lakehouses.
In this guide, we are going to move beyond the basics. We won’t just “dump data.” We are going to build a Production-Grade ELT Pipeline that ingests HubSpot data into AWS S3, cleans it using AWS Glue, and stores it in Apache Iceberg format.
By the end of this tutorial, you will have a pipeline that supports “Time Travel,” ensures zero duplicates, and costs pennies to run.
What We Will Build We will focus on a single, critical data source: HubSpot Sales Data.
Ready to level up your architecture? Let’s dive in.
The Architecture: Designing for ROI and Reliability
Before we write a single line of code, we must understand the strategy. Many businesses fail at analytics not because they lack data, but because their infrastructure is too expensive to maintain or too fragile to trust.
A “Modern Data Lakehouse” architecture solves this by combining the low cost of cloud storage (Amazon S3) with the reliability of a traditional database (Apache Iceberg).
The Workflow
Strategic Prerequisites To implement this, your team needs:
Step 1: Secure Ingestion (The Bronze Layer)
For a CTO or VP of Engineering, the nightmare scenario is “Silent Failure”—where data stops updating, but no one notices until the CEO opens a dashboard three days later.
We prevent this by building a Robust Ingestion Script. This script doesn’t just “grab data”; it handles API limits, paginates through thousands of records, and organizes files by date for easy auditing.
The Strategy: We utilize a “Partitioning” strategy. Instead of one giant file, we save data into folders like /year=2026/month=01/day=15/. This makes future queries faster and cheaper because we only scan the specific dates we need.
The Implementation: Here is the Python logic that securely fetches your HubSpot Deals and lands them in your “Bronze” S3 bucket.
import requests
import json
import boto3
from datetime import datetime
import time
import os
from dotenv import load_dotenv # <--- NEW IMPORT
# 1. LOAD SECRETS
load_dotenv() # This reads the .env file
# CONFIGURATION
# ------------------------------------------------------
HUBSPOT_ACCESS_TOKEN = os.getenv("HUBSPOT_ACCESS_TOKEN")
S3_BUCKET = os.getenv("S3_BUCKET_NAME")
S3_PREFIX = "bronze/source=hubspot"
# Safety Check
if not HUBSPOT_ACCESS_TOKEN:
raise ValueError("❌ Missing HubSpot Token! Check your .env file.")
def ingest_hubspot_deals():
print("⏳ Connecting to HubSpot API...")
url = "https://api.hubapi.com/crm/v3/objects/deals"
headers = {
'Authorization': f'Bearer {HUBSPOT_ACCESS_TOKEN}',
'Content-Type': 'application/json'
}
all_deals = []
has_more = True
after_cursor = None
while has_more:
# Define parameters for this specific "page" of data
params = {
'limit': 100, # Max allowed by HubSpot
'properties': 'dealname,amount,dealstage,closedate,pipeline'
}
# If we have a cursor from the last loop, add it to params
if after_cursor:
params['after'] = after_cursor
print(f" ... Fetching next batch (Cursor: {after_cursor})")
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
results = data['results']
# Add this batch to our main list
all_deals.extend(results)
# Check if there is a next page
if 'paging' in data and 'next' in data['paging']:
after_cursor = data['paging']['next']['after']
has_more = True
# Sleep briefly to be nice to the API limits
time.sleep(0.5)
else:
has_more = False
after_cursor = None
else:
print(f"❌ Error: {response.status_code} - {response.text}")
break
print(f"📦 Total deals fetched: {len(all_deals)}")
# ------------------------------------------------------
# UPLOAD TO S3
# ------------------------------------------------------
if all_deals:
# Add ingestion metadata
ingestion_time = datetime.now().isoformat()
for record in all_deals:
record['ingestion_time'] = ingestion_time
# File Naming
file_date = datetime.now().strftime("%Y-%m-%d")
file_name = f"{S3_PREFIX}/date={file_date}/hubspot_deals_full.json"
s3 = boto3.client('s3')
print(f"🚀 Uploading {len(all_deals)} deals to S3...")
s3.put_object(
Bucket=S3_BUCKET,
Key=file_name,
Body=json.dumps(all_deals)
)
print(f"✅ Success! Data landed at: s3://{S3_BUCKET}/{file_name}")
if __name__ == "__main__":
ingest_hubspot_deals()
Code Walkthrough: Why This Design Wins
You might look at the script above and ask, “Why not just a simple 5-line script?”
A 5-line script works for a hobby project. It fails in production. Here is a breakdown of the specific architectural decisions embedded in this code:
Security by Design (Lines 8-9)
You should never, ever hardcode API keys or passwords or bucket names in the script. By using os.getenv, we force the application to read credentials from a secure environment file or AWS Secrets Manager. This makes your code compliant with SOC2 and ISO 27001 security standards.
Smart Pagination Logic (Lines 34-45)
HubSpot, like most APIs, limits how much data you can grab at once (usually 100 records). If you have 5,000 deals, a basic script will only retrieve the first 100 and silently ignore the rest. Our while loop checks for a “Next Page” token. It keeps asking HubSpot for more data until every single record is safely inside your system. This guarantees 100% Data Completeness.
Cost-Optimized Partitioning (Lines 57-59)
We don’t just dump files randomly. We organize them into folders by date (/date=2026-02-06/).
Why this matters: When you query this data later using AWS Athena, you can ask for just “February 2026” data. Athena will skip all other folders, potentially reducing your cloud query costs by 99%.
Step 2: The Transformation (The Silver Layer)
Now that our raw data is safely in S3, we face a new problem: JSON is messy.
To fix this, we build the Silver Layer. We will use AWS Glue (Serverless Spark) to convert that messy JSON into Apache Iceberg tables.
Why Iceberg? Iceberg acts like a database table sitting on top of your S3 files. It allows “Time Travel” (querying data as it looked yesterday) and prevents “Dirty Reads” (ensuring nobody sees half-written data while the pipeline is running).
The Glue Transformation Script Here is the PySpark code that turns raw JSON into a polished, queryable Iceberg table.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, from_json
# 1. SETUP ENV
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# CONFIGURATION (Update Bucket Name Here)
BUCKET_NAME = "bucketname-goes-here"
BRONZE_PATH = f"s3://{BUCKET_NAME}/bronze/source=hubspot"
DATABASE_NAME = "marketing_data_lake_db"
TABLE_NAME = "glue_catalog.marketing_data_lake_db.silver_hubspot_deals"
# 2. READ RAW DATA
# We let Spark infer schema from the JSON files
print("⏳ Reading raw JSON data...")
raw_df = spark.read.option("multiline", "true").json(BRONZE_PATH)
# 3. TRANSFORM & FLATTEN
# HubSpot data usually comes inside a "properties" struct.
# We verify if 'properties' exists, then flatten it.
if "properties" in raw_df.columns:
silver_df = raw_df.select(
col("id").alias("deal_id"),
col("properties.dealname").alias("deal_name"),
col("properties.amount").cast("double").alias("amount"),
col("properties.dealstage").alias("stage"),
col("properties.closedate").cast("timestamp").alias("close_date"),
col("properties.pipeline").alias("pipeline"),
col("ingestion_time").cast("timestamp")
)
else:
# Fallback if structure is flat (depends on ingestion script version)
silver_df = raw_df
# 4. WRITE TO ICEBERG
# This creates (or appends to) the Iceberg table
print(f"🚀 Writing to Iceberg table: {TABLE_NAME}")
silver_df.writeTo(TABLE_NAME) \
.tableProperty("format-version", "2") \
.createOrReplace() # Use .append() in production, createOrReplace for dev testing
print("✅ Job Complete!")
job.commit()
Code Walkthrough: Turning Raw Data into Business Intelligence
Most data projects fail during the “Transformation” phase. Scripts break when data formats change, or they become too slow as data volumes grow. Our AWS Glue script is designed to bypass these common pitfalls.
Here is exactly what the code above is doing to protect your data pipeline:
Serverless Scalability (Lines 9-14)
We are initializing Apache Spark in a serverless environment. Why this matters: If you used a standard Python script on a server, a sudden spike in HubSpot data (like Black Friday sales) could crash the system due to lack of memory. Spark distributes the workload across multiple nodes automatically. You get the power of a supercomputer that turns off when you’re done, keeping costs minimal.
Automated Discovery (Line 23)
We tell the system to look recursively through your S3 folders (/2025/, /2026/). Why this matters: You don’t need to manually tell the script which dates to process. Whether you have one day of data or five years of history, the script automatically finds and ingests everything in the Bronze layer.
Flattening the Hierarchy (Lines 28-34)
HubSpot returns data in a deeply nested “JSON” format that is hostile to Excel, Power BI or Tableau. Here, we programmatically “flatten” that structure.
Why this matters: We transform technical gibberish into business-friendly columns like Deal Name and Amount. This ensures that when your analyst opens Tableau, they see a clean table, not a mess of brackets and code.
The Iceberg Guarantee (Lines 39-41)
This is the most important line. We are not just saving a file; we are committing a Database Transaction.
Why this matters: In older data lakes, if a job failed halfway through, you would end up with “corrupt” or partial files that broke dashboards. Apache Iceberg ensures ACID compliance, either the data is saved perfectly, or it isn’t saved at all. This means your CEO never sees a “half-loaded” dashboard.
Step 3: The Payoff (The Gold Layer)
We have ingested the data securely (Bronze). We have cleaned and structured it (Silver). Now, we reach the finish line: Analytics.
Because we used Apache Iceberg, we don’t need to move this data into a simplified warehouse to read it. We can query it directly where it sits in S3 using AWS Athena.
The “Gold” Query Here is the SQL query you can run immediately to see your ROI.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, to_date, sum as _sum, countDistinct, round, avg
# 1. SETUP
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DATABASE_NAME = "marketing_data_lake_db"
# We are creating a Gold table specifically for Sales KPIs now
GOLD_TABLE_NAME = "glue_catalog.marketing_data_lake_db.gold_sales_daily"
# 2. LOAD SILVER TABLES (Reading from Iceberg)
# We read from the Cleaned/Silver layer
print("⏳ Loading Silver HubSpot Table...")
df_hubspot = spark.table(f"glue_catalog.{DATABASE_NAME}.silver_hubspot_deals")
# 3. AGGREGATE SALES (The "Gold" Transformation)
# Business Logic:
# - Filter for 'closedwon' deals only (Revenue)
# - Group by Date
# - Calculate Total Revenue, Count of Deals, and Average Deal Size
gold_df = df_hubspot \
.filter(col("stage") == 'closedwon') \
.withColumn("date", to_date(col("close_date"))) \
.groupBy("date") \
.agg(
_sum("amount").alias("total_revenue"),
countDistinct("deal_id").alias("deals_closed"),
round(avg("amount"), 2).alias("avg_deal_size")
) \
.orderBy("date") # Sort by date for clean reporting
# 4. WRITE TO GOLD ICEBERG TABLE
print(f"🚀 Writing Gold Sales KPI Data to {GOLD_TABLE_NAME}...")
gold_df.writeTo(GOLD_TABLE_NAME) \
.tableProperty("format-version", "2") \
.createOrReplace()
print("✅ Gold Sales Job Complete!")
job.commit()
The Result: In milliseconds, you get a real-time view of your sales pipeline. No 24-hour delays. No complex exports. Just accurate data, ready for decision-making.
Conclusion: Build for the Future
Building a data pipeline isn’t just about moving data from A to B. It’s about building a foundation that your business can trust.
By choosing AWS S3 for storage, Glue for computation, and Iceberg for reliability, you have built a Lakehouse that is:
Don’t let your data become a swamp. If you are ready to implement a production-grade data architecture but need a partner to guide the way, Primedsoft is here to help. We specialize in turning messy data into clear, actionable insights.