GoでAthenaのクエリを実行する

golangawsetl

segmentio/go-athenaを使う。database/sqlのドライバーとして提供されていて、 StartQueryExecution()stateのポーリング値のキャストを行う。

package main

import (
	"database/sql"
	"errors"
	"fmt"

	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sts"
	_ "github.com/segmentio/go-athena"
)

func outputLocation() (string, error) {
	svc := sts.New(session.Must(session.NewSession()))
	result, err := svc.GetCallerIdentity(&sts.GetCallerIdentityInput{})
	if err != nil {
		return "", err
	}
	if result.Account == nil || svc.Config.Region == nil {
		return "", errors.New("account or region is nil")
	}
	return fmt.Sprintf("s3://aws-athena-query-results-%s-%s", *result.Account, *svc.Config.Region), nil
}

func execute(query string) (*sql.Rows, error) {
	loc, err := outputLocation()
	if err != nil {
		return nil, err
	}
	db, err := sql.Open("athena", fmt.Sprintf("db=default&output_location=%s", loc))
	if err != nil {
		return nil, err
	}
	return db.Query(query)
}

func main() {
	rows, err := execute("SELECT 1 as v, 2 as w FROM hoge.fuga")
	if err != nil {
		panic(err)
	}
	for rows.Next() {
		var v, w int
		rows.Scan(&v, &w)
		fmt.Println(v, w)
	}
}