BigQuery와 함께 Vertex AI에서 Ray 사용

Vertex AI에서 Ray 애플리케이션을 실행할 때 BigQuery를 클라우드 데이터베이스로 사용할 수 있습니다. 이 섹션에서는 Vertex AI의 Ray 클러스터에서 BigQuery 데이터베이스를 읽고 쓰는 방법을 설명합니다. 이 섹션의 단계에서는 Vertex AI SDK for Python을 사용한다고 가정합니다.

BigQuery 데이터 세트에서 읽으려면 새 BigQuery 데이터 세트를 만들거나 기존 데이터 세트를 사용해야 합니다.

Vertex AI 클라이언트에서 Ray 가져오기 및 초기화

Vertex AI의 Ray 클러스터에 이미 연결되어 있으면 커널을 다시 시작하고 다음 코드를 실행합니다. runtime_env 변수는 연결 시 BigQuery 명령어를 실행하기 위해 필요합니다.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.33.0"]
  }

ray.init(address=address, runtime_env=runtime_env)

BigQuery에서 데이터 읽기

BigQuery 데이터 세트에서 데이터를 읽습니다. 읽기는 Ray Task에서 수행해야 합니다.

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

각 항목의 의미는 다음과 같습니다.

  • PROJECT_ID: Google Cloud 프로젝트 ID입니다. Google Cloud 콘솔 시작 페이지에서 프로젝트 ID를 찾을 수 있습니다.

  • LOCATION: Dataset가 저장된 위치. 예를 들면 us-central1입니다.

  • DATASET: BigQuery 데이터 세트. dataset.table 형식이어야 합니다. 쿼리를 제공하려면 None으로 설정합니다.

  • PARALLELISM: 동시에 생성되는 읽기 태스크 수에 영향을 미치는 정수. 읽기 스트림이 요청한 것보다 적게 생성될 수 있습니다.

  • QUERY: BigQuery 데이터베이스에서 읽을 SQL 쿼리가 포함된 문자열. 쿼리가 필요하지 않으면 None으로 설정합니다.

데이터 변환

pyarrow 또는 pandas를 사용하여 BigQuery 테이블에서 행과 열을 업데이트하고 삭제합니다. pandas 변환을 사용하려면 입력 유형을 pyarrow로 유지하고 사용자 정의 함수(UDF) 내에서 pandas 변환 유형 오류를 포착할 수 있도록 UDF 내에서 pandas로 변환하는 것이 좋습니다. 변환은 Ray Task에서 수행해야 합니다.

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

BigQuery에 데이터 쓰기

BigQuery 데이터 세트에 데이터를 삽입합니다. 쓰기는 Ray Task에서 수행해야 합니다.

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

각 항목의 의미는 다음과 같습니다.

  • DATASET: BigQuery 데이터 세트. dataset.table 형식이어야 합니다.

다음 단계