How to Efficiently Ingest HubSpot Data into Amazon S3 and Build a Production-Grade Lakehouse with Iceberg and AWS Glue

Writing a Python script to fetch data from an API is the easy part. Any junior developer can write a script that pulls HubSpot deals and dumps them into a CSV file. 

But what happens six months later? 

  • The file grows to 10GB.  
  • The script crashes because of memory issues.  
  • You have duplicate records everywhere because the API connection reset halfway through.  

Suddenly, your “simple” data project has turned into a “Data Swamp”: 

  • Unreliable,  
  • Unscalable, and  
  • Impossible to query without a headache. 

If you are serious about building a robust and scalable data pipeline for your business data, you need to stop building fragile scripts and start building Data Lakehouses. 

In this guide, we are going to move beyond the basics. We won’t just “dump data.” We are going to build a Production-Grade ELT Pipeline that ingests HubSpot data into AWS S3, cleans it using AWS Glue, and stores it in Apache Iceberg format. 

By the end of this tutorial, you will have a pipeline that supports “Time Travel,” ensures zero duplicates, and costs pennies to run. 

What We Will Build We will focus on a single, critical data source: HubSpot Sales Data. 

  1. Ingest: We will write a robust Python script to fetch data securely. 
  1. Store: We will land data in Amazon S3 (Bronze Layer). 
  1. Model: We will transform that data into an Iceberg Table (Silver Layer) using AWS Glue. 

Ready to level up your architecture? Let’s dive in. 

The Architecture: Designing for ROI and Reliability 

Before we write a single line of code, we must understand the strategy. Many businesses fail at analytics not because they lack data, but because their infrastructure is too expensive to maintain or too fragile to trust. 

A “Modern Data Lakehouse” architecture solves this by combining the low cost of cloud storage (Amazon S3) with the reliability of a traditional database (Apache Iceberg). 

The Workflow 

  1. Ingest (Bronze Layer): We fetch raw data from HubSpot and land it safely in S3. This creates an immutable audit trail. 
  1. Refine (Silver Layer): We use AWS Glue to clean the data and store it as Iceberg Tables. This ensures that if the HubSpot API changes, your reports don’t break. 
  1. Analyze (Gold Layer): The data is ready for Tableau, PowerBI, Looker Studio, or any BI tool that can query directly via Athena without needing an expensive data warehouse like Redshift. 

Strategic Prerequisites To implement this, your team needs: 

  • An AWS Account: With access to S3, Glue, and Athena. 
  • HubSpot Private App Token: A secure way to access your CRM data without sharing user passwords. 
  • Python 3.x: The language of modern data engineering. 

Step 1: Secure Ingestion (The Bronze Layer) 

For a CTO or VP of Engineering, the nightmare scenario is “Silent Failure”—where data stops updating, but no one notices until the CEO opens a dashboard three days later. 

We prevent this by building a Robust Ingestion Script. This script doesn’t just “grab data”; it handles API limits, paginates through thousands of records, and organizes files by date for easy auditing. 

The Strategy: We utilize a “Partitioning” strategy. Instead of one giant file, we save data into folders like /year=2026/month=01/day=15/. This makes future queries faster and cheaper because we only scan the specific dates we need. 

The Implementation: Here is the Python logic that securely fetches your HubSpot Deals and lands them in your “Bronze” S3 bucket. 

import requests
import json
import boto3
from datetime import datetime
import time
import os
from dotenv import load_dotenv # <--- NEW IMPORT

# 1. LOAD SECRETS
load_dotenv() # This reads the .env file

# CONFIGURATION
# ------------------------------------------------------
HUBSPOT_ACCESS_TOKEN = os.getenv("HUBSPOT_ACCESS_TOKEN")
S3_BUCKET = os.getenv("S3_BUCKET_NAME")
S3_PREFIX = "bronze/source=hubspot"

# Safety Check
if not HUBSPOT_ACCESS_TOKEN:
    raise ValueError("❌ Missing HubSpot Token! Check your .env file.")

def ingest_hubspot_deals():
    print("⏳ Connecting to HubSpot API...")
    
    url = "https://api.hubapi.com/crm/v3/objects/deals"
    headers = {
        'Authorization': f'Bearer {HUBSPOT_ACCESS_TOKEN}',
        'Content-Type': 'application/json'
    }
    
    all_deals = []
    has_more = True
    after_cursor = None
    
    while has_more:
        # Define parameters for this specific "page" of data
        params = {
            'limit': 100, # Max allowed by HubSpot
            'properties': 'dealname,amount,dealstage,closedate,pipeline'
        }
        
        # If we have a cursor from the last loop, add it to params
        if after_cursor:
            params['after'] = after_cursor
            
        print(f"   ... Fetching next batch (Cursor: {after_cursor})")
        response = requests.get(url, headers=headers, params=params)
        
        if response.status_code == 200:
            data = response.json()
            results = data['results']
            
            # Add this batch to our main list
            all_deals.extend(results)
            
            # Check if there is a next page
            if 'paging' in data and 'next' in data['paging']:
                after_cursor = data['paging']['next']['after']
                has_more = True
                # Sleep briefly to be nice to the API limits
                time.sleep(0.5) 
            else:
                has_more = False
                after_cursor = None
                
        else:
            print(f"❌ Error: {response.status_code} - {response.text}")
            break
    
    print(f"📦 Total deals fetched: {len(all_deals)}")
    
    # ------------------------------------------------------
    # UPLOAD TO S3
    # ------------------------------------------------------
    if all_deals:
        # Add ingestion metadata
        ingestion_time = datetime.now().isoformat()
        for record in all_deals:
            record['ingestion_time'] = ingestion_time
            
        # File Naming
        file_date = datetime.now().strftime("%Y-%m-%d")
        file_name = f"{S3_PREFIX}/date={file_date}/hubspot_deals_full.json"
        
        s3 = boto3.client('s3')
        print(f"🚀 Uploading {len(all_deals)} deals to S3...")
        
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=file_name,
            Body=json.dumps(all_deals)
        )
        print(f"✅ Success! Data landed at: s3://{S3_BUCKET}/{file_name}")

if __name__ == "__main__":
    ingest_hubspot_deals()

Code Walkthrough: Why This Design Wins 

You might look at the script above and ask, “Why not just a simple 5-line script?” 

A 5-line script works for a hobby project. It fails in production. Here is a breakdown of the specific architectural decisions embedded in this code: 

Security by Design (Lines 8-9) 

You should never, ever hardcode API keys or passwords or bucket names in the script. By using os.getenv, we force the application to read credentials from a secure environment file or AWS Secrets Manager. This makes your code compliant with SOC2 and ISO 27001 security standards. 

Smart Pagination Logic (Lines 34-45) 

HubSpot, like most APIs, limits how much data you can grab at once (usually 100 records). If you have 5,000 deals, a basic script will only retrieve the first 100 and silently ignore the rest. Our while loop checks for a “Next Page” token. It keeps asking HubSpot for more data until every single record is safely inside your system. This guarantees 100% Data Completeness

Cost-Optimized Partitioning (Lines 57-59) 

We don’t just dump files randomly. We organize them into folders by date (/date=2026-02-06/).  

Why this matters: When you query this data later using AWS Athena, you can ask for just “February 2026” data. Athena will skip all other folders, potentially reducing your cloud query costs by 99%. 

Step 2: The Transformation (The Silver Layer) 

Now that our raw data is safely in S3, we face a new problem: JSON is messy. 

  • Dates are strings. 
  • Amounts might be null. 
  • Analyzing raw JSON is slow and expensive. 

To fix this, we build the Silver Layer. We will use AWS Glue (Serverless Spark) to convert that messy JSON into Apache Iceberg tables. 

Why Iceberg? Iceberg acts like a database table sitting on top of your S3 files. It allows “Time Travel” (querying data as it looked yesterday) and prevents “Dirty Reads” (ensuring nobody sees half-written data while the pipeline is running). 

The Glue Transformation Script Here is the PySpark code that turns raw JSON into a polished, queryable Iceberg table. 

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, from_json

# 1. SETUP ENV
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# CONFIGURATION (Update Bucket Name Here)
BUCKET_NAME = "bucketname-goes-here"
BRONZE_PATH = f"s3://{BUCKET_NAME}/bronze/source=hubspot"
DATABASE_NAME = "marketing_data_lake_db"
TABLE_NAME = "glue_catalog.marketing_data_lake_db.silver_hubspot_deals"

# 2. READ RAW DATA
# We let Spark infer schema from the JSON files
print("⏳ Reading raw JSON data...")
raw_df = spark.read.option("multiline", "true").json(BRONZE_PATH)

# 3. TRANSFORM & FLATTEN
# HubSpot data usually comes inside a "properties" struct.
# We verify if 'properties' exists, then flatten it.
if "properties" in raw_df.columns:
    silver_df = raw_df.select(
        col("id").alias("deal_id"),
        col("properties.dealname").alias("deal_name"),
        col("properties.amount").cast("double").alias("amount"),
        col("properties.dealstage").alias("stage"),
        col("properties.closedate").cast("timestamp").alias("close_date"),
        col("properties.pipeline").alias("pipeline"),
        col("ingestion_time").cast("timestamp")
    )
else:
    # Fallback if structure is flat (depends on ingestion script version)
    silver_df = raw_df

# 4. WRITE TO ICEBERG
# This creates (or appends to) the Iceberg table
print(f"🚀 Writing to Iceberg table: {TABLE_NAME}")

silver_df.writeTo(TABLE_NAME) \
    .tableProperty("format-version", "2") \
    .createOrReplace() # Use .append() in production, createOrReplace for dev testing

print("✅ Job Complete!")
job.commit()

Code Walkthrough: Turning Raw Data into Business Intelligence 

Most data projects fail during the “Transformation” phase. Scripts break when data formats change, or they become too slow as data volumes grow. Our AWS Glue script is designed to bypass these common pitfalls. 

Here is exactly what the code above is doing to protect your data pipeline: 

Serverless Scalability (Lines 9-14) 

We are initializing Apache Spark in a serverless environment. Why this matters: If you used a standard Python script on a server, a sudden spike in HubSpot data (like Black Friday sales) could crash the system due to lack of memory. Spark distributes the workload across multiple nodes automatically. You get the power of a supercomputer that turns off when you’re done, keeping costs minimal. 

Automated Discovery (Line 23) 

We tell the system to look recursively through your S3 folders (/2025/, /2026/). Why this matters: You don’t need to manually tell the script which dates to process. Whether you have one day of data or five years of history, the script automatically finds and ingests everything in the Bronze layer. 

Flattening the Hierarchy (Lines 28-34) 

HubSpot returns data in a deeply nested “JSON” format that is hostile to Excel, Power BI or Tableau. Here, we programmatically “flatten” that structure.  

Why this matters: We transform technical gibberish into business-friendly columns like Deal Name and Amount. This ensures that when your analyst opens Tableau, they see a clean table, not a mess of brackets and code. 

The Iceberg Guarantee (Lines 39-41) 

This is the most important line. We are not just saving a file; we are committing a Database Transaction.  

Why this matters: In older data lakes, if a job failed halfway through, you would end up with “corrupt” or partial files that broke dashboards. Apache Iceberg ensures ACID compliance, either the data is saved perfectly, or it isn’t saved at all. This means your CEO never sees a “half-loaded” dashboard. 

Step 3: The Payoff (The Gold Layer) 

We have ingested the data securely (Bronze). We have cleaned and structured it (Silver). Now, we reach the finish line: Analytics. 

Because we used Apache Iceberg, we don’t need to move this data into a simplified warehouse to read it. We can query it directly where it sits in S3 using AWS Athena

The “Gold” Query Here is the SQL query you can run immediately to see your ROI. 

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, to_date, sum as _sum, countDistinct, round, avg

# 1. SETUP
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

DATABASE_NAME = "marketing_data_lake_db"
# We are creating a Gold table specifically for Sales KPIs now
GOLD_TABLE_NAME = "glue_catalog.marketing_data_lake_db.gold_sales_daily"

# 2. LOAD SILVER TABLES (Reading from Iceberg)
# We read from the Cleaned/Silver layer
print("⏳ Loading Silver HubSpot Table...")
df_hubspot = spark.table(f"glue_catalog.{DATABASE_NAME}.silver_hubspot_deals")

# 3. AGGREGATE SALES (The "Gold" Transformation)
# Business Logic: 
# - Filter for 'closedwon' deals only (Revenue)
# - Group by Date
# - Calculate Total Revenue, Count of Deals, and Average Deal Size

gold_df = df_hubspot \
    .filter(col("stage") == 'closedwon') \
    .withColumn("date", to_date(col("close_date"))) \
    .groupBy("date") \
    .agg(
        _sum("amount").alias("total_revenue"), 
        countDistinct("deal_id").alias("deals_closed"),
        round(avg("amount"), 2).alias("avg_deal_size")
    ) \
    .orderBy("date") # Sort by date for clean reporting

# 4. WRITE TO GOLD ICEBERG TABLE
print(f"🚀 Writing Gold Sales KPI Data to {GOLD_TABLE_NAME}...")

gold_df.writeTo(GOLD_TABLE_NAME) \
    .tableProperty("format-version", "2") \
    .createOrReplace()

print("✅ Gold Sales Job Complete!")
job.commit()

The Result: In milliseconds, you get a real-time view of your sales pipeline. No 24-hour delays. No complex exports. Just accurate data, ready for decision-making. 

Conclusion: Build for the Future 

Building a data pipeline isn’t just about moving data from A to B. It’s about building a foundation that your business can trust. 

By choosing AWS S3 for storage, Glue for computation, and Iceberg for reliability, you have built a Lakehouse that is: 

  1. Auditable: Every raw record is preserved. 
  1. Scalable: It handles 10 records or 10 million records with equal ease. 
  1. Cost-Effective: You pay pennies for storage and only for the seconds of compute you use. 

Don’t let your data become a swamp. If you are ready to implement a production-grade data architecture but need a partner to guide the way, Primedsoft is here to help. We specialize in turning messy data into clear, actionable insights.