Skip to main content

3 posts tagged with "StateMachine"

View All Tags

· 4 min read
Byju Luckose

State machines are widely used to model workflows, business processes, and system behavior in software engineering. But when used across large systems or microservices, the lack of transformation standards often leads to inconsistent transitions, duplicated logic, and hard to debug failures.

In this blog post, we'll explore how to define State Machine Transformation Standards – a practical approach to improving structure, traceability, and maintainability in systems using state machines.

Why Do We Need Transformation Standards?

State machines are inherently about transitions — from one state to another based on an event. However, in real-world systems:

  • Events come from multiple sources (UI, APIs, Kafka, batch jobs)

  • The same state machine can be triggered in different contexts

  • Transition logic often mixes with transformation logic

Without standards:
  • Event-to-state mapping becomes inconsistent

  • Error handling differs across modules

  • Reuse becomes difficult

With standards:
  • All transitions follow a common contract

  • Developers know where to find transformation logic

  • Testing becomes deterministic

Core Concepts of State Machine Transformation Standards

  • We'll define a layered architecture that separates:

  • External Events (e.g., JSON messages, HTTP requests)

  • Transformation Layer (mapping input to internal events)

  • Internal State Machine (defined states, events, guards, actions)

  • Post-Processing (e.g., publishing, notifications, logging)

+---------------------+
| External Event (JSON)|
+----------+----------+
|
v
+---------------------+
| EventTransformer |
| Converts to |
| Internal Event Enum |
+----------+----------+
|
v
+---------------------+
| StateMachineService |
| Applies transition |
| Logs state change |
+----------+----------+
|
v
+---------------------+
| Post Processor |
| (Notify, Log, Save) |
+---------------------+


Naming Conventions for States and Events

State Naming (Enum):

Use ALL_CAPS with action-driven semantics:
  • WAITING_FOR_VALIDATION

  • VALIDATED

  • REJECTED

  • PROCESSING

Event Naming (Enum):

Use PAST_TENSE or clear action phrases:
  • VALIDATION_SUCCEEDED

  • VALIDATION_FAILED

  • DOCUMENT_RECEIVED

  • PROCESS_COMPLETED

The Event Transformer Standard

This is a key part of the pattern. The EventTransformer receives raw input (e.g., from Kafka or REST) and maps it to a known event enum:

public interface EventTransformer {
ScenarioEvent transform(Object externalEvent);
}

public class KafkaDocumentEventTransformer implements EventTransformer {
@Override
public ScenarioEvent transform(Object externalEvent) {
if (externalEvent instanceof ValidationSuccessMessage) {
return ScenarioEvent.VALIDATION_SUCCEEDED;
}
throw new IllegalArgumentException("Unknown event");
}
}

Handling Standard Events: NEXT, FAIL, SKIP, RETRY

In standardized workflows, having a common set of generic events improves clarity and reusability across different state machines. Four particularly useful event types are:

  • NEXT: A neutral transition to the next logical state (often from a task completion)

  • FAIL: Indicates a failure that should move the process to a failure or error state

  • SKIP: Skips a task or validation step and moves to a later state

  • RETRY: Retries the current action or state without progressing

These events should be defined in a shared enum or interface and respected across all state machine configurations.

public enum CommonEvent {
NEXT,
FAIL,
SKIP,
RETRY
}

When combined with guards and actions, these events make workflows predictable and debuggable.

State Machine Configuration Example (Spring StateMachine)

builder.configureStates()
.withStates()
.initial(ScenarioState.WAITING_FOR_VALIDATION)
.state(ScenarioState.VALIDATED)
.state(ScenarioState.REJECTED);

builder.configureTransitions()
.withExternal()
.source(ScenarioState.WAITING_FOR_VALIDATION)
.target(ScenarioState.VALIDATED)
.event(ScenarioEvent.VALIDATION_SUCCEEDED);

Logging and Auditing Standard

  • Every transition should be logged with:
  • Previous State

  • Triggering Event

  • New State

  • Timestamp

  • Correlation ID (e.g., documentId or userId)

log.info("Transitioned from {} to {} on event {} [docId: {}]",
previousState, newState, event, docId);

Testing Transformation and Transitions

Unit test the EventTransformer separately from the StateMachine:

@Test
void testKafkaToEventMapping() {
ScenarioEvent event = transformer.transform(new ValidationSuccessMessage());
assertEquals(ScenarioEvent.VALIDATION_SUCCEEDED, event);
}

Also test transitions:


@Test
void testValidationTransition() {
stateMachine.sendEvent(ScenarioEvent.VALIDATION_SUCCEEDED);
assertEquals(ScenarioState.VALIDATED, stateMachine.getState().getId());
}

Real-World Use Case – Document Workflow Engine

At oohm.io, we use this standard to model document processing workflows. Documents pass through states like UPLOADED, VALIDATING, VALIDATED, FAILED_VALIDATION, and ARCHIVED. Each incoming Kafka message is transformed into an internal event, which triggers transitions.

The benefits:

  • Simplified debugging of failures

  • Easier onboarding for new developers

  • Predictable behavior across microservices

Conclusion

Defining clear State Machine Transformation Standards allows teams to build complex workflows without chaos. By separating concerns, using naming conventions, and implementing a structured transformer layer, you create a predictable and maintainable system.

Whether you're working on document pipelines, payment systems, or approval flows — standards will keep your state machines under control.

· 4 min read
Byju Luckose

In the age of microservices and polyglot development, it's common for teams to use different languages for different tasks Java for orchestration, Python for AI, and C# for enterprise system integration. To tie all this together, Apache Kafka shines as a powerful messaging backbone. In this blog post, we’ll explore how to build a multi-language worker architecture using Spring Boot and Kafka, with workers written in Java, Python, and C#.

Why Use Kafka with Multiple Language Workers?

Kafka is a distributed message queue designed for high-throughput and decoupled communication. Using Kafka with multi-language workers allows you to:

  • Scale task execution independently per language.

  • Use the best language for each task.

  • Decouple orchestration logic from implementation details.

  • Add or remove workers without restarting the system.

Architecture Overview


+-----------------------------+ Kafka Topics +-------------------------+
| Spring Boot App | ---------------------------> | |
| (Orchestrator) | [task-submission] | Java Worker |
| | | - Parses DOCX |
| - Accepts job via REST | <--------------------------- | - Converts to PDF |
| - Sends JSON tasks to Kafka| [task-result] +-------------------------+
| - Collects results | +-------------------------+
+-----------------------------+ | |
| Python Worker |
| - Runs ML Inference |
| - Extracts Text |
+-------------------------+
| |
| C# (.NET) Worker |
| - Legacy System API |
| - Data Enrichment |
+-------------------------+



Topics

  • task-submission: Receives tasks from orchestrator

  • task-result: Publishes results from workers

Common Message Format

All communication uses a shared JSON message schema:


{
"jobId": "123e4567-e89b-12d3-a456-426614174000",
"taskType": "DOC_CONVERT",
"payload": {
"source": "http://example.com/sample.docx",
"outputFormat": "pdf"
}
}


Spring Boot Orchestrator

Dependencies (Maven)

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

REST + Kafka Integration

@RestController
@RequestMapping("/jobs")
public class JobController {

private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper = new ObjectMapper();

public JobController(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

@PostMapping
public ResponseEntity<String> submitJob(@RequestBody Map<String, Object> job) throws JsonProcessingException {
String jobId = UUID.randomUUID().toString();
job.put("jobId", jobId);
String json = objectMapper.writeValueAsString(job);
kafkaTemplate.send("task-submission", jobId, json);
return ResponseEntity.ok("Job submitted: " + jobId);
}

@KafkaListener(topics = "task-result", groupId = "orchestrator")
public void receiveResult(String message) {
System.out.println("Received result: " + message);
}
}


Java Worker Example


@KafkaListener(topics = "task-submission", groupId = "java-worker")
public void consume(String message) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> task = mapper.readValue(message, new TypeReference<>() {});
// ... Process ...
Map<String, Object> result = Map.of(
"jobId", task.get("jobId"),
"status", "done",
"worker", "java"
);
kafkaTemplate.send("task-result", mapper.writeValueAsString(result));
}

Python Worker Example


from kafka import KafkaConsumer, KafkaProducer
import json

consumer = KafkaConsumer('task-submission', bootstrap_servers='localhost:9092', group_id='py-worker')
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode())

for msg in consumer:
task = json.loads(msg.value.decode())
print("Python Worker got task:", task)

result = {
"jobId": task["jobId"],
"status": "completed",
"worker": "python"
}
producer.send("task-result", result)


C# Worker Example (.NET Core)


using Confluent.Kafka;
using System.Text.Json;

var config = new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = "csharp-worker" };
var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
var producer = new ProducerBuilder<Null, string>(new ProducerConfig { BootstrapServers = "localhost:9092" }).Build();

consumer.Subscribe("task-submission");

while (true)
{
var consumeResult = consumer.Consume();
var task = JsonSerializer.Deserialize<Dictionary<string, object>>(consumeResult.Message.Value);

var result = new {
jobId = task["jobId"],
status = "done",
worker = "csharp"
};

producer.Produce("task-result", new Message<Null, string> {
Value = JsonSerializer.Serialize(result)
});
}

Monitoring & Logging

  • Use Prometheus + Grafana to monitor worker throughput and failures.

  • Add structured logs with jobId for end-to-end traceability.

Local Testing Tips

  • Use Docker to spin up Kafka quickly (e.g., Bitnami Kafka).

  • Use test producers/consumers (kafka-console-producer, kafka-console-consumer) to verify topics.

  • Use Postman or cURL to submit jobs via Spring Boot.

Benefits of This Architecture

FeatureBenefit
Kafka decouplingWorkers can scale independently
Multi-language supportBest language per use case
Spring Boot OrchestratorCentral control and REST API
Standard JSON formatEasy integration and testing

Conclusion

This architecture empowers teams to build distributed, language-agnostic workflows powered by Kafka. By combining the orchestration strength of Spring Boot with the flexibility of multi-language workers, you can build scalable, fault-tolerant systems that grow with your needs.

· 3 min read
Byju Luckose

In this blog post, we'll build a real-world application that combines Spring StateMachine, Apache Kafka, and CSV-based document ingestion to manage complex document lifecycles in a scalable and reactive way.

Use Case Overview

You have a CSV file that contains many documents. Each row defines a document and an event to apply (e.g., START, COMPLETE). The system should:

  1. Read the CSV file

  2. Send a Kafka message for each row

  3. Consume the Kafka message

  4. Trigger a Spring StateMachine transition for the related document

  5. Persist the updated document state

Sample CSV Format


documentId,title,state,event
doc-001,Contract A,NEW,START
doc-002,Contract B,NEW,START
doc-003,Report C,PROCESSING,COMPLETE

Technologies Used

  • Java 17

  • Spring Boot 3.x

  • Spring StateMachine

  • Spring Kafka

  • Apache Commons CSV

  • H2 Database

Enum Definitions


public enum DocumentState {
NEW, PROCESSING, COMPLETED, ERROR
}

public enum DocumentEvent {
START, COMPLETE, FAIL
}

StateMachine Configuration

@Configuration
@EnableStateMachineFactory
public class DocumentStateMachineConfig extends StateMachineConfigurerAdapter<DocumentState, DocumentEvent> {

@Override
public void configure(StateMachineTransitionConfigurer<DocumentState, DocumentEvent> transitions) throws Exception {
transitions
.withExternal().source(DocumentState.NEW).target(DocumentState.PROCESSING).event(DocumentEvent.START)
.and()
.withExternal().source(DocumentState.PROCESSING).target(DocumentState.COMPLETED).event(DocumentEvent.COMPLETE)
.and()
.withExternal().source(DocumentState.NEW).target(DocumentState.ERROR).event(DocumentEvent.FAIL);
}
}

Document Entity

@Entity
public class Document {

@Id
private String id;
private String title;

@Enumerated(EnumType.STRING)
private DocumentState state;

// Getters and Setters
}

Kafka Producer and CSV Processing


@Component
public class CsvProcessor {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void processCSV(InputStream inputStream) throws IOException {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
CSVParser parser = CSVFormat.DEFAULT.withFirstRecordAsHeader().parse(reader)) {

for (CSVRecord record : parser) {
String documentId = record.get("documentId");
String event = record.get("event");
kafkaTemplate.send("document-events", documentId, event);
}
}
}
}

REST Upload Endpoint

@RestController
@RequestMapping("/api/documents")
public class DocumentUploadController {

@Autowired
private CsvProcessor csvProcessor;

@PostMapping("/upload")
public ResponseEntity<String> upload(@RequestParam("file") MultipartFile file) throws IOException {
csvProcessor.processCSV(file.getInputStream());
return ResponseEntity.ok("CSV processed successfully");
}
}

Kafka Listener and State Transition


@Component
public class DocumentEventListener {

@Autowired
private StateMachineFactory<DocumentState, DocumentEvent> stateMachineFactory;

@Autowired
private DocumentRepository documentRepository;

@KafkaListener(topics = "document-events")
public void onMessage(ConsumerRecord<String, String> record) {
String docId = record.key();
DocumentEvent event = DocumentEvent.valueOf(record.value());

StateMachine<DocumentState, DocumentEvent> sm = stateMachineFactory.getStateMachine(docId);
sm.start();
sm.sendEvent(event);

Document doc = documentRepository.findById(docId).orElseThrow();
doc.setState(sm.getState().getId());
documentRepository.save(doc);
}
}

Document Repository


public interface DocumentRepository extends JpaRepository<Document, String> {}

Final Thoughts

This architecture provides:

  • Decoupled, event-driven state management

  • Easily testable document lifecycles

  • A scalable pattern for batch processing from CSVs

You can extend this with:

  • Retry transitions

  • Error handling

  • Audit logging

  • UI feedback via WebSockets or REST polling

Let me know if you'd like the full GitHub repo, Docker setup, or integration with a frontend uploader!