Skip to main content

Stateful Document Processing with Spring StateMachine, Kafka, and CSV

· 3 min read
Byju Luckose

In this blog post, we'll build a real-world application that combines Spring StateMachine, Apache Kafka, and CSV-based document ingestion to manage complex document lifecycles in a scalable and reactive way.

Use Case Overview

You have a CSV file that contains many documents. Each row defines a document and an event to apply (e.g., START, COMPLETE). The system should:

  1. Read the CSV file

  2. Send a Kafka message for each row

  3. Consume the Kafka message

  4. Trigger a Spring StateMachine transition for the related document

  5. Persist the updated document state

Sample CSV Format


documentId,title,state,event
doc-001,Contract A,NEW,START
doc-002,Contract B,NEW,START
doc-003,Report C,PROCESSING,COMPLETE

Technologies Used

  • Java 17

  • Spring Boot 3.x

  • Spring StateMachine

  • Spring Kafka

  • Apache Commons CSV

  • H2 Database

Enum Definitions


public enum DocumentState {
NEW, PROCESSING, COMPLETED, ERROR
}

public enum DocumentEvent {
START, COMPLETE, FAIL
}

StateMachine Configuration

@Configuration
@EnableStateMachineFactory
public class DocumentStateMachineConfig extends StateMachineConfigurerAdapter<DocumentState, DocumentEvent> {

@Override
public void configure(StateMachineTransitionConfigurer<DocumentState, DocumentEvent> transitions) throws Exception {
transitions
.withExternal().source(DocumentState.NEW).target(DocumentState.PROCESSING).event(DocumentEvent.START)
.and()
.withExternal().source(DocumentState.PROCESSING).target(DocumentState.COMPLETED).event(DocumentEvent.COMPLETE)
.and()
.withExternal().source(DocumentState.NEW).target(DocumentState.ERROR).event(DocumentEvent.FAIL);
}
}

Document Entity

@Entity
public class Document {

@Id
private String id;
private String title;

@Enumerated(EnumType.STRING)
private DocumentState state;

// Getters and Setters
}

Kafka Producer and CSV Processing


@Component
public class CsvProcessor {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void processCSV(InputStream inputStream) throws IOException {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
CSVParser parser = CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(reader)) {

for (CSVRecord record : parser) {
String documentId = record.get("documentId");
String event = record.get("event");
kafkaTemplate.send("document-events", documentId, event);
}
}
}
}

REST Upload Endpoint

@RestController
@RequestMapping("/api/documents")
public class DocumentUploadController {

@Autowired
private CsvProcessor csvProcessor;

@PostMapping("/upload")
public ResponseEntity<String> upload(@RequestParam("file") MultipartFile file) throws IOException {
csvProcessor.processCSV(file.getInputStream());
return ResponseEntity.ok("CSV processed successfully");
}
}

Kafka Listener and State Transition


@Component
public class DocumentEventListener {

@Autowired
private StateMachineFactory<DocumentState, DocumentEvent> stateMachineFactory;

@Autowired
private DocumentRepository documentRepository;

@KafkaListener(topics = "document-events")
public void onMessage(ConsumerRecord<String, String> record) {
String docId = record.key();
DocumentEvent event = DocumentEvent.valueOf(record.value());

StateMachine<DocumentState, DocumentEvent> sm = stateMachineFactory.getStateMachine(docId);
sm.start();
sm.sendEvent(event);

Document doc = documentRepository.findById(docId).orElseThrow();
doc.setState(sm.getState().getId());
documentRepository.save(doc);
}
}

Document Repository


public interface DocumentRepository extends JpaRepository<Document, String> {}

Final Thoughts

This architecture provides:

  • Decoupled, event-driven state management

  • Easily testable document lifecycles

  • A scalable pattern for batch processing from CSVs

You can extend this with:

  • Retry transitions

  • Error handling

  • Audit logging

  • UI feedback via WebSockets or REST polling

Let me know if you'd like the full GitHub repo, Docker setup, or integration with a frontend uploader!