public class SUBDAXGenerator
extends java.lang.Object
Modifier and Type | Field and Description |
---|---|
private static java.lang.String |
CACHE_FILE_SUFFIX
Suffix to be applied for cache file generation.
|
static java.lang.String |
CONDOR_DAGMAN_LOGICAL_NAME
The logical name with which to query the transformation catalog for the
condor_dagman executable, that ends up running the mini dag as one
job.
|
static java.lang.String |
CONDOR_DAGMAN_NAMESPACE
The namespace to use for condor dagman.
|
static java.lang.String |
CPLANNER_LOGICAL_NAME
The logical name with which to query the transformation catalog for
cPlanner executable.
|
static java.lang.String[][] |
DAGMAN_KNOBS
The dagman knobs controlled through property.
|
static java.lang.String |
DEFAULT_SUBDAX_CATEGORY_KEY
The default category for the sub dax jobs.
|
static boolean |
GENERATE_SUBDAG_KEYWORD
Whether to generate the SUBDAG keyword or not.
|
private PegasusBag |
mBag
Bag of Pegasus objects
|
private PegasusProperties.CLEANUP_SCOPE |
mCleanupScope
The cleanup scope for the workflows.
|
private long |
mCondorVersion
The long value of condor version.
|
private java.lang.String |
mCurrentDAGCacheFile
Cache file for the current DAG
|
private ADag |
mDAG |
private java.io.PrintWriter |
mDAGWriter
The print writer handle to DAG file being written out.
|
private java.util.Map<java.lang.String,java.lang.String> |
mDAXJobIDToSubmitDirectoryCacheFile
Maps a sub dax job id to it's submit directory.
|
private LogManager |
mLogger
Handle to the logging manager.
|
private java.text.NumberFormat |
mNumFormatter
The number formatter to format the run submit dir entries.
|
private PlannerOptions |
mPegasusPlanOptions
The object containing all the options passed to the Concrete Planner.
|
private PegasusProperties |
mProps
The handle to Pegasus Properties.
|
private SiteStore |
mSiteStore |
private TransformationCatalog |
mTCHandle
The handle to the transformation catalog
|
private java.lang.String |
mUser
The username of the user running the program.
|
static java.lang.String |
NAMESPACE
The namespace to which the job in the MEGA DAG being created refer to.
|
static java.lang.String |
RETRY_LOGICAL_NAME
The planner utility that needs to be called as a prescript.
|
Constructor and Description |
---|
SUBDAXGenerator()
The default constructor.
|
Modifier and Type | Method and Description |
---|---|
protected Job |
constructDAGJob(Job subdaxJob,
java.io.File directory,
java.io.File subdaxDirectory,
java.lang.String basenamePrefix)
Constructs a job that plans and submits the partitioned workflow,
referred to by a Partition.
|
java.lang.String |
constructDAGManKnobs(Job job)
Constructs Any extra arguments that need to be passed to dagman, as determined
from the properties file.
|
Job |
constructPegasusPlanPrescript(Job job,
PlannerOptions options,
java.lang.String rootUUID,
java.lang.String properties,
java.lang.String log)
Constructs the pegasus plan prescript for the subdax
|
protected java.io.File |
constructPlannerPrescriptWrapper(Job dagJob,
java.io.File directory,
java.lang.String executable,
java.lang.String arguments)
Construct a pegasus plan wrapper script that changes the directory in which
pegasus-plan is launched.
|
private TransformationCatalogEntry |
constructTCEntryFromEnvironment()
Returns a transformation catalog entry object constructed from the environment
An entry is constructed if either of the following environment variables
are defined
1) CONDOR_HOME
2) CONDOR_LOCATION
CONDOR_HOME takes precedence over CONDOR_LOCATION
|
private TransformationCatalogEntry |
constructTCEntryFromEnvProfiles(ENV env)
Returns a tranformation catalog entry object constructed from the environment
An entry is constructed if either of the following environment variables
are defined
1) CONDOR_HOME
2) CONDOR_LOCATION
CONDOR_HOME takes precedence over CONDOR_LOCATION
|
private TransformationCatalogEntry |
constructTCEntryFromPath()
Returns a tranformation catalog entry object constructed from the path
environment variable
|
private TransformationCatalogEntry |
constructTransformationCatalogEntryForDAGMan(java.lang.String path)
Constructs TransformationCatalogEntry for DAGMan.
|
protected java.lang.String |
createSubmitDirectory(ADag dag,
java.lang.String dir,
java.lang.String user,
java.lang.String vogroup,
boolean timestampBased)
Creates the submit directory for the workflow.
|
protected java.lang.String |
createSubmitDirectory(java.lang.String label,
java.lang.String dir,
java.lang.String user,
java.lang.String vogroup,
boolean timestampBased)
Creates the submit directory for the workflow.
|
protected boolean |
createSymbolicLink(java.lang.String source,
java.lang.String destination)
This method generates a symlink between two files
|
protected boolean |
createSymbolicLink(java.lang.String source,
java.lang.String destination,
boolean logErrorToDebug)
This method generates a symlink between two files
|
boolean |
createSymbolicLinktoCacheFile(PlannerOptions options,
java.lang.String label,
java.lang.String index)
Creates a symbolic link to the DAX file in a dax sub directory in the
submit directory
|
java.lang.String |
createSymbolicLinktoDAX(java.lang.String submitDirectory,
java.lang.String dax)
Creates a symbolic link to the DAX file in a dax sub directory in the
submit directory
|
private TransformationCatalogEntry |
defaultTCEntry(java.lang.String site)
Returns a default TC entry to be used in case entry is not found in the
transformation catalog.
|
Job |
generateCode(Job job)
Generates code for a job
|
protected java.lang.String |
getBasename(java.lang.String prefix,
java.lang.String suffix)
Returns the basename of a dagman (usually) related file for a particular
partition.
|
protected java.lang.String |
getCacheFile(PlannerOptions options,
java.lang.String label,
java.lang.String index)
Returns the path to the cache file in a workflow's submit directory
|
protected java.lang.String |
getCacheFileName(PlannerOptions options,
java.lang.String label,
java.lang.String index)
Constructs the basename to the cache file that is to be used
to log the transient files.
|
java.util.Set<java.lang.String> |
getParentsTransientRC(Job job)
Returns a set containing the paths to the parent dax jobs
transient replica catalogs.
|
protected java.lang.String |
getWorkflowFileBasenamePrefix(PlannerOptions options,
java.lang.String label,
java.lang.String index) |
protected java.lang.String |
getWorkflowFileName(PlannerOptions options,
java.lang.String label,
java.lang.String index,
java.lang.String suffix)
Constructs the basename to a workflow file that.
|
void |
initialize(PegasusBag bag,
ADag dag,
java.io.PrintWriter dagWriter)
Initializes the class.
|
protected static int |
parseInt(java.lang.String s)
Parses a string into an integer.
|
protected static void |
sanityCheck(java.io.File dir)
Checks the destination location for existence, if it can
be created, if it is writable etc.
|
public static final java.lang.String DEFAULT_SUBDAX_CATEGORY_KEY
public static final boolean GENERATE_SUBDAG_KEYWORD
private static final java.lang.String CACHE_FILE_SUFFIX
public static final java.lang.String CPLANNER_LOGICAL_NAME
public static final java.lang.String CONDOR_DAGMAN_NAMESPACE
public static final java.lang.String CONDOR_DAGMAN_LOGICAL_NAME
public static final java.lang.String NAMESPACE
public static final java.lang.String RETRY_LOGICAL_NAME
public static final java.lang.String[][] DAGMAN_KNOBS
private java.lang.String mUser
private java.text.NumberFormat mNumFormatter
private PlannerOptions mPegasusPlanOptions
private PegasusProperties mProps
private LogManager mLogger
private PegasusBag mBag
private java.io.PrintWriter mDAGWriter
private TransformationCatalog mTCHandle
private PegasusProperties.CLEANUP_SCOPE mCleanupScope
private long mCondorVersion
private java.util.Map<java.lang.String,java.lang.String> mDAXJobIDToSubmitDirectoryCacheFile
private ADag mDAG
private SiteStore mSiteStore
private java.lang.String mCurrentDAGCacheFile
public void initialize(PegasusBag bag, ADag dag, java.io.PrintWriter dagWriter)
bag
- the bag of objects required for initializationdag
- the dag for which code is being generateddaxReplicaStore
- the dax replica store.dagWriter
- handle to the dag writerpublic Job generateCode(Job job)
job
- the job for which code has to be generated.Job
if a submit file needs to be generated
for the job. Else return null.protected java.io.File constructPlannerPrescriptWrapper(Job dagJob, java.io.File directory, java.lang.String executable, java.lang.String arguments)
dagJob
- the DAG job corresponding to which the prescript is associated.directory
- the directory where the submit file for dagman job has
to be written out to.executable
- the path to the planner that needs to be called in the prescriptarguments
- the arguments with which the planner is called.protected Job constructDAGJob(Job subdaxJob, java.io.File directory, java.io.File subdaxDirectory, java.lang.String basenamePrefix)
subdaxJob
- the original subdax job.directory
- the directory where the submit file for dagman job has
to be written out to.subdaxDirectory
- the submit directory where the submit files for the
subdag reside.basenamePrefix
- the basename to be assigned to the files associated
with DAGManpublic java.lang.String constructDAGManKnobs(Job job)
job
- the jobprotected static int parseInt(java.lang.String s)
s
- the String to be parsed as integerprotected java.lang.String getBasename(java.lang.String prefix, java.lang.String suffix)
prefix
- the prefix.suffix
- the suffix for the file basename.protected java.lang.String getCacheFile(PlannerOptions options, java.lang.String label, java.lang.String index)
options
- the options for the workflow.label
- the label for the workflow.index
- the index for the workflow.protected java.lang.String getCacheFileName(PlannerOptions options, java.lang.String label, java.lang.String index)
options
- the options for the sub workflow.label
- the label for the workflow.index
- the index for the workflow.protected java.lang.String getWorkflowFileName(PlannerOptions options, java.lang.String label, java.lang.String index, java.lang.String suffix)
options
- the options for the sub workflow.label
- the label for the workflow.index
- the index for the workflow.suffix
- the suffix for the workfklow file.protected java.lang.String getWorkflowFileBasenamePrefix(PlannerOptions options, java.lang.String label, java.lang.String index)
private TransformationCatalogEntry defaultTCEntry(java.lang.String site)
site
- the site for which the default entry is required.private TransformationCatalogEntry constructTCEntryFromEnvironment()
private TransformationCatalogEntry constructTCEntryFromPath()
env
- the environment profiles.private TransformationCatalogEntry constructTCEntryFromEnvProfiles(ENV env)
env
- the environment profiles.private TransformationCatalogEntry constructTransformationCatalogEntryForDAGMan(java.lang.String path)
path
- path to dagmanpublic Job constructPegasusPlanPrescript(Job job, PlannerOptions options, java.lang.String rootUUID, java.lang.String properties, java.lang.String log)
job
- the subdax joboptions
- the planner options with which subdax has to be invokedrootUUID
- the root workflow uuidproperties
- the properties file.log
- the log for the prescript outputpublic boolean createSymbolicLinktoCacheFile(PlannerOptions options, java.lang.String label, java.lang.String index)
options
- the options for the sub workflow.label
- the label for the workflow.index
- the index for the workflow.public java.lang.String createSymbolicLinktoDAX(java.lang.String submitDirectory, java.lang.String dax)
submitDirectory
- the submit directory for the sub workflow.dax
- the dax file to which the symbolic link has
to be created.protected java.lang.String createSubmitDirectory(ADag dag, java.lang.String dir, java.lang.String user, java.lang.String vogroup, boolean timestampBased) throws java.io.IOException
dag
- the workflow being worked upon.dir
- the base directory specified by the user.user
- the username of the user.vogroup
- the vogroup to which the user belongs to.timestampBased
- boolean indicating whether to have a timestamp based dir or notjava.io.IOException
- in case of unable to create submit directory.protected java.lang.String createSubmitDirectory(java.lang.String label, java.lang.String dir, java.lang.String user, java.lang.String vogroup, boolean timestampBased) throws java.io.IOException
label
- the label of the workflowdir
- the base directory specified by the user.user
- the username of the user.vogroup
- the vogroup to which the user belongs to.timestampBased
- boolean indicating whether to have a timestamp based dir or notjava.io.IOException
- in case of unable to create submit directory.protected static void sanityCheck(java.io.File dir) throws java.io.IOException
dir
- is the new base directory to optionally create.java.io.IOException
- in case of error while writing out files.protected boolean createSymbolicLink(java.lang.String source, java.lang.String destination)
source
- the file that has to be symlinkeddestination
- the destination of the symlinkprotected boolean createSymbolicLink(java.lang.String source, java.lang.String destination, boolean logErrorToDebug)
source
- the file that has to be symlinkeddestination
- the destination of the symlinklogErrorToDebug
- whether to log messeage to debug or notpublic java.util.Set<java.lang.String> getParentsTransientRC(Job job)
job
- the job