User Tools

Site Tools


jobqueue

This is an old revision of the document!


Long-running jobs with a jobqueue

By introducing simulations and predictions, we have created a job type that does not coexist well with the interactive website. These jobs must be outsourced to another machine (or a pool of machines). These jobs are executed as batch-jobs, i.e. they don't need user interaction.

Some requirements for this design:

  • Jobs must be self-contained. A job should describe where to get input data (fully qualified URL) and where result data should be published (also fully qualified URL).
  • The lingua franca of the mySmartGrid ecosystem is JSON. Most visualization is done directly using JSON. The (new) data submission API also employs JSON. Therefore, result data (such as a predicted timeseries) must be represented as JSON data and published via the HTTP protocol.
  • The simulations must run on a different system as the webserver.

Beanstalkd

Beanstalkd is a simple job queue designed for minimal overhead: http://kr.github.com/beanstalkd/

The beanstalk protocol is very simple: https://github.com/kr/beanstalkd/blob/v1.3/doc/protocol.txt. This is a very good thing(TM), because it leads to a bunch of client implementations that are ready to use.

The architecture for mySmartGrid will look as follows:

Super-sketchy sketch

The architecture consists of a webserver, several worker nodes and a beanstalkd instance. The webserver can submit jobs to the beanstalkd queue. A job is a JSON-formatted document containing the desired result URL and $foo as input data for the job. The worker node then retrieves the job from the queue and dispatches a process locally on the worker node. As soon as the process finishes, the job is deleted from the queue. The results of the job are published using the local HTTP server. Again, data is JSON-formatted. In order to be able to detect outdated information, a timestamp MUST be included in the result file.

Right now, there is no redundancy in the system. I assume that the webserver decides which workernode should run the job. So, the results are accessible on an URL like http://worker1.mysmartgrid.de/<UUID>. This makes it easy for the webserver to render pages that include a link to the data. The web browser of the users can then access the data seamlessly.

The disadvantage of this solution is that there is one queue per worker. If a worker fails, the old data published on the server is not available any longer. In addition, new jobs are not processed. This can be mitigated in future releases:

  1. A loadbalancer makes a group of worker nodes accessible. All results are available using the base url of the loadbalancer.
  2. Worker nodes work on a common queue.
  3. The results are rsync'd across all worker nodes.
  4. If one worker fails, it is not used by the loadbalance any more. The other workers run the additional jobs.

In the long term, this setup should be much more stable. Right now, however, it seems to be overkill.

QWorker: A Ruby Library for Processing Beanstalkd Queues

A first implementation of a queue worker is now available at:

https://github.com/mysmartgrid/msg-qworker

The gem can be installed using

 
 $ gem install msg-qworker
 

The library consists of a producer, a qworker and the drunken sailor.

  • The producer puts job in the default tube. This should be used as a template on how to put jobs (JSON-formatted) in the tube.
  • The qworker deals with the execution of jobs in a reliable way. It is capable of forking jobs, monitoring them and reacting on failure conditions.
  • The drunken sailor is a test program that simulates all kinds of misbehaving jobs, i.e. non-terminating jobs.

Smaller tasks such as sending emails etc. can be implemented directly in the ruby library. TODO: Provide a framework of Jobs for this.

An example run looks like this:

$ beanstalkd

First, start beanstalkd. By default it listens on localhost:11300. After the queue runs, we can put a job into the default tube:

$ ruby bin/producer.rb -v -c etc/qproducerrc -a drunken_sailor

This tells the producer to use the config file packaged within the gem (adjust your path) and execute the drunken_sailor algorithm. The output of this command looks like this:

Using this configuration:
--- 
BEANSTALK_PORT: "11300"
BEANSTALK_SERVER: localhost
---
Starting producer, connecting to localhost:11300
generated work package: [{"uuid":"bc3a925099b9012e0f06549a20077664","type":"drunken_sailor"},{"23":"42","foo":"bar"}]

Then, you can start the qworker in order to run the algorithm:

$ ruby qworker.rb -c ../etc/qworkerrc -v
Using this configuration:
--- 
GRACE_TIME: 2
RESULT_BASEDIR: /var/tmp/qworker
BEANSTALK_PORT: "11300"
BEANSTALK_SERVER: localhost
---
Starting producer, connecting to localhost:11300
### Dequeued job bc3a925099b9012e0f06549a20077664, algorithm drunken_sailor
 + result directory: /var/tmp/qworker/bc/bc3a925099b9012e0f06549a20077664
 + input data: 2342foobar
 + time left for processing: 2
 + job queue priority: 1337
Drunken sailor PID: 2403
Drunken Sailor: Infinite loop. 
Algorithm did not terminate within expected timeframe.
Attempting to kill algorithm PID 2403
Job not found, cannot release it. Ignoring.
### Dequeued job bc3a925099b9012e0f06549a20077664, algorithm drunken_sailor
 + result directory: /var/tmp/qworker/bc/bc3a925099b9012e0f06549a20077664
 + input data: 2342foobar
 + time left for processing: 2  
 + job queue priority: 1337
Drunken sailor PID: 2408
Drunken Sailor: Faulty execution.
Algorithm terminated within expected time constraints.
Algorithm terminated with exitstatus 10
Job failed, leaving job in queue.
### Dequeued job bc3a925099b9012e0f06549a20077664, algorithm drunken_sailor
 + result directory: /var/tmp/qworker/bc/bc3a925099b9012e0f06549a20077664
 + input data: 2342foobar
 + time left for processing: 2
 + job queue priority: 1337
Drunken sailor PID: 2409
Drunken Sailor: Normal execution.
Algorithm terminated within expected time constraints.
Algorithm terminated with exitstatus 0
Job completed, removing from queue.

As you can see, the job was executed three times – only the last time was successful. Stray processes are getting killed etc, so the behavior of an algorithm does not affect the stability of the system. You might notice that there is a queue priority and expected runtime encoded on the qworker side. Actually, these values are defined by the producer, the code looks like this:

beanstalk.put(
    json_payload,   # Content of the job.
    pri=1337,       # Priority of this job. <1024 is considered urgent.
    delay=0,        # Should the job be delayed for X seconds?  
    ttr=3          # Seconds until the job will be re-queued.
 )

This is not exposed to the user, but a real producer application should of course implement adequate settings. The result directory /var/tmp/qworker/bc/bc3a925099b9012e0f06549a20077664 is created for each job.

References

http://tomayko.com/writings/things-caches-do Note for the future: Check out Varnish.

jobqueue.1311687617.txt.gz · Last modified: 2012/10/30 10:35 (external edit)