Class TransferEngine


  • public class TransferEngine
    extends Engine
    The transfer engine, which on the basis of the pools on which the jobs are to run, adds nodes to transfer the data products.
    Version:
    $Revision$
    Author:
    Karan Vahi, Gaurang Mehta
    • Field Detail

      • DELETED_JOBS_LEVEL

        public static final int DELETED_JOBS_LEVEL
        The MAX level is assigned as the level for deleted jobs. We can put it to Integer.MAX_VALUE, but it is rare that number of levels in a workflows exceed 1000.
        See Also:
        Constant Field Values
      • WORKFLOW_CACHE_FILE_IMPLEMENTOR

        public static final java.lang.String WORKFLOW_CACHE_FILE_IMPLEMENTOR
        The name of the Replica Catalog Implementer that is used to write out the workflow cache file in the submit directory.
        See Also:
        Constant Field Values
      • WORKFLOW_CACHE_REPLICA_CATALOG_KEY

        public static final java.lang.String WORKFLOW_CACHE_REPLICA_CATALOG_KEY
        The name of the source key for Replica Catalog Implementer that serves as cache
        See Also:
        Constant Field Values
      • SRM_PROPERTIES_PREFIX

        public static final java.lang.String SRM_PROPERTIES_PREFIX
        The property prefix for retrieving SRM properties.
        See Also:
        Constant Field Values
      • SRM_SERVICE_URL_PROPERTIES_SUFFIX

        public static final java.lang.String SRM_SERVICE_URL_PROPERTIES_SUFFIX
        The suffix to retrive the service url for SRM server.
        See Also:
        Constant Field Values
      • SRM_MOUNT_POINT_PROPERTIES_SUFFIX

        public static final java.lang.String SRM_MOUNT_POINT_PROPERTIES_SUFFIX
        The suffix to retrive the mount point for SRM server.
        See Also:
        Constant Field Values
      • REFINER_NAME

        public static final java.lang.String REFINER_NAME
        The name of the refiner for purposes of error logging
        See Also:
        Constant Field Values
      • mSRMServiceURLToMountPointMap

        private java.util.Map<java.lang.String,​NameValue> mSRMServiceURLToMountPointMap
        A map that associates the site name with the SRM server url and mount point.
      • mDag

        private ADag mDag
        The DAG object to which the transfer nodes are to be added. This is the reduced Dag, which is got from the Reduction Engine.
      • mReplicaSelector

        private ReplicaSelector mReplicaSelector
        The handle to the replica selector that is to used to select the various replicas.
      • mTXRefiner

        private Refiner mTXRefiner
        The handle to the transfer refiner that adds the transfer nodes into the workflow.
      • mDeletedJobs

        private java.util.List<Job> mDeletedJobs
        Holds all the jobs deleted by the reduction algorithm.
      • mPlannerCache

        private PlannerCache mPlannerCache
        A SimpleFile Replica Catalog, that tracks all the files that are being materialized as part of workflow executaion.
      • mWorkflowCache

        private ReplicaCatalog mWorkflowCache
        A Replica Catalog, that tracks all the GET URL's for the files on the staging sites.
      • mFactory

        private org.griphyn.vdl.euryale.FileFactory mFactory
        The handle to the file factory, that is used to create the top level directories for each of the partitions.
      • mOutputMapper

        private OutputMapper mOutputMapper
        Handle to an OutputMapper that tells what
      • mWorkDir

        protected java.lang.String mWorkDir
        The working directory relative to the mount point of the execution pool. It is populated from the pegasus.dir.exec property from the properties file. If not specified then it work_dir is supposed to be the exec mount point of the execution pool.
      • mUseSymLinks

        protected boolean mUseSymLinks
        This member variable if set causes the destination URL for the symlink jobs to have symlink:// url if the pool attributed associated with the pfn is same as a particular jobs execution pool.
      • mWorkerNodeExecution

        private boolean mWorkerNodeExecution
        A boolean indicating whether we are doing worker node execution or not.
      • mPlannerOptions

        private PlannerOptions mPlannerOptions
        The planner options passed to the planner
      • mBypassStagingForInputs

        private boolean mBypassStagingForInputs
        A boolean indicating whether to bypass first level staging for inputs
      • mSetupForCondorIO

        private final boolean mSetupForCondorIO
        A boolean to track whether condor file io is used for the workflow or not.
      • mOutputSite

        private final java.lang.String mOutputSite
        The output site where files need to be staged to.
    • Constructor Detail

      • TransferEngine

        public TransferEngine​(ADag reducedDag,
                              PegasusBag bag,
                              java.util.List<Job> deletedJobs,
                              java.util.List<Job> deletedLeafJobs)
        Overloaded constructor.
        Parameters:
        reducedDag - the reduced workflow.
        bag - bag of initialization objects
        deletedJobs - list of all jobs deleted by reduction algorithm.
        deletedLeafJobs - list of deleted leaf jobs by reduction algorithm.
    • Method Detail

      • runTransferOnLocalSite

        public boolean runTransferOnLocalSite​(java.lang.String site,
                                              java.lang.String destinationURL,
                                              int type)
        Returns whether to run a transfer job on local site or not.
        Parameters:
        site - the site handle associated with the destination URL.
        destPutURL - the destination URL
        type - the type of transfer job for which the URL is being constructed.
        Returns:
        true indicating if the associated transfer job should run on local site or not.
      • addTransferNodes

        public void addTransferNodes​(ReplicaCatalogBridge rcb,
                                     PlannerCache plannerCache)
        Adds the transfer nodes to the workflow.
        Parameters:
        rcb - the bridge to the ReplicaCatalog.
        plannerCache - an instance of the replica catalog that will store the locations of the files on the remote sites.
      • getStagingSite

        public java.lang.String getStagingSite​(Job job)
        Returns the staging site to be used for a job. If a staging site is not determined from the options it is set to be the execution site for the job
        Parameters:
        job - the job for which to determine the staging site
        Returns:
        the staging site
      • getDeletedFileTX

        private java.util.Vector getDeletedFileTX​(java.lang.String pool,
                                                  Job job)
        This gets the file transfer objects corresponding to the location of files found in the replica mechanism, and transfers it to the output pool asked by the user. If the output pool path and the one returned by the replica mechanism match then that object is not transferred.
        Parameters:
        pool - this the output pool which the user specifies at runtime.
        job - The Job object corresponding to the leaf job which was deleted by the Reduction algorithm
        Returns:
        Vector of FileTransfer objects
      • processParents

        private void processParents​(Job job,
                                    java.util.Collection<GraphNode> parents)
        It processes a nodes parents and determines if nodes are to be added or not. All the input files for the job are searched in the output files of the parent nodes and the Replica Mechanism.
        Parameters:
        job - the Job object containing all the details of the job.
        parents - list GraphNode ojbects corresponding to the parent jobs of the job.
      • getFileTX

        private java.util.Vector getFileTX​(java.lang.String destPool,
                                           Job job,
                                           boolean localTransfer)
        This gets the Vector of FileTransfer objects for the files which have to be transferred to an one destination pool. It checks for the transient flags for files. If the transfer transient flag is set, it means the file does not have to be transferred to the destination pool.
        Parameters:
        destSiteHandle - The pool to which the files are to be transferred to.
        job - The Jobobject of the job whose output files are needed at the destination pool.
        localTransfer - boolean indicating that associated transfer job will run on local site.
        Returns:
        Vector of FileTransfer objects
      • constructFileTX

        private FileTransfer constructFileTX​(PegasusFile pf,
                                             Job job,
                                             java.lang.String destSiteHandle,
                                             java.lang.String path,
                                             boolean localTransfer)
        Constructs the FileTransfer object on the basis of the transiency information. If the transient flag for transfer is set, the destPutURL for the FileTransfer object would be the execution directory, as this is the entry that has to be registered in the ReplicaMechanism
        Parameters:
        pf - the PegasusFile for which the transfer has to be done.
        stagingSiteHandle - the staging site at which file is placed after execution.
        destSiteHandle - the output pool where the job should be transferred
        job - the name of the associated job.
        path - the path that a user specifies in the profile for key remote_initialdir that results in the workdir being changed for a job on a execution pool.
        localTransfer - boolean indicating that associated transfer job will run on local site.
        Returns:
        the corresponding FileTransfer object
      • constructRegistrationURL

        private java.lang.String constructRegistrationURL​(java.lang.String site,
                                                          java.lang.String lfn)
        Constructs a Registration URL for a LFN
        Parameters:
        site - the site handle
        lfn - the LFN for which the URL needs to be constructed
        Returns:
        the URL
      • poolNotFoundMsg

        private java.lang.String poolNotFoundMsg​(java.lang.String poolName,
                                                 java.lang.String universe)
        This generates a error message for pool not found in the pool config file.
        Parameters:
        poolName - the name of pool that is not found.
        universe - the condor universe
        Returns:
        the message.
      • getInterpoolFileTX

        private java.util.Collection<FileTransfer>[] getInterpoolFileTX​(Job job,
                                                                        java.util.Collection<GraphNode> parents)
        This gets the Vector of FileTransfer objects for all the files which have to be transferred to the destination pool in case of Interpool transfers. Each FileTransfer object has the source and the destination URLs. the source URI is determined from the pool on which the jobs are executed.
        Parameters:
        job - the job with reference to which interpool file transfers need to be determined.
        parents - Collection of GraphNode ojbects corresponding to the parent jobs of the job.
        Returns:
        array of Collection of FileTransfer objects
      • getFilesFromRC

        private void getFilesFromRC​(DAGJob job,
                                    java.util.Collection searchFiles)
        Special Handling for a DAGJob for retrieving files from the Replica Catalog.
        Parameters:
        job - the DAGJob
        searchFiles - file that need to be looked in the Replica Catalog.
      • getFilesFromRC

        private void getFilesFromRC​(DAXJob job,
                                    java.util.Collection searchFiles)
        Special Handling for a DAXJob for retrieving files from the Replica Catalog.
        Parameters:
        job - the DAXJob
        searchFiles - file that need to be looked in the Replica Catalog.
      • getFilesFromRC

        private void getFilesFromRC​(Job job,
                                    java.util.Collection searchFiles)
        It looks up the RCEngine Hashtable to lookup the locations for the files and add nodes to transfer them. If a file is not found to be in the Replica Catalog the Transfer Engine flags an error and exits
        Parameters:
        job - the Jobobject for whose ipfile have to search the Replica Mechanism for.
        searchFiles - Vector containing the PegasusFile objects corresponding to the files that need to have their mapping looked up from the Replica Mechanism.
      • replaceSourceProtocolFromURL

        protected ReplicaCatalogEntry replaceSourceProtocolFromURL​(ReplicaCatalogEntry rce)
        Replaces the SRM URL scheme from the url, and replaces it with the file url scheme and returns in a new object if replacement happens. The original object passed as a parameter still remains the same.
        Parameters:
        rce - the ReplicaCatalogEntry object whose url need to be replaced.
        Returns:
        the object with the url replaced.
      • replaceProtocolFromURL

        protected java.lang.String replaceProtocolFromURL​(java.lang.String pfn)
        Replaces the gsiftp URL scheme from the url, and replaces it with the symlink url scheme and returns in a new object. The original object passed as a parameter still remains the same.
        Parameters:
        pfn - the pfn that needs to be replaced
        Returns:
        the replaced PFN
      • constructSiteToSRMServerMap

        private java.util.Map<java.lang.String,​NameValue> constructSiteToSRMServerMap​(PegasusProperties props)
        Constructs a Properties objects by parsing the relevant SRM pegasus properties. For example, if users have the following specified in properties file
         pegasus.transfer.srm.ligo-cit.service.url          srm://osg-se.ligo.caltech.edu:10443/srm/v2/server?SFN=/mnt/hadoop
         pegasus.transfer.srm.ligo-cit.service.mountpoint   /mnt/hadoop
         
        then, a Map is create the associates ligo-cit with NameValue object containing the service url and mount point ( ).
        Parameters:
        props - the PegasusProperties object
        Returns:
        Map that maps a site name to a NameValue object that has the URL prefix and the mount point
      • getOutputFiles

        private java.util.Set<PegasusFile> getOutputFiles​(java.util.Collection<GraphNode> nodes)
        It gets the output files for all the nodes which are specified in the nodes passed.
        Parameters:
        nodes - List containing the jobs
        Returns:
        Set of PegasusFile objects
      • trackInCaches

        private void trackInCaches​(Job job)
        Tracks the files created by a job in the both the planner and workflow cache The planner cache stores the put URL's and the GET URL is stored in the workflow cache.
        Parameters:
        job - the job whose input files need to be tracked.
      • trackInPlannerCache

        private void trackInPlannerCache​(java.lang.String lfn,
                                         java.lang.String pfn,
                                         java.lang.String site)
        Inserts an entry into the planner cache as a put URL.
        Parameters:
        lfn - the logical name of the file.
        pfn - the pfn
        site - the site handle
      • trackInPlannerCache

        private void trackInPlannerCache​(java.lang.String lfn,
                                         java.lang.String pfn,
                                         java.lang.String site,
                                         FileServerType.OPERATION type)
        Inserts an entry into the planner cache as a put URL.
        Parameters:
        lfn - the logical name of the file.
        pfn - the pfn
        site - the site handle
        type - the type of url
      • trackInWorkflowCache

        private void trackInWorkflowCache​(java.lang.String lfn,
                                          java.lang.String pfn,
                                          java.lang.String site)
        Inserts an entry into the workflow cache that is to be written out to the submit directory.
        Parameters:
        lfn - the logical name of the file.
        pfn - the pfn
        site - the site handle
      • getURLOnSharedScratch

        private java.lang.String getURLOnSharedScratch​(SiteCatalogEntry entry,
                                                       Job job,
                                                       FileServerType.OPERATION operation,
                                                       java.lang.String lfn)
        Returns a URL on the shared scratch of the staging site
        Parameters:
        entry - the SiteCatalogEntry for the associated stagingsite
        job - the job
        operation - the FileServer operation for which we need the URL
        lfn - the LFN can be null to get the path to the directory
        Returns:
        the URL
      • complainForScratchFileServer

        private void complainForScratchFileServer​(Job job,
                                                  FileServerType.OPERATION operation,
                                                  java.lang.String site)
        Complains for a missing head node file server on a site for a job
        Parameters:
        job - the job
        operation - the operation
        site - the site
      • complainForScratchFileServer

        private void complainForScratchFileServer​(java.lang.String jobname,
                                                  FileServerType.OPERATION operation,
                                                  java.lang.String site)
        Complains for a missing head node file server on a site for a job
        Parameters:
        jobname - the name of the job
        operation - the file server operation
        site - the site
      • initializeWorkflowCacheFile

        private ReplicaCatalog initializeWorkflowCacheFile​(ADag dag)
        Initializes a Replica Catalog Instance that is used to store the GET URL's for all files on the staging site ( inputs staged and outputs created ).
        Parameters:
        dag - the workflow being planned
        Returns:
        handle to transient catalog
      • getCacheFileName

        private java.lang.String getCacheFileName​(ADag adag)
        Constructs the basename to the cache file that is to be used to log the transient files. The basename is dependant on whether the basename prefix has been specified at runtime or not.
        Parameters:
        adag - the ADag object containing the workflow that is being concretized.
        Returns:
        the name of the cache file
      • bypassStagingForInputFile

        private boolean bypassStagingForInputFile​(ReplicaCatalogEntry entry,
                                                  PegasusFile file,
                                                  java.lang.String computeSite)
        Returns a boolean indicating whether to bypass first level staging for a file or not
        Parameters:
        entry - a ReplicaCatalogEntry matching the selected replica location.
        file - the corresponding Pegasus File object
        computeSite - the compute site where the associated job will run.
        isExecutable - whether the file transferred is an executable file or not
        Returns:
        boolean indicating whether we need to enable bypass or not
      • logRemoval

        private void logRemoval​(Job job,
                                PegasusFile file,
                                java.lang.String prefix,
                                boolean removed)
        Helped method for logging removal message. If removed is true, then logged on debug else logged as warning.
        Parameters:
        job - the job
        file - the file to be removed
        prefix - prefix for log message
        removed - whether removal was successful or not.