使用 Linux 执行筛选器

Linux 计算节点上的 HPC Pack 执行筛选器简介

LINUX 计算节点上的执行筛选器在 HPC 6 月 QFE 版本中引入。 它允许群集管理员在不同的作业/任务执行阶段(根目录下)在 Linux 计算节点上执行自定义脚本。

使用执行筛选器的两种典型方案:

  • 在集成 Active Directory 的 Linux 节点上,用户名格式的格式不同。 使用执行筛选器,管理员可以在执行作业/任务之前自定义用户名格式。 例如,将用户名从“domain\user”格式转换为“domain.user”格式

  • 通常每个用户在 SMB 共享上都有自己的主文件夹。 使用执行筛选器,管理员可以提供一个脚本,以便使用用户的域帐户装载 SMB 共享,以便作业/任务可以访问共享中自己的数据,并在作业结束之前进行清理。

启用执行筛选器

可以通过在 /opt/hpcnodemanager/filters下添加脚本文件来启用 Linux 节点上的执行筛选器。 目前 HPC Pack 支持:

  • OnJobTaskStart.sh:将第一个任务调度到 Linux 节点时,将立即执行此脚本。 从当前实现开始,此脚本可能会在任务为参数扫描任务的情况下多次执行。 可能需要处理这种情况。 管理员可以利用此脚本执行节点准备,例如为用户装载共享。

  • OnTaskStart.sh:将后续任务调度到 Linux 节点时,将执行此脚本。 管理员可以利用此脚本自定义任务执行环境,例如 WorkDir。

  • OnJobEnd.sh:在 Linux 节点上结束作业时,将执行此脚本。 管理员可以利用此脚本进行一些清理。

请注意,由于计划策略/网络问题/重试,可能会为同一作业或任务多次触发筛选器。

自定义执行筛选器

执行筛选器的输入

所有执行筛选器的输入都通过 JSON 数据格式的标准输入。 下面是示例:

  1. OnJobTaskStart.sh的示例输入:

    {
        "m_Item1": {
            "JobId": 9299,
            "ResIds": [
                141
            ],
            "TaskId": 170355
        },
        "m_Item2": {
            "affinity": [
                1
            ],
            "commandLine": "echo specialword1",
            "environmentVariables": {
                "CCP_CLUSTER_NAME": "LN11-RH71-HN1",
                "CCP_COREIDS": "0",
                "CCP_EXCLUSIVE": "False",
                "CCP_ISADMIN": "1",
                "CCP_JOBID": "9299",
                "CCP_JOBNAME": "Test Basic OnJobTaskStart",
                "CCP_JOBTYPE": "Batch",
                "CCP_MPI_NETMASK": "10.156.60.0/255.255.252.0",
                "CCP_NODES": "1 LN11-RH71-LN2 1",
                "CCP_NODES_CORES": "1 LN11-RH71-LN2 1",
                "CCP_NUMCPUS": "1",
                "CCP_OWNER_SID": "S-1-5-21-1645912939-3214980066-801894016-500",
                "CCP_REQUIREDNODES": "",
                "CCP_RERUNNABLE": "False",
                "CCP_RETRY_COUNT": "0",
                "CCP_RUNTIME": "2147483647",
                "CCP_SERVICEREGISTRATION_PATH": "\\\\LN11-RH71-HN1\\HpcServiceRegistration",
                "CCP_TASKID": "1",
                "CCP_TASKINSTANCEID": "0",
                "CCP_TASKSYSTEMID": "170355",
                "HPC_RUNTIMESHARE": "\\\\LN11-RH71-HN1\\Runtime$"
            },
            "stderr": null,
            "stdin": null,
            "stdout": null,
            "taskRequeueCount": 0,
            "workingDirectory": null
        },
        "m_Item3": "hpclnpr11\\Administrator",
        "m_Item4": "Password",
        "m_Item5": null,
        "m_Item6": null
    }
    
    • m_Item1 的内容包含基本作业和任务 ID 信息。

    • m_item2 的内容包含更详细的作业信息,描述如何执行作业以及使用哪些参数。 environmentVariables 块还包含在作业/任务中定义的用户定义的 Environment Variables

    Envrs

    • m_Item3m_Item4 的内容是作业 Runas 用户的用户和密码。 请在执行筛选器使用这些信息(尤其是密码)时小心。
  2. OnTaskStart.sh的示例输入:

    {
        "m_Item1": {
            "JobId": 11274,
            "ResIds": [
                205
            ],
            "TaskId": 206059
        },
        "m_Item2": {
            "affinity": [
                1
            ],
            "commandLine": "echo specialword1",
            "environmentVariables": {
                "CCP_CLUSTER_NAME": "LN11-RH71-HN1",
                "CCP_COREIDS": "0",
                "CCP_EXCLUSIVE": "False",
                "CCP_JOBID": "11274",
                "CCP_JOBNAME": "",
                "CCP_JOBTYPE": "Batch",
                "CCP_MPI_NETMASK": "10.156.60.0/255.255.252.0",
                "CCP_NODES": "1 LN11-RH71-LN1 1",
                "CCP_NODES_CORES": "1 LN11-RH71-LN1 1",
                "CCP_NUMCPUS": "1",
                "CCP_OWNER_SID": "S-1-5-21-1645912939-3214980066-801894016-500",
                "CCP_REQUIREDNODES": "",
                "CCP_RERUNNABLE": "False",
                "CCP_RETRY_COUNT": "0",
                "CCP_RUNTIME": "2147483647",
                "CCP_SERVICEREGISTRATION_PATH": "\\\\LN11-RH71-HN1\\HpcServiceRegistration",
                "CCP_TASKID": "2",
                "CCP_TASKINSTANCEID": "0",
                "CCP_TASKSYSTEMID": "206059",
                "HPC_RUNTIMESHARE": "\\\\LN11-RH71-HN1\\Runtime$"
            },
            "stderr": null,
            "stdin": null,
            "stdout": null,
            "taskRequeueCount": 0,
            "workingDirectory": null
        }
    }
    
  3. OnJobEnd.sh的示例输入:

    {
        "JobId": 9299,
        "JobInfo": null,
        "ResIds": [
            141
        ]
    }
    

执行筛选器的输出

对于 OnJobTaskStart.shOnTaskStart.sh 筛选器,输出应通过标准输出传递回采用正式 JSON 格式的 nodemanager。 具有新作业信息的输出会影响作业行为。 OnJobEnd.sh的标准输出不会影响作业行为。 除了标准输出之外,OnJobTaskStart.shOnTaskStart.sh 筛选器应退出并退出代码 0 以指示成功运行,否则,nodemanager 将跳过启动作业/任务并将错误返回到计划程序。 可以在日志文件 nodemanager.txt中看到错误代码,有关详细信息,请参阅 排查执行筛选器故障 部分。

自定义执行筛选器

若要自定义执行筛选器,请获取和分析从脚本中的标准输入传输的 JSON 数据,使用数据实现某些函数,然后通过标准输出编写输出 JSON,以便 OnJobTaskStart.shOnTaskStart.sh 筛选器。 例如,此处我们设置了一个包含命令 OnJobTaskStart.sh 筛选器,sed –s 's/specialword/OnJobTaskStart/g'命令将采用 stdin JSON 输入数据,将字符串 specialword 替换为 OnJobTaskStart 并通过 stdout 输出:

OnJobTaskStart

然后提交作业以验证筛选器是否正常工作:

纽乔布

纽乔布

Viewjob

排查执行筛选器失败问题

作业失败时,请检查特定节点上的 nodemanager 日志。 日志将为失败的执行筛选器提供退出代码。 例如,在以下快照中,JobStartFilter 失败并显示退出代码 127。

日志

在执行筛选器中添加日志记录,并向文件提供输出。

使用执行筛选器以域用户身份运行作业,并为不同的域用户装载 SMB 共享

在 HPC 6 月 QFE 版本中,我们引入了适用于 Linux 节点的执行筛选器(此功能未公开发布)。 可以通过在 HPC Linux nodemanager 的安装文件夹 filters中添加名为“/opt/hpcnodemanager”的文件夹,并在 OnJobTaskStart.sh 文件夹中添加命名 OnTaskStart.shOnJobEnd.shfilters 来启用执行筛选器。 有关执行筛选器的详细信息,请参阅适用于 Linux的 HPC Pack 执行筛选器。

在此示例中,我们实现了以下方案:

  • HPC 管理员具有具有 Active Directory 集成 Linux 节点的 HPC 群集。
  • 为了支持某些应用程序,它们以自定义格式设置用户名,例如,在使用 SSSD 时使用 winbind 或 sssd.conf 时,通过 smb.conf 在 Linux 节点上设置“domain.username”,因此希望 HPC 在这些 Linux 节点上以正确的域用户身份运行作业。
  • 对于每个域用户,管理员已创建专用 SMB 共享,并希望这些共享可以相应装载到每个用户的主目录中的共享文件夹,因此用户可以使用保存在共享中的数据。

共享权限控制知识:

  • 有关如何在 Windows 中设置 SMB 共享的权限控制,请参阅 管理共享文件夹的权限
  • 有关如何在 Linux 端控制 SMB 共享权限,请参阅 man 8 mountman mount.cifs

配置步骤:

  1. 按照上述方案建立 SMB 共享,在此示例中,我们在 LN11-RH71-HN1的服务器上创建一个共享文件夹,并将其命名为 SmbShareDemo。 然后在文件夹 SmbShareDemo 中为不同的域用户创建子目录,例如,我们为域用户创建文件夹 mpiuser1mpiuser2mpiuser3hpclnpr11\mpiuser1hpclnpr11\mpiuser2hpclnpr11\mpiuser3 等。我们授予域用户 read/write 相应访问共享文件夹的权限,例如授予对文件夹 hpclnpr11\mpiuser1read\writempiuser1 权限:

    股票

  2. 准备执行筛选器:

    2.1 在 Linux 节点下,创建筛选器文件夹:

    FilterFolder

    2.2 请使用 chmod 700 /opt/hpcnodemanager/filters 来限制执行筛选器的权限,并确保只有管理员(root 或 sudoers)可以查看和修改执行筛选器。

    2.3 复制示例脚本 ResolveUserName.py 和 ResolveUserNameAndDoMount.py(请参阅本文末尾),并将其复制到文件夹 /opt/hpcnodemanager/filters/

    2.4 创建 OnJobTaskStart 筛选器 OnJobTaskStart.sh 以调用 python 脚本 ResolveUserNameAndDoMount.py

    OnJobTaskStart

    该脚本 ResolveUserNameAndDoMount.py 重复使用 ResolveUserName.py的逻辑来撰写所需的用户名,同时,如果尚未装载共享,该脚本会将共享命名 //[SmbShareSever]/SmbShareDemo/[UserName] 装载到非管理员用户的主目录中的共享文件夹。 (此处 [SmbShareSever] 是共享服务器,即在本示例中 LN11-RH71-HN1[UserName] 是作业运行的名称,这是提交作业时提供的用户。

    2.5 修改 ResolveUserName.py,并确保它组成正确的用户名:

    ResolveUserName

    2.6 修改 ResolveUserNameAndDoMount.py

    • 确保它可以正确导入 ResolveUserName.py,默认情况下,它们需要位于同一文件夹中

    • [SmbShareSever] 替换为环境中的 SMB 服务器名称:

      SmbShareSever

    • 默认情况下,目录将装载file_mode 0755和dir_mode 0755,请进行修改,这符合你的要求:

      MountSmbShare

    • 日志记录设施可用于故障排除,如脚本中的注释所示:

      LogWithPrefix

  3. 尝试使用 HPC 作业执行筛选器:

    3.1 检查 Linux 节点的状态、是否为 mpiuser1装载共享,以及是否可以使用域用户运行 bash 命令。

    用户

    3.2 提交具有任务 whoami的作业,然后选择执行筛选器集的节点的资源:

    SubmitJob

    SubmitJob

    3.3 检查运行是否按预期运行,并正确装载共享:

    结果

    结果

  4. 若要使筛选器可用于所有 Linux 节点,请将筛选器复制到共享,并使用 clusrun 将筛选器部署到共享中的所有 Linux 节点:

    PS > clusrun /nodegroup:LinuxNodes cp -rf <SmbSharePath>/filters  /opt/hpcnodemanager/
    

脚本作为附录

  • ResolveUserName.py

    #!/usr/bin/env python
    # Hpc Execution Filter Sample - Compose Customized Active Directory User Name
    # Introduction:
    # When it's in an Active Directly integrated Linux environment,
    # it's necessary to compose right RunAs user with different settings,
    # such as: 'winbind seperator' set in /etc/samba/smb.conf for Winbind
    # or 're_expression' set in /etc/sssd/sssd.conf for SSSD.
    # to ensure right user is used when HPC run jobs.
    #
    # In this case, we compose RunAs user, for example:
    # composedUserName = "{0}.{1}".format(domainName, userName) when Winbind Seperator set to . delimiter
    # or In SSSD, when set re_expression = ((?P<domain>.+)\.(?P<name>[^\\\.@]+$))
    #
    # Return codes:
    # 0      success
    # 1      incorrect invocation 
    
    import json
    import sys
    
    def ComposeAdUserName(domainName, userName):
        """
        Examples:
        composedUserName = "{0}@{1}".format(userName, domainName), when using userName@domainName
        """
        composedUserName = "{0}.{1}".format(domainName, userName)
        return composedUserName
    
    def Main():
        """The input is job execution context in json format."""
        jsonData = json.loads(sys.stdin.readline())
    
        """Get and compose user name, by default it's in domain\username format."""
        composedUserName = jsonData["m_Item3"]
        runAsUserInfo = composedUserName.split('\\')
        if len(runAsUserInfo) == 2:
            domainName = runAsUserInfo[0]
            userName = runAsUserInfo[1]
            composedUserName = ComposeAdUserName(domainName, userName)
    
        """Set composedUserName."""
        jsonData["m_Item3"] = composedUserName
    
        """Return the result through stdout"""
        print json.dumps(jsonData)
    
        sys.exit(0)
    
    if __name__ == '__main__':
        Main()
    
  • ResolveUserNameAndDoMount.py

    #!/usr/bin/env python
    # Hpc Execution Filter Sample - Compose Customized Active Directory User Name and Do Mount For Non-Admin Users
    # This script reuse ResolveUserName.py to compose right Active Directory user name,
    # and do mount for Non-Admin users. 
    #
    # The sample fulfils the following scenario:
    # Administrators use HPC Linux Support with Active Directory Integrated,
    # and provide SMB Shares with pattern //SmbShareBasePath/UserName for each Active Directory User to do data movement.  
    # Administrators wish to ensure the shares can be mounted for different users. 
    # In this script, the specific SMB share will be mounted to the share folder in each users' home directory, 
    # with uid and gid set correspondingly, and file_mode and dir_mode both set to 755.
    #
    # Please notice this sample script will parse the password of users for mounting, 
    # and be sure this aligns security policies before using it in production environments. 
    # 
    # Return codes:
    # This script follow command mount's return codes:
    # 0      success
    # 1      incorrect invocation or permissions
    # 2      system error (out of memory, cannot fork, no more loop devices)
    # 4      internal mount bug
    # 8      user interrupt
    # 16     problems writing or locking /etc/mtab
    # 32     mount failure
    # 64     some mount succeeded
    # For more about mount's return codes, please refer man 8 mount.
    
    import json
    import os
    import pwd
    import string
    import subprocess
    import sys
    import time
    import ResolveUserName
    
    """Define the constants."""
    SmbShareBasePath = "//[SmbShareSever]/SmbShareDemo"
    
    def MountSmbShare(smbSharePath, targetPath, domainName, userName, password, uid, gid, fileMode="0755", dirMode="0755"):
        retCode = 0
        if os.path.ismount(targetPath) == False:
            maxRetry = 3
            while(maxRetry > 0):
                retCode = Run("mount -t cifs {0} {1} -o domain={2},username={3},password='{4}',uid={5},gid={6},file_mode={7},dir_mode={8}".format(smbSharePath, targetPath, domainName, userName, password, uid, gid, fileMode, dirMode))
                """Check if succeeded, and skip the case when another process successfully mount the share."""
                if retCode == 0 or os.path.ismount(targetPath):
                    retCode = 0
                    break
                maxRetry = maxRetry - 1     
                time.sleep(1)
        return retCode
    
    """Run command facilities."""
    if not hasattr(subprocess,'check_output'):
        def check_output(*popenargs, **kwargs):
            r"""Backport from subprocess module from python 2.7"""
            if 'stdout' in kwargs:
                raise ValueError('stdout argument not allowed, it will be overridden.')
            process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs)
            output, unused_err = process.communicate()
            retcode = process.poll()
            if retcode:
                cmd = kwargs.get("args")
                if cmd is None:
                    cmd = popenargs[0]
                raise subprocess.CalledProcessError(retcode, cmd, output=output)
            return output
    
        # Exception classes used by this module.
        class CalledProcessError(Exception):
            def __init__(self, returncode, cmd, output=None):
                self.returncode = returncode
                self.cmd = cmd
                self.output = output
            def __str__(self):
                return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode)
    
        subprocess.check_output=check_output
        subprocess.CalledProcessError=CalledProcessError
    
    def Run(cmd,chk_err=True):
        retcode,out=RunGetOutput(cmd,chk_err)
        return retcode
    
    def RunGetOutput(cmd,chk_err=True):
        try:
            output=subprocess.check_output(cmd,stderr=subprocess.STDOUT,shell=True)
        except subprocess.CalledProcessError,e :
            if chk_err :
                Error('CalledProcessError.  Error Code is ' + str(e.returncode)  )
                Error('CalledProcessError.  Command result was ' + (e.output[:-1]).decode('latin-1'))
            return e.returncode,e.output.decode('latin-1')
        return 0,output.decode('latin-1')
    """End of run command facilities."""
    
    """
    Logging facilities can be removed from the script.
    Log can be used for trouble shooting, and remember to comment them out when performance is considered more important.
    """
    LocalTime = time.localtime()
    ExecutionFilterSampleLogFile = "./ExecutionFilter_ResolveUserAndMount_%04u%02u%02u-%02u%02u%02u.log" % (LocalTime.tm_year, LocalTime.tm_mon, LocalTime.tm_mday, LocalTime.tm_hour, LocalTime.tm_min, LocalTime.tm_sec)
    def LogWithPrefix(prefix, message):
        t = time.localtime()
        t = "%04u/%02u/%02u %02u:%02u:%02u " % (t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec)
        t += prefix
        for line in message.split('\n'):
            line = t + line
            line = filter(lambda x : x in string.printable, line)
            try:
                with open(ExecutionFilterSampleLogFile, "a") as F :
                    F.write(line.encode('ascii','ignore') + "\n")
            except IOError, e:
                pass
    
    def Log(message):
        LogWithPrefix("INFO: ", message)
    
    def Error(message):
        LogWithPrefix("ERROR: ", message)
    
    def Warn(message):
        LogWithPrefix("WARNING: ", message)
    """End of logging facilities."""
    
    def Main():
        retCode = 0
    
        """The input is job execution context in json format."""
        jsonData = json.loads(sys.stdin.readline())
        try:
            """Get user name, by default it's in domain\username format."""
            composedUserName = jsonData["m_Item3"]
            runAsUserInfo = composedUserName.split('\\')
            if len(runAsUserInfo) < 2:
                Error("Illegal input runAsUser: {0}, be sure the input is hpc job context in json format.".format(composedUserName))
                sys.exit(1)
            domainName = runAsUserInfo[0]
            userName = runAsUserInfo[1]
    
            """Resolve right Active Directory user name."""
            composedUserName = ResolveUserName.ComposeAdUserName(domainName, userName)
    
            """Query if the user is admin, and mount for Non-Admin users."""
            isAdmin = "0"
            try:
                isAdmin = jsonData["m_Item2"]["environmentVariables"]["CCP_ISADMIN"]
            except KeyError:
                pass
    
            if isAdmin == "0":
                """Check whether user exists, touch user's home dir, and get user information."""
                retCode = Run("mkhomedir_helper {0}".format(composedUserName))
                if retCode != 0:
                    Error("No such user: {0}, or home directory for this user cannot be used or generated properly.".format(composedUserName))
                    sys.exit(1)
    
                pwdInfo = pwd.getpwnam(composedUserName)
                uid = pwdInfo.pw_uid
                gid = pwdInfo.pw_gid
                homeDir = pwdInfo.pw_dir
    
                """Get password, please note the risk here."""
                password = jsonData["m_Item4"]
    
                """Do mount for Non-Admin users."""
                smbSharePath = "{0}/{1}".format(SmbShareBasePath, userName)
                targetPath = "{0}/share".format(homeDir)
                retCode = Run("mkdir -p {0}".format(targetPath))
                if retCode != 0:
                    Error("Cannot find and create mount target path: {0}".format(targetPath))
                    sys.exit(1)
    
                retCode = MountSmbShare(smbSharePath, targetPath, domainName, userName, password, uid, gid)
    
            """Set composedUserName."""
            jsonData["m_Item3"] = composedUserName
    
        except KeyError:
            """Please check whether the script is used correctly."""
            Error("Please check whether the script is used correctly, and ensure it get right format job context json.")
            retCode = 1
    
        """Return the result through stdout."""
        print json.dumps(jsonData)
    
        #Log("ExecutionFitler finished with retCode:{0}".format(retCode))
        sys.exit(retCode)
    
    if __name__ == '__main__':
        Main()