2
\$\begingroup\$

The code here will be directly pasted from this project. Quick summary: it is related to a Stack Overflow question.

So, basically, all the code below aims to implement CharSequence over a large text file, within the limitation of CharSequence itself (that is, up to Integer.MAX_VALUE chars). All the code below is extracted from the same package.

The process is as follows:

  • a TextDecoder is issued for one file which (supposedly) contains text; it will decode byte sequences into char sequences;
  • it instantiates one instance of a DecodingStatus; this class will keep track of how many characters were successfully decoded so far, and if necessary will report an error instead;
  • processes "waiting" for a given amount of characters will create a CharWaiter instance; they will be queues into the DecoderStatus only if their requirement (character offset) has not been meant;
  • should a decoding error happen, the DecodingStatus instance will terminate all CharWaiters for an instance.

OK; so, first things first: the code works and is mostly tested; my problem is that it is ugly. In particular, in DecodingStatus, all methods except one are synchronized. Also, error "recovery" code is duplicated in several places.

How can I improve upon this code? In particular, can I avoid making (nearly) all methods in DecodingStatus synchronized? Can I avoid duplicating error checking?

First, the CharWaiter:

import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.PriorityQueue;
import java.util.concurrent.CountDownLatch;
/**
 * A waiter on a number of available characters in a {@link TextDecoder}
 *
 * <p>When it is woken up, it will check for the status of the operation; it
 * will throw a {@link RuntimeException} if the decoding operation fails, or it
 * has waited to more characters than what is actually available.</p>
 *
 * <p>It implements {@link Comparable} since instances of this class are used in
 * a {@link PriorityQueue}.</p>
 *
 * <p>Inspired from <a href="https://stackoverflow.com/a/22055231/1093528">this
 * StackOverflow answer</a>.</p>
 *
 * @see DecodingStatus
 * @see TextDecoder#needChars(int)
 */
final class CharWaiter
 implements Comparable<CharWaiter>
{
 private final int required;
 private final CountDownLatch latch = new CountDownLatch(1);
 private int nrChars = 0;
 private IOException exception = null;
 CharWaiter(final int required)
 {
 if (required < 0)
 throw new ArrayIndexOutOfBoundsException(required);
 this.required = required;
 }
 void setNrChars(final int nrChars)
 {
 this.nrChars = nrChars;
 }
 void setException(final IOException exception)
 {
 this.exception = exception;
 }
 int getRequired()
 {
 return required;
 }
 void await()
 throws InterruptedException
 {
 latch.await();
 if (exception != null)
 throw new RuntimeException("decoding error", exception);
 if (nrChars < required)
 throw new ArrayIndexOutOfBoundsException(required);
 }
 void wakeUp()
 {
 latch.countDown();
 }
 @Override
 public int compareTo(@Nonnull final CharWaiter o)
 {
 return Integer.compare(required, o.required);
 }
 @Override
 public String toString()
 {
 return "waiting for " + required + " character(s)";
 }
}

Then, the DecodingStatus:

import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
/**
 * The watchdog class for a text decoding operation
 *
 * <p>This class takes care of {@link CharWaiter}s and callers to {@link
 * TextDecoder#getTotalChars()}.</p>
 *
 * <p>The decoding process in {@link TextDecoder} will update the internal
 * status of this object when the decoding operation makes progress; on an
 * update, this class will wake up the relevant waiters.</p>
 *
 * <p>In the event of an error, all waiters are woken up.</p>
 *
 * @see CharWaiter
 */
@ThreadSafe
final class DecodingStatus
{
 private boolean finished = false;
 private int nrChars = -1;
 private IOException exception = null;
 private final Queue<CharWaiter> waiters = new PriorityQueue<>();
 private final CountDownLatch endLatch = new CountDownLatch(1);
 synchronized boolean addWaiter(final CharWaiter waiter)
 {
 if (exception != null)
 throw new RuntimeException("decoding error", exception);
 final int required = waiter.getRequired();
 if (required <= nrChars)
 return false;
 if (!finished) {
 waiters.add(waiter);
 return true;
 }
 if (required > nrChars)
 throw new ArrayIndexOutOfBoundsException(required);
 return false;
 }
 synchronized void setNrChars(final int nrChars)
 {
 this.nrChars = nrChars;
 CharWaiter waiter;
 while (!waiters.isEmpty()) {
 waiter = waiters.peek();
 if (waiter.getRequired() > nrChars)
 break;
 waiter.setNrChars(nrChars);
 waiters.remove().wakeUp();
 }
 }
 synchronized void setFailed(final IOException exception)
 {
 this.exception = exception;
 final List<CharWaiter> list = new ArrayList<>(waiters);
 waiters.clear();
 for (final CharWaiter waiter: list) {
 waiter.setException(exception);
 waiter.wakeUp();
 }
 endLatch.countDown();
 }
 synchronized void setFinished(final int nrChars)
 {
 finished = true;
 this.nrChars = nrChars;
 final List<CharWaiter> list = new ArrayList<>(waiters);
 waiters.clear();
 for (final CharWaiter waiter: list) {
 waiter.setNrChars(nrChars);
 waiter.wakeUp();
 }
 endLatch.countDown();
 }
 int getTotalSize()
 {
 try {
 endLatch.await();
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 throw new RuntimeException("interrupted", e);
 }
 if (exception != null)
 throw new RuntimeException("decoding error", exception);
 return nrChars;
 }
 @Override
 public synchronized String toString()
 {
 if (exception != null)
 return "decoding error after reading " + nrChars + " character(s)";
 return "currently decoded: " + nrChars + " character(s); finished: "
 + finished;
 }
}

And finally, the TextDecoder:

import com.github.fge.largetext.LargeText;
import com.github.fge.largetext.LargeTextFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.TreeRangeMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import java.io.Closeable;
import java.io.IOException;
import java.nio.CharBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
 * Text file decoder
 *
 * <p>This is the first core class of this package (the second is {@link
 * TextLoader}. Its role is to decode a text file chunk by chunk. The size of
 * chunks to use is determined when you build your {@link LargeTextFactory}.</p>
 *
 * <p>{@link LargeText} will call upon this class to obtain a {@link TextRange}
 * (or a list of them) containing the character at a given index (or the range
 * of characters), by using the methods {@link #getRange(int)} and {@link
 * #getRanges(com.google.common.collect.Range)} respectively.</p>
 *
 * <p>These methods are blocking, but they <em>do not</em> throw {@link
 * InterruptedException}; if an interruption occurs, these methods reset the
 * thread interruption status and throw the appropriate {@link
 * RuntimeException} (for instance, an {@link ArrayIndexOutOfBoundsException} if
 * the requested offset exceeds the number of characters in the file).</p>
 *
 * <p>Implementation note: this class uses a <em>single threaded</em> {@link
 * ExecutorService} to perform the decoding operation. Decoding is not done in
 * parallel, and cannot be, since it is not guaranteeed that a byte mapping can
 * be decoded exactly to a character sequence (for instance, using UTF-8, the
 * end of the mapping may contain one byte only of a three-byte sequence).</p>
 *
 * @see DecodingStatus
 */
@ThreadSafe
public final class TextDecoder
 implements Closeable
{
 private static final ThreadFactory THREAD_FACTORY
 = new ThreadFactoryBuilder().setDaemon(true).build();
 private final ExecutorService executor
 = Executors.newSingleThreadExecutor(THREAD_FACTORY);
 private final DecodingStatus status = new DecodingStatus();
 @GuardedBy("ranges")
 private final RangeMap<Integer, TextRange> ranges = TreeRangeMap.create();
 private final FileChannel channel;
 private final Charset charset;
 private final long fileSize;
 private final long targetMapSize;
 /**
 * Constructor; don't use directly!
 *
 * @param channel the {@link FileChannel} to the target file
 * @param charset the character encoding to use
 * @param targetMapSize the target byte mapping size
 * @throws IOException error obtaining information on the channel
 */
 public TextDecoder(final FileChannel channel, final Charset charset,
 final long targetMapSize)
 throws IOException
 {
 this.channel = channel;
 fileSize = channel.size();
 this.targetMapSize = targetMapSize;
 this.charset = charset;
 executor.submit(decodingTask());
 }
 /**
 * Return the appropriate text range containing the character at the given
 * offset
 *
 * @param charOffset the offset
 * @return the appropriate {@link TextRange}
 * @throws RuntimeException method has been interrupted, or a decoding error
 * has occurred
 * @throws ArrayIndexOutOfBoundsException offset requested is out of range
 */
 public TextRange getRange(final int charOffset)
 {
 try {
 needChars(charOffset + 1);
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 throw new RuntimeException("Interrupted", e);
 }
 synchronized (ranges) {
 return ranges.get(charOffset);
 }
 }
 /**
 * Return an ordered iterable of text ranges covering the requested range
 *
 * @param range the range
 * @return the appropriate list of text ranges
 * @throws RuntimeException method has been interrupted, or a decoding error
 * has occurred
 * @throws ArrayIndexOutOfBoundsException range is out of bounds for this
 * decoder
 */
 public List<TextRange> getRanges(final Range<Integer> range)
 {
 try {
 needChars(range.upperEndpoint());
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 throw new RuntimeException("Interrupted", e);
 }
 final Collection<TextRange> ret;
 synchronized (ranges) {
 ret = ranges.subRangeMap(range).asMapOfRanges().values();
 }
 return ImmutableList.copyOf(ret);
 }
 /**
 * Return the total number of characters in this decoder
 *
 * <p>This method sleeps until the decoding operation finishes (either
 * successfully or with an error).</p>
 *
 * @return the total number of characters
 * @throws RuntimeException method has been interrupted, or a decoding error
 * has occurred
 *
 * @see DecodingStatus#getTotalSize()
 */
 public int getTotalChars()
 {
 return status.getTotalSize();
 }
 @Override
 public void close()
 throws IOException
 {
 executor.shutdown();
 }
 private void needChars(final int needed)
 throws InterruptedException
 {
 final CharWaiter waiter = new CharWaiter(needed);
 if (status.addWaiter(waiter))
 waiter.await();
 }
 // TODO: move to another class?
 private Runnable decodingTask()
 {
 return new Runnable()
 {
 @Override
 public void run()
 {
 final CharsetDecoder decoder = charset.newDecoder()
 .onMalformedInput(CodingErrorAction.REPORT)
 .onUnmappableCharacter(CodingErrorAction.REPORT);
 final CharBuffer charMap
 = CharBuffer.allocate((int) targetMapSize);
 long byteOffset = 0L;
 int charOffset = 0;
 TextRange range;
 while (byteOffset < fileSize) {
 try {
 range = nextRange(byteOffset, charOffset, decoder,
 charMap);
 if (range.getByteRange().isEmpty())
 throw new IOException("unable to read file as text "
 + "starting from byte offset " + byteOffset);
 } catch (IOException e) {
 status.setFailed(e);
 break;
 }
 byteOffset = range.getByteRange().upperEndpoint();
 charOffset = range.getCharRange().upperEndpoint();
 status.setNrChars(charOffset);
 synchronized (ranges) {
 ranges.put(range.getCharRange(), range);
 }
 }
 status.setFinished(charOffset);
 }
 };
 }
 private TextRange nextRange(final long byteOffset, final int charOffset,
 final CharsetDecoder decoder, final CharBuffer charMap)
 throws IOException
 {
 long nrBytes = Math.min(targetMapSize, fileSize - byteOffset);
 final MappedByteBuffer byteMap
 = channel.map(FileChannel.MapMode.READ_ONLY, byteOffset, nrBytes);
 charMap.rewind();
 decoder.reset();
 final CoderResult result = decoder.decode(byteMap, charMap, true);
 // FIXME
 if (result.isUnmappable())
 result.throwException();
 /*
 * Incomplete byte sequence: in this case, the mapping position reflects
 * what was actually read; change the mapping size
 */
 if (result.isMalformed())
 nrBytes = (long) byteMap.position();
 return new TextRange(byteOffset, nrBytes, charOffset,
 charMap.position());
 }
}
asked Mar 30, 2014 at 2:32
\$\endgroup\$

1 Answer 1

1
\$\begingroup\$
  1. TextRandge could be declared inside the loop and could be final:

    TextRange range;
    while (byteOffset < fileSize) {
    

    The same is true for waiter here:

    CharWaiter waiter;
    while (!waiters.isEmpty()) {
     waiter = waiters.peek();
    

    (Effective Java, Second Edition, Item 45: Minimize the scope of local variables)

  2. I'd rename this to createDecodingTask:

    // TODO: move to another class?
    private Runnable decodingTask()
    

    From Clean Code, Chapter 2: Meaningful Names:

    Method Names

    Methods should have verb or verb phrase names like postPayment, deletePage, or save. [...]

  3. For the TODO comment above: I'm usually working with a lot smaller classes than 240 lines, I'd move it to another class. It makes testing easier.

  4. I'd wait here a little bit with executor.awaitTermination and check its return value.

    @Override
    public void close()
     throws IOException
    {
     executor.shutdown();
    }
    

    I see that the code uses daemon threads but if the application doesn't stop entirely, just closes the TextDecoder it may have some threads running in the background needlessly and without any notice. Failing early usually helps preventing bigger bugs/shutdowns.

  5. Moving this code to another class and passing an Executor to the constructor would help testing also (probably with a current thread executor):

    private static final ThreadFactory THREAD_FACTORY
     = new ThreadFactoryBuilder().setDaemon(true).build();
    private final ExecutorService executor
     = Executors.newSingleThreadExecutor(THREAD_FACTORY);
    

    It would separate the concerns, you could test the parts isolated.

  6. Starting another thread/Runnable from the constructor smells a little bit. It looks safe as its the last call in the constructor but I'd still try to avoid that.

  7. You could move this logic to a checkException method, it's used twice:

    if (exception != null)
     throw new RuntimeException("decoding error", exception);
    
  8. The code already uses Guava, so I'd consider using Preconditions.checkArgument here (although it throws IllegalArgumentException):

    if (required < 0)
     throw new ArrayIndexOutOfBoundsException(required);
    
  9. A more descriptive name than nrChars (loadedChars, readChars, decodedChars?) and a named constants for -1 (with a descriptive name) would help readers here:

    private int nrChars = -1;
    
  10. I would be a little bit more defensive here and check that the nrChars parameter's value is higher than the field's value:

    synchronized void setNrChars(final int nrChars)
    {
     this.nrChars = nrChars;
    

    (The Pragmatic Programmer: From Journeyman to Master by Andrew Hunt and David Thomas: Dead Programs Tell No Lies.)

  11. The following is used more than once, I'd create a method for it:

    final List<CharWaiter> list = new ArrayList<>(waiters);
    waiters.clear();
    
  12. I don't think that DecodingStatus is thread-safe. Reading of nrChars should be also in a synchronized block (in getTotalSize()).

    [...] synchronization has no effect unless both read and write operations are synchronized.

    Source: Effective Java, 2nd Edition, Item 66: Synchronize access to shared mutable data

    Locking is not just about mutual exclusion; it is also about memory visibility. To ensure that all threads see the most up-to-date values of shared mutable variables, the reading and writing threads must synchronize on a common lock.

    Source: Java Concurrency in Practice, 3.1.3. Locking and Visibility.

answered Mar 30, 2014 at 11:11
\$\endgroup\$
2
  • \$\begingroup\$ As to point 1, I prefer it this way... Minimizing the scope is all good all well, but I prefer to declare my variables out of the loop in this case. \$\endgroup\$ Commented Mar 30, 2014 at 11:24
  • \$\begingroup\$ Point 8: no! I implement CharSequence here, so I need to throw this exception \$\endgroup\$ Commented Mar 30, 2014 at 11:31

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.