Share via


Using Linux Execution Filter

Introduction to HPC Pack Execution Filter on Linux Compute Nodes

Execution filter on Linux compute nodes is introduced in the HPC's June QFE release. It allows cluster admin to plugin customized scripts to be executed (under root) on Linux compute node during different stage of job/task execution.

Two typical scenarios using execution filters:

  • On linux nodes with Active Directory integrated, username format is in different format. With execution filter, admin can customize the username format before the job/task get executed. For example, translate the username from "domain\user" format to "domain.user" format

  • Usually every user has his own home folder on an SMB share. With execution filter, admin can provide a script to mount the SMB share with user's domain account so that the job/task can access his own data in the share and do clean up before job get ended.

Enable Execution Filters

Execution filter on Linux Nodes can be enabled by adding script files under /opt/hpcnodemanager/filters. Currently HPC Pack supports:

  • OnJobTaskStart.sh: This script will be executed immediately when the first task is dispatched to the Linux Node. As of the currently implementation this script might be executed multiple time in case the task is parametric sweep task. You may need to handle this situation. Admin can leverage this script to do node preparation such as mount a share for the user.

  • OnTaskStart.sh: This script will be executed when follow-up task being dispatched to the Linux Node. Admin can leverage this script to customize the task execution environment such as a WorkDir.

  • OnJobEnd.sh: This script will be executed when the job is ended on the Linux Node. Admin can leverage this script to do some clean up.

Please note the filters may be triggered for multiple times for the same job or task due to scheduling policy/network issues/retries.

Customize Execution Filters

Input for the execution filters

The input for all execution filters are through standard input in JSON data format. The followings are examples:

  1. Sample input for OnJobTaskStart.sh:

    {
        "m_Item1": {
            "JobId": 9299,
            "ResIds": [
                141
            ],
            "TaskId": 170355
        },
        "m_Item2": {
            "affinity": [
                1
            ],
            "commandLine": "echo specialword1",
            "environmentVariables": {
                "CCP_CLUSTER_NAME": "LN11-RH71-HN1",
                "CCP_COREIDS": "0",
                "CCP_EXCLUSIVE": "False",
                "CCP_ISADMIN": "1",
                "CCP_JOBID": "9299",
                "CCP_JOBNAME": "Test Basic OnJobTaskStart",
                "CCP_JOBTYPE": "Batch",
                "CCP_MPI_NETMASK": "10.156.60.0/255.255.252.0",
                "CCP_NODES": "1 LN11-RH71-LN2 1",
                "CCP_NODES_CORES": "1 LN11-RH71-LN2 1",
                "CCP_NUMCPUS": "1",
                "CCP_OWNER_SID": "S-1-5-21-1645912939-3214980066-801894016-500",
                "CCP_REQUIREDNODES": "",
                "CCP_RERUNNABLE": "False",
                "CCP_RETRY_COUNT": "0",
                "CCP_RUNTIME": "2147483647",
                "CCP_SERVICEREGISTRATION_PATH": "\\\\LN11-RH71-HN1\\HpcServiceRegistration",
                "CCP_TASKID": "1",
                "CCP_TASKINSTANCEID": "0",
                "CCP_TASKSYSTEMID": "170355",
                "HPC_RUNTIMESHARE": "\\\\LN11-RH71-HN1\\Runtime$"
            },
            "stderr": null,
            "stdin": null,
            "stdout": null,
            "taskRequeueCount": 0,
            "workingDirectory": null
        },
        "m_Item3": "hpclnpr11\\Administrator",
        "m_Item4": "Password",
        "m_Item5": null,
        "m_Item6": null
    }
    
    • The contents for m_Item1 contains basic job and task id information.

    • The contents for m_item2 contains more detailed job information describing how the job would be executed, and with which parameters. The environmentVariables block also contains the user defined Environment Variables which defined in the job/task:

    Envrs

    • The contents for m_Item3 and m_Item4 are the user and password of the Runas user of the job. Please take care when your execution filter uses these information (especially the password).
  2. Sample input for OnTaskStart.sh:

    {
        "m_Item1": {
            "JobId": 11274,
            "ResIds": [
                205
            ],
            "TaskId": 206059
        },
        "m_Item2": {
            "affinity": [
                1
            ],
            "commandLine": "echo specialword1",
            "environmentVariables": {
                "CCP_CLUSTER_NAME": "LN11-RH71-HN1",
                "CCP_COREIDS": "0",
                "CCP_EXCLUSIVE": "False",
                "CCP_JOBID": "11274",
                "CCP_JOBNAME": "",
                "CCP_JOBTYPE": "Batch",
                "CCP_MPI_NETMASK": "10.156.60.0/255.255.252.0",
                "CCP_NODES": "1 LN11-RH71-LN1 1",
                "CCP_NODES_CORES": "1 LN11-RH71-LN1 1",
                "CCP_NUMCPUS": "1",
                "CCP_OWNER_SID": "S-1-5-21-1645912939-3214980066-801894016-500",
                "CCP_REQUIREDNODES": "",
                "CCP_RERUNNABLE": "False",
                "CCP_RETRY_COUNT": "0",
                "CCP_RUNTIME": "2147483647",
                "CCP_SERVICEREGISTRATION_PATH": "\\\\LN11-RH71-HN1\\HpcServiceRegistration",
                "CCP_TASKID": "2",
                "CCP_TASKINSTANCEID": "0",
                "CCP_TASKSYSTEMID": "206059",
                "HPC_RUNTIMESHARE": "\\\\LN11-RH71-HN1\\Runtime$"
            },
            "stderr": null,
            "stdin": null,
            "stdout": null,
            "taskRequeueCount": 0,
            "workingDirectory": null
        }
    }
    
  3. Sample input for OnJobEnd.sh:

    {
        "JobId": 9299,
        "JobInfo": null,
        "ResIds": [
            141
        ]
    }
    

Output of execution filter

For OnJobTaskStart.sh and OnTaskStart.sh filters, the output should be passed through standard output back to the nodemanager in formal JSON format. The output with new job information will influence job behavior. And OnJobEnd.sh's standard output will not affect job behavior. Besides the standard output, OnJobTaskStart.sh and OnTaskStart.sh filters should exit with exit code 0 to indicate a success run, otherwise, the nodemanager will skip starting the job/task and return an error to scheduler. The error code can be seen in the log file nodemanager.txt, please refer to Troubleshoot Execution Filters failure section for details.

Customize execution filter

To customize the execution filters, please get and parse the JSON data transferred from standard input in the scripts, use the data to fulfill some functions, and then compose the output JSON through standard output for OnJobTaskStart.sh and OnTaskStart.sh filters. For example, here we set a OnJobTaskStart.sh filter with command, sed –s 's/specialword/OnJobTaskStart/g', the command will take stdin JSON input data, replace string specialword with OnJobTaskStart and output through stdout:

OnJobTaskStart

Then submit a job to validate if the filter works:

Newjob

Newjob

Viewjob

Troubleshoot Execution Filters failure

When job get failed, check the nodemanager log on the specific node. The log will give exit code for the failed execution filter. For example, in the following snapshot, JobStartFilter failed with exit code 127.

Log

Add loggings in the execution filter, and give output to the files.

Using Execution Filter to Run Job as Domain Users and Mount SMB Shares for Different Domain Users

In HPC's June QFE release, we introduced Execution Filter for Linux Nodes (This feature is not publicly available). Execution filter can be enabled by adding folder named filters in HPC Linux nodemanager's installation folder /opt/hpcnodemanager, and hook scripts naming OnJobTaskStart.sh, OnTaskStart.sh or OnJobEnd.sh in the filters folder. For more information about execution filter, please refer to HPC Pack Execution Filter for Linux.

In this sample, we fulfill the following scenario:

  • HPC Administrator has HPC clusters with Active Directory integrated Linux Nodes.
  • To support some applications, they set the username in customized format, for example "domain.username" on the Linux nodes through smb.conf when using winbind or sssd.conf when using SSSD, and thus wishes HPC to run jobs as the right domain user on these Linux nodes.
  • For each domain user, administrator has created dedicated SMB shares and wishes these shares can be mounted to the share folder in each user's the home directory correspondingly, and thus users can consume data they saved in the share.

Knowledge for share's permission control:

  • For how to set an SMB share's permission control in Windows, please refer to Managing Permissions for Shared Folders
  • For how SMB share's permission are controlled in Linux side, please refer to man 8 mount or man mount.cifs

Configuration Steps:

  1. Establish the SMB share as the scenario described, in this sample we create a share folder on sever LN11-RH71-HN1, naming it SmbShareDemo. And then create subdirectories in the folder SmbShareDemo for different domain users, for example we create folder mpiuser1, mpiuser2, mpiuser3 for domain users hpclnpr11\mpiuser1, hpclnpr11\mpiuser2, hpclnpr11\mpiuser3 and etc. We grant the domain user read/write permission to access the share folder correspondingly, for example grant hpclnpr11\mpiuser1 read\write permission to folder mpiuser1:

    Shares

  2. Prepare the Execution Filters:

    2.1 Under the Linux node, create filters folder:

    FilterFolder

    2.2 Please use chmod 700 /opt/hpcnodemanager/filters to restrict the permissions of execution filters, and make sure only admins (root or sudoers) can view and modify the execution filters.

    2.3 Copy the sample scripts ResolveUserName.py and ResolveUserNameAndDoMount.py (See in end of this article), and copy them into folder /opt/hpcnodemanager/filters/.

    2.4 Create OnJobTaskStart filter OnJobTaskStart.sh to call python script ResolveUserNameAndDoMount.py:

    OnJobTaskStart

    The script ResolveUserNameAndDoMount.py reuses ResolveUserName.py's logic to compose the wished username, and at the same time, it will mount the share naming //[SmbShareSever]/SmbShareDemo/[UserName] to the share folder in users' home directory for non-administrator users if the share hasn't been mounted yet. (Here [SmbShareSever] is the share sever, i.e. LN11-RH71-HN1 in this example, [UserName] is the name of job run as user which is provided when the job is submitted.)

    2.5 Modify the ResolveUserName.py, and make sure it composed the right user name:

    ResolveUserName

    2.6 Modify the ResolveUserNameAndDoMount.py

    • Make sure it can import ResolveUserName.py correctly, by default they need to be in the same folder

    • Replace the [SmbShareSever] with the SMB sever name in your environment:

      SmbShareSever

    • By default, the directory would be mounted with file_mode 0755 and dir_mode 0755, please do modification which aligns your requirements:

      MountSmbShare

    • The logging facilities can be used for troubleshooting, as the comments in the script:

      LogWithPrefix

  3. Try the Execution Filters with an HPC Job:

    3.1 Check the status of the Linux nodes, whether share mounted for mpiuser1, and if bash command can be run using domain users.

    User

    3.2 Submit job with task whoami, and Select Resource to the node with execution filter set:

    SubmitJob

    SubmitJob

    3.3 Check if the run as user is as expected, and the share is mounted properly:

    Result

    Result

  4. To make the filters available to all the Linux nodes, please copy the filters to the share, and deploy the filters to all the Linux nodes from the share using clusrun:

    PS > clusrun /nodegroup:LinuxNodes cp -rf <SmbSharePath>/filters  /opt/hpcnodemanager/
    

Scripts as appendix

  • ResolveUserName.py

    #!/usr/bin/env python
    # Hpc Execution Filter Sample - Compose Customized Active Directory User Name
    # Introduction:
    # When it's in an Active Directly integrated Linux environment,
    # it's necessary to compose right RunAs user with different settings,
    # such as: 'winbind seperator' set in /etc/samba/smb.conf for Winbind
    # or 're_expression' set in /etc/sssd/sssd.conf for SSSD.
    # to ensure right user is used when HPC run jobs.
    #
    # In this case, we compose RunAs user, for example:
    # composedUserName = "{0}.{1}".format(domainName, userName) when Winbind Seperator set to . delimiter
    # or In SSSD, when set re_expression = ((?P<domain>.+)\.(?P<name>[^\\\.@]+$))
    #
    # Return codes:
    # 0      success
    # 1      incorrect invocation 
    
    import json
    import sys
    
    def ComposeAdUserName(domainName, userName):
        """
        Examples:
        composedUserName = "{0}@{1}".format(userName, domainName), when using userName@domainName
        """
        composedUserName = "{0}.{1}".format(domainName, userName)
        return composedUserName
    
    def Main():
        """The input is job execution context in json format."""
        jsonData = json.loads(sys.stdin.readline())
    
        """Get and compose user name, by default it's in domain\username format."""
        composedUserName = jsonData["m_Item3"]
        runAsUserInfo = composedUserName.split('\\')
        if len(runAsUserInfo) == 2:
            domainName = runAsUserInfo[0]
            userName = runAsUserInfo[1]
            composedUserName = ComposeAdUserName(domainName, userName)
    
        """Set composedUserName."""
        jsonData["m_Item3"] = composedUserName
    
        """Return the result through stdout"""
        print json.dumps(jsonData)
    
        sys.exit(0)
    
    if __name__ == '__main__':
        Main()
    
  • ResolveUserNameAndDoMount.py

    #!/usr/bin/env python
    # Hpc Execution Filter Sample - Compose Customized Active Directory User Name and Do Mount For Non-Admin Users
    # This script reuse ResolveUserName.py to compose right Active Directory user name,
    # and do mount for Non-Admin users. 
    #
    # The sample fulfils the following scenario:
    # Administrators use HPC Linux Support with Active Directory Integrated,
    # and provide SMB Shares with pattern //SmbShareBasePath/UserName for each Active Directory User to do data movement.  
    # Administrators wish to ensure the shares can be mounted for different users. 
    # In this script, the specific SMB share will be mounted to the share folder in each users' home directory, 
    # with uid and gid set correspondingly, and file_mode and dir_mode both set to 755.
    #
    # Please notice this sample script will parse the password of users for mounting, 
    # and be sure this aligns security policies before using it in production environments. 
    # 
    # Return codes:
    # This script follow command mount's return codes:
    # 0      success
    # 1      incorrect invocation or permissions
    # 2      system error (out of memory, cannot fork, no more loop devices)
    # 4      internal mount bug
    # 8      user interrupt
    # 16     problems writing or locking /etc/mtab
    # 32     mount failure
    # 64     some mount succeeded
    # For more about mount's return codes, please refer man 8 mount.
    
    import json
    import os
    import pwd
    import string
    import subprocess
    import sys
    import time
    import ResolveUserName
    
    """Define the constants."""
    SmbShareBasePath = "//[SmbShareSever]/SmbShareDemo"
    
    def MountSmbShare(smbSharePath, targetPath, domainName, userName, password, uid, gid, fileMode="0755", dirMode="0755"):
        retCode = 0
        if os.path.ismount(targetPath) == False:
            maxRetry = 3
            while(maxRetry > 0):
                retCode = Run("mount -t cifs {0} {1} -o domain={2},username={3},password='{4}',uid={5},gid={6},file_mode={7},dir_mode={8}".format(smbSharePath, targetPath, domainName, userName, password, uid, gid, fileMode, dirMode))
                """Check if succeeded, and skip the case when another process successfully mount the share."""
                if retCode == 0 or os.path.ismount(targetPath):
                    retCode = 0
                    break
                maxRetry = maxRetry - 1     
                time.sleep(1)
        return retCode
    
    """Run command facilities."""
    if not hasattr(subprocess,'check_output'):
        def check_output(*popenargs, **kwargs):
            r"""Backport from subprocess module from python 2.7"""
            if 'stdout' in kwargs:
                raise ValueError('stdout argument not allowed, it will be overridden.')
            process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs)
            output, unused_err = process.communicate()
            retcode = process.poll()
            if retcode:
                cmd = kwargs.get("args")
                if cmd is None:
                    cmd = popenargs[0]
                raise subprocess.CalledProcessError(retcode, cmd, output=output)
            return output
    
        # Exception classes used by this module.
        class CalledProcessError(Exception):
            def __init__(self, returncode, cmd, output=None):
                self.returncode = returncode
                self.cmd = cmd
                self.output = output
            def __str__(self):
                return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode)
    
        subprocess.check_output=check_output
        subprocess.CalledProcessError=CalledProcessError
    
    def Run(cmd,chk_err=True):
        retcode,out=RunGetOutput(cmd,chk_err)
        return retcode
    
    def RunGetOutput(cmd,chk_err=True):
        try:
            output=subprocess.check_output(cmd,stderr=subprocess.STDOUT,shell=True)
        except subprocess.CalledProcessError,e :
            if chk_err :
                Error('CalledProcessError.  Error Code is ' + str(e.returncode)  )
                Error('CalledProcessError.  Command result was ' + (e.output[:-1]).decode('latin-1'))
            return e.returncode,e.output.decode('latin-1')
        return 0,output.decode('latin-1')
    """End of run command facilities."""
    
    """
    Logging facilities can be removed from the script.
    Log can be used for trouble shooting, and remember to comment them out when performance is considered more important.
    """
    LocalTime = time.localtime()
    ExecutionFilterSampleLogFile = "./ExecutionFilter_ResolveUserAndMount_%04u%02u%02u-%02u%02u%02u.log" % (LocalTime.tm_year, LocalTime.tm_mon, LocalTime.tm_mday, LocalTime.tm_hour, LocalTime.tm_min, LocalTime.tm_sec)
    def LogWithPrefix(prefix, message):
        t = time.localtime()
        t = "%04u/%02u/%02u %02u:%02u:%02u " % (t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec)
        t += prefix
        for line in message.split('\n'):
            line = t + line
            line = filter(lambda x : x in string.printable, line)
            try:
                with open(ExecutionFilterSampleLogFile, "a") as F :
                    F.write(line.encode('ascii','ignore') + "\n")
            except IOError, e:
                pass
    
    def Log(message):
        LogWithPrefix("INFO: ", message)
    
    def Error(message):
        LogWithPrefix("ERROR: ", message)
    
    def Warn(message):
        LogWithPrefix("WARNING: ", message)
    """End of logging facilities."""
    
    def Main():
        retCode = 0
    
        """The input is job execution context in json format."""
        jsonData = json.loads(sys.stdin.readline())
        try:
            """Get user name, by default it's in domain\username format."""
            composedUserName = jsonData["m_Item3"]
            runAsUserInfo = composedUserName.split('\\')
            if len(runAsUserInfo) < 2:
                Error("Illegal input runAsUser: {0}, be sure the input is hpc job context in json format.".format(composedUserName))
                sys.exit(1)
            domainName = runAsUserInfo[0]
            userName = runAsUserInfo[1]
    
            """Resolve right Active Directory user name."""
            composedUserName = ResolveUserName.ComposeAdUserName(domainName, userName)
    
            """Query if the user is admin, and mount for Non-Admin users."""
            isAdmin = "0"
            try:
                isAdmin = jsonData["m_Item2"]["environmentVariables"]["CCP_ISADMIN"]
            except KeyError:
                pass
    
            if isAdmin == "0":
                """Check whether user exists, touch user's home dir, and get user information."""
                retCode = Run("mkhomedir_helper {0}".format(composedUserName))
                if retCode != 0:
                    Error("No such user: {0}, or home directory for this user cannot be used or generated properly.".format(composedUserName))
                    sys.exit(1)
    
                pwdInfo = pwd.getpwnam(composedUserName)
                uid = pwdInfo.pw_uid
                gid = pwdInfo.pw_gid
                homeDir = pwdInfo.pw_dir
    
                """Get password, please note the risk here."""
                password = jsonData["m_Item4"]
    
                """Do mount for Non-Admin users."""
                smbSharePath = "{0}/{1}".format(SmbShareBasePath, userName)
                targetPath = "{0}/share".format(homeDir)
                retCode = Run("mkdir -p {0}".format(targetPath))
                if retCode != 0:
                    Error("Cannot find and create mount target path: {0}".format(targetPath))
                    sys.exit(1)
    
                retCode = MountSmbShare(smbSharePath, targetPath, domainName, userName, password, uid, gid)
    
            """Set composedUserName."""
            jsonData["m_Item3"] = composedUserName
    
        except KeyError:
            """Please check whether the script is used correctly."""
            Error("Please check whether the script is used correctly, and ensure it get right format job context json.")
            retCode = 1
    
        """Return the result through stdout."""
        print json.dumps(jsonData)
    
        #Log("ExecutionFitler finished with retCode:{0}".format(retCode))
        sys.exit(retCode)
    
    if __name__ == '__main__':
        Main()