Middleware Architecture for Enterprise HubSpot
/ 6 min read
Table of Contents
Enterprise HubSpot implementations rarely exist in isolation. They connect to ERP systems for order data, data warehouses for analytics, legacy CRMs for migration, and custom applications for industry-specific workflows.
Building these integrations directly - point-to-point from each system to HubSpot - creates a maintenance nightmare. What starts as three simple integrations becomes a web of dependencies that no one fully understands.
Middleware solves this. A well-designed middleware layer becomes your integration hub, handling data transformation, error recovery, and synchronisation logic in one place.
This article documents the architectural patterns I’ve implemented across enterprise HubSpot deployments.
Why Middleware?
Point-to-Point Problems
Direct integrations seem simpler initially:
Azure Synapse ──────► HubSpotERP System ─────────► HubSpotLegacy CRM ─────────► HubSpotPartner API ────────► HubSpotBut they accumulate problems:
- No centralised logging: Debugging requires checking each integration
- Duplicated logic: Each integration handles date formatting, error retry, rate limiting independently
- No orchestration: If Azure sync fails, ERP sync doesn’t know to wait
- Schema drift: Each integration evolves differently, creating inconsistencies
Middleware Benefits
A middleware layer provides:
Azure Synapse ──┐ │ERP System ─────┼──► Middleware ──► HubSpot │ │Legacy CRM ─────┤ ▼ │ [Logging]Partner API ────┘ [Transformation] [Error Handling] [Orchestration]- Single source of truth for integration logic
- Centralised monitoring and alerting
- Consistent data transformation rules
- Coordinated sync scheduling
- Unified error recovery
Core Middleware Components
1. API Handlers
Each external system gets a dedicated handler that abstracts its specific API quirks:
class HubSpotHandler: def __init__(self, access_token): self.client = hubspot.Client.create(access_token=access_token) self.rate_limiter = RateLimiter(requests_per_second=9)
def upsert_contact(self, email, properties): self.rate_limiter.wait() try: return self.client.crm.contacts.basic_api.create_or_update( simple_public_object_input_for_create={ "properties": properties }, id_property="email" ) except ApiException as e: if e.status == 429: self.handle_rate_limit(e) raise
def handle_rate_limit(self, error): retry_after = int(error.headers.get('Retry-After', 10)) time.sleep(retry_after)2. Data Transformation Layer
Transform data between source and destination schemas:
class ContactTransformer: """Transform contacts between Azure schema and HubSpot schema"""
FIELD_MAPPING = { 'EmailAddress': 'email', 'FirstName': 'firstname', 'LastName': 'lastname', 'CompanyName': 'company', 'Phone': 'phone', 'CreatedDate': 'hs_object_createdate' }
def to_hubspot(self, azure_record): hubspot_properties = {}
for azure_field, hubspot_field in self.FIELD_MAPPING.items(): value = azure_record.get(azure_field) if value is not None: hubspot_properties[hubspot_field] = self.transform_value( hubspot_field, value )
return hubspot_properties
def transform_value(self, field, value): if field == 'hs_object_createdate': return self.format_date(value) if field == 'phone': return self.normalize_phone(value) return value3. Sync Orchestrator
Coordinate multiple sync processes with dependencies:
class SyncOrchestrator: def __init__(self): self.jobs = {} self.dependencies = {}
def register_job(self, name, handler, depends_on=None): self.jobs[name] = handler self.dependencies[name] = depends_on or []
async def run_sync(self, job_name): # Check dependencies first for dep in self.dependencies[job_name]: if not self.is_complete(dep): await self.run_sync(dep)
# Run the job handler = self.jobs[job_name] try: result = await handler.execute() self.mark_complete(job_name, result) return result except Exception as e: self.mark_failed(job_name, e) raise
# Usageorchestrator = SyncOrchestrator()orchestrator.register_job('companies', CompanySyncHandler())orchestrator.register_job('contacts', ContactSyncHandler(), depends_on=['companies'])orchestrator.register_job('deals', DealSyncHandler(), depends_on=['contacts', 'companies'])4. Error Handler and Retry Logic
Centralised error handling with exponential backoff:
class RetryHandler: def __init__(self, max_retries=3, base_delay=1): self.max_retries = max_retries self.base_delay = base_delay
async def execute_with_retry(self, operation, *args, **kwargs): last_exception = None
for attempt in range(self.max_retries): try: return await operation(*args, **kwargs) except TransientError as e: last_exception = e delay = self.base_delay * (2 ** attempt) logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}") await asyncio.sleep(delay) except PermanentError as e: # Don't retry permanent failures logger.error(f"Permanent error: {e}") raise
raise MaxRetriesExceeded(last_exception)5. Webhook Processor
Handle incoming webhooks from HubSpot:
class WebhookProcessor: def __init__(self, client_secret): self.client_secret = client_secret self.handlers = {}
def register_handler(self, event_type, handler): self.handlers[event_type] = handler
def process(self, request): # Validate signature if not self.validate_signature(request): raise InvalidSignatureError()
events = request.json() results = []
for event in events: event_type = event['subscriptionType'] handler = self.handlers.get(event_type)
if handler: result = handler.process(event) results.append(result) else: logger.warning(f"No handler for event type: {event_type}")
return resultsReal-World Architecture: Master Customer Record
One of the most common enterprise patterns is the Master Customer Record (MCR) - consolidating customer data from multiple sources into HubSpot.
Use Case
A global organisation has customer data in:
- Azure Data Warehouse: Historical transaction data
- IELTS System: Test registration data
- English Online: Subscription data
- Impact Partners: Partner-referred customers
HubSpot becomes the unified customer view.
Architecture
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐│ Azure Synapse │ │ IELTS System │ │ English Online │└────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │ │ │ │ Daily Batch │ Daily Batch │ Daily Batch │ │ │ ▼ ▼ ▼┌─────────────────────────────────────────────────────────────────┐│ MIDDLEWARE ││ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ ││ │ Azure │ │ IELTS │ │ EOL │ │ Impact │ ││ │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter │ ││ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ ││ │ │ │ │ ││ ▼ ▼ ▼ ▼ ││ ┌─────────────────────────────────────────────────────────┐ ││ │ TRANSFORMATION ENGINE │ ││ │ - Field Mapping │ ││ │ - Deduplication Rules │ ││ │ - Merge Logic │ ││ └─────────────────────────────┬───────────────────────────┘ ││ │ ││ ┌─────────────────────────────▼───────────────────────────┐ ││ │ HUBSPOT SYNC ENGINE │ ││ │ - Rate Limiting │ ││ │ - Batch Operations │ ││ │ - Conflict Resolution │ ││ └─────────────────────────────┬───────────────────────────┘ │└────────────────────────────────┼────────────────────────────────┘ │ ▼ ┌─────────────┐ │ HubSpot │ │ CRM │ └─────────────┘Key Design Decisions
1. Batch vs Real-Time
Most syncs run as daily batches at 2 AM. Exceptions:
- Impact Partners: Every 30 minutes (high-value leads need faster response)
- Webhooks: Real-time for critical status changes
2. Conflict Resolution
When the same contact exists in multiple sources:
PRECEDENCE = ['azure', 'ielts', 'eol', 'impact']
def resolve_conflict(field, values_by_source): """Return the value from the highest-precedence source that has it""" for source in PRECEDENCE: if source in values_by_source and values_by_source[source]: return values_by_source[source] return None3. Deduplication
Identify duplicates before syncing:
def find_matching_contact(email, phone, name): # Priority 1: Email match if email: match = hubspot.search_by_email(email) if match: return match
# Priority 2: Phone + Name match if phone and name: matches = hubspot.search_by_phone(phone) for match in matches: if fuzzy_name_match(match.name, name) > 0.9: return match
return None # Create new contactMonitoring and Observability
Structured Logging
import structlog
logger = structlog.get_logger()
def sync_contact(source, contact_data): logger.info("sync_started", source=source, email=contact_data.get('email'), record_id=contact_data.get('id') )
try: result = hubspot.upsert_contact(contact_data) logger.info("sync_completed", source=source, hubspot_id=result.id, action="created" if result.new else "updated" ) except Exception as e: logger.error("sync_failed", source=source, error=str(e), record_id=contact_data.get('id') ) raiseMetrics
Track operational health:
from prometheus_client import Counter, Histogram, Gauge
sync_records_total = Counter( 'middleware_sync_records_total', 'Total records synced', ['source', 'object_type', 'status'])
sync_duration_seconds = Histogram( 'middleware_sync_duration_seconds', 'Time spent syncing', ['source', 'object_type'])
sync_queue_depth = Gauge( 'middleware_sync_queue_depth', 'Number of records waiting to sync', ['source'])Alerting
Critical alerts for:
- Sync job failures (any source)
- Error rate exceeds 5%
- Queue depth exceeds threshold
- HubSpot API rate limit hit
- Stale data (no updates in expected window)
Deployment Patterns
Containerised Deployment
services: middleware: build: ./middleware environment: - HUBSPOT_ACCESS_TOKEN=${HUBSPOT_ACCESS_TOKEN} - AZURE_CONNECTION_STRING=${AZURE_CONNECTION_STRING} - REDIS_URL=redis://redis:6379 depends_on: - redis - postgres
scheduler: build: ./scheduler environment: - MIDDLEWARE_URL=http://middleware:8000
redis: image: redis:7-alpine
postgres: image: postgres:15-alpine volumes: - pgdata:/var/lib/postgresql/dataAdmin Interface
Build a simple admin UI for:
- Triggering manual syncs
- Viewing sync history
- Monitoring error logs
- Managing configuration
from flask import Flask, jsonifyfrom flask_admin import Admin
app = Flask(__name__)admin = Admin(app, name='Middleware Admin')
@app.route('/api/sync/trigger/<source>')def trigger_sync(source): job = sync_queue.enqueue(f'sync_{source}') return jsonify({'job_id': job.id, 'status': 'queued'})
@app.route('/api/sync/status/<job_id>')def sync_status(job_id): job = sync_queue.fetch_job(job_id) return jsonify({ 'job_id': job_id, 'status': job.get_status(), 'result': job.result })Key Takeaways
- Middleware beats point-to-point for any integration beyond trivial
- Centralise transformation logic - don’t scatter it across integrations
- Design for failure - retries, error handling, and recovery are not optional
- Coordinate dependent syncs - company records before contacts, contacts before deals
- Monitor everything - structured logs, metrics, and alerts
- Build admin tools - manual triggers and visibility save hours of debugging
Good middleware is invisible when it works and invaluable when problems arise. Invest in the architecture upfront, and your enterprise HubSpot implementation will scale smoothly.
Designing enterprise HubSpot integrations? Connect with me on LinkedIn to discuss architecture patterns.