Distributed executor framework

Hi ,

In openGamma , i have read somewhere on the concept of running executors remotely , something like in a distributed system.

If i have not mistaken about this , kindly pass on the literature.


Yes, it’s one of the key features of the engine to be able to have remote calculation nodes. However, we haven’t yet updated the documentation - the 0.9.0 docs are here but is rather spring focused. I will get someone on the case of updating these docs and adding some scripts and config files to support the new component system when we’re back in the office (Tuesday) to make it easier to fire up new calc nodes.

It’s in JIRA under PLAT-2244 and assigned to an engineer if you want to monitor progress.

@jim - Just to be clear on what i need -

  • A distributed executor framework where all remote and local executors listen to a queue
  • What work the executor does should be programmable.
  • When a job comes into the queue , its taken by a free executor
  • There should be a unique id associated with each job.
  • I should be able to tell the status of the job. Like Queued , Processing , Failure or Success.

It would be great if you can add the above too in the documentation.

Hi @vineeth,

I think you should bare in mind that our compute distribution system is not meant to be a general purpose ‘grid’, rather a specifically optimised default implementation of a distributed compute processor for executing our dependency graphs. It’s really designed to be possible to implement on top of an conventional distributed compute grid framework rather than to replace one.

While there are aspects of a more general purpose mechanism available, and we may well farm out other types of job to the calculation infrastructure in the future, it’s really intended to be controlled by the view processor directly rather than directed by an external agent.

So, if you’ve got calculations you want to perform outside the framework of our dependency graph based calculation engine (i.e. you don’t want to implement our FunctionDefinition/Invoker/AbstractFunction interfaces), I’d probably suggest you try using a more traditional distributed compute solution such as GridGain/Hadoop, Infinispan, or RedHat MRG or one of the many commercial options available.


Thanks for the clarification @jim .

It would be quite helpful if you can fill in more information on the kind of job is executed over this distributed frame work.
I am not fully able to get the picture on what you mean by “it’s really intended to be controlled by the view processor”


What I mean is that if it’s calculated in our view processor - i.e. it uses the functions in the OG-Financial package that extend AbstractFunction, then the execution of those functions are automatically distributed to the calculation nodes (a ‘job item’ is typically the execution of a single function), the results are placed into a shared ‘computation cache’ visible to all nodes (although with locality optimisations) and top level results are fed back to the view processor and eventually back to the view client (typically a web browser running the analytics viewer).

This distribution is completely transparent and automatic. Each individual function should generally be written as single-threaded code with no or very limited side-effects and allow the distribution mechanism to parallelise the computations. Various optimisations are used to reduce inter-node communications such as placing members of the same dependency sub-graph on the same node, although care is needed to prevent the cost of optimisation strategies outweighing the benefits.

I hope that helps.


Thanks @jim.

This is exactly what i needed :slight_smile:
Can you please explain the following

  1. How to submit a job to this distributed system or grid
  2. I believe submission would be asynchronous (correct me if i am wrong) and we will get to know the status of each job we submitted. How can we see this.
  3. Also is there would be some unique identification ID assigned to each job submitted. (That is if submission is asynchronous) .
  4. And last , how to get the result from the cache.


Hi @vineeth,

So, the way the system works is that you write functions that extend AbstractFunction. Currently writing and debugging these requires a fair amount of expertise and isn’t trivial.

These functions are declarative in style (think Prolog): they have methods to establish what inputs they require by returning a set of ‘Value Requirements’, They also have methods to establish what outputs type of outputs they produce, by returning a set of ‘Value Specifications’ (which are very similar to value requirements). All functions are then registered in a FunctionRepository (the default for the examples is called DemoStandardFunctionConfiguration).

At the top level, the user builds a View Definition using either the web ui or some external tool (using code), and requests the engine (View Processor) to start a view process running that view with a given set of Execution Options. The Execution Options specify things like where to source data (e.g. live, snapshot or from historical time series) and how often to run a cycle of the view. Alternatively it can run for a specific set of cycles, each with a different data set for e.g. full revaluation Historical VaR.

The view processor then goes through the outputs that have been asked for by the user (e.g. Present Value on Swaps) for each item in the portfolio specified by the view definition (that will include PortfolioNodes as well as positions and trades), and offers it to each function, asking it whether it can produce the required output. When it finds a function that can produce the correct output, it adds that function as a node in the ‘dependency graph’ required to compute that value. It then looks at any required inputs of that function and recursively tries to satisfy those requirements using functions from the function repository. Eventually, the input requirements will all be input market data and the system adds those to a list of data subscriptions (in the case of live) it will have to make when starting the view.

Once it’s built the whole dependency graph (which is called ‘dependency graph compilation’), it moves on to an execution phase. This is when it walks up the graphs, starting at the market data leaves, places the appropriate market data in a shared value cache that all compute nodes can access and then places functions into jobs that are farmed out to the compute nodes. Those compute nodes then call the execute() methods on each function, passing in the inputs it’s declared it requires and then being given a set of outputs that should correspond to the value specifications it gave during the compilation phase. Those outputs are then placed back into the cache (locally if possible) so they’re accessible by the next function up the graph as inputs. Eventually all the results required by the user are present in the value cache and the view processor can pull the results out and organise them into a result set data structure which is passed back to the user via a call-back (possibly over a remote connection).

Things are actually a bit more complicated than the picture I’ve painted here because we have a constraint system too. This allows you to say things like ‘use curve X, currency should be Y, use calculation method Z’. The system then uses a back-tracking algorithm to best meet the specified constraints. The upside of this is that it makes it very easy for the user to parameterise what they want with reasonable defaults when they don’t care. The downside is that currently writing and debugging these functions can be quite complex and difficult (we’re aiming to make it simpler).

So, to summarise and answer your questions directly:

  1. You don't explicitly submit jobs at all, it's all done for you.
  2. There is currently no real API for monitoring jobs, although we're aware that this is something users want.
  3. Because the user does not have direct access to jobs presently, there is no ID.
  4. The results are extracted from the cache for you, there is no direct access required.
A basic guide to writing functions is available and a fuller example is given there.

I hope that clarifies things.



Thats a very useful summary Jim. I am very interested in this area and hope to have a look at any further optimising/monitoring/ data proximity to processiing/ failover recovery that can be done. I have been looking into solving a similar problem with an actor based system (akka) in an elastic cloud of compute nodes (so that a bank can easily assign a set of resources from a private cloud for a time period overnight, run many scenarios, save the results, and easily free the resources for the morning). If there is any current direction planned for improved distribution i would love to help on this and I have the time also.

Thanks for the interest Neil! There’s a few things that it might be interesting to look at tackling: for example the stock ‘distributed’ value cache uses a BerkleyDB backing and isn’t really distributed at the moment. It is really an example implementation rather than something that can seriously scale so we’ll need to replace it with something that can. It basically just needs to be an eventually consistent distributed map and there are quite a few existing projects that could provide much better scalability: memcached is just one option.

Another interesting project would be to implement the whole job distribution system on top of another existing grid platform. The APIs were designed to allow this, but we’ve never actually tried it, although several of our customers have expressed an interest in using their existing grid resources.

If you’re interested in doing something like that or have a proposal of your own, then definitely get in touch!

Thanks again,

There are a number of different ideas for scaling out the value cache that we’ve run through internally:

  • Keep it a pure Java affair but use Infinispan/Terracotta/etc. to automatically handle the increased RAM requirements;
  • Work with a distributed KV store like Riak/Voldemort
  • Use an in-memory store like Memcached/Redis

As the original culprit behind the BerkeleyDB inclusion, it’s an interesting choice there. In general, the Value Cache won’t flow to disk, particularly with remote calc nodes (where the subgraph shipping optimization means that the View Processor version of the Value Cache doesn’t have every element). If it does, you’re going to start to have performance issues at runtime.

However, I went with it because it exhibited extremely good performance on an in-memory basis, and the fact that it works with a filesystem type of abstraction means that you avoid a lot of the complexity and GC churn with a massive heap. Plus it was wicked fast to code up and seems like it’s handling some pretty good performance numbers thus far.

Kirk highlights that I’m actually being rather harsh on what we currently have as our value cache. Actually, it’s not shown up as a performance bottleneck thus far, probably largely because of the effort we’ve put into NOT going off-node. It’s for that reason that we’ve not focussed engineering effort on replacing it thus far.

The wider point that when we start going much bigger in terms of nodes we’ll need to scale it out still stands and I do think it would make an interesting outside project.

I am looking around for some literature on how to

  • Create or configure a view Processor
  • How to submit a view definition to it.

from the OG-Example code.

The closest literate i could come up was - http://docs.opengamma.com/display/DOC/Writing+a+Java+Client

It would be really helpful if we can get more code examples on the same.

Thanks Jim and Kirk. I will have a look at the current implementation, this is right in my field of interest. I am very interested in the the optimisations that can be done for running many scenarios by partitioning data and/or replicating data. Obviously many machines can add to compute resources, but network traffic slows thngs down considerably. I really like the approach of having a dependancy tree and trying to keep as much data local as possible in an intelligent manner. One field I have been looking at is optimisations for a particular set of risk calculations and scenarios (basel 3 calcs which is my business area of expertise). I will look further into how things currently work in opengamma, and then get back to you when I am more informed on the current implementation and its pros and cons.

The other thing I have been looking into a lot is sharing data between scenarios to run them efficiently. For example if you calcualate the required regulatory capital for a portfolio (which requires a number of intermediatory calculations) , then run a scenario where you shift a yield curve or sell out of a position, you dont necessarily need to redo ALL of the intermediatory calculations. OpenGamma’s dependancy graph may well help with this (I will look through the implementation and documentation further), is this optimal scenario running something that has been considered?l…I consider it to be quite powerful when the number of scenarios to run is very large. There are probably interesting trade offs and challenges to consider in distributing this. btw my main focus is on optimising batch processing as opposed to real time.

@jim - i figured out most of the working but am not able to obtain the ViewProcessor instance used from OG-Example.
Can you pleas let me know , how i can achieve this.

When you run the example, the web server that you create is also running a view processor. The easiest way you submit a view definition to it is by choosing a view definition to run from the ‘analytics’ tab.

@jim - Thanks for your answer , but what i was looking was how to execute my ViewDefenition from code.
Say , i am writing a command line tool to read viewdefenition and protfolio from a file , how can i submit the job to ViewProcessor.

@jim - Sorry to bother you again , i have one more question.
I am trying to write my own function and invoke it through ViewProcessor.
Writing my own function - with your help i managed to get a working example of mine own.
Secondly i am trying to get the ViewProcessor instance from OG-Web so that i can submit or attach a ViewDefenition for processing. I am trying to create a new tab which will take a ViewDefenition name as input and outputs its result.
Unlike other tabs , i am not able to map the code of the analytics tab as it is using the comment framework.

Any pointers or help would be greatly appreciated.