使用 Workflows 运行批处理作业


Batch 是一项全代管式服务,可让您在 Compute Engine 虚拟机 (VM) 实例上安排批处理工作负载、将批处理工作负载排入队列并执行批处理工作负载。Batch 为您的集群预配资源和管理容量, 让您可以大规模运行批量工作负载。

借助Workflows,您可以执行自己希望 您需要按照您定义的订单 Workflows 语法

在本教程中,您将使用 适用于批处理的作业流连接器安排和运行一个批处理作业,该作业将在两个 Compute Engine 虚拟机上并行执行六项任务。同时使用批处理和工作流可以让您结合使用它们提供的优势,并高效地预配和编排整个流程。

目标

在此教程中,您将学习以下操作:

  1. 为 Docker 容器映像创建 Artifact Registry 制品库。
  2. 从 GitHub 获取批处理工作负载的代码:一个示例程序,以 1 万个批量生成素数。
  3. 为工作负载构建 Docker 映像。
  4. 部署并执行执行以下操作的工作流:
    1. 创建一个 Cloud Storage 存储桶来存储素数生成器的结果。
    2. 安排和运行一个批处理作业,该作业会在两个 Compute Engine 虚拟机上将 Docker 容器作为六个任务并行运行。
    3. (可选)在批处理作业完成后将其删除。
  5. 确认结果符合预期,并且生成的大量素数已存储在 Cloud Storage 中。

您可以在 Google Cloud 控制台中运行以下大部分命令,也可以运行所有命令 在终端或 Cloud Shell 中使用 Google Cloud CLI 运行命令。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

控制台

  1. 在 Google Cloud 控制台的“项目选择器”页面上,选择或创建 Google Cloud 项目

    前往项目选择器

  2. 确保您的 Google Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

  3. 启用 Artifact Registry、Batch、Cloud Build Compute Engine、工作流执行 API 和 Workflows API。

    启用 API

  4. 为您的工作流创建用于身份验证的服务账号 其他 Google Cloud 服务并为其授予适当的角色:

    1. 在 Google Cloud 控制台中,进入创建服务账号页面。

      转到“创建服务账号”

    2. 选择您的项目。

    3. 服务账号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务账号 ID 字段。

      服务账号说明字段中,输入说明。例如 Service account for tutorial

    4. 点击创建并继续

    5. 选择角色列表中,过滤出要授予您在上一步中创建的用户管理型服务账号的以下角色:

      • 批量作业编辑器:用于修改批量作业。
      • Logs Writer:用于写入日志。
      • Storage Admin:用于控制 Cloud Storage 资源。

      如需添加其他角色,请点击 添加其他角色,然后添加其他各个角色。

    6. 点击继续

    7. 如需完成账号的创建过程,请点击完成

  5. 授予默认 IAM Service Account User 角色 与上一步骤中创建的用户代管式服务账号相关联 操作。启用 Compute Engine API 后,默认服务账号是 Compute Engine 默认服务账号 (PROJECT_NUMBER-compute@developer.gserviceaccount.com),并且权限通常通过 roles/iam.serviceAccountUser 角色分配。

    1. 服务账号页面上,点击 默认服务账号 (PROJECT_NUMBER-compute@developer.gserviceaccount.com).

    2. 点击权限标签页。

    3. 点击 授予访问权限按钮。

    4. 如需添加新的主账号,请输入您的服务账号的电子邮件地址 (SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com).

    5. 选择角色列表中,依次选择服务账号 > Service Account User 角色。

    6. 点击保存

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 确保您的 Google Cloud 项目已启用结算功能。 了解如何检查项目是否已启用结算功能

  3. 启用 Artifact Registry API、Batch API、Cloud Build API、Compute Engine Workflow Executions API 和 Workflows API。

    gcloud services enable artifactregistry.googleapis.com \
      batch.googleapis.com \
      cloudbuild.googleapis.com \
      compute.googleapis.com \
      workflowexecutions.googleapis.com \
      workflows.googleapis.com
  4. 为您的工作流创建一个服务账号,供其向其他 Google Cloud 服务进行身份验证,并向其授予适当的角色。

    1. 创建服务账号:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      SERVICE_ACCOUNT_NAME 替换为服务账号的名称。

    2. 将角色授予您在 上一步。针对以下各项运行一次命令 IAM 角色:

      • roles/batch.jobsEditor:用于修改批量作业。
      • roles/logging.logWriter:用于写入日志。
      • roles/storage.admin:用于控制 Cloud Storage 资源。
      gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE

      替换以下内容:

      • PROJECT_ID:您创建的项目 ID 服务账号
      • ROLE:要授予的角色
  5. 将默认服务账号的 IAM Service Account User 角色授予您在上一步中创建的用户管理的服务账号。启用 Compute Engine API 后,默认服务账号是 Compute Engine 默认服务账号 (PROJECT_NUMBER-compute@developer.gserviceaccount.com),并且权限通常通过 roles/iam.serviceAccountUser 角色分配。

    PROJECT_NUMBER=$(gcloud projects describe PROJECT_ID --format='value(projectNumber)')
    gcloud iam service-accounts add-iam-policy-binding \
      $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
      --member=serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --role=roles/iam.serviceAccountUser

创建 Artifact Registry 仓库

创建一个仓库来存储您的 Docker 容器映像。

控制台

  1. 在 Google Cloud 控制台中,前往代码库页面。

    前往制品库

  2. 点击 创建代码库

  3. 输入 containers 作为代码库名称。

  4. 格式字段中,选择 Docker

  5. 对于位置类型,选择区域

  6. 区域列表中,选择 us-central1

  7. 点击创建

gcloud

运行以下命令:

  gcloud artifacts repositories create containers \
    --repository-format=docker \
    --location=us-central1

您已在 us-central1 区域中创建了一个名为 containers 的 Artifact Registry 代码库。如需详细了解支持的区域,请参阅 Artifact Registry 位置

获取代码示例

Google Cloud 将本教程的应用源代码存储在 GitHub。您可以克隆该代码库或下载示例。

  1. 将示例应用代码库克隆到本地机器:

    git clone https://github.com/GoogleCloudPlatform/batch-samples.git
    

    或者,您也可以下载 main.zip 文件中的示例并将其解压缩。

  2. 转到包含示例代码的目录:

    cd batch-samples/primegen
    

现在,您已拥有开发环境中应用的源代码。

使用 Cloud Build 构建 Docker 映像

Dockerfile 包含构建 Docker 映像所需的信息 Cloud Build运行以下命令进行构建:

gcloud builds submit \
  -t us-central1-docker.pkg.dev/PROJECT_ID/containers/primegen-service:v1 PrimeGenService/

PROJECT_ID 替换为您的 Google Cloud 项目 ID。

构建完成后,您应该会看到如下所示的输出:

DONE
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ID: a54818cc-5d14-467b-bfda-5fc9590af68c
CREATE_TIME: 2022-07-29T01:48:50+00:00
DURATION: 48S
SOURCE: gs://project-name_cloudbuild/source/1659059329.705219-17aee3a424a94679937a7200fab15bcf.tgz
IMAGES: us-central1-docker.pkg.dev/project-name/containers/primegen-service:v1
STATUS: SUCCESS

您使用 Dockerfile 构建了一个名为 primegen-service 的 Docker 映像,并将该映像推送到了名为 containers 的 Artifact Registry 代码库。

部署用于安排和运行批量作业的工作流

以下工作流安排并运行了 作为 6 个任务在两个 Compute Engine 虚拟机上并行运行。通过 会生成六个质数,这些质数存储在 Cloud Storage 存储桶。

控制台

  1. 在 Google Cloud 控制台中,前往工作流页面。

    进入 Workflows

  2. 点击 创建

  3. 输入新工作流的名称,例如 batch-workflow

  4. 区域列表中,选择 us-central1

  5. 选择您之前创建的服务账号

  6. 点击下一步

  7. 在工作流编辑器中,为工作流输入以下定义:

    YAML

    main:
      params: [args]
      steps:
        - init:
            assign:
              - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - region: "us-central1"
              - imageUri: ${region + "-docker.pkg.dev/" + projectId + "/containers/primegen-service:v1"}
              - jobId: ${"job-primegen-" + string(int(sys.now()))}
              - bucket: ${projectId + "-" + jobId}
        - createBucket:
            call: googleapis.storage.v1.buckets.insert
            args:
              query:
                project: ${projectId}
              body:
                name: ${bucket}
        - logCreateBucket:
            call: sys.log
            args:
              data: ${"Created bucket " + bucket}
        - logCreateBatchJob:
            call: sys.log
            args:
              data: ${"Creating and running the batch job " + jobId}
        - createAndRunBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.create
            args:
                parent: ${"projects/" + projectId + "/locations/" + region}
                jobId: ${jobId}
                body:
                  taskGroups:
                    taskSpec:
                      runnables:
                        - container:
                            imageUri: ${imageUri}
                          environment:
                            variables:
                              BUCKET: ${bucket}
                    # Run 6 tasks on 2 VMs
                    taskCount: 6
                    parallelism: 2
                  logsPolicy:
                    destination: CLOUD_LOGGING
            result: createAndRunBatchJobResponse
        # You can delete the batch job or keep it for debugging
        - logDeleteBatchJob:
            call: sys.log
            args:
              data: ${"Deleting the batch job " + jobId}
        - deleteBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.delete
            args:
                name: ${"projects/" + projectId + "/locations/" + region + "/jobs/" + jobId}
            result: deleteResult
        - returnResult:
            return:
              jobId: ${jobId}
              bucket: ${bucket}

    JSON

    {
      "main": {
        "params": [
          "args"
        ],
        "steps": [
          {
            "init": {
              "assign": [
                {
                  "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                },
                {
                  "region": "us-central1"
                },
                {
                  "imageUri": "${region + \"-docker.pkg.dev/\" + projectId + \"/containers/primegen-service:v1\"}"
                },
                {
                  "jobId": "${\"job-primegen-\" + string(int(sys.now()))}"
                },
                {
                  "bucket": "${projectId + \"-\" + jobId}"
                }
              ]
            }
          },
          {
            "createBucket": {
              "call": "googleapis.storage.v1.buckets.insert",
              "args": {
                "query": {
                  "project": "${projectId}"
                },
                "body": {
                  "name": "${bucket}"
                }
              }
            }
          },
          {
            "logCreateBucket": {
              "call": "sys.log",
              "args": {
                "data": "${\"Created bucket \" + bucket}"
              }
            }
          },
          {
            "logCreateBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Creating and running the batch job \" + jobId}"
              }
            }
          },
          {
            "createAndRunBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.create",
              "args": {
                "parent": "${\"projects/\" + projectId + \"/locations/\" + region}",
                "jobId": "${jobId}",
                "body": {
                  "taskGroups": {
                    "taskSpec": {
                      "runnables": [
                        {
                          "container": {
                            "imageUri": "${imageUri}"
                          },
                          "environment": {
                            "variables": {
                              "BUCKET": "${bucket}"
                            }
                          }
                        }
                      ]
                    },
                    "taskCount": 6,
                    "parallelism": 2
                  },
                  "logsPolicy": {
                    "destination": "CLOUD_LOGGING"
                  }
                }
              },
              "result": "createAndRunBatchJobResponse"
            }
          },
          {
            "logDeleteBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Deleting the batch job \" + jobId}"
              }
            }
          },
          {
            "deleteBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.delete",
              "args": {
                "name": "${\"projects/\" + projectId + \"/locations/\" + region + \"/jobs/\" + jobId}"
              },
              "result": "deleteResult"
            }
          },
          {
            "returnResult": {
              "return": {
                "jobId": "${jobId}",
                "bucket": "${bucket}"
              }
            }
          }
        ]
      }
    }
    
  8. 点击部署

gcloud

  1. 为您的工作流创建源代码文件:

    touch batch-workflow.JSON_OR_YAML

    JSON_OR_YAML 替换为 yamljson,具体取决于工作流的格式。

  2. 在文本编辑器中,将以下工作流复制到源代码文件中:

    YAML

    main:
      params: [args]
      steps:
        - init:
            assign:
              - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - region: "us-central1"
              - imageUri: ${region + "-docker.pkg.dev/" + projectId + "/containers/primegen-service:v1"}
              - jobId: ${"job-primegen-" + string(int(sys.now()))}
              - bucket: ${projectId + "-" + jobId}
        - createBucket:
            call: googleapis.storage.v1.buckets.insert
            args:
              query:
                project: ${projectId}
              body:
                name: ${bucket}
        - logCreateBucket:
            call: sys.log
            args:
              data: ${"Created bucket " + bucket}
        - logCreateBatchJob:
            call: sys.log
            args:
              data: ${"Creating and running the batch job " + jobId}
        - createAndRunBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.create
            args:
                parent: ${"projects/" + projectId + "/locations/" + region}
                jobId: ${jobId}
                body:
                  taskGroups:
                    taskSpec:
                      runnables:
                        - container:
                            imageUri: ${imageUri}
                          environment:
                            variables:
                              BUCKET: ${bucket}
                    # Run 6 tasks on 2 VMs
                    taskCount: 6
                    parallelism: 2
                  logsPolicy:
                    destination: CLOUD_LOGGING
            result: createAndRunBatchJobResponse
        # You can delete the batch job or keep it for debugging
        - logDeleteBatchJob:
            call: sys.log
            args:
              data: ${"Deleting the batch job " + jobId}
        - deleteBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.delete
            args:
                name: ${"projects/" + projectId + "/locations/" + region + "/jobs/" + jobId}
            result: deleteResult
        - returnResult:
            return:
              jobId: ${jobId}
              bucket: ${bucket}

    JSON

    {
      "main": {
        "params": [
          "args"
        ],
        "steps": [
          {
            "init": {
              "assign": [
                {
                  "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                },
                {
                  "region": "us-central1"
                },
                {
                  "imageUri": "${region + \"-docker.pkg.dev/\" + projectId + \"/containers/primegen-service:v1\"}"
                },
                {
                  "jobId": "${\"job-primegen-\" + string(int(sys.now()))}"
                },
                {
                  "bucket": "${projectId + \"-\" + jobId}"
                }
              ]
            }
          },
          {
            "createBucket": {
              "call": "googleapis.storage.v1.buckets.insert",
              "args": {
                "query": {
                  "project": "${projectId}"
                },
                "body": {
                  "name": "${bucket}"
                }
              }
            }
          },
          {
            "logCreateBucket": {
              "call": "sys.log",
              "args": {
                "data": "${\"Created bucket \" + bucket}"
              }
            }
          },
          {
            "logCreateBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Creating and running the batch job \" + jobId}"
              }
            }
          },
          {
            "createAndRunBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.create",
              "args": {
                "parent": "${\"projects/\" + projectId + \"/locations/\" + region}",
                "jobId": "${jobId}",
                "body": {
                  "taskGroups": {
                    "taskSpec": {
                      "runnables": [
                        {
                          "container": {
                            "imageUri": "${imageUri}"
                          },
                          "environment": {
                            "variables": {
                              "BUCKET": "${bucket}"
                            }
                          }
                        }
                      ]
                    },
                    "taskCount": 6,
                    "parallelism": 2
                  },
                  "logsPolicy": {
                    "destination": "CLOUD_LOGGING"
                  }
                }
              },
              "result": "createAndRunBatchJobResponse"
            }
          },
          {
            "logDeleteBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Deleting the batch job \" + jobId}"
              }
            }
          },
          {
            "deleteBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.delete",
              "args": {
                "name": "${\"projects/\" + projectId + \"/locations/\" + region + \"/jobs/\" + jobId}"
              },
              "result": "deleteResult"
            }
          },
          {
            "returnResult": {
              "return": {
                "jobId": "${jobId}",
                "bucket": "${bucket}"
              }
            }
          }
        ]
      }
    }
    
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy batch-workflow \
      --source=batch-workflow.yaml \
      --location=us-central1 \
      --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    SERVICE_ACCOUNT_NAME 替换为 之前创建的服务账号

执行工作流

执行某个工作流会运行与该工作流关联的当前工作流定义。

控制台

  1. 在 Google Cloud 控制台中,前往工作流页面。

    进入 Workflows

  2. Workflows(工作流)页面上,点击 batch-workflow(批处理工作流)工作流以转到其详情页面。

  3. 工作流详情页面上,点击 执行

  4. 再次点击执行

    工作流执行过程应该需要几分钟时间。

  5. 输出窗格中查看工作流的结果。

    结果应如下所示:

    {
      "bucket": "project-name-job-primegen-TIMESTAMP",
      "jobId": "job-primegen-TIMESTAMP"
    }
    

gcloud

  1. 执行工作流:

    gcloud workflows run batch-workflow \
      --location=us-central1

    工作流执行过程应该需要几分钟时间。

  2. 您可以检查长时间运行的执行操作的状态

  3. 如需获取最后一次完成的执行的状态,请运行以下命令:

    gcloud workflows executions describe-last

    结果应������如下所示:

    name: projects/PROJECT_NUMBER/locations/us-central1/workflows/batch-workflow/executions/EXECUTION_ID
    result: '{"bucket":"project-name-job-primegen-TIMESTAMP","jobId":"job-primegen-TIMESTAMP"}'
    startTime: '2022-07-29T16:08:39.725306421Z'
    state: SUCCEEDED
    status:
      currentSteps:
      - routine: main
        step: returnResult
    workflowRevisionId: 000001-9ba
    

列出输出存储桶中的对象

要确认结果是否符合预期,您可以在 Cloud Storage 输出存储桶。

控制台

  1. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。

    进入“存储桶”

  2. 在存储分区列表中,点击您要查看其内容的存储分区的名称。

    结果应类似于以下内容,总共有 6 个文件,每个文件列出了 1 万个质数:

    primes-1-10000.txt
    primes-10001-20000.txt
    primes-20001-30000.txt
    primes-30001-40000.txt
    primes-40001-50000.txt
    primes-50001-60000.txt
    

gcloud

  1. 检索输出存储桶名称:

    gcloud storage ls

    输出类似于以下内容:

    gs://PROJECT_ID-job-primegen-TIMESTAMP/

  2. 列出输出存储桶中的对象:

    gcloud storage ls gs://PROJECT_ID-job-primegen-TIMESTAMP/** --recursive

    TIMESTAMP 替换为 上一个命令。

    输出应类似于以下内容,总共有 6 个文件,每个文件列出一批 10,000 个素数:

    gs://project-name-job-primegen-TIMESTAMP/primes-1-10000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-10001-20000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-20001-30000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-30001-40000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-40001-50000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-50001-60000.txt
    

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除本教程中创建的资源

  1. 删除批处理作业:

    1. 首先检索作业名称:

      gcloud batch jobs list --location=us-central1

      输出应类似如下所示:

      NAME: projects/project-name/locations/us-central1/jobs/job-primegen-TIMESTAMP
      STATE: SUCCEEDED
      

      其中 job-primegen-TIMESTAMP 是批处理作业的名称。

    2. 删除作业:

      gcloud batch jobs delete BATCH_JOB_NAME --location us-central1
  2. 删除工作流:

    gcloud workflows delete WORKFLOW_NAME
  3. 删除容器代码库:

    gcloud artifacts repositories delete REPOSITORY_NAME --location=us-central1
  4. Cloud Build 使用 Cloud Storage 存储构建资源。如需删除 Cloud Storage 存储桶,请参阅删除存储桶

后续步骤