Avoid synchronized(this) in Java?

Mutable state is evil. Especially when shared between threads it needs to be synchronized. Otherwise our program is broken (see Java Concurrency in Practice, Ch.2).

We will have a look on a non-trivial example and see, why it may be a not-so-good idea to synchronize on this.

Future

“A Future is an object holding a value which may become available at some point. This value is usually the result of some other computation.”

Taken from Scala Docs

Let me sketch out a possible Javaslang Future implementation.

interface Future<T> {

    static <T> Future<T> of(CheckedSupplier<T> computation) {
        // ...
    }

    Option<Try<T>> getValue();

    boolean isComplete();

    void onComplete(Consumer<? super Try<T>> action);

}

We create a Future with the static factory method of, we may ask, if the computation is already completed, get the value (if present) and register an asynchronous callback via onComplete.

This simple interface has many implications when thinking of the internal state of a Future implementation. For example callbacks may be registered before or after the result is concurrently computed by another thread.

Without going further into detail, I want to show the implementation. Please note that Java's CompletableFuture has more than 2300 lines of code and uses sun.misc.Unsafe.

class FutureImpl<T> implements Future<T> {

    private final Executor executor;

    // Once the Future is completed, the value is defined
    // and the actions are performed
    private volatile Option<Try<T>> value = None.instance();
    private Queue<Consumer<? super Try<T>>> actions = Queue.empty();

    FutureImpl(Executor executor) {
        this.executor = executor;
    }

    @Override
    public Option<Try<T>> getValue() {
        return value;
    }

    @Override
    public boolean isComplete() {
        return value.isDefined();
    }

    @Override
    public void onComplete(Consumer<? super Try<T>> action) {
        if (value.isDefined()) {
            perform(action);
        } else {
            synchronized (this) {
                if (value.isDefined()) {
                    perform(action);
                } else {
                    actions = actions.enqueue(action);
                }
            }
        }
    }

    // Running outside the constructor in order to have 'this'
    // fully initialized
    void run(CheckedSupplier<T> computation) {
        executor.execute(() -> {

            final Try<T> value = Try.of(computation);
            final Queue<Consumer<? super Try<T>>> actions;

            synchronized (this) {
                this.value = new Some<>(value);
                actions = this.actions;
                this.actions = null;
            }

            actions.forEach(this::perform);
        });
    }

    private void perform(Consumer<? super Try<T>> action) {
        executor.execute(() -> action.accept(value.get()));
    }
}

This is the missing factory method:

interface Future<T> {

    static <T> Future<T> of(CheckedSupplier<T> computation) {
        final FutureImpl<T> future = new FutureImpl<>(commonPool());
        future.run(computation);
        return future;
    }

    // ...
}

Pretty straightforward, right? The Future is initialized with an Executor which creates threads. The computation is described by a CheckedSupplier which runs on a separate thread. The result of type Try denotes that the computation may fail. In this case the result is a Failure (containing the exception), otherwise a Success (containing the value).

Let's try it out.

public static void main(String[] args) {

    final Future<String> future = Future.of(() -> {
        Thread.sleep(1000);
        return "Hello";
    });

    future.onComplete(System.out::println);

    while (!future.isComplete()) {
        Try.run(() -> {
            System.out.println("Waiting...");
            Thread.sleep(500);
        });
    }
}

The output is

Waiting...  
Waiting...  
Success(Hello)  

Let it crash

A simple change of our example and the test will run forever. We add an additional synchronized block.

synchronized(future) {  
    while (!future.isComplete()) {
        Try.run(() -> {
            System.out.println("Waiting...");
            Thread.sleep(500);
        });
    }
}

The output now is

Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
Waiting...  
...

“Do not synchronize on objects that may be reused.”

Software Engineering Institute, CERT Oracle Coding Standard for Java, LCK01-J

The reason for the infinite loop is, that the main method acquires a lock on the local variable future. The future itself needs a lock on this to complete, which can't happen because the main method releases the lock after the future completed. Boom.

The solution is simple, we need to synchronize on an additional instance variable of FutureImpl.

// usage: synchronized(lock) { ... }
private final Object lock = new Object();  

The source code of this example can be found here.

I hope this brought some insight on the question, why it may not a good idea to synchronize on this.

Daniel