Extending Metapipe¶
Metapipe provides 2 extension points for developers to extend it’s functionality: custom Queues and custom Job Types. In most cases, custom queues are an advanced feature that most users and developers will not need to worry about, but if you must, it is there.
To add support for a queue system not included with metapipe, all you need to do is add a job type.
Custom Job types¶
All job types are subclasses of the metapipe.models.Job
class. The base job class implements a lot of the functionality that is common between all job types, and has method stubs for the required functionality that needs to be implemented by any subclass. This section will cover what duty job subclasses have, how to subclass the main Job
and what to fill in.
The Root Job Class¶
The code for the main job class can be found here. To create your own job type, simply subclass this as follows:
from metapipe.models import Job
class MyCustomJob(Job):
def __repr__(self):
return '<MyCustomJob: {}>'.format(self.cmd)
There are 6 methods you need to fill in to have a complete job class. Your full job subclass should have the following form:
class MyCustomJob(Job):
def __repr__(self):
return '<MyCustomJob: {}>'.format(self.cmd)
# Override these...
@property
def cmd(self):
""" Returns the command needed to submit the calculations.
Normally, this would be just running the command, however if
using a queue system, then this should return the command to
submit the command to the queue.
"""
pass
def submit(self):
""" Submits the job to be run. If an external queue system is used,
this method submits itself to that queue. Else it runs the job itself.
:see: call
"""
pass
def is_running(self):
""" Returns whether the job is running or not. """
pass
def is_queued(self):
""" Returns whether the job is queued or not.
This function is only used if jobs are submitted to an external queue.
"""
pass
def is_complete(self):
""" Returns whether the job is complete or not. """
pass
def is_error(self):
""" Checks to see if the job errored out. """
pass
The duty of the job types is to submit the jobs when asked by the queue, and to inform the queue about the status of jobs. The queue needs to know when a job is running, queued, complete, or when an error has occurred.
Each of the is_*
callbacks should return a boolean value, and the cmd property should return the bash command (as an array of strings) that can be called to run the job. The job class has an attribute filename
that contains the value of the bash script containing the job command (i.e. ['bash', self.filename]
).
IMPORTANT: All of the above handlers are required for custom job types to function properly.
Here is the code for the cmd
property of the PBSJob
class:
class PBSJob(Job):
#...
@property
def cmd(self):
return ['qsub', self.filename]
#...
The submit
call should do any logic pertaining to submitting the job or tracking the number of total submissions. For example, here is the code for submitting a job to the PBS queue:
class PBSJob(Job):
#...
def submit(self, job):
if self.attempts == 0:
job.make()
self.attempts += 1
out = call(job.cmd)
self.waiting = False
self.id = out[:out.index('.')]
#...
As you can see, it keeps track of the number of times the job was submitted, and then calls the call
function, provided in the root job module, to execute the job. Since PBS assigns job ids to each job at submission-time, it also captures that information and saves it for later use.
Custom Queues¶
In the event that your analysis requires more control over the submission process for jobs, the metapipe module also allows for the customization of queue logic by subclassing metapipe.models.Queue
. This section will cover how to subclass the root queue, but it is left to the reader to determine why you might want to do this. From personal experience, customizing the queue should be a very rare requirement.
The Root Queue class¶
As is the case for custom job types, all queues inherit from the root Queue in metapipe.models.Queue
, including the main JobQueue
that is used by the metapipe command line tool.
To customize the response of the queue to various types of events subclass it and fill in the following methods, all the methods are optional so just omit any handlers that you don’t need.
class MyCustomQueue(object):
def __repr__(self):
return '<MyCustomQueue: jobs=%s>' % len(self.queue)
# Callbacks...
def on_start(self):
""" Called when the queue is starting up. """
pass
def on_end(self):
""" Called when the queue is shutting down. """
pass
def on_locked(self):
""" Called when the queue is locked and no jobs can proceed.
If this callback returns True, then the queue will be restarted,
else it will be terminated.
"""
return True
def on_tick(self):
""" Called when a tick of the queue is complete. """
pass
def on_ready(self, job):
""" Called when a job is ready to be submitted.
:param job: The given job that is ready.
"""
pass
def on_submit(self, job):
""" Called when a job has been submitted.
:param job: The given job that has been submitted.
"""
pass
def on_complete(self, job):
""" Called when a job has completed.
:param job: The given job that has completed.
"""
pass
def on_error(self, job):
""" Called when a job has errored.
:param job: The given job that has errored.
"""
pass
Using Your Custom Code¶
Once you have subclassed and filled in the required code for your custom job type or queue, it is time to use your code. If your code adapts metapipe to work on a common computing platform, or system then please consider contributing to the metapipe project. This helps the rest of the community use a broader range of hardware to solve our problems!
Building your custom pipeline¶
Use the following code to build your pipeline. This code is taken directly from [metapipe’s app.py][app] tool which is the command line tool that metapipe uses to build pipelines.
import MyCustomJob
JOB_TYPES = {
'my_custom_job_type': MyCustomJob
}
parser = Parser(config)
try:
command_templates = parser.consume()
except ValueError as e:
raise SyntaxError('Invalid config file. \n%s' % e)
pipeline = Runtime(command_templates, JOB_TYPES, 'my_custom_job_type')
IMPORTANT: Adding custom queues is coming soon!
For more information on how to script metapipe once you have custom jobs, see Scripting Metapipe