By membersound


2018-05-16 15:02:09 8 Comments

I want to read a large file, process each line and insert the results into a database. My goal is to parallelize the processing of the lines, as each process is a longrunning task. Therefore I want one thread to keep reading, multiple threads to keep processing, and one thread keep inserting in chunks to db.

I broke it down as follows:

1) read a file line by line sequentially (easy)

2) send each line to a threadpool (3 threads), as the processing is the long-running task. block further line reading while threadpool is busy.

3) write each processed line from each theadpool to StringBuffer

4) monitor that buffer size, and write the results in chunks to a database (eg each 1000 entries)

ExecutorService executor = Executors.newFixedThreadPool(3);

StringBuffer sb = new StringBuffer();

String line;
AtomicInteger count = new AtomicInteger(0);
while ((line = reader.read()) != null) {
    count.getAndIncrement();
    Future<String> future = executor.submit(() -> {
        return processor.process(line);
    });

    //PROBLEM: this blocks until the future returns
    sb.append(future.get());

    if (count.get() == 100) {
        bufferChunk = sb;
        count = new AtomicInteger(0);
        sb = new StringBuffer();

        databaseService.batchInsert(bufferChunk.toString());
    }
}

Problems:

  • future.get() will always block the reader until one future returns a result

  • the buffer "monitoring" is probably not done right

Probably I'm not doing this the right way. But how can I achieve this?

Sidenote: filesize is about 10GB, so I cannot first read the entire file into memory to prepare the parallel tasks.

3 comments

@membersound 2018-05-17 11:42:11

After deeper research, I found the BlockingExecutor presented in this answer comes closest to what I'm trying to achieve:

https://stackoverflow.com/a/43109689/1194415

It basically extends ThreadPoolExecutor combined with a Semaphore lock.

@Oleg Sklyar 2018-05-17 11:51:45

Cool, the idea seems to be similar to my last effort :) Happy you found your solution!

@Oleg Sklyar 2018-05-16 15:40:58

I find the following solution elegant. It is only one of the many possible, but it is conceptually simple and

  • it throttles the reads,
  • accumulates just the minimum amount of state to report ready at the end
  • does not require explicit handling of threads

I am only putting the actual test method here with the complete test setup and auxiliary data structures available in a dedicated GitHub repo:

private final AtomicInteger count = new AtomicInteger();

private final Consumer<String> processor = (value) -> {
    count.incrementAndGet();
};

@Test
public void onlyReadWhenExecutorAvailable() throws Exception {

    Executor executor = Executors.newCachedThreadPool();

    CompletableFuture<Void> done = CompletableFuture.completedFuture(null);
    for (Semaphore semaphore = new Semaphore(CONCURRENCY_LEVEL); ; ) {
        String value = reader.read();
        if (value == null) {
            break;
        }

        semaphore.acquire();

        CompletableFuture<Void> future = CompletableFuture.completedFuture(value)
            .thenAcceptAsync(v -> {
                processor.accept(v);
                semaphore.release();
            }, executor);

        done = done.thenCompose($ -> future);
    }
    done.get();

    assertEquals(ENTRIES, count.get());
}

@membersound 2018-05-16 15:46:07

So there is no ExecutorService that just bocks the main thread from submitting if queue is full?

@Oleg Sklyar 2018-05-16 15:47:21

Not that I know about, it queues and this is what does not work for you.

@Oleg Sklyar 2018-05-16 21:10:45

@membersound I have updated my answer to be really straightforward and avoid loops for thread blocking. Really like it now

@gioni_go 2018-05-16 15:09:43

  1. Read file size. (File.length() method) and split it to your number of threads.
  2. Use RandomAccessFile to search any new line characters previous the you indexes found at @1. https://docs.oracle.com/javase/7/docs/api/java/io/RandomAccessFile.html
  3. Send to each thread the new indexes/offsets + RandomAccessFile with read access to each.
  4. Subclass InputStream to create a new InputStream on top of RandomAccessFile and start reading.

Related Questions

Sponsored Content

57 Answered Questions

[SOLVED] How to read / convert an InputStream into a String in Java?

21 Answered Questions

30 Answered Questions

[SOLVED] Initialization of an ArrayList in one line

61 Answered Questions

[SOLVED] How to generate random integers within a specific range in Java?

  • 2008-12-12 18:20:57
  • user42155
  • 3559961 View
  • 2981 Score
  • 61 Answer
  • Tags:   java random integer

29 Answered Questions

[SOLVED] How to avoid Java code in JSP files?

  • 2010-07-05 07:24:06
  • former
  • 254550 View
  • 1548 Score
  • 29 Answer
  • Tags:   java jsp scriptlet

50 Answered Questions

[SOLVED] How do I fix android.os.NetworkOnMainThreadException?

38 Answered Questions

[SOLVED] How to efficiently iterate over each entry in a Java Map?

5 Answered Questions

35 Answered Questions

[SOLVED] How do I convert a String to an int in Java?

31 Answered Questions

[SOLVED] How do I create a file and write to it in Java?

  • 2010-05-21 19:58:55
  • Drew Johnson
  • 2482628 View
  • 1176 Score
  • 31 Answer
  • Tags:   java file-io

Sponsored Content