skip to content
san.is
Table of Contents

Enterprise HubSpot implementations rarely exist in isolation. They connect to ERP systems for order data, data warehouses for analytics, legacy CRMs for migration, and custom applications for industry-specific workflows.

Building these integrations directly - point-to-point from each system to HubSpot - creates a maintenance nightmare. What starts as three simple integrations becomes a web of dependencies that no one fully understands.

Middleware solves this. A well-designed middleware layer becomes your integration hub, handling data transformation, error recovery, and synchronisation logic in one place.

This article documents the architectural patterns I’ve implemented across enterprise HubSpot deployments.

Why Middleware?

Point-to-Point Problems

Direct integrations seem simpler initially:

Azure Synapse ──────► HubSpot
ERP System ─────────► HubSpot
Legacy CRM ─────────► HubSpot
Partner API ────────► HubSpot

But they accumulate problems:

  • No centralised logging: Debugging requires checking each integration
  • Duplicated logic: Each integration handles date formatting, error retry, rate limiting independently
  • No orchestration: If Azure sync fails, ERP sync doesn’t know to wait
  • Schema drift: Each integration evolves differently, creating inconsistencies

Middleware Benefits

A middleware layer provides:

Azure Synapse ──┐
ERP System ─────┼──► Middleware ──► HubSpot
│ │
Legacy CRM ─────┤ ▼
│ [Logging]
Partner API ────┘ [Transformation]
[Error Handling]
[Orchestration]
  • Single source of truth for integration logic
  • Centralised monitoring and alerting
  • Consistent data transformation rules
  • Coordinated sync scheduling
  • Unified error recovery

Core Middleware Components

1. API Handlers

Each external system gets a dedicated handler that abstracts its specific API quirks:

class HubSpotHandler:
def __init__(self, access_token):
self.client = hubspot.Client.create(access_token=access_token)
self.rate_limiter = RateLimiter(requests_per_second=9)
def upsert_contact(self, email, properties):
self.rate_limiter.wait()
try:
return self.client.crm.contacts.basic_api.create_or_update(
simple_public_object_input_for_create={
"properties": properties
},
id_property="email"
)
except ApiException as e:
if e.status == 429:
self.handle_rate_limit(e)
raise
def handle_rate_limit(self, error):
retry_after = int(error.headers.get('Retry-After', 10))
time.sleep(retry_after)

2. Data Transformation Layer

Transform data between source and destination schemas:

class ContactTransformer:
"""Transform contacts between Azure schema and HubSpot schema"""
FIELD_MAPPING = {
'EmailAddress': 'email',
'FirstName': 'firstname',
'LastName': 'lastname',
'CompanyName': 'company',
'Phone': 'phone',
'CreatedDate': 'hs_object_createdate'
}
def to_hubspot(self, azure_record):
hubspot_properties = {}
for azure_field, hubspot_field in self.FIELD_MAPPING.items():
value = azure_record.get(azure_field)
if value is not None:
hubspot_properties[hubspot_field] = self.transform_value(
hubspot_field, value
)
return hubspot_properties
def transform_value(self, field, value):
if field == 'hs_object_createdate':
return self.format_date(value)
if field == 'phone':
return self.normalize_phone(value)
return value

3. Sync Orchestrator

Coordinate multiple sync processes with dependencies:

class SyncOrchestrator:
def __init__(self):
self.jobs = {}
self.dependencies = {}
def register_job(self, name, handler, depends_on=None):
self.jobs[name] = handler
self.dependencies[name] = depends_on or []
async def run_sync(self, job_name):
# Check dependencies first
for dep in self.dependencies[job_name]:
if not self.is_complete(dep):
await self.run_sync(dep)
# Run the job
handler = self.jobs[job_name]
try:
result = await handler.execute()
self.mark_complete(job_name, result)
return result
except Exception as e:
self.mark_failed(job_name, e)
raise
# Usage
orchestrator = SyncOrchestrator()
orchestrator.register_job('companies', CompanySyncHandler())
orchestrator.register_job('contacts', ContactSyncHandler(), depends_on=['companies'])
orchestrator.register_job('deals', DealSyncHandler(), depends_on=['contacts', 'companies'])

4. Error Handler and Retry Logic

Centralised error handling with exponential backoff:

class RetryHandler:
def __init__(self, max_retries=3, base_delay=1):
self.max_retries = max_retries
self.base_delay = base_delay
async def execute_with_retry(self, operation, *args, **kwargs):
last_exception = None
for attempt in range(self.max_retries):
try:
return await operation(*args, **kwargs)
except TransientError as e:
last_exception = e
delay = self.base_delay * (2 ** attempt)
logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}")
await asyncio.sleep(delay)
except PermanentError as e:
# Don't retry permanent failures
logger.error(f"Permanent error: {e}")
raise
raise MaxRetriesExceeded(last_exception)

5. Webhook Processor

Handle incoming webhooks from HubSpot:

class WebhookProcessor:
def __init__(self, client_secret):
self.client_secret = client_secret
self.handlers = {}
def register_handler(self, event_type, handler):
self.handlers[event_type] = handler
def process(self, request):
# Validate signature
if not self.validate_signature(request):
raise InvalidSignatureError()
events = request.json()
results = []
for event in events:
event_type = event['subscriptionType']
handler = self.handlers.get(event_type)
if handler:
result = handler.process(event)
results.append(result)
else:
logger.warning(f"No handler for event type: {event_type}")
return results

Real-World Architecture: Master Customer Record

One of the most common enterprise patterns is the Master Customer Record (MCR) - consolidating customer data from multiple sources into HubSpot.

Use Case

A global organisation has customer data in:

  • Azure Data Warehouse: Historical transaction data
  • IELTS System: Test registration data
  • English Online: Subscription data
  • Impact Partners: Partner-referred customers

HubSpot becomes the unified customer view.

Architecture

┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Azure Synapse │ │ IELTS System │ │ English Online │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
│ Daily Batch │ Daily Batch │ Daily Batch
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ MIDDLEWARE │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ Azure │ │ IELTS │ │ EOL │ │ Impact │ │
│ │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter │ │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ TRANSFORMATION ENGINE │ │
│ │ - Field Mapping │ │
│ │ - Deduplication Rules │ │
│ │ - Merge Logic │ │
│ └─────────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────▼───────────────────────────┐ │
│ │ HUBSPOT SYNC ENGINE │ │
│ │ - Rate Limiting │ │
│ │ - Batch Operations │ │
│ │ - Conflict Resolution │ │
│ └─────────────────────────────┬───────────────────────────┘ │
└────────────────────────────────┼────────────────────────────────┘
┌─────────────┐
│ HubSpot │
│ CRM │
└─────────────┘

Key Design Decisions

1. Batch vs Real-Time

Most syncs run as daily batches at 2 AM. Exceptions:

  • Impact Partners: Every 30 minutes (high-value leads need faster response)
  • Webhooks: Real-time for critical status changes

2. Conflict Resolution

When the same contact exists in multiple sources:

PRECEDENCE = ['azure', 'ielts', 'eol', 'impact']
def resolve_conflict(field, values_by_source):
"""Return the value from the highest-precedence source that has it"""
for source in PRECEDENCE:
if source in values_by_source and values_by_source[source]:
return values_by_source[source]
return None

3. Deduplication

Identify duplicates before syncing:

def find_matching_contact(email, phone, name):
# Priority 1: Email match
if email:
match = hubspot.search_by_email(email)
if match:
return match
# Priority 2: Phone + Name match
if phone and name:
matches = hubspot.search_by_phone(phone)
for match in matches:
if fuzzy_name_match(match.name, name) > 0.9:
return match
return None # Create new contact

Monitoring and Observability

Structured Logging

import structlog
logger = structlog.get_logger()
def sync_contact(source, contact_data):
logger.info("sync_started",
source=source,
email=contact_data.get('email'),
record_id=contact_data.get('id')
)
try:
result = hubspot.upsert_contact(contact_data)
logger.info("sync_completed",
source=source,
hubspot_id=result.id,
action="created" if result.new else "updated"
)
except Exception as e:
logger.error("sync_failed",
source=source,
error=str(e),
record_id=contact_data.get('id')
)
raise

Metrics

Track operational health:

from prometheus_client import Counter, Histogram, Gauge
sync_records_total = Counter(
'middleware_sync_records_total',
'Total records synced',
['source', 'object_type', 'status']
)
sync_duration_seconds = Histogram(
'middleware_sync_duration_seconds',
'Time spent syncing',
['source', 'object_type']
)
sync_queue_depth = Gauge(
'middleware_sync_queue_depth',
'Number of records waiting to sync',
['source']
)

Alerting

Critical alerts for:

  • Sync job failures (any source)
  • Error rate exceeds 5%
  • Queue depth exceeds threshold
  • HubSpot API rate limit hit
  • Stale data (no updates in expected window)

Deployment Patterns

Containerised Deployment

docker-compose.yml
services:
middleware:
build: ./middleware
environment:
- HUBSPOT_ACCESS_TOKEN=${HUBSPOT_ACCESS_TOKEN}
- AZURE_CONNECTION_STRING=${AZURE_CONNECTION_STRING}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
- postgres
scheduler:
build: ./scheduler
environment:
- MIDDLEWARE_URL=http://middleware:8000
redis:
image: redis:7-alpine
postgres:
image: postgres:15-alpine
volumes:
- pgdata:/var/lib/postgresql/data

Admin Interface

Build a simple admin UI for:

  • Triggering manual syncs
  • Viewing sync history
  • Monitoring error logs
  • Managing configuration
from flask import Flask, jsonify
from flask_admin import Admin
app = Flask(__name__)
admin = Admin(app, name='Middleware Admin')
@app.route('/api/sync/trigger/<source>')
def trigger_sync(source):
job = sync_queue.enqueue(f'sync_{source}')
return jsonify({'job_id': job.id, 'status': 'queued'})
@app.route('/api/sync/status/<job_id>')
def sync_status(job_id):
job = sync_queue.fetch_job(job_id)
return jsonify({
'job_id': job_id,
'status': job.get_status(),
'result': job.result
})

Key Takeaways

  1. Middleware beats point-to-point for any integration beyond trivial
  2. Centralise transformation logic - don’t scatter it across integrations
  3. Design for failure - retries, error handling, and recovery are not optional
  4. Coordinate dependent syncs - company records before contacts, contacts before deals
  5. Monitor everything - structured logs, metrics, and alerts
  6. Build admin tools - manual triggers and visibility save hours of debugging

Good middleware is invisible when it works and invaluable when problems arise. Invest in the architecture upfront, and your enterprise HubSpot implementation will scale smoothly.


Designing enterprise HubSpot integrations? Connect with me on LinkedIn to discuss architecture patterns.