CSV Import & Data Processing Pipeline - Solution Architecture
Mission
Build a resilient CSV import and data processing platform that allows users to upload large CSV files, validates and transforms them asynchronously, applies valid records into the system safely, surfaces row-level errors clearly, and provides progress, auditability, and replay tools without ever processing the full file in the synchronous request path.

Constraints
- Backend stack:
C#/ASP.NET Core - Cloud:
Azure - Primary database:
Azure SQL Database - File storage:
Azure Blob Storage - Queueing:
Azure Service Bus - Optional cache:
Azure Cache for Redis - Processing must be asynchronous
- Large files must not be loaded fully into memory
- Partial row failures must not fail the whole import
- Users must be able to see progress and download error reports
- Failed jobs must be retryable and auditable
Core Design
- Never parse the full CSV inside the initial HTTP request.
- Upload raw files directly to
Blob Storageusing SAS URLs. - Create an
import jobas the control-plane record of the workflow. - Process the file asynchronously in chunks.
- Separate:
- upload
- validation
- transformation
- staging
- merge
- reporting
- Use
Azure SQLas the source of truth for job state, staging state, results, and audit state. - Use
Service Busfor decoupled job execution and retries. - Use
Redisfor hot progress/status reads where needed. - Generate downloadable error reports for invalid rows.
- Make every stage idempotent and replayable.
Azure Services
Azure App ServiceorAzure Container Appsimport-apiadmin-api
Azure Functionsimport-orchestratorchunk-processormerge-processorreport-generator
Azure Blob Storage- raw uploads
- manifests
- chunk files if needed
- error reports
Azure Service Bus- import request queue
- chunk work queue
- completion/failure topics
- DLQs
Azure SQL DatabaseAzure Cache for RedisAzure Key VaultApplication InsightsAzure Monitor/Log AnalyticsAzure Event GridorAzure SignalR Servicefor optional progress push
High-Level Architecture
Service Responsibilities
import-api (ASP.NET Core)
- Authenticates the user
- Creates upload sessions
- Generates SAS URLs for direct blob upload
- Creates
import_jobs - Finalises uploads
- Enqueues import requests to
Service Bus - Serves:
- import status
- progress
- validation summaries
- error report links
- admin actions
import-orchestrator (Azure Function)
- Starts when a file upload is finalised
- Loads import template/schema
- Inspects file metadata
- Validates file-level properties:
- size
- extension
- delimiter/header assumptions
- required columns
- Creates chunk plan
- Writes
import_chunks - Enqueues chunk processing messages
chunk-processor (Azure Function)
- Processes a single chunk of rows
- Streams CSV rows rather than loading the full file into memory
- Parses and validates each row
- Applies transformations and normalization
- Writes valid rows to staging tables using bulk operations
- Writes invalid rows to
import_row_errors - Emits chunk completion events
merge-processor (Azure Function)
- Runs when all chunks are complete
- Applies staged valid records into final domain tables
- Uses transactional merge/upsert logic
- Protects against duplicate application
- Updates import result metrics
report-generator (Azure Function)
- Builds downloadable error reports
- Writes summary artifacts to blob
- Marks import as completed / failed / completed_with_errors
admin-api
- Lists jobs, chunks, failures, DLQs
- Supports replay, cancel, re-run, and audit views
End-to-End Workflow
Upload and Job Creation
Processing Flow
Progress Read Flow
Processing Strategy
Rule
- The request path creates work.
- The worker path performs work.
- Large files are streamed and chunked.
- Database writes happen in batches.
- Final application into domain tables happens only after validation/staging succeeds.
Chunking Strategy
- Split work by row ranges or logical chunk size
- Example chunk dimensions:
5,000to25,000rows per chunk- tuned by row width and downstream cost
- Each chunk has:
job_idchunk_indexstart_rowend_rowstatusattempt_count
Validation Layers
File-Level Validation
- file extension
- MIME/content-type sanity check
- size limits
- encoding detection
- delimiter detection
- header presence
- duplicate column detection
Schema Validation
- required columns present
- unsupported columns flagged
- column type expectations
- template version compatibility
Row-Level Validation
- required values present
- type conversion success
- string length checks
- date/decimal parsing
- enum/domain constraints
- duplicate row detection within file
Business Validation
- uniqueness against existing records
- referential integrity checks
- status transition rules
- tenant ownership constraints
- temporal/business rules
Storage Layout
Blob Storage Containers
raw-imports/- original uploaded CSVs
import-manifests/- inferred schema / chunk manifests
import-artifacts/- normalized outputs if needed
import-error-reports/- generated CSV error files
import-audit/- optional raw trace artifacts
Blob Naming
raw-imports/{tenant}/{job_id}/source.csvimport-manifests/{tenant}/{job_id}/manifest.jsonimport-error-reports/{tenant}/{job_id}/errors.csv
Data Model
Core Tables
| Table | Purpose |
|---|---|
import_jobs |
Top-level import lifecycle |
import_files |
Blob metadata, checksums, file attributes |
import_templates |
Schema definitions by import type |
import_template_columns |
Expected columns and rules |
import_chunks |
Chunk plan and per-chunk execution state |
staging_rows |
Valid normalized rows before merge |
import_row_errors |
Invalid rows with error details |
import_metrics |
Counters and summary metrics |
import_results |
Final applied counts and output links |
deduplication_keys |
Idempotency and duplicate protection |
audit_logs |
Operational and security audit trail |
dead_letters |
Persisted failed messages and replay state |
import_jobs
Suggested columns:
idtenant_idimport_typestatusrequested_by_user_idfile_idtemplate_idtotal_rowsvalid_rowsinvalid_rowsprocessed_rowsstarted_atcompleted_atfailure_reasoncreated_atupdated_at
import_chunks
Suggested columns:
idjob_idchunk_indexstart_rowend_rowstatusattempt_countworker_idstarted_atcompleted_atvalid_countinvalid_counterror_count
import_row_errors
Suggested columns:
idjob_idchunk_idrow_numberraw_payloadfield_nameerror_codeerror_messagecreated_at
SQL Strategy
Staging and Merge
- Use
SqlBulkCopyto write valid transformed rows intostaging_rows - Keep staging tables append-only per job
- Use stored procedures or set-based
MERGElogic to apply staged rows into final domain tables - Wrap merge step in controlled batches
- Record applied keys to prevent duplicate application
Why Stage First
- avoids partial corruption of domain tables
- allows full validation before final apply
- simplifies replay
- supports downloadable row-level error reporting
- makes operations auditable
Service Bus Topology
| Queue / Topic | Purpose |
|---|---|
imports.requested |
New import jobs |
imports.chunks |
Chunk processing work |
imports.chunk-completed |
Chunk completion notifications |
imports.merge-requested |
Trigger final merge |
imports.report-requested |
Trigger report generation |
audit.events |
Audit trail events |
imports.failed |
Hard failures requiring review |
Message Design
Each message should include:
message_typejob_idtenant_idchunk_idwhere applicableattemptcorrelation_idcausation_idtimestampschema_version
Sessioning
Use SessionId = job_id where ordered job-level transitions matter, especially for:
- merge trigger
- report generation
- final completion
Idempotency, Retries, and DLQ
Idempotency
- Upload idempotency:
- based on
job_id
- based on
- File dedupe:
- optional checksum on file content + tenant + import type
- Chunk idempotency:
- unique key on
(job_id, chunk_index)
- unique key on
- Merge idempotency:
- record final apply token per job
- Row idempotency:
- optional business key dedupe in target tables
Retries
- transient worker failures:
- rely on
Service Busretry delivery - exponential backoff
- rely on
- transient SQL/blob failures:
- retry with jitter
- parse failures at row level:
- do not retry endlessly; record row error instead
Dead-Letter Handling
- Messages exceeding retry threshold go to DLQ
- DLQ entries are surfaced in admin UI
- Admin can:
- inspect payload
- replay message
- mark ignored
- attach operator notes
Redis Strategy
Use Redis for:
- import progress snapshots:
import:{job_id}:progress
- active dashboard counters
- recent import summaries
- operator hot views
Invalidation
- update progress on chunk completion
- expire completed jobs after a short TTL in cache
- database remains the source of truth
Audit Logging
Audit:
- upload session creation
- import creation
- file completion
- chunk start/end
- merge start/end
- report generation
- replay actions
- cancellation actions
- DLQ actions
- admin interventions
Persist audit logs in Azure SQL and send traces to Application Insights.
Observability
Track:
- queue depth
- chunk throughput
- average chunk duration
- rows processed per minute
- validation error rate
- merge latency
- report generation latency
- DLQ growth
- API latency
- progress endpoint latency
- SQL bulk insert duration
Error Handling Model
Import Statuses
upload_pendingqueuedvalidatingprocessingmerginggenerating_reportcompletedcompleted_with_errorsfailedcancelled
Chunk Statuses
queuedprocessingcompletedfaileddead_lettered
Failure Boundaries
- One bad row should not kill the whole chunk
- One failed chunk should not corrupt the whole job
- One failed merge should not partially apply silently
- One failed report generation should not erase a successful import
Admin Dashboard
Views
- job list with status filters
- chunk breakdown per job
- row error counts and samples
- downloadable error reports
- DLQ entries
- replay history
- operator audit trail
- throughput and failure metrics
Actions
- replay failed chunk
- replay failed merge
- regenerate error report
- cancel queued job
- re-run full job
- download original file
- inspect staging summary
API Shape
User / App APIs
POST /imports/sessionsPOST /imports/{id}/completeGET /imports/{id}GET /imports/{id}/progressGET /imports/{id}/errorsGET /imports/{id}/error-reportGET /imports
Admin APIs
GET /admin/importsGET /admin/imports/{id}/chunksGET /admin/imports/{id}/auditGET /admin/imports/{id}/dead-lettersPOST /admin/imports/{id}/replayPOST /admin/imports/{id}/cancelPOST /admin/imports/{id}/regenerate-report
Deployment
Why This Meets the Requirements
- Large files do not block API requests:
- upload is direct to blob
- processing is async
- Performance is predictable:
- work is chunked
- hot progress can be served from
Redis
- Reliability is strong:
- queue-based execution
- retries
- DLQ
- replay
- Data safety is strong:
- validate first
- stage next
- merge last
- Partial failure is supported:
- invalid rows become error records, not silent drops
- Operations are visible:
- status tables
- audit logs
- dashboard
- Scale is horizontal:
- chunk processors can scale independently
Main Trade-Offs
- More moving parts than a naive synchronous upload-and-parse API
- Requires careful chunk sizing
- Staging tables add storage cost
- Replay and idempotency logic add complexity
Redisis optional, but useful for high-frequency dashboard reads
Non-Negotiable Rule
- The API request that starts the import must never parse and process the full CSV synchronously.
- The request path creates jobs.
- The worker path executes jobs.
Suggested Build Order
import_jobs+import_filesschema- SAS-based direct upload flow
imports.requestedqueue + orchestrator- chunk model + chunk processor
- row validation +
import_row_errors - staging writes with
SqlBulkCopy - merge processor
- report generation
- progress endpoints
- admin replay / DLQ tooling
- Redis acceleration
- full observability and dashboard
If you want, I can do the same again but convert this into a freelance portfolio case study format with sections like:
ProblemWhy the naive approach failsArchitectureTrade-offsOutcome
instead of pure solution architecture.