Jump to content

Race Condition Woes in Multi-threaded Data Processing?

Posted

I'm encountering an intermittent bug in my Java application that processes large datasets using multiple threads. It seems to be a race condition, but I'm struggling to pinpoint the exact cause and implement a robust solution. Sometimes, a record is processed twice, or completely skipped, even with what I believed were proper synchronization mechanisms.

Here's a simplified code snippet demonstrating the core logic:

public class DataProcessor {

private List<String> sharedQueue = new Collections.synchronizedList(new ArrayList<>());

private Set<String> processedItems = Collections.synchronizedSet(new HashSet<>()); // To track processed items

public void addItem(String item) {

sharedQueue.add(item);

}

public void processData() {

// Assume multiple worker threads call this concurrently

while (!sharedQueue.isEmpty()) {

String data = null;

synchronized (sharedQueue) { // Synchronize on sharedQueue

if (!sharedQueue.isEmpty()) {

data = sharedQueue.remove(0);

}

}

if (data != null) {

if (!processedItems.contains(data)) { // Check if already processed

// Simulate complex processing

try {

Thread.sleep(50); // Simulate some work

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

System.out.println("Processing: " + data);

processedItems.add(data);

} else {

System.out.println("Skipping already processed: " + data);

}

}

}

}

public static void main(String[] args) {

DataProcessor processor = new DataProcessor();

for (int i = 0; i < 100; i++) {

processor.addItem("Item_" + i);

}

// Create and start multiple worker threads

for (int i = 0; i < 5; i++) {

new Thread(processor::processData).start();

}

}

}

Featured Replies

You're hitting a race condition between checking processedItems.contains(data) and processedItems.add(data) — they're not atomic together. Two threads can process the same item before it's added to the set.

Fix: Wrap that check-add block in a synchronized section or use a thread-safe structure like ConcurrentHashMap with putIfAbsent() for atomicity:

ConcurrentMap<String, Boolean> processedItems = new ConcurrentHashMap<>();

if (processedItems.putIfAbsent(data, true) == null) {
    // process safely
}

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...