[erlang-questions] Considering a Generic Transaction System in Erlang

Jörgen Brandt brandjoe@REDACTED
Fri Oct 9 00:07:55 CEST 2015


Hello,

is there an Erlang library for transactional message
passing, using patterns in communication and error handling
to improve fault tolerance?

This (or a similar) question may have been asked before and,
of course, there is plenty of research on fault tolerance
and failure transparency. Nevertheless, in my work with
scientific workflows it seems that certain patterns in error
handling exist. In this mail I'm trying to motivate a way to
externalize common error handling in a standardized service
(a transaction server) but I'm unsure whether such a thing
already exists, whether I'm missing an important feature,
and whether it's a good idea anyway.

Large distributed systems are composed of many services.
They process many tasks concurrently and need fault
tolerance to yield correct results and maintain
availability. Erlang seemed a good choice because it
provides facilities to automatically improve availability,
e.g., by using supervisers. In addition, it encourages a
programming style that separates processing logic from
error handling. In general, each service has its own
requirements, implying that a general approach to error
handling (beyond restarting) is infeasible. However, if an
application exhibits recurring patterns in the way error
handling corresponds to the messages passed between
services, we can abstract these patterns to reuse them.


Fault tolerance is important because it directly translates
to scalability.

Consider an application (with transient software faults),
processing user queries. The application reports errors back
to the user as they appear. If a user query is a long-
running job (hours, days), the number of subtasks created
from this job (thousands), the number of services to process
one subtask, and the number of machines involved are large,
then the occurrence of an error is near to certain. Quietly
restarting the application and rerunning the query may
reduce the failure probability but even if the application
succeeds, the number of retries and, thus, the time elapsed
to success may be prohibitive. What is needed is a system
that does not restart the whole application but only the
service that failed reissuing only the unfinished requests
that this service received before failing. Consequently, the
finer the granularity at which errors are handled, the less
work has to be redone when errors occur, allowing a system
to host longer-running jobs, containing more subtasks,
involving more services for each subtask, and running on
more machines in feasible time.


Scientific workflows are a good example for a large
distributed application exhibiting patterns in communication
and error handling.

A scientific workflow system consumes an input query in the
form of an expression in the workflow language. On
evaluation of this expression it identifies subtasks that
can be executed in any order. E.g., a variant calling
workflow from bioinformatics unfolds into several hundred
to a thousand subtasks each of which is handed down in the
form of requests through a number of services: Upon
identification of the subtask in (i) the query interpreter,
a request is sent to (ii) a cache service. This service
keeps track of all previously run subtasks and returns the
cached result if available. If not, a request is sent to
(iii) a scheduling service. This service determines the
machine, to run the subtask. The scheduler tries both, to
adequately distribute the work load among workers (load
balancing) and to minimize data transfers among nodes (data
locality). Having decided where to run the subtask, a
request is sent to (iv) the corresponding worker which
executes the subtask and returns the result up the chain of
services. Every subtask goes through this life cycle.

Apart from the interplay of the aforementioned services we
want the workflow system to behave in a particular way when
one of these services dies:

- Each workflow is evaluated inside its own interpreter
  process. A workflow potentially runs for a long time and
  at some point we might want to kill the interpreter
  process. When this happens, the system has to identify all
  open requests originating from that interpreter and cancel
  them.

- When an important service (say the scheduler) dies, a
  supervisor will restart it, this way securing the
  availability of the application. Upon a fresh start, none
  of the messages this service has received will be there
  anymore. Instead of having to notify the client of this
  important service (in this case the cache) to give it the
  chance to repair the damage, we want all the messages,
  that have been sent to the important service (scheduler)
  and have not been quited, to be resent to the freshly
  started service (scheduler).

- When a worker dies, from a hardware fault, we cannot
  expect a supervisor to restart it (on the same machine).
  In this case we want to notify the scheduler not to expect
  a reply to his request anymore. Also we want to reissue
  the original request to the scheduler to give it the
  chance to choose a different machine to run the subtask
  on.

- When a request is canceled at a high level (say at the
  cache level because the interpreter died) All subsequent
  requests (to the scheduler and in the worker)
  corresponding to the original request should have been
  canceled before the high level service (cache) is
  notified, thereby relieving him of the duty to cancel them
  himself.


Since there is no shared memory in Erlang, the state of a
process is defined only by the messages received (and its
init parameters which are assumed constant). To reestablish
the state of a process after failure we propose three
different ways to send messages to a process and their
corresponding informal error handling semantics:

tsend( Dest, Req, replay ) -> TransId
when Dest    :: atom(),
     Req     :: term(),
     TransId :: reference().

Upon calling tsend/3, a transaction server creates a record
of the request to be sent and relays it to the destination
(must be a registered process). At the same time it creates
a monitor on both the request's source and destination. When
the source dies, it will send an abort message to the
destination. When the destination dies, initially, nothing
happens. When the supervisor restarts the destination, the
transaction server replays all unfinished requests to the
destination.

tsend( Dest, Req, replay, Precond ) -> TransId
when Dest      :: atom(),
     Req       :: term(),
     Precond   :: reference(),
     TransId   :: reference().

The error handling for tsend/4 with replay works just the
same as tsend/3. Additionally, when the request with the id
Precond is canceled, this request is also canceled.

tsend( Dest, Req, reschedule, Precond ) -> TransId
when Dest    :: atom() | pid(),
     Req     :: term(),
     Precond :: reference(),
     TransId :: reference().

Upon calling tsend/4, with reschedule, as before, a
transaction server creates a record of the request and
monitors both source and destination. When the destination
dies, instead of waiting for a supervisor to restart it, the
original request identified with Precond is first canceled
at the source and then replayed to the source. Since we do
not rely on the destination to be a permanent process, we
can also identify it per Pid while we had to require a
registered service under replay error handling.

commit( TransId, Reply ) -> ok
when TransId :: reference(),
     Reply   :: term().

When a service is done working on a request, it sends a
commit which relays the reply to the transaction source and
removes the record for this request from the transaction
server.

A service participating in transaction handling has to
provide the following two callbacks:

handle_recv( TransId::reference(), Req::_, State::_ ) ->
  {noreply, NewState::_}.

handle_abort( TransId::reference(), State::_ ) ->
  {noreply, NewState::_}.

While the so-defined transaction protocol is capable of
satisfying the requirements introduced for the workflow
system example the question is, is it general enough to be
applicable also in other applications?


This conduct has its limitations.

The introduced transaction protocol may be suited to deal
with transient software faults (Heisenbugs) but its
effectiveness to mitigate hardware faults or deterministic
software faults (Bohrbugs) is limited. In addition, with the
introduction of the transaction server we created a single
point of failure.


Concludingly, the restarting of a service by a supervisor is
sufficient to secure the availability of a service in the
presence of software faults but large scale distributed
systems require a more fine-grained approach to error
handling. To identify patterns in message passing and error
handling gives us the opportunity to reduce error handling
code and, thereby, avoid the introduction of bugs into error
handling. The proposed transaction protocol may be suitable
to achieve this goal.


I had hoped to get some feedback on the concept, in order to
have an idea whether I am on the right track. If a similar
library is already around and I just couldn't find it, if I
am missing an obvious feature, a pattern that is important
but just doesn't appear in the context of scientific
workflows, it would be helpful to know about it. Thanks in
advance.

Cheers
Jörgen




More information about the erlang-questions mailing list