BigQuery から Cloud Run functions を呼び出して任意のライブラリやサービスを用いた処理を行う

gcpterraform

BigQuery には SQL または JavaScript で定義できる User-defined functions やパラメータを受け取れる view のようにはたらく Table functions (TVF) のほかに、Athena の UDF のように Cloud Run や Cloud Run functions (旧 Cloud Functions 2nd gen) を呼び出す remote functions があって、任意のライブラリやサービスを用いて処理を行うことができる。

Athenaのデータソースコネクタとユーザー定義関数(UDF)を実装する - sambaiz-net

CREATE FUNCTION (project).(dataset).add(x FLOAT64, y FLOAT64)
RETURNS FLOAT64
LANGUAGE js
AS r"""
  return x+y;
""";

CREATE FUNCTION (project).(dataset).remote_add(x INT64, y INT64)
RETURNS INT64
REMOTE WITH (project).us.my_connection
OPTIONS (
  endpoint = 'ENDPOINT_URL'
)

calls で入力を受け取って replies で値を返す API を実装する。

package remote_add

import (
	"encoding/json"
	"fmt"
	"net/http"

	"github.com/GoogleCloudPlatform/functions-framework-go/functions"
)

func init() {
	functions.HTTP("RemoteAdd", RemoteAdd)
}

func RemoteAdd(w http.ResponseWriter, r *http.Request) {
	var requestData struct {
		Calls [][]int64 `json:"calls"`
	}

	if err := json.NewDecoder(r.Body).Decode(&requestData); err != nil {
		http.Error(w, "Invalid JSON", http.StatusBadRequest)
		return
	}

	replies := make([]int64, 0, len(requestData.Calls))

	for _, call := range requestData.Calls {
		if len(call) != 2 {
			http.Error(w, "Invalid num of args", http.StatusBadRequest)
			return
		}

		replies = append(replies, call[0]+call[1])
	}

	responseData := map[string]interface{}{
		"replies": replies,
	}

	w.Header().Set("Content-Type", "application/json")
	if err := json.NewEncoder(w).Encode(responseData); err != nil {
		http.Error(w, "Failed to encode response", http.StatusInternalServerError)
		return
	}
}

BigQueryと同じリージョンデプロイする。

 $ gcloud functions deploy remote-add-function \
    --gen2 \
    --runtime=go122 \
    --region=us-central1 \
    --source=. \
    --entry-point=RemoteAdd \
    --trigger-http \
    --ingress-settings=internal-only
 
...
url: https://us-central1-*****.cloudfunctions.net/remote-add-function

google_bigquery_connection を作成し、google_bigquery_routine で関数を定義する。

variable "project_id" {
  type = string
}

variable "region" {
  type = string
}

variable "dataset_id" {
  type = string
}

variable "function_endpoint" {
  type = string
}

provider "google" {
  project = var.project_id
  region  = var.region
}

resource "google_bigquery_connection" "my_connection" {
  connection_id = "my_connection"
  location      = "US"
  cloud_resource {}
}

resource "google_bigquery_routine" "remote_function" {
  dataset_id      = var.dataset_id
  routine_id      = "remote_add"
  routine_type    = "SCALAR_FUNCTION"
  definition_body = ""

  arguments {
    name      = "X"
    data_type = "{\"typeKind\": \"INT64\"}"
  }

  arguments {
    name      = "Y"
    data_type = "{\"typeKind\": \"INT64\"}"
  }

  return_type = "{\"typeKind\" :  \"INT64\"}"

  remote_function_options {
    endpoint          = var.function_endpoint
    connection        = google_bigquery_connection.my_connection.name
    max_batching_rows = "10"
  }
}

関数を呼び出す。