Sunday, May 15, 2011

In Async Programming, an Ounce of Opportunism is worth a Pound of Plan

I've recently found myself tackling some moderately complex asynchronous programming projects. This type of programming can, of course, be very error-prone and hard to maintain. The topic of this post is a design pattern which I've developed over the years, which can make asynchronous programming much more tractable. I call this pattern "continuous re-evaluation".

What is continuous re-evaluation good for?

Programming tasks suitable for this design pattern generally involve the following requirements:
  • Asynchronous operations (e.g. I/O or RPCs)
  • Multiple asynchronous steps, which may overlap
  • The choice and timing of the asynchronous operations varies according to external events and the outcome of earlier operations.
These requirements arise frequently; for example, in RPC batching, or joining data from heterogenous servers. Three real-world problems I've had to solve over the years:

1. Document synchronization in the original Writely online document editor. This was the process by which edits were saved to the server, and collaborators' edits brought down to the browser. The task is complex, due to the possibility of simultaneous (and perhaps conflicting) changes; out-of-order message delivery; and the need to balance update speed against server load.

2. Implementing a responsive browser-based photo gallery. The gallery displayed one full-sized image at a time, plus a scrolling list of thumbnails for navigation. It was important to optimize response times for the initial page load, navigation to a new photo, and scrolling the thumbnail list. To implement this, the gallery used a careful prioritization scheme whereby images were prefetched according to the probability that they would soon be needed. The prioritization scheme covered both full-sized images, and thumbnails for use in the scrolling list. Outside events (e.g. the user selecting a different image, or resizing the window) would cause priorities to change on the fly.

3. Data replication in the Megastore data store. A typical transaction commit involves multiple types of RPC, such as Paxos prepare and accept operations, cache ("coordinator") invalidation, database updates, and log cleanup. Often, multiple instances of each RPC type are in flight simultaneously, alongside a variety of time-based triggers. Latency is a critical design parameter, requiring these RPCs to be very carefully orchestrated.

State machines vs. re-evaluation: to plan, or not to plan?

The classic approach to this sort of problem is to model it as a state machine. The machine begins in some initial state in which it initiates one or more asynchronous operations. As these operations complete (or fail), the machine transitions to other states; the transitions may trigger additional operations. The logic for chaining operations together is embedded in the transitions of the state machine.

This textbook approach has an appealing sense of rigor. However, in practice I've rarely found it to work very well. The primary drawback is that designing a state machine leads you to think in terms of an imperative, planned sequence of operations. In a (simplified) version of the Megastore data replication system, that planned sequence might be "create an entry in the write-ahead log; update the coordinator for any replica where the log couldn't be written; update the data tables; clean up the log". Unfortunately, such plans rarely survive contact with reality. There are always error paths, retry loops, and edge case handlers complicating your logic. It may be possible for operations to complete in several different sequences, each of which requires its own chain of state transitions. Your simple, linear plan becomes a tangle of overlapping, contingent plans: hard to write, harder to maintain, sometimes impossible to truly understand.

(State machines have other, more tactical problems as well. They are not directly supported by mainstream programming languages, forcing awkward workarounds. They force all state to be "unrolled" into a single variable, when it might have been better expressed as a collection of orthogonal variables. And, by associating code with each unrolled state, they encourage code duplication.)

In the continuous re-evaluation pattern, we abandon any notion of a plan. Instead, we ask the following question: in any given situation, what is the one thing we should do next? We don't attempt to plan beyond the immediate moment; we simply decide what action to take right now. (The choices are generally "initiate some new asynchronous operation", "set a timer", or "do nothing and await further events".) Whenever any event occurs -- an operation completes, a timer fires, one of our input variables changes, etc. -- we always do the same thing: evaluate the current situation and decide which one action, if any, to take.

I had initially thought of calling this pattern "stateless machines". However, it's not really accurate to call it stateless; any interesting algorithm requires some state to be held somewhere. So I settled on "continuous re-evaluation". In a continuous re-evaluation machine, we do maintain state, but this state does not reflect a plan; it is merely a direct description of the state of the world.

All this is best explained through examples. I'll do that in the next two sections.

Example: RPC failover

A classic case is issuing an RPC to a server pool, with failover. The idea is that any server in the pool can handle the request, and we don't care which one we use, but if our chosen server doesn't respond, we should retry on ("fail over to") another server.

Let's start with a very simple version of the problem. Suppose there are precisely two servers; we send each request to the first server; if a request fails, we resend it to the second server. Here is pseudocode for the state machine approach:

start:
  send_request(servers[0]);        // try server 0 first
  on rpc_succeeded: exit(success); // if the request succeeded, we're done
  on rpc_failed:    goto failover; // otherwise, try server 1

failover:
  send_request(server[1]);         // try server 1
  on rpc_succeeded: exit(success); // if it succeeds, great
  on rpc_failed:    exit(failure); // otherwise, give up

All well and good. However, in the real world, we usually have additional requirements. For example, if the first server doesn't respond quickly, we might want to try the second server in parallel. A simple specification for this is "after 50ms, initiate a request to the second server. Accept whichever response arrives first." Our state machine gets a little more complex now -- we need a timer to trigger the second request, and we need to keep track of whether the timer has triggered yet, and which requests are still in flight:

start:
  send_request(server[0]);
  start_timer(50);                 // kick off a 50ms timer
  on rpc_succeeded: exit(success);
  on rpc_failed:    goto failover;
  on timer:         goto timer;

failover: // server 0 failed, so try server 1 immediately (don't wait 50ms)
  send_request(server[1]);
  goto 0failed;

timer: // try server 1 while still hoping for a response from server 0
  send(server[1]);
  on rpc_succeeded: exit(success);
  on rpc_failed(0): goto 0failed;
  on rpc_failed(1): goto 1failed;

0failed: // server 0 failed; wait for response from server 1
  on rpc_succeeded(1): exit(success);
  on rpc_failed(1):    exit(failure);

1failed: // server 1 failed; wait for response from server 0
  on rpc_succeeded(0): exit(success);
  on rpc_failed(0):    exit(failure);

Now suppose we want to support an arbitrary number of servers, failing over repeatedly from one to the next. A pure state machine approach will no longer work, because information we need to keep track of (which servers have we sent requests to, which ones have responded) is too complex. The usual solution is to keep some state outside of the state machine proper. Suppose that our spec is "always maintain one in-flight request, or two in-flight requests if 50ms have elapsed since our current request was initiated; try each server at most once". We might wind up with something like this:

// Define an array that keeps track of which servers we've sent requests
// to, and the status of each request.  N is the number of servers.
enum Status { untried, pending, succeeded, failed };
Status[] statusArray = new Status[N]{untried};

start:
  goto launchRequest;

// No requests are in flight, we need to launch one.
launchRequest:
  // Choose a not-yet-tried server, and send a request to it.
  int i = <some server for which the status is "untried">;
  statusArray[i] = pending;
  send_request(server[i]);

  // Start a failover timer.
  start_timer(50);

  on rpc_succeeded: exit(success);
  on rpc_failed(j): goto failure(j);
  on timer:         goto timer;

// We had one request in flight, to server j; the request just failed.
failure(j): 
  statusArray[j] = failed;
  if <at least one server has status "untried">
    goto launchRequest; // launch a request to some other server
  goto wait;

// We've sent requests to every server, so now we're just waiting
// for responses.
wait:
  if <no server has status "pending">
    exit(failure);
  on rpc_succeeded: exit(success);
  on rpc_failed(j): goto failure(j);

// We had one request in flight, and our timer fired, so we'll kick off
// a parallel request to another server.
timer:
  if <no server has status "untried">
    goto wait;

  int i = <some server for which the status is "untried">;
  statusArray[i] = pending;
  send_request(server[i]);
  on rpc_succeeded: exit(success);
  on rpc_failed(j): goto parallelFailure(j);

// We had two requests in flight, and one of them (to server j) has
// just failed.
parallelFailure:
  statusArray[j] = failed;

  // Start a timer: if the other request doesn't complete soon,
  // we'll kick off another redundant request.  (Note, technically
  // we should reduce the timer interval by the amount of time since
  // the remaining in-flight request was launched.)
  start_timer(50);

  on rpc_succeeded: exit(success);
  on rpc_failed(j): goto failure(j);
  on timer:         goto timer;

This can be made to work, but it's quite complex, especially when you consider that the problem specification fits in a single sentence. The state machine style is not really helping. For instance, observe how we need different failure handlers for the "one rpc in flight' and "two rpc in flight" states, and how the parallelFailure state is nearly a copy-and-paste of the launchRequest state.

In the continuous re-evaluation style, we discard the state machine, and keep all state in conventional programming variables. The state we do maintain is relatively simple; we do not keep track of what has just changed (e.g. there's no equivalent to the "timer" or "parallelFailure" states), or where we are in some plan (e.g. the "launchRequest" state). We merely record the objective state of the world -- in this example, the status of each RPC we've launched.  Then, we write a single method that is invoked whenever any event occurs (e.g. a timer fires, or a response arrives). That method looks at the state of the world and decides what to do next. You can think of it as a conventional state machine with a single state. The code for our N-server failover logic might look like this:

// Keep track of the requests we've sent, and their status.
enum Status { untried, pending, succeeded, failed };
Status[] statusArray = new Status[N]{untried};

// Also keep track of when each request was sent.
Date[] requestTimes = new Date[N];

start:
  if <all servers have status "failed">
    exit(failure);

  // Make sure we have at least one in-flight request.
  int k = <number of servers with status "pending">;
  if (k < 1)
    sendRequest();

  // If we have only one in-flight request, and it's stale, start
  // another.
  if (k == 1)
    if (requestTimes[statusArray.indexOf(pending)] <= new Date().addMillis(-50))
      sendRequest();

  on rpc_succeeded: exit(success);
  on rpc_failed(j): statusArray[j] = failed; goto start;
  on timer:         goto start;

// If there are any untried servers, choose one and send a request.
function sendRequest() {
  int i = <some server for which the status is "untried", or -1 if none>;
  if (i >= 0) {
    statusArray[i] = pending;
    requestTimes[i] = new Date();
    send_request(server[i]);
    start_timer(50);
  }
}

This is a big improvement on the state machine. Not only is it shorter and easier to follow, its structure directly reflects that of the specification. It should be easy to maintain and adapt this code when the specification changes. As a side bonus, it handles the 50ms retry correctly, unlike the previous version (which waited 50ms after failure, instead of 50ms after the time when the still-outstanding request was sent).

The specification for a real-world, industrial-strength RPC implementation might be considerably more complex than the one I've used so far. For instance, we might want to:
  • Allow servers to be added or removed on the fly. (This might by triggered by a manual configuration change, or a monitoring system that removes servers from the list when they appear unhealthy.)
  • Allow the caller to specify that K servers be tried in parallel (without waiting for a 50ms staleness threshold to be reached). This would be an option, enabled for latency-sensitive requests.
  • Specify a timeout for the overall operation -- after a specified interval, we abandon any in-flight requests and report failure.
With each new feature, the traditional state machine would become that much more convoluted. But for the continuous re-evaluation version, the changes are trivial.

Example: Writely document sync

The problem of document synchronization was mentioned briefly at the beginning of this post. In the original Writely implementation, synchronization occurred through a single RPC method. The browser sent its latest changes (if any) to the server, and the server responded with any fresh changes made by collaborators. The browser would send RPCs even if no changes were occurring locally, as a way of polling for collaborators' changes.

Some of the considerations that went into the decision of when to send a sync RPC:
  • If our user has modified the document, we want to sync soon, to avoid data loss and so that any collaborators who are currently online can see the change.
  • If a collaborator is modifying the document, we want to sync frequently, so our user can see the changes promptly.
  • The acts of sending a request and receiving a response use enough CPU to cause a noticeable hiccup in typing response. So, if the user is actively typing, it's preferable to wait until they pause before syncing. (This might be less of an issue today than it was in 2005, when Writely was originally developed.)
  • If a window is inactive, it's less important to sync that window frequently. 
  • Syncing too frequently can put excessive load on the server.
  • If a sync request fails (e.g. due to server or network problems), we should retry, but not right away.
As a state machine, this would be very complex. We'd need states to keep track of the cross product of <document is modified locally>, <collaborators are online>, <collaborators have made recent edits>, <window is active>, and <last sync request failed>, among other things. We'd need to define state transitions for "user has made an edit", "sync succeeded (with remote changes)", "sync succeeded (without remote changes, but with collaborators present)", "sync succeeded (with no collaborators present)", "sync failed", "window activated", and "window deactivated", and more.

Using the continuous re-evaluation pattern, the problem decomposes much more nicely. We'll need a handful of state variables:

Date timeLastSyncRequestWasSent;
Date timeOfLastLocalChange;
Date timeOfLastRemoteChange;
boolean anyUnsavedChanges;
boolean anyCollaboratorsOnline;
boolean windowIsActive;
int syncFailuresInARow;

Maintaining these state variables is trivial. Then we just need a re-evaluation method:

Date now = new Date();
double timeSinceLastRequest  = now - timeLastSyncRequestWasSent;
double timeSinceLocalChange  = now - timeOfLastLocalChange;
double timeSinceRemoteChange = now - timeOfLastRemoteChange;

// Based on the current state, decide how often we should be syncing.
double timeBetweenSyncs;
if (anyUnsavedChanges || timeSinceRemoteChange < 60)
  timeBetweenSyncs = 10;
else if (anyCollaboratorsOnline)
  timeBetweenSyncs = 30;
else
  timeBetweenSyncs = 60;

if (!windowIsActive)
  timeBetweenSyncs *= 2;

// Back off if we've been getting server errors.
if (syncFailuresInARow > 0) {
  double backoffTime = Math.min(10 * Math.pow(2, syncFailuresInARow), 300);
  timeBetweenSyncs = Math.max(timeBetweenSyncs, backoffTime);
}

// If the last sync request is too old, sync now.
if (timeSinceLastRequest >= timeBetweenSyncs) {
  // But, if the user is actively typing, wait a bit in hopes that
  // they will pause.  Concretely, we sync only if it has been 2 seconds
  // since the last change, or we're at least 10 seconds "overdue" to
  // sync.
  if (timeSinceLocalChange > 2 ||
      timeSinceLastRequest >= timeBetweenSyncs + 10)
    sendSyncRequest();
}

Really not bad at all. As in the RPC failover case, the re-evaluation code directly reflects the original specification. In Writely, I believe we simply invoked this method once per second.

A related problem is when and how to notify the user of sync failures. For failures that may be transient (e.g. request timeout, or "server busy"), we don't necessarily need to bother the user; but we should notify them if there are multiple such failures in a row, especially if the document contains unsaved changes. For permanent failures (e.g. the user's login credentials are no longer valid), we should notify immediately. If failures continue to occur, we should not bombard the user with error messages, but we probably should remind them occasionally that their work is not being saved. The continuous re-evaluation pattern handles all this without difficultly. The state variables would be something like this:

// Number of sync requests which have failed; resets to 0 after each success.
int transientErrorsInARow;

// True if the most recent sync request failed because the user needs
// to log in again.
boolean needLogin;

boolean documentContainsUnsavedChanges;

// Time when we last displayed an error dialog.
Date timeOfLastTransientErrorWarning;
Date timeOfLastNeedLoginWarning;

And the re-evaluation method is straightforward enough that I won't bother spelling it out.

Wrapping up

...not much to wrap up, actually. The continuous re-evaluation pattern has solved many real-world problems for me. I hope it can do the same for you. As always, questions, feedback, or other comments are welcomed.

2 comments:

  1. For attempt of general solution to a problem of orchestration of async calls please see mesh.js library at https://github.com/tatumizer/mesh

    It's a javascript library, but all the concepts apply to any evented runtime. There's very detailed readme file, but the main idea becomes clear already at page 1. At the end, you can find example of "reliable service", where I consider exactly the same problem (there's even a link to the above article as a source for inspiration)

    ReplyDelete
  2. Well, it makes me think of how the approach is when you write "recovery" code for a database system. Any previous action can fail unless you've enforced a specific sequence of events, but even so, in distributed systems, less is known, thus the recovery algorithm need to reason about what is know, what is unknown and what the correct action is to get to a "better state", and eventually be "complete".

    (hmmm, pretty funny, commenting a blog can only? be done in the name of my own blog?)

    ReplyDelete