BigQuery には SQL または JavaScript で定義できる User-defined functions やパラメータを受け取れる view のようにはたらく Table functions (TVF) のほかに、Athena の UDF のように Cloud Run や Cloud Run functions (旧 Cloud Functions 2nd gen) を呼び出す remote functions があって、任意のライブラリやサービスを用いて処理を行うことができる。
Athenaのデータソースコネクタとユーザー定義関数(UDF)を実装する - sambaiz-net
CREATE FUNCTION (project).(dataset).add(x FLOAT64, y FLOAT64)
RETURNS FLOAT64
LANGUAGE js
AS r"""
return x+y;
""";
CREATE FUNCTION (project).(dataset).remote_add(x INT64, y INT64)
RETURNS INT64
REMOTE WITH (project).us.my_connection
OPTIONS (
endpoint = 'ENDPOINT_URL'
)
calls で入力を受け取って replies で値を返す API を実装する。
package remote_add
import (
"encoding/json"
"fmt"
"net/http"
"github.com/GoogleCloudPlatform/functions-framework-go/functions"
)
func init() {
functions.HTTP("RemoteAdd", RemoteAdd)
}
func RemoteAdd(w http.ResponseWriter, r *http.Request) {
var requestData struct {
Calls [][]int64 `json:"calls"`
}
if err := json.NewDecoder(r.Body).Decode(&requestData); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
replies := make([]int64, 0, len(requestData.Calls))
for _, call := range requestData.Calls {
if len(call) != 2 {
http.Error(w, "Invalid num of args", http.StatusBadRequest)
return
}
replies = append(replies, call[0]+call[1])
}
responseData := map[string]interface{}{
"replies": replies,
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(responseData); err != nil {
http.Error(w, "Failed to encode response", http.StatusInternalServerError)
return
}
}
$ gcloud functions deploy remote-add-function \
--gen2 \
--runtime=go122 \
--region=us-central1 \
--source=. \
--entry-point=RemoteAdd \
--trigger-http \
--ingress-settings=internal-only
...
url: https://us-central1-*****.cloudfunctions.net/remote-add-function
google_bigquery_connection を作成し、google_bigquery_routine で関数を定義する。
variable "project_id" {
type = string
}
variable "region" {
type = string
}
variable "dataset_id" {
type = string
}
variable "function_endpoint" {
type = string
}
provider "google" {
project = var.project_id
region = var.region
}
resource "google_bigquery_connection" "my_connection" {
connection_id = "my_connection"
location = "US"
cloud_resource {}
}
resource "google_bigquery_routine" "remote_function" {
dataset_id = var.dataset_id
routine_id = "remote_add"
routine_type = "SCALAR_FUNCTION"
definition_body = ""
arguments {
name = "X"
data_type = "{\"typeKind\": \"INT64\"}"
}
arguments {
name = "Y"
data_type = "{\"typeKind\": \"INT64\"}"
}
return_type = "{\"typeKind\" : \"INT64\"}"
remote_function_options {
endpoint = var.function_endpoint
connection = google_bigquery_connection.my_connection.name
max_batching_rows = "10"
}
}
関数を呼び出す。