List dataflow jobs filter by name and date in python

What is the python API equivalent of the CLI command to list the jobs 

 

gcloud dataflow jobs list --filter="$job_name" --region="${region}" --format=json --created-after=-p1d --sort-by="~stateTime"
0 2 67
2 REPLIES 2

The equivalent Python API code to list the Dataflow jobs with similar filtering options as the gcloud dataflow jobs list command would look something like this:

 

from googleapiclient.discovery import build
from datetime import datetime, timedelta
import logging

def list_dataflow_jobs(project_id, region, job_name=None, job_filter='ACTIVE'):
    """
    Lists Dataflow jobs matching the given criteria.
    """

    # Initialize the Dataflow client
    dataflow = build('dataflow', 'v1b3')

    # Calculate the creation time filter (1 day ago)
    created_after = (datetime.utcnow() - timedelta(days=1)).isoformat() + 'Z'

    try:
        request = dataflow.projects().locations().jobs().list(
            projectId=project_id,
            location=region,
            filter=job_filter  # Filter for job states (e.g., ACTIVE, TERMINATED)
        )

        jobs = []
        while request is not None:
            response = request.execute()

            # Filter jobs by name (if provided) and created time
            filtered_jobs = [
                job for job in response.get('jobs', [])
                if (job_name is None or job['name'] == job_name) and job['createTime'] >= created_after
            ]
            jobs.extend(filtered_jobs)

            # Handle pagination if necessary
            request = dataflow.projects().locations().jobs().list_next(previous_request=request, previous_response=response)

        # Sort by currentStateTime (if available) in descending order
        jobs.sort(key=lambda x: x.get('currentStateTime'), reverse=True)

        return jobs

    except Exception as e:
        logging.error(f"An error occurred: {e}")
        return []

# Example usage
project_id = 'your-project-id'
region = 'your-region'
job_name = 'your-job-name'

jobs = list_dataflow_jobs(project_id, region, job_name)

for job in jobs:
    print(job)

 

Hi @ramkrishnamI,

In addition to @ms4446 , you can also refer to the following documentation:

I hope the above information is helpful.