In this blog post, we'll build a real-world application that combines Spring StateMachine, Apache Kafka, and CSV-based document ingestion to manage complex document lifecycles in a scalable and reactive way.
Use Case Overview
You have a CSV file that contains many documents. Each row defines a document and an event to apply (e.g., START, COMPLETE). The system should:
-
Read the CSV file
-
Send a Kafka message for each row
-
Consume the Kafka message
-
Trigger a Spring StateMachine transition for the related document
-
Persist the updated document state
Sample CSV Format
documentId,title,state,event
doc-001,Contract A,NEW,START
doc-002,Contract B,NEW,START
doc-003,Report C,PROCESSING,COMPLETE
Technologies Used
-
Java 17
-
Spring Boot 3.x
-
Spring StateMachine
-
Spring Kafka
-
Apache Commons CSV
-
H2 Database
Enum Definitions
public enum DocumentState {
NEW, PROCESSING, COMPLETED, ERROR
}
public enum DocumentEvent {
START, COMPLETE, FAIL
}
StateMachine Configuration
@Configuration
@EnableStateMachineFactory
public class DocumentStateMachineConfig extends StateMachineConfigurerAdapter<DocumentState, DocumentEvent> {
@Override
public void configure(StateMachineTransitionConfigurer<DocumentState, DocumentEvent> transitions) throws Exception {
transitions
.withExternal().source(DocumentState.NEW).target(DocumentState.PROCESSING).event(DocumentEvent.START)
.and()
.withExternal().source(DocumentState.PROCESSING).target(DocumentState.COMPLETED).event(DocumentEvent.COMPLETE)
.and()
.withExternal().source(DocumentState.NEW).target(DocumentState.ERROR).event(DocumentEvent.FAIL);
}
}
Document Entity
@Entity
public class Document {
@Id
private String id;
private String title;
@Enumerated(EnumType.STRING)
private DocumentState state;
// Getters and Setters
}
Kafka Producer and CSV Processing
@Component
public class CsvProcessor {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void processCSV(InputStream inputStream) throws IOException {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
CSVParser parser = CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(reader)) {
for (CSVRecord record : parser) {
String documentId = record.get("documentId");
String event = record.get("event");
kafkaTemplate.send("document-events", documentId, event);
}
}
}
}
REST Upload Endpoint
@RestController
@RequestMapping("/api/documents")
public class DocumentUploadController {
@Autowired
private CsvProcessor csvProcessor;
@PostMapping("/upload")
public ResponseEntity<String> upload(@RequestParam("file") MultipartFile file) throws IOException {
csvProcessor.processCSV(file.getInputStream());
return ResponseEntity.ok("CSV processed successfully");
}
}
Kafka Listener and State Transition
@Component
public class DocumentEventListener {
@Autowired
private StateMachineFactory<DocumentState, DocumentEvent> stateMachineFactory;
@Autowired
private DocumentRepository documentRepository;
@KafkaListener(topics = "document-events")
public void onMessage(ConsumerRecord<String, String> record) {
String docId = record.key();
DocumentEvent event = DocumentEvent.valueOf(record.value());
StateMachine<DocumentState, DocumentEvent> sm = stateMachineFactory.getStateMachine(docId);
sm.start();
sm.sendEvent(event);
Document doc = documentRepository.findById(docId).orElseThrow();
doc.setState(sm.getState().getId());
documentRepository.save(doc);
}
}
Document Repository
public interface DocumentRepository extends JpaRepository<Document, String> {}
Final Thoughts
This architecture provides:
-
Decoupled, event-driven state management
-
Easily testable document lifecycles
-
A scalable pattern for batch processing from CSVs
You can extend this with:
-
Retry transitions
-
Error handling
-
Audit logging
-
UI feedback via WebSockets or REST polling
Let me know if you'd like the full GitHub repo, Docker setup, or integration with a frontend uploader!