Category Archives: JVM

Beware: java.nio.file.WatchService is subtly broken on Linux

This blog describes a bug that I reported to Oracle a month or so ago but still doesn't seem to have made it's way through to the official tracker.

The problem is that on Linux, file system events that should be being delivered by WatchService events can be silently discarded or be delivered against the wrong WatchKey. So for example, it's possible to register two directories, A and B, with a WatchService waiting for ENTRY_CREATE events, then create a file A/C but get an event with the WatchKey for B and WatchEvent.context C.

The reason for this is a bug in the JDK's LinuxWatchService. This class wraps an inotify instance, and also a thread that spins using poll to wait for either for:

  • A file system event to be delivered on the inotify FD, or
  • A byte to arrive on a FD corresponding to a pipe which is owned by the LinuxWatchService

Whenever a registration request is made by the user of the LinuxWatchService, the request is enqueued and then a single byte is written to the other end of this pipe to wake up the background thread, which will then make the actual registration with the kernel.

The core loop of this background thread is where the bug lies. The loop body looks like this:

// wait for close or inotify event
nReady = poll(ifd, socketpair[0]);
 
// read from inotify
try {
    bytesRead = read(ifd, address, BUFFER_SIZE);
} catch (UnixException x) {
    if (x.errno() != EAGAIN)
        throw x;
    bytesRead = 0;
}
 
// process any pending requests
if ((nReady > 1) || (nReady == 1 && bytesRead == 0)) {
    try {
        read(socketpair[0], address, BUFFER_SIZE);
        boolean shutdown = processRequests();
        if (shutdown)
            break;
    } catch (UnixException x) {
        if (x.errno() != UnixConstants.EAGAIN)
            throw x;
    }
}
 
// iterate over buffer to decode events
int offset = 0;
while (offset < bytesRead) {
    long event = address + offset;
    int wd = unsafe.getInt(event + OFFSETOF_WD);
    int mask = unsafe.getInt(event + OFFSETOF_MASK);
    int len = unsafe.getInt(event + OFFSETOF_LEN);
 
    // Omitted: the code that actually does something with the inotify event
}
// wait for close or inotify event
nReady = poll(ifd, socketpair[0]);

// read from inotify
try {
    bytesRead = read(ifd, address, BUFFER_SIZE);
} catch (UnixException x) {
    if (x.errno() != EAGAIN)
        throw x;
    bytesRead = 0;
}

// process any pending requests
if ((nReady > 1) || (nReady == 1 && bytesRead == 0)) {
    try {
        read(socketpair[0], address, BUFFER_SIZE);
        boolean shutdown = processRequests();
        if (shutdown)
            break;
    } catch (UnixException x) {
        if (x.errno() != UnixConstants.EAGAIN)
            throw x;
    }
}

// iterate over buffer to decode events
int offset = 0;
while (offset < bytesRead) {
    long event = address + offset;
    int wd = unsafe.getInt(event + OFFSETOF_WD);
    int mask = unsafe.getInt(event + OFFSETOF_MASK);
    int len = unsafe.getInt(event + OFFSETOF_LEN);

    // Omitted: the code that actually does something with the inotify event
}

The issue is that two read calls are made by this body — once with the inotify FD ifd, and once with the pipe FD socketpair[0]. If data happens to be available both via the pipe and via inotify, then the read from the pipe will corrupt the first few bytes of the inotify event stream! As it happens, the first few bytes of an event denote which watch descriptor the event is for, and so the issue usually manifests as an event being delivered against the wrong directory (or, if the resulting watch descriptor is not actually valid, the event being ignored entirely).

Note that this issue can only occur if you are registering watches while simultaneously receiving events. If your program just sets up some watches at startup and then never registers/cancels watches again you probably won't be affected. This, plus the fact that it is only triggered by registration requests and events arriving very close together, is probably why this bug has gone undetected since the very first release of the WatchService code.

I've worked around this myself by using the inotify API directly via JNA. This reimplementation also let me solve a unrelated WatchService "feature", which is that WatchKey.watchable can point to the wrong path in the event that a directory is renamed. So if you create a directory A, start watching it for EVENT_CREATE events, rename the directory to B, and then create a file B/C the WatchKey.watchable you get from the WatchService will be A rather than B, so naive code will derive the incorrect full path A/C for the new file.

In my implementation, a WatchKey is invalidated if the directory is watches is renamed, so a user of the class has the opportunity to reregister the new path with the correct WatchKey.watchable if they so desire. I think this is much saner behaviour!

Asynchronous and non-blocking IO

This post aims to explain the difference between asynchronous and non-blocking IO, with particular reference to their implementation in Java. These two styles of IO API are closely related but have a number of important differences, especially when it comes to OS support.

Asynchronous IO

Asynchronous IO refers to an interface where you supply a callback to an IO operation, which is invoked when the operation completes. This invocation often happens to an entirely different thread to the one that originally made the request, but this is not necessarily the case. Asynchronous IO is a manifestation of the "proactor" pattern.

One common way to implement asynchronous IO is to have a thread pool whose threads are used to make the normal blocking IO requests, and execute the appropriate callbacks when these return. The less common implementation approach is to avoid a thread pool, and just push the actual asynchronous operations down into the kernel. This alternative solution obviously has the disadvantage that it depends on operating system specific support for making async operations, but has the following advantages:

  • The maximum number of in-flight requests is not bounded by the size of your thread pool
  • The overhead of creating thread pool threads is avoided (e.g. you need not reserve any memory for the thread stacks, and you don't pay the extra context switching cost associated with having more schedulable entities)
  • You expose more information to the kernel, which it can potentially use to make good choices about how to do the IO operations — e.g. by minimizing the distance that the disk head needs to travel to satisfy your requests, or by using native command queueing.

Operating system support for asynchronous IO is mixed:

  • Linux has at least two implementations of async IO:
    • POSIX AIO (aio_read et al). This is implemented on Linux by glibc, but other POSIX systems (Solaris, OS X etc) have their own implementations. The glibc implementation is simply a thread pool based one — I'm not sure about the other systems.
    • Linux kernel AIO (io_submit et al). No thread pool is used here, but it has quite a few limitations (e.g. it only works for files, not sockets, and has alignment restrictions on file reads) and does not seem to be used much in practice.

    There is a good discussion of the *nix AIO situation on the libtorrent blog, summarised by the same writer on Stack Overflow here. The experience of this author was that the limitations and poor implementation quality of the various *nix AIO implementations are such that you are much better off just using your own thread pool to issue blocking operations.

  • Windows provides a mechanism called completion ports for performing asynchronous IO. With this system:
    1. You start up a thread pool and arrange for each thread to spin calling GetQueuedCompletionStatus
    2. You make IO requests using the normal Windows APIs (e.g. ReadFile and WSARecv), with the small added twist that you supply a special LPOVERLAPPED parameter indicating that the calls should be non-blocking and the result should be reported to the thread pool
    3. As IO completes, thread pool threads blocked on GetQueuedCompletionStatus are woken up as necessary to process completion events

    Windows intelligently schedules how it delivers GetQueuedCompletionStatus wakeups, such that it tries to roughly keep the same number of threads active at any time. This avoids excessive context switching and scheduler transitions — things are arranged so that a thread which has just processed a completion event will likely be able to immediately grab a new work item. With this arrangement, your pool can be much smaller than the number of IO operations you want to have in-flight: you only need to have as many threads as are required to process completion events.

In Java, support for asynchronous IO was added as part of the NIO2 work in JDK7, and the appropriate APIs are exposed by the AsynchronousChannel class. On *nix, AsynchronousFileChannel and AsynchronousSocketChannel are implemented using the standard thread pool approach (the pools are owned by an AsynchronousChannelGroup). On Windows, completion ports are used — in this case, the AsynchronousChannelGroup thread poll is used as the GetQueuedCompletionStatus listeners.

If you happen to be stuck on JDK6, your only option is to ignore completion ports and roll your own thread pool to dispatch operations on e.g. standard synchronous FileChannels. However, if you do this you may find that you don't actually get much concurrency on Windows. This happens because FileChannel.read(ByteBuffer, long) is needlessly crippled by taking a lock on the whole FileChannel. This lock is needless because FileChannel is otherwise a thread-safe class, and in order to make sure your positioned read isn't interfering with the other readers you don't need to lock — you simply need to issue a ReadFile call with a custom position by using one of the fields of the LPOVERLAPPED struct parameter. Note that the *nix implementation of FileChannel.read does the right thing and simply issues a pread call without locking.

Non-blocking IO

Non-blocking IO refers to an interface where IO operations will return immediately with a special error code if called when they are in a state that would otherwise cause them to block. So for example, a non-blocking recv will return immediately with a EAGAIN or EWOULDBLOCK error code if no data is available on the socket, and likewise send will return immediately with an error if the OS send buffers are full. Generally APIs providing non-blocking IO will also provide some sort of interface where you can efficiently wait for certain operations to enter a state where invoking the non-blocking IO operation will actually make some progress rather than immediately returning. APIs in this style are implementations of the reactor pattern.

No OS that I know of implements non-blocking IO for file IO, but support for socket IO is generally reasonable:

  • Non-blocking read and writes are available via the POSIX O_NONBLOCK operating mode, which can be set on file descriptors (FDs) representing sockets and FIFOs.

  • POSIX provides select and poll which let you wait for reads and writes to be ready on several FDs. (The difference between these two is pretty much just that select lets you wait for a number of FDs up to FD_SETSIZE, while poll can wait for as many FDs as you are allowed to create.)

    Select and poll have the major disadvantage that when the kernel returns from one of these calls, you only know the number of FDs that got triggered — not which specific FDs have become unblocked. This means you later have to do a linear time scan across each of the FDs you supplied to figure out which one you actually need to use.

  • This limitation motivated the development of several successor interfaces. BSD & OS X got kqueue, Solaris got /dev/poll, and Linux got epoll. Roughly speaking, these interfaces lets you build up a set of FDs you are interested in watching, and then make a call that returns to you a list those of FDs in the set that were actually triggered.

    There's lots of good info about these mechanisms at the classic C10K page. If you like hearing someone who clearly knows what he is talking about rant for 10 minutes about syscalls, this Bryan Cantrill bit about epoll is quite amusing.

  • Unfortunately, Windows never got one of these successor mechanisms: only select is supported. It is possible to do an epoll-like thing by kicking off an operation that would normally block (e.g. WSARecv) with a specially prepared LPOVERLAPPED parameter, such that you can wait it to complete using WSAWaitForMultipleEvents. Like epoll, when this wait returns it gives you a notion of which of the sockets of interest caused the wakeup. Unfortunately, this API won't let you wait for more than 64 events — if you want to wait for more you need to create child threads that recursively call WSAWaitForMultipleEvents, and then wait on those threads!

  • The reason that Windows support is a bit lacking here is that they seem to expect you to use an asynchronous IO mechanism instead: either completion ports, or completion handlers. (Completion handlers are implemented using the windows APC mechanism and are a form of callback that don't require a thread pool — instead, they are executed in the spare CPU time when the thread that issued the IO operation is otherwise suspended, e.g. in a call to WaitForMultipleObjectsEx).

In Java, non-blocking IO has been exposed via SelectableChannel since JDK4. As I mentioned above, OS support for non-blocking IO on files is nonexistant — correspondingly, Java's SocketChannel extends SelectableChannel, but FileChannel does not.

The JDK implements SelectableChannel using whatever the platform-appropriate API is (i.e. epoll, kqueue, /dev/poll, poll or select). The Windows implementation is based on select — to ameliorate the fact that select requires a linear scan, the JDK creates a new thread for every 1024 sockets being waited on.

Conclusions

Let's say that you want to do Java IO in a non-synchronous way. The bottom line is:

  • If you want to do IO against files, your only option is asynchronous IO. You'll need to roll it yourself with JDK6 and below (and the resulting implementation won't be as concurrent as you expect Windows). On the other hand, with Java 7 and up you can just use the built-in mechanisms, and what you'll get is basically as good as the state-of-the-art.

  • If you want to do IO against sockets, an ideal solution would use non-blocking IO on *nix and asynchronous IO on Windows. This is obviously a bit awkward to do, since it involves working with two rather different APIs. There might be some project akin to libuv that wraps these two mechanisms up into a single API you can write against, but I don't know of it if so.

    The Netty project is an interesting data point. This high performance Java server is based principally on non-blocking IO, but they did make an abortive attempt to use async IO instead at one point — it was backed out because there was no performance advantage to using async IO instead of non-blocking IO on Linux. Some users report that the now-removed asynchronous IO code drastically reduces CPU usage on Windows, but others report that Java's dubious select-based implementation of Windows non-blocking IO is good enough.

C#-Style Events In Scala

As you may know, C# has a rather lovely little feature whereby you can setup an event on a class, like so:

public event EventHandler MyEvent;

Where "EventHandler" is a delegate (i.e. function pointer) type. The built-in EventHandler delegate just has the signature (Object, EventArgs) -> Void, so from within the class we can invoke the event like so:

if (MyEvent != null)
{ MyEvent(this, EventArgs.Empty); }

And this will cause any anonymous delegates or methods which have been attached to the event to be invoked with those arguments. This attachment and detachment is done like so:

myInstance.MyEvent += this.myInstance_MyEvent;
myInstance.MyEvent -= this.myInstance_MyEvent;

Where we have created a method on the subscribing instance called myInstance_MyEvent with the appropriate signature. We could also have supplied an anonymous delegate to the event, but would need to keep a reference to it around in order to remove it later:

EventHandler myHandler = delegate(object sender, EventArgs e) { Console.WriteLine("MyEvent triggered from " + sender); };

myInstance.MyEvent += myHandler;
myInstance.MyEvent -= myHandler;

Right, so far so ordinary. As part of my ongoing quest to reimplement C# in Scala (see also here) , I'll try and code up an equivalent system in Scala, which does not have a built in event system. The approach I took was to define a class, Event:

class Event[T]() {

  private var invocationList : List[T => Unit] = Nil

  def apply(args : T) {
    for (val invoker <- invocationList) {
      invoker(args)
    }
  }

  def +=(invoker : T => Unit) {
    invocationList = invoker :: invocationList
  }

  def -=(invoker : T => Unit) {
    invocationList = invocationList filter ((x : T => Unit) => (x != invoker))
  }

}

This is a pretty simple bit of code. It just defines two methods with the same names as the operators of C# which add or remove the functions they receive as arguments to and from an invocation list, and a method with the special name "apply" which will be invoked when the event is supplied with an argument list by the usual juxtaposition convention. Now, let's take a look at some example code that uses this:

val myEvent = new Event[Int]()

val functionValue = ((x : Int) => println("Function value called with " + x))
myEvent += functionValue
myEvent(4)
myEvent -= functionValue
myEvent(5)

This will simply print "Anonymous function called with 4". Great! Looks like a very direct equivalent of C#s mechanism. But alas, there is a bit of a subtlety. What about if we have a method such as this:

def method(x : Int) = println("Class method called with " + x)

And we then try and do something like this:

myEvent += method
myEvent(2)
myEvent -= method
myEvent(3)

What you actually find is that you get notification on the console of both the 2 and the 3 that were passed to the event! Why is this? Well, if you look at the generated code in a Java decompiler you will find that actually Scala internally creates a class to coerce the class method into its internal representation of a function. The problem is that his coercion is done separately for both usages of "method", which leads to two function objects which are not reference equal and hence the invocation list never has its item removed!

Sidenote: actually, interestingly the Scala compiler creates entirely different CLASSES for each coercion, even though these classes have identical functionality!

The only sensible way this code could be fixed is for a more intuitive implementation of equality to be given to the adapter classes generated in this way (obviously we cannot compare functions for equality in general!).

Happily, there is a workaround, but it smells a bit. Instead of declaring a method like that, declare it like this:

val memberFunctionValue = (x : Int) => {
  println("Member function value called with " + x)
}

This creates a member function value instead of a method, which looks the same for its callers from Scala code (but not from Java). This means that reference equality does the right thing when trying to remove the "method" from the event.

Another irritating limitation of the approach is that anyone is allowed to raise the event. This is in contrast to C#, where only the class hierarchy that declared it has such access: a much better design principle. On the basis that Scala supports syntax such as the following for property access:

var _foo : Int

def foo() = _foo

def foo_=(value : Int) {
  _foo = value
}

I tried to define a class like this:

class MyClass {

    private val _myEvent = new Event[Int]()

    public def myEvent_+=(function : Int => Unit) {
        _myEvent += function
    }

    public def myEvent_-=(function : Int => Unit) {
        _myEvent -= function
    }

    public def doSomethingCausingMyEventToRaise() {
        // ...
        _myEvent(1)
        // ...
    }

}

This encapsulates the event, allowing us to control exactly who can invoke it. Unfortunately, code of the form:

val myInstance = new MyClass
myInstance.myEvent += method
myInstance.doSomethingCausingMyEventToRaise
myInstance.myEvent -= method

Does not compile: the myEvent_ style method names I added to the class are simply not used, and that code instead tries to find a (non existent) field called myEvent in MyClass. Of course, calling the myEvent_ methods of MyClass directly does work, but we lose the nice feature of the syntax whereby events feel like they are built into the language.

So overall things didn't work out great for replicating C# events in Scala, but it was still an interesting voyage of discovery. Furthermore, if we could convince the language designers to support desugaring for arbitrary methods of the form "foo_[some symbols]" rather than just the special case of properties we could begin to do some rather interesting things...

Todays Java Atrocity

class SomeLibraryClass<T> {

    public SomeLibraryClass(Class<T> klass) {
        // Use klass to overcome Java generics limitations
    }

}

public class Program {

    public static void main(String[] args) {
        // Cannot convert from Class<List> to Class<List<Integer>>: fair enough
        Class<List<Integer>> klass1 = List.class;
        // Integer and List cannot be resolved: huh, that's an odd error
        Class<List<Integer>> klass2 = List<Integer>.class;
        // Cannot cast from Class<List> to Class<List<Integer>>: they should be equivalent after erasure, but fine
        Class<List<Integer>> klass3 = (Class<List<Integer>>)List.class;
        // This works! Awesome! But it's dead ugly (and unchecked of course), and why should I have to cast
        // to Class<?> before what I want? This means the castable-to relationship is >not transitive<!
        Class<List<Integer>> klass4 = (Class<List<Integer>>)(Class<?>)List.class;
    }

}

Implementing The Disposable Pattern In Scala

One excellent feature of the CLR in general and C# in particular is deterministic finalization: the ability to control when resources are released. In C# this is realized with the IDisposable interface and the "using" keyword. Effectively, the statements:

using (File myFile = File.Open("foo.txt"))
{ /* Do something with myFile */ }

Are equivalent to the following

File myFile = File.Open("foo.txt");
try
{ /* Do something with myFile */ }
finally
{ myFile.Dispose(); }

The usual use of the Dispose method which is, as you can see above, called when "myFile" goes out of scope is to perform general clean up. In this case it would release any open handles to the file, if any: doing so in this deterministic manner is in most situations a much better idea than waiting until the garbage collector gets around to finalizing your object and doing the cleanup there, although File will of course support this paradigm to ensure that its open handles are eventually released.

There are actually more interesting uses than resource management that are made of the using keyword in C#, such as transaction management (the scope of the transaction is the scope of the using block), mock playback (mocks are automatically verified by Dispose) and many more of the class of scope management problems encountered during everyday programming (e.g. calling Begin/EndUpdate on a particular Control while you do lots of setup work on it). Hopefully this litany of examples has convinced you that we simply must have "using" added to Java - but alas that particular behemoth is unlikely to acquire this rather nice feature any time soon. Instead I have elected to show you how we can add this feature to a JVM based language, Scala, using the advanced features of its type system and functional programming paradigm.

Right, let's get started! First, let's have an interface for all things we want to consider Disposable, just as in C#:

trait Disposable[S] {
  def dispose()
}

Don't let the word "trait" throw you: although you can do some cool stuff with traits, for our purposes they are just like Java's interfaces. We have simply defined a method, dispose, on the Disposable interface that will be called to do cleanup.

Now, we could just have all things we use "using" with implement this interface, but this feels kind of limiting. This is all well and good for our own classes, but what about interfacing with Java code that doesn't know a thing about our interface? In particular, it would be nice to be able to use our new functionality with JDBC and SWT, which have a common pattern of making use of close and dispose methods respectively to perform cleanup. So, let's define two implicit conversions, known as "views" in Scala-land, from objects that define methods with those signatures to a Disposable!

  implicit def close2disposable[S](what : S { def close() }) : Disposable[S] =
    new Disposable[S] {
      def dispose() {
        try {
          what.close
        }
        catch {
          case t => ()
        }
      }
    }

  implicit def dispose2disposable[S](what : S { def dispose() }) : Disposable[S] =
    new Disposable[S] {
      def dispose() {
        what.dispose
      }
    }

This is cool stuff. These function is making use of a very nice feature of Scalas type system, structural subtyping, to operate on arbitrary objects that define particular methods. For example, the type

S { def close() }

defines a type of class S which must contain the method close, which takes no arguments and returns void (aka Unit, in Scala). With this, it is statically valid to call close() on any variable of type S within the body of our implicit conversion, and I make use of this by returning a new anonymous inner class which implements Disposable and calls the close method of the implicitly converted variable when dispose is invoked (wrapping it in appropriate exception handling, since e.g. the "close" method in JDBC can throw SQLException, and we don't want that masking exceptions thrown in the body of the using block). We define the view on things with a dispose method similarly, but this time we needn't handle exceptions since we assume anything with a dispose method is already going to be well behaved.

Notice that this is effectively statically safe duck typing, so you can have all the flexibility of a dynamic language like Python in exactly the places you need it, but keep the compile time guarantees of Scala in the majority of your code base, where you don't need that power.

Now all that remains to be done is to define the using "keyword" itself. There isn't a way to perfectly emulate the C# syntax in Scala, to the best of my knowledge, but we can come pretty damn close by defining a function which takes one curried argument which is the thing to dispose and another which is the block to dispose it after:

  def using[S <% Disposable[S], T](what : S)(block : S => T) : T = {
    try {
      block(what)
    }
    finally {
      what.dispose
    }
  }

To those uninitiated in the ways of Scala, that first line can look a little scary, so let's look at it in more detail.

def using[S <% Disposable[S], T]

tells us that we are declaring a method called "using" which is parameterized on two type arguments: S, which must have a view of type Disposable[S], and T. Next,

(what : S)(block : S => T)

says that the method takes two arguments: "what" of type S (which we specified must be Disposable in the type constraint earlier) and a block which takes one of those S objects and returns a T. The fact that brackets, rather than a comma, separate the two arguments shows that the method is curried, i.e. it may be partially applied by its callers: this makes for some nicer code later on. The final T just tells us that the method returns something of type T.

Armed with this understanding, it should be very easy to see that the body of the function is statically safe and does just the same thing as the C# example I showed at the start of this post. That's all pretty neat! Let's see some of the code you might write using this function, assuming that the method definitions I've just given have been added to an object called Disposable which accompanies the trait of the same name:

import com.omegaprime.Disposable._

class Closable {
  System.out.println("Constructed")

  def close() {
    System.out.println("Closing")
  }

  def doSomething() {
    System.out.println("Doing something")
  }
}

object Program {
  def main(args : Array[String]) {
    using (new Closable()) (closable => {
      closable.doSomething
    })
  }
}

This will print:

Constructed
Doing something
Closing

Lovely stuff! The sharp eyed reader will have noticed that the implicit conversions I defined above actually both apply for some objects (i.e. those which have both close and dispose methods): they might then be tempted to ask which conversion to Disposable gets used by the compiler for such an object. The answer for me is that it seems that the close method is always picked as the one to be called by Disposable.dispose, but I suspect this is an implementation detail, and will probably even vary from version to version: from the compilers perspective these two choices must be equally good.

I hope this article has gone some way to showing you some of the advanced features of Scala and how they can be used to solve real problems in software engineering. Note, however, that if you are going to play along at home you need at least version 2.6 of the language, as structural types were only added recently