Back to Projects
ArchitectureNovember 2024

CSV Import & Data Processing Pipeline

A resilient async CSV import platform processing large files in chunks with row-level validation, staging, and full auditability — never parsing the full file in the request path.

Async chunked processingRow-level error reportingFull audit trailReplay & DLQ support
C#ASP.NET CoreAzureAzure SQLAzure Service BusBlob StorageRedis

CSV Import & Data Processing Pipeline - Solution Architecture

Mission

Build a resilient CSV import and data processing platform that allows users to upload large CSV files, validates and transforms them asynchronously, applies valid records into the system safely, surfaces row-level errors clearly, and provides progress, auditability, and replay tools without ever processing the full file in the synchronous request path.

CSV Pipeline Architecture Overview

Constraints

  • Backend stack: C# / ASP.NET Core
  • Cloud: Azure
  • Primary database: Azure SQL Database
  • File storage: Azure Blob Storage
  • Queueing: Azure Service Bus
  • Optional cache: Azure Cache for Redis
  • Processing must be asynchronous
  • Large files must not be loaded fully into memory
  • Partial row failures must not fail the whole import
  • Users must be able to see progress and download error reports
  • Failed jobs must be retryable and auditable

Core Design

  • Never parse the full CSV inside the initial HTTP request.
  • Upload raw files directly to Blob Storage using SAS URLs.
  • Create an import job as the control-plane record of the workflow.
  • Process the file asynchronously in chunks.
  • Separate:
    • upload
    • validation
    • transformation
    • staging
    • merge
    • reporting
  • Use Azure SQL as the source of truth for job state, staging state, results, and audit state.
  • Use Service Bus for decoupled job execution and retries.
  • Use Redis for hot progress/status reads where needed.
  • Generate downloadable error reports for invalid rows.
  • Make every stage idempotent and replayable.

Azure Services

  • Azure App Service or Azure Container Apps
    • import-api
    • admin-api
  • Azure Functions
    • import-orchestrator
    • chunk-processor
    • merge-processor
    • report-generator
  • Azure Blob Storage
    • raw uploads
    • manifests
    • chunk files if needed
    • error reports
  • Azure Service Bus
    • import request queue
    • chunk work queue
    • completion/failure topics
    • DLQs
  • Azure SQL Database
  • Azure Cache for Redis
  • Azure Key Vault
  • Application Insights
  • Azure Monitor / Log Analytics
  • Azure Event Grid or Azure SignalR Service for optional progress push

High-Level Architecture

flowchart LR U[User / Admin UI] --> API[Import API - ASP.NET Core] API --> AUTH[Auth Provider / Entra ID] API --> SQL[(Azure SQL)] API --> REDIS[Azure Cache for Redis] API --> BLOB[Blob Storage] API --> SB[Azure Service Bus] U --> BLOB SB --> ORCH[Import Orchestrator Function] ORCH --> SQL ORCH --> BLOB ORCH --> SB SB --> CHUNK[Chunk Processor Function] CHUNK --> SQL CHUNK --> BLOB CHUNK --> SB SB --> MERGE[Merge Processor Function] MERGE --> SQL MERGE --> SB SB --> REPORT[Report Generator Function] REPORT --> BLOB REPORT --> SQL API --> APPINS[Application Insights] ORCH --> APPINS CHUNK --> APPINS MERGE --> APPINS REPORT --> APPINS ADMIN[Admin Dashboard] --> API

Service Responsibilities

import-api (ASP.NET Core)

  • Authenticates the user
  • Creates upload sessions
  • Generates SAS URLs for direct blob upload
  • Creates import_jobs
  • Finalises uploads
  • Enqueues import requests to Service Bus
  • Serves:
    • import status
    • progress
    • validation summaries
    • error report links
    • admin actions

import-orchestrator (Azure Function)

  • Starts when a file upload is finalised
  • Loads import template/schema
  • Inspects file metadata
  • Validates file-level properties:
    • size
    • extension
    • delimiter/header assumptions
    • required columns
  • Creates chunk plan
  • Writes import_chunks
  • Enqueues chunk processing messages

chunk-processor (Azure Function)

  • Processes a single chunk of rows
  • Streams CSV rows rather than loading the full file into memory
  • Parses and validates each row
  • Applies transformations and normalization
  • Writes valid rows to staging tables using bulk operations
  • Writes invalid rows to import_row_errors
  • Emits chunk completion events

merge-processor (Azure Function)

  • Runs when all chunks are complete
  • Applies staged valid records into final domain tables
  • Uses transactional merge/upsert logic
  • Protects against duplicate application
  • Updates import result metrics

report-generator (Azure Function)

  • Builds downloadable error reports
  • Writes summary artifacts to blob
  • Marks import as completed / failed / completed_with_errors

admin-api

  • Lists jobs, chunks, failures, DLQs
  • Supports replay, cancel, re-run, and audit views

End-to-End Workflow

Upload and Job Creation

sequenceDiagram participant U as User UI participant API as Import API participant SQL as Azure SQL participant B as Blob Storage participant SB as Service Bus U->>API: POST /imports/sessions API->>SQL: Create import_job(status=upload_pending) SQL-->>API: import_job_id API->>B: Generate SAS URL API-->>U: upload URL + import_job_id U->>B: Upload CSV directly U->>API: POST /imports/{id}/complete API->>SQL: Update job(status=queued) API->>SB: Enqueue imports.requested API-->>U: 202 Accepted

Processing Flow

flowchart TD A[Import Requested] --> B[Load Import Template] B --> C[Inspect Blob Metadata] C --> D[Validate File-Level Rules] D --> E[Create Chunk Plan] E --> F[Enqueue Chunk Messages] F --> G[Process Chunks in Parallel] G --> H[Write Valid Rows to Staging] G --> I[Write Invalid Rows to Error Store] H --> J[Track Progress] I --> J J --> K{All Chunks Complete?} K -- No --> G K -- Yes --> L[Merge Staged Data] L --> M[Generate Error Report] M --> N[Mark Job Completed]

Progress Read Flow

sequenceDiagram participant U as User UI participant API as Import API participant R as Redis participant SQL as Azure SQL U->>API: GET /imports/{id} API->>R: Get hot progress snapshot alt cache hit R-->>API: progress payload else cache miss API->>SQL: Query import_jobs + chunks + metrics SQL-->>API: job status and metrics API->>R: Cache snapshot end API-->>U: status, counts, links

Processing Strategy

Rule

  • The request path creates work.
  • The worker path performs work.
  • Large files are streamed and chunked.
  • Database writes happen in batches.
  • Final application into domain tables happens only after validation/staging succeeds.

Chunking Strategy

  • Split work by row ranges or logical chunk size
  • Example chunk dimensions:
    • 5,000 to 25,000 rows per chunk
    • tuned by row width and downstream cost
  • Each chunk has:
    • job_id
    • chunk_index
    • start_row
    • end_row
    • status
    • attempt_count

Validation Layers

File-Level Validation

  • file extension
  • MIME/content-type sanity check
  • size limits
  • encoding detection
  • delimiter detection
  • header presence
  • duplicate column detection

Schema Validation

  • required columns present
  • unsupported columns flagged
  • column type expectations
  • template version compatibility

Row-Level Validation

  • required values present
  • type conversion success
  • string length checks
  • date/decimal parsing
  • enum/domain constraints
  • duplicate row detection within file

Business Validation

  • uniqueness against existing records
  • referential integrity checks
  • status transition rules
  • tenant ownership constraints
  • temporal/business rules

Storage Layout

Blob Storage Containers

  • raw-imports/
    • original uploaded CSVs
  • import-manifests/
    • inferred schema / chunk manifests
  • import-artifacts/
    • normalized outputs if needed
  • import-error-reports/
    • generated CSV error files
  • import-audit/
    • optional raw trace artifacts

Blob Naming

  • raw-imports/{tenant}/{job_id}/source.csv
  • import-manifests/{tenant}/{job_id}/manifest.json
  • import-error-reports/{tenant}/{job_id}/errors.csv

Data Model

Core Tables

Table Purpose
import_jobs Top-level import lifecycle
import_files Blob metadata, checksums, file attributes
import_templates Schema definitions by import type
import_template_columns Expected columns and rules
import_chunks Chunk plan and per-chunk execution state
staging_rows Valid normalized rows before merge
import_row_errors Invalid rows with error details
import_metrics Counters and summary metrics
import_results Final applied counts and output links
deduplication_keys Idempotency and duplicate protection
audit_logs Operational and security audit trail
dead_letters Persisted failed messages and replay state

import_jobs

Suggested columns:

  • id
  • tenant_id
  • import_type
  • status
  • requested_by_user_id
  • file_id
  • template_id
  • total_rows
  • valid_rows
  • invalid_rows
  • processed_rows
  • started_at
  • completed_at
  • failure_reason
  • created_at
  • updated_at

import_chunks

Suggested columns:

  • id
  • job_id
  • chunk_index
  • start_row
  • end_row
  • status
  • attempt_count
  • worker_id
  • started_at
  • completed_at
  • valid_count
  • invalid_count
  • error_count

import_row_errors

Suggested columns:

  • id
  • job_id
  • chunk_id
  • row_number
  • raw_payload
  • field_name
  • error_code
  • error_message
  • created_at

SQL Strategy

Staging and Merge

  • Use SqlBulkCopy to write valid transformed rows into staging_rows
  • Keep staging tables append-only per job
  • Use stored procedures or set-based MERGE logic to apply staged rows into final domain tables
  • Wrap merge step in controlled batches
  • Record applied keys to prevent duplicate application

Why Stage First

  • avoids partial corruption of domain tables
  • allows full validation before final apply
  • simplifies replay
  • supports downloadable row-level error reporting
  • makes operations auditable

Service Bus Topology

Queue / Topic Purpose
imports.requested New import jobs
imports.chunks Chunk processing work
imports.chunk-completed Chunk completion notifications
imports.merge-requested Trigger final merge
imports.report-requested Trigger report generation
audit.events Audit trail events
imports.failed Hard failures requiring review

Message Design

Each message should include:

  • message_type
  • job_id
  • tenant_id
  • chunk_id where applicable
  • attempt
  • correlation_id
  • causation_id
  • timestamp
  • schema_version

Sessioning

Use SessionId = job_id where ordered job-level transitions matter, especially for:

  • merge trigger
  • report generation
  • final completion

Idempotency, Retries, and DLQ

Idempotency

  • Upload idempotency:
    • based on job_id
  • File dedupe:
    • optional checksum on file content + tenant + import type
  • Chunk idempotency:
    • unique key on (job_id, chunk_index)
  • Merge idempotency:
    • record final apply token per job
  • Row idempotency:
    • optional business key dedupe in target tables

Retries

  • transient worker failures:
    • rely on Service Bus retry delivery
    • exponential backoff
  • transient SQL/blob failures:
    • retry with jitter
  • parse failures at row level:
    • do not retry endlessly; record row error instead

Dead-Letter Handling

  • Messages exceeding retry threshold go to DLQ
  • DLQ entries are surfaced in admin UI
  • Admin can:
    • inspect payload
    • replay message
    • mark ignored
    • attach operator notes

Redis Strategy

Use Redis for:

  • import progress snapshots:
    • import:{job_id}:progress
  • active dashboard counters
  • recent import summaries
  • operator hot views

Invalidation

  • update progress on chunk completion
  • expire completed jobs after a short TTL in cache
  • database remains the source of truth

Audit Logging

Audit:

  • upload session creation
  • import creation
  • file completion
  • chunk start/end
  • merge start/end
  • report generation
  • replay actions
  • cancellation actions
  • DLQ actions
  • admin interventions

Persist audit logs in Azure SQL and send traces to Application Insights.

Observability

Track:

  • queue depth
  • chunk throughput
  • average chunk duration
  • rows processed per minute
  • validation error rate
  • merge latency
  • report generation latency
  • DLQ growth
  • API latency
  • progress endpoint latency
  • SQL bulk insert duration

Error Handling Model

Import Statuses

  • upload_pending
  • queued
  • validating
  • processing
  • merging
  • generating_report
  • completed
  • completed_with_errors
  • failed
  • cancelled

Chunk Statuses

  • queued
  • processing
  • completed
  • failed
  • dead_lettered

Failure Boundaries

  • One bad row should not kill the whole chunk
  • One failed chunk should not corrupt the whole job
  • One failed merge should not partially apply silently
  • One failed report generation should not erase a successful import

Admin Dashboard

Views

  • job list with status filters
  • chunk breakdown per job
  • row error counts and samples
  • downloadable error reports
  • DLQ entries
  • replay history
  • operator audit trail
  • throughput and failure metrics

Actions

  • replay failed chunk
  • replay failed merge
  • regenerate error report
  • cancel queued job
  • re-run full job
  • download original file
  • inspect staging summary

API Shape

User / App APIs

  • POST /imports/sessions
  • POST /imports/{id}/complete
  • GET /imports/{id}
  • GET /imports/{id}/progress
  • GET /imports/{id}/errors
  • GET /imports/{id}/error-report
  • GET /imports

Admin APIs

  • GET /admin/imports
  • GET /admin/imports/{id}/chunks
  • GET /admin/imports/{id}/audit
  • GET /admin/imports/{id}/dead-letters
  • POST /admin/imports/{id}/replay
  • POST /admin/imports/{id}/cancel
  • POST /admin/imports/{id}/regenerate-report

Deployment

flowchart TB subgraph Azure APP[App Service / Container Apps - Import API] FUNC1[Azure Function - Orchestrator] FUNC2[Azure Function - Chunk Processor] FUNC3[Azure Function - Merge Processor] FUNC4[Azure Function - Report Generator] SQL[(Azure SQL Database)] BLOB[Blob Storage] SB[Azure Service Bus] REDIS[Azure Cache for Redis] KV[Key Vault] AI[Application Insights] MON[Azure Monitor / Log Analytics] end APP --> SQL APP --> BLOB APP --> SB APP --> REDIS APP --> KV APP --> AI FUNC1 --> SQL FUNC1 --> BLOB FUNC1 --> SB FUNC1 --> KV FUNC1 --> AI FUNC2 --> SQL FUNC2 --> BLOB FUNC2 --> SB FUNC2 --> KV FUNC2 --> AI FUNC3 --> SQL FUNC3 --> SB FUNC3 --> KV FUNC3 --> AI FUNC4 --> SQL FUNC4 --> BLOB FUNC4 --> KV FUNC4 --> AI AI --> MON

Why This Meets the Requirements

  • Large files do not block API requests:
    • upload is direct to blob
    • processing is async
  • Performance is predictable:
    • work is chunked
    • hot progress can be served from Redis
  • Reliability is strong:
    • queue-based execution
    • retries
    • DLQ
    • replay
  • Data safety is strong:
    • validate first
    • stage next
    • merge last
  • Partial failure is supported:
    • invalid rows become error records, not silent drops
  • Operations are visible:
    • status tables
    • audit logs
    • dashboard
  • Scale is horizontal:
    • chunk processors can scale independently

Main Trade-Offs

  • More moving parts than a naive synchronous upload-and-parse API
  • Requires careful chunk sizing
  • Staging tables add storage cost
  • Replay and idempotency logic add complexity
  • Redis is optional, but useful for high-frequency dashboard reads

Non-Negotiable Rule

  • The API request that starts the import must never parse and process the full CSV synchronously.
  • The request path creates jobs.
  • The worker path executes jobs.

Suggested Build Order

  1. import_jobs + import_files schema
  2. SAS-based direct upload flow
  3. imports.requested queue + orchestrator
  4. chunk model + chunk processor
  5. row validation + import_row_errors
  6. staging writes with SqlBulkCopy
  7. merge processor
  8. report generation
  9. progress endpoints
  10. admin replay / DLQ tooling
  11. Redis acceleration
  12. full observability and dashboard

If you want, I can do the same again but convert this into a freelance portfolio case study format with sections like:

  • Problem
  • Why the naive approach fails
  • Architecture
  • Trade-offs
  • Outcome

instead of pure solution architecture.