Package edu.isi.pegasus.planner.cluster
Class Horizontal
- java.lang.Object
-
- edu.isi.pegasus.planner.cluster.Horizontal
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description private static class
Horizontal.JobComparator
A job comparator, that allows me to compare jobs according to the transformation names.
-
Field Summary
Fields Modifier and Type Field Description static int
DEFAULT_COLLAPSE_FACTOR
The default collapse factor for collapsing jobs with same logical name scheduled onto the same execution pool.static java.lang.String
DESCRIPTION
A short description about the partitioner.private java.util.Map
mCollapseMap
Map to hold the collapse values for the various execution pools.protected JobAggregatorInstanceFactory
mJobAggregatorFactory
The handle to the job aggregator factory.private static java.util.Comparator
mJobComparator
A singleton access to the job comparator.private java.util.Map
mJobMap
Map to hold the jobs sorted by the label of jobs in dax.protected LogManager
mLogger
The handle to the logger object.private PPS
mPPS
The handle to the provenance store implementation.protected PegasusProperties
mProps
The handle to the properties object holding all the properties.private java.util.Map
mReplacementTable
Replacement table, that identifies the corresponding fat job for a job.private ADag
mScheduledDAG
ADag object containing the jobs that have been scheduled by the site selector.private java.util.Map
mSubInfoMap
A Map to store all the job(Job) objects indexed by their logical ID found in the dax.private XMLProducer
mXMLStore
The XML Producer object that records the actions.
-
Constructor Summary
Constructors Constructor Description Horizontal()
The default constructor.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
appendAttribute(java.lang.StringBuffer xmlFeed, java.lang.String key, java.lang.String value)
Appends an xml attribute to the xml feed.private void
assimilateJobs()
Puts the jobs in the abstract workflow into the job that is index by the logical name of the jobs.private java.util.List<java.util.List<Job>>
bestFitBinPack(java.util.List<Job> jobs, double maxTime)
Perform best fit bin packing.private java.util.List<java.util.List<Job>>
bestFitBinPack(java.util.List<Job> jobs, int maxBins)
Perform best fit bin packing.private void
collapseJobs(java.lang.String name, java.util.List jobs, java.lang.String partitionID)
Collapses the jobs having the same logical name according to the sites where they are scheduled.java.lang.String
constructID(java.lang.String partitionID, int id)
Given an integer id, returns a string id that is used for the clustered job.private java.util.Map
constructMap(java.lang.String propValue)
Constructs a map with the numbers/values for the collapsing factors to collapse the nodes of same type.java.lang.String
description()
Returns a textual description of the transfer implementation.void
determineClusters(Partition partition)
Determine the clusters for a partition.private java.util.Comparator<Job>
getBinPackingComparator()
The comparator is used to sort a collection of jobs in decreasing order of their run times as specified by the Pegasus.JOB_RUN_TIME property.ADag
getClusteredDAG()
Returns the clustered workflow.int[]
getCollapseFactor(java.lang.String pool, Job job, int size)
Returns the collapse factor, that is used to chunk up the jobs of a particular type on a pool.private java.lang.String
getRunTime(Job job)
ADag
getWorkflow()
Returns a reference to the workflow that is being refined by the refiner.XMLProducer
getXMLProducer()
Returns a reference to the XMLProducer, that generates the XML fragment capturing the actions of the refiner.void
initialize(ADag dag, PegasusBag bag)
Initializes the Clusterer impelementationprivate java.util.Comparator
jobComparator()
Singleton access to the job comparator.protected void
logRefinerAction(AggregatedJob clusteredJob, JobAggregator aggregator)
Records the refiner action into the Provenace Store as a XML fragment.void
parents(java.lang.String partitionID, java.util.List parents)
Am empty implementation of the callout, as state is maintained internally to determine the relations between the jobs.private void
printList(java.util.List l)
A utility method to print short description of jobs in a list.private void
replaceJobs()
The relations/edges are changed in local graph structure.private void
updateReplacementTable(java.util.List jobs, Job mergedJob)
Updates the replacement table.
-
-
-
Field Detail
-
DEFAULT_COLLAPSE_FACTOR
public static final int DEFAULT_COLLAPSE_FACTOR
The default collapse factor for collapsing jobs with same logical name scheduled onto the same execution pool.- See Also:
- Constant Field Values
-
DESCRIPTION
public static final java.lang.String DESCRIPTION
A short description about the partitioner.- See Also:
- Constant Field Values
-
mJobComparator
private static java.util.Comparator mJobComparator
A singleton access to the job comparator.
-
mLogger
protected LogManager mLogger
The handle to the logger object.
-
mProps
protected PegasusProperties mProps
The handle to the properties object holding all the properties.
-
mJobAggregatorFactory
protected JobAggregatorInstanceFactory mJobAggregatorFactory
The handle to the job aggregator factory.
-
mScheduledDAG
private ADag mScheduledDAG
ADag object containing the jobs that have been scheduled by the site selector.
-
mJobMap
private java.util.Map mJobMap
Map to hold the jobs sorted by the label of jobs in dax. The key is the logical job name and value is the list of jobs with that logical name. This no longer used, and would be removed later.
-
mSubInfoMap
private java.util.Map mSubInfoMap
A Map to store all the job(Job) objects indexed by their logical ID found in the dax. This should actually be in the ADag structure.
-
mCollapseMap
private java.util.Map mCollapseMap
Map to hold the collapse values for the various execution pools. The values are gotten from the properties file or can be gotten from the resource information catalog a.k.a MDS.
-
mReplacementTable
private java.util.Map mReplacementTable
Replacement table, that identifies the corresponding fat job for a job.
-
mXMLStore
private XMLProducer mXMLStore
The XML Producer object that records the actions.
-
mPPS
private PPS mPPS
The handle to the provenance store implementation.
-
-
Method Detail
-
jobComparator
private java.util.Comparator jobComparator()
Singleton access to the job comparator.- Returns:
- the job comparator.
-
getWorkflow
public ADag getWorkflow()
Returns a reference to the workflow that is being refined by the refiner.- Specified by:
getWorkflow
in interfaceRefiner
- Returns:
- ADAG object.
-
getXMLProducer
public XMLProducer getXMLProducer()
Returns a reference to the XMLProducer, that generates the XML fragment capturing the actions of the refiner. This is used for provenace purposes.- Specified by:
getXMLProducer
in interfaceRefiner
- Returns:
- XMLProducer
-
initialize
public void initialize(ADag dag, PegasusBag bag) throws ClustererException
Initializes the Clusterer impelementation- Specified by:
initialize
in interfaceClusterer
- Parameters:
dag
- the workflow that is being clustered.bag
- the bag of objects that is useful for initialization.- Throws:
ClustererException
- in case of error.
-
determineClusters
public void determineClusters(Partition partition) throws ClustererException
Determine the clusters for a partition. The partition is assumed to contain independant jobs, and multiple clusters maybe created for the partition. Internally the jobs are grouped according to transformation name and then according to the execution site. Each group (having same transformation name and scheduled on same site), is then clustered. The number of clustered jobs created for each group is dependant on the following Pegasus profiles that can be associated with the jobs.1) bundle (dictates the number of clustered jobs that are created) 2) collapse (the number of jobs that make a single clustered job)
In case of both parameters being associated with the jobs in a group, the bundle parameter overrides collapse parameter.- Specified by:
determineClusters
in interfaceClusterer
- Parameters:
partition
- the partition for which the clusters need to be determined.- Throws:
ClustererException
- in case of error.- See Also:
Pegasus.BUNDLE_KEY
,Pegasus.COLLAPSE_KEY
-
parents
public void parents(java.lang.String partitionID, java.util.List parents) throws ClustererException
Am empty implementation of the callout, as state is maintained internally to determine the relations between the jobs.- Specified by:
parents
in interfaceClusterer
- Parameters:
partitionID
- the id of a partition.parents
- the list ofString
objects that contain the id's of the parents of the partition.- Throws:
ClustererException
- in case of error.
-
collapseJobs
private void collapseJobs(java.lang.String name, java.util.List jobs, java.lang.String partitionID)
Collapses the jobs having the same logical name according to the sites where they are scheduled.- Parameters:
name
- the logical name of the jobs in the list passed to this function.jobs
- the listJob
objects corresponding to the jobs that have the same logical name.partitionID
- the ID of the partition to which the jobs belong.
-
bestFitBinPack
private java.util.List<java.util.List<Job>> bestFitBinPack(java.util.List<Job> jobs, double maxTime)
Perform best fit bin packing.- Parameters:
jobs
- List of jobs sorted in decreasing order of the job runtime.maxTime
- The maximum time for which the clustered job should run.- Returns:
- List of List of Jobs where each List
is the set of jobs which should be clustered together so as to run in under maxTime.
-
bestFitBinPack
private java.util.List<java.util.List<Job>> bestFitBinPack(java.util.List<Job> jobs, int maxBins)
Perform best fit bin packing.- Parameters:
jobs
- List of jobs sorted in decreasing order of the job runtime.maxBins
- The fixed-number of bins taht should be created- Returns:
- List of List of Jobs where each List
is the set of jobs which should be clustered together so as to run in under maxTime.
-
getRunTime
private java.lang.String getRunTime(Job job)
-
getBinPackingComparator
private java.util.Comparator<Job> getBinPackingComparator()
The comparator is used to sort a collection of jobs in decreasing order of their run times as specified by the Pegasus.JOB_RUN_TIME property.- Returns:
-
getClusteredDAG
public ADag getClusteredDAG() throws ClustererException
Returns the clustered workflow.- Specified by:
getClusteredDAG
in interfaceClusterer
- Returns:
- the
ADag
object corresponding to the clustered workflow. - Throws:
ClustererException
- in case of error.
-
description
public java.lang.String description()
Returns a textual description of the transfer implementation.- Specified by:
description
in interfaceClusterer
- Returns:
- a short textual description
-
logRefinerAction
protected void logRefinerAction(AggregatedJob clusteredJob, JobAggregator aggregator)
Records the refiner action into the Provenace Store as a XML fragment.- Parameters:
clusteredJob
- the clustered jobaggregator
- the aggregator that was used to create this clustered job
-
appendAttribute
protected void appendAttribute(java.lang.StringBuffer xmlFeed, java.lang.String key, java.lang.String value)
Appends an xml attribute to the xml feed.- Parameters:
xmlFeed
- the xmlFeed to which xml is being writtenkey
- the attribute keyvalue
- the attribute value
-
getCollapseFactor
public int[] getCollapseFactor(java.lang.String pool, Job job, int size)
Returns the collapse factor, that is used to chunk up the jobs of a particular type on a pool. The collapse factor is determined by getting the collapse key in the Pegasus namespace/profile associated with the job in the transformation catalog. Right now tc overrides the property from the one in the properties file that specifies per pool. There are two orthogonal notions of bundling and collapsing. In case the bundle key is specified, it ends up overriding the collapse key, and the bundle value is used to generate the collapse values.- Parameters:
pool
- the pool where the chunking up is occuringjob
- theJob
object containing the job that is to be chunked up together.size
- the number of jobs that refer to the same logical transformation and are scheduled on the same execution pool.- Returns:
- int array of size 4 where int[0] is the the collapse factor int[1] is the number of jobs for whom collapsing is int[0] + 1. int [2] is maximum time for which the clustered job should run. int [3] is time for which the single job would run.
-
constructID
public java.lang.String constructID(java.lang.String partitionID, int id)
Given an integer id, returns a string id that is used for the clustered job.- Parameters:
partitionID
- the id of the partition.id
- the integer id from which the string id has to be constructed. The id should be unique for all the clustered jobs that are formed for a particular partition.- Returns:
- the id of the clustered job
-
updateReplacementTable
private void updateReplacementTable(java.util.List jobs, Job mergedJob)
Updates the replacement table.- Parameters:
jobs
- the List of jobs that is being replaced.mergedJob
- the mergedJob that is replacing the jobs in the list.
-
assimilateJobs
private void assimilateJobs()
Puts the jobs in the abstract workflow into the job that is index by the logical name of the jobs.
-
constructMap
private java.util.Map constructMap(java.lang.String propValue)
Constructs a map with the numbers/values for the collapsing factors to collapse the nodes of same type. The user ends up specifying these through the properties file. The value of the property is of the form poolname1=value,poolname2=value....- Parameters:
propValue
- the value of the property got from the properties file.- Returns:
- the constructed map.
-
replaceJobs
private void replaceJobs()
The relations/edges are changed in local graph structure.
-
printList
private void printList(java.util.List l)
A utility method to print short description of jobs in a list.- Parameters:
l
- the list ofJob
objects
-
-