Vertex AI에서 Ray 애플리케이션을 실행할 때 BigQuery를 클라우드 데이터베이스로 사용할 수 있습니다. 이 섹션에서는 Vertex AI의 Ray 클러스터에서 BigQuery 데이터베이스를 읽고 쓰는 방법을 설명합니다. 이 섹션의 단계에서는 Vertex AI SDK for Python을 사용한다고 가정합니다.
BigQuery 데이터 세트에서 읽으려면 새 BigQuery 데이터 세트를 만들거나 기존 데이터 세트를 사용해야 합니다.
Vertex AI 클라이언트에서 Ray 가져오기 및 초기화
Vertex AI의 Ray 클러스터에 이미 연결되어 있으면 커널을 다시 시작하고 다음 코드를 실행합니다. runtime_env
변수는 연결 시 BigQuery 명령어를 실행하기 위해 필요합니다.
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) runtime_env = { "pip": ["google-cloud-aiplatform[ray]","ray==2.33.0"] } ray.init(address=address, runtime_env=runtime_env)
BigQuery에서 데이터 읽기
BigQuery 데이터 세트에서 데이터를 읽습니다. 읽기는 Ray Task에서 수행해야 합니다.
aiplatform.init(project=PROJECT_ID, location=LOCATION) @ray.remote def run_remotely(): import vertex_ray dataset = DATASET parallelism = PARALLELISM query = QUERY ds = vertex_ray.data.read_bigquery( dataset=dataset, parallelism=parallelism, query=query ) ds.materialize()
각 항목의 의미는 다음과 같습니다.
PROJECT_ID: Google Cloud 프로젝트 ID입니다. Google Cloud 콘솔 시작 페이지에서 프로젝트 ID를 찾을 수 있습니다.
LOCATION:
Dataset
가 저장된 위치. 예를 들면us-central1
입니다.DATASET: BigQuery 데이터 세트.
dataset.table
형식이어야 합니다. 쿼리를 제공하려면None
으로 설정합니다.PARALLELISM: 동시에 생성되는 읽기 태스크 수에 영향을 미치는 정수. 읽기 스트림이 요청한 것보다 적게 생성될 수 있습니다.
QUERY: BigQuery 데이터베이스에서 읽을 SQL 쿼리가 포함된 문자열. 쿼리가 필요하지 않으면
None
으로 설정합니다.
데이터 변환
pyarrow
또는 pandas
를 사용하여 BigQuery 테이블에서 행과 열을 업데이트하고 삭제합니다. pandas
변환을 사용하려면 입력 유형을 pyarrow로 유지하고 사용자 정의 함수(UDF) 내에서 pandas
변환 유형 오류를 포착할 수 있도록 UDF 내에서 pandas
로 변환하는 것이 좋습니다. 변환은 Ray Task에서 수행해야 합니다.
@ray.remote def run_remotely(): # BigQuery Read first import pandas as pd import pyarrow as pa def filter_batch(table: pa.Table) -> pa.Table: df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get) # PANDAS_TRANSFORMATIONS_HERE return pa.Table.from_pandas(df) ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle() ds.materialize() # You can repartition before writing to determine the number of write blocks ds = ds.repartition(4) ds.materialize()
BigQuery에 데이터 쓰기
BigQuery 데이터 세트에 데이터를 삽입합니다. 쓰기는 Ray Task에서 수행해야 합니다.
@ray.remote def run_remotely(): # BigQuery Read and optional data transformation first dataset=DATASET vertex_ray.data.write_bigquery( ds, dataset=dataset )
각 항목의 의미는 다음과 같습니다.
- DATASET: BigQuery 데이터 세트.
dataset.table
형식이어야 합니다.