fluentdのmonitor_agent
メトリクスをjsonで返すAPIを提供する。
<source>
@type monitor_agent
bind 0.0.0.0
port 24220
</source>
$ curl localhost:24220/api/plugins.json | jq
{
"plugins": [
{
"plugin_id": "object:3f8590d8c250",
"plugin_category": "input",
"type": "forward",
"config": {
"@type": "forward",
"port": "24222",
"bind": "0.0.0.0"
},
"output_plugin": false,
"retry_count": null
},
{
"plugin_id": "object:3f8590d894c4",
"plugin_category": "input",
"type": "monitor_agent",
"config": {
"@type": "monitor_agent",
"bind": "0.0.0.0",
"port": "24220"
},
"output_plugin": false,
"retry_count": null
},
{
"plugin_id": "object:3f8590dc1f2c",
"plugin_category": "output",
"type": "file",
"config": {
"@type": "file",
"path": "/var/log/td-agent/hoge.log",
"buffer_path": "/var/log/td-agent/hoge.log.*"
},
"output_plugin": true,
"buffer_queue_length": 0,
"buffer_total_queued_size": 0,
"retry_count": 0
}
]
}
これをもとにStackdriverで異常を検知できるようにする。
Google Stackdriver
GoogleがStackdriverを買収して改造したもの。GCPだけではなくAWSのリソースも監視できる。 まだBeta。
EC2インスタンスを監視する
GCPのメニューのSTACKDRIVER -> モニタリングで、プロジェクトを指定してStackdriverアカウントを作成する。
今回はEC2で動いているfluentdを監視するので指示に従ってクロスアカウントアクセスのロールを作成、 Role ARNを入力してAWSアカウントと接続すると、 StackdriverのResouces->InstancesでCPUの使用率などは確認できるが、 EC2にAgentを入れると詳細な情報を取得できる。
GCPのメニューのサービスアカウントから接続したAWSアカウントを選択し、
Project->編集者とLogging->ログ書き込みロールのサービスアカウントを作成する。
新しい秘密鍵の提供にチェックを入れて、JSONのキーをダウンロードする。
これをEC2の/etc/google/auth/application_default_credentials.json
に置いて
chown root:root
、chmod 400
する。
Monitoring AgentとLogging Agentをインストールし、
stackdriver-collectd
とgoogle-fluentd
のプロセスがあれば正常。
curl -O https://repo.stackdriver.com/stack-install.sh
sudo bash stack-install.sh --write-gcm
curl -sSO https://dl.google.com/cloudagents/install-logging-agent.sh
sudo bash install-logging-agent.sh
メモリの使用量やTCPコネクション数などがとれていることを確認する。
Googleのドキュメントには見つからなかったが、
旧Stackdriverと同様、stackdriver_monitor: false
のタグを付けると
監視対象から外れる
っぽい。
カスタムメトリクスを送る
MetricDescriptorを作成し、これにTimeSeriesデータを書き込んでいく。
MetricDescriptorの作成
typeはcustom.googleapis.com/
から始める。
metricKind にはGAUGEのほかに変化量をとるDELTA、累積するCUMULATIVEを指定できる。
labelはフィルタリングのためのもの。
{
"name": "",
"description": "fluentd buffer_queue_length",
"displayName": "fluentd-buffer_queue_length",
"type": "custom.googleapis.com/fluentd/buffer_queue_length",
"metricKind": "GAUGE",
"valueType": "INT64",
"labels": [
{
"key": "plugin_type",
"valueType": "STRING",
"description": "The type of the plugin"
},
],
}
これをGoで登録する。
gcpのほうのprojectでProject->編集者のサービスアカウントを作成してパスを
環境変数GOOGLE_APPLICATION_CREDENTIALS
に入れて
Default Credential
にする。
必要なパッケージをgo get。
$ go get google.golang.org/api/monitoring/v3
$ go get golang.org/x/oauth2/google
func main() {
ctx := context.Background()
httpClient, err := google.DefaultClient(ctx, monitoring.CloudPlatformScope)
if err != nil {
panic(err)
}
client, err := monitoring.New(httpClient)
if err != nil {
panic(err)
}
var (
// The project on which to execute the request. The format is `"projects/{project_id_or_number}"`.
name = "projects/*****"
requestBody = &monitoring.MetricDescriptor{
Description: "fluentd buffer_queue_length",
DisplayName: "fluentd-buffer_queue_length",
Type: "custom.googleapis.com/fluentd/buffer_queue_length",
MetricKind: "GAUGE",
ValueType: "INT64",
Labels: []*monitoring.LabelDescriptor{
&monitoring.LabelDescriptor{
Key: "plugin_type",
ValueType: "STRING",
Description: "The type of the plugin",
},
},
}
)
response, err := client.Projects.MetricDescriptors.Create(name, requestBody).Context(ctx).Do()
if err != nil {
panic(err)
}
fmt.Println("done")
}
登録されたことをlistで確認する。
response, err := client.Projects.MetricDescriptors.List(name).Context(ctx).Do()
if err != nil {
panic(err)
}
for _, v := range response.MetricDescriptors {
fmt.Println(v.DisplayName)
}
API Request Count
Agent Memory Usage
Stream Space Used
...
fluentd-buffer_queue_length
...
TimeSeriesの書き込み
metricのtypeはMetricDescriptorのtypeと対応する。 pointsのendTimeはRFC3339のUTC文字列で渡す。
{
"timeSeries": [
{
"metric": {
"type": "custom.googleapis.com/fluentd/buffer_queue_length",
"labels": {
"plugin_type": "file"
}
},
"resource": {
"type": "aws_ec2_instance",
"labels": {
"project_id": "*****",
"instance_id": "*****",
"region": "aws:ap-northeast-1",
"aws_account": "*****"
}
},
"points": [
{
"interval": {
"endTime": "2016-06-01T10:00:00-04:00"
},
"value": {
"int64Value": 0
}
}
]
}
]
}
resourceのtypeは MonitoredResourceDescriptor と対応していて、 listで確認できる。
{
"resourceDescriptors": [
{
"type": "aws_ec2_instance",
"displayName": "Amazon EC2 Instance",
"description": "A VM instance in Amazon EC2.",
"labels": [
{
"key": "project_id",
"description": "The identifier of the GCP project under which data is stored for the AWS account specified in the aws_account label (e.g., my-project)."
},
{
"key": "instance_id",
"description": "The VM instance identifier assigned by AWS."
},
{
"key": "region",
"description": "The AWS region in which the VM is running. Supported AWS region values are listed by service at http://docs.aws.amazon.com/general/latest/gr/rande.html. The value supplied for this label must be prefixed with 'aws:' (for example, 'aws:us-east-1' is a valid value while 'us-east-1' is not)."
},
{
"key": "aws_account",
"description": "The AWS account number under which the VM is running."
}
]
},
...
]
}
書くコード。
func writeFluentdBufferQueueLength() error {
ctx := context.Background()
httpClient, err := google.DefaultClient(ctx, monitoring.CloudPlatformScope)
if err != nil {
return err
}
client, err := monitoring.New(httpClient)
if err != nil {
return err
}
now := time.Now().UTC().Format(time.RFC3339)
resource := &monitoring.MonitoredResource{
Type: "aws_ec2_instance",
Labels: map[string]string{
"project_id": "*****",
"instance_id": "*****",
"region": "aws:ap-northeast-1",
"aws_account": "*****",
},
}
metrics, err := fetchFluentdMetrics()
if err != nil {
return err
}
timeSeries := []*monitoring.TimeSeries{}
for _, v := range metrics.Plugins {
if v.OutputPlugin {
fmt.Printf("send %s\n", v.Type)
timeSeries = append(
timeSeries,
&monitoring.TimeSeries{
Metric: &monitoring.Metric{
Type: "custom.googleapis.com/fluentd/buffer_queue_length",
Labels: map[string]string{
"plugin_type": v.Type,
},
},
Resource: resource,
Points: []*monitoring.Point{
&monitoring.Point{
Interval: &monitoring.TimeInterval{
EndTime: now,
},
Value: &monitoring.TypedValue{
Int64Value: int64p(v.BufferQueueLength),
},
},
},
},
)
}
}
var (
// The project on which to execute the request. The format is `"projects/{project_id_or_number}"`.
name = "projects/try-stackdriver-159110"
requestBody = &monitoring.CreateTimeSeriesRequest{
TimeSeries: timeSeries,
}
)
_, err = client.Projects.TimeSeries.Create(name, requestBody).Context(ctx).Do()
if err != nil {
return err
}
fmt.Println("done")
return nil
}
const fluentdMonitorEndpoint = "http://localhost:24220/api/plugins.json"
type fluentdMetrics struct {
Plugins []fluentdMetricsPlugin `json:"plugins"`
}
type fluentdMetricsPlugin struct {
Type string `json:"type"`
OutputPlugin bool `json:"output_plugin"`
BufferQueueLength int64 `json:"buffer_queue_length"`
}
// monitor_agentからfluentdのメトリクスを取得する
func fetchFluentdMetrics() (*fluentdMetrics, error) {
resp, err := http.Get(fluentdMonitorEndpoint)
if err != nil {
return nil, err
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var ret fluentdMetrics
if err := json.Unmarshal(body, &ret); err != nil {
return nil, err
}
return &ret, nil
}
// int64 -> *int64
func int64p(n int64) *int64 {
return &n
}
これをgocronなどで定期的に実行させる。
読むコード。確認用。
func readFluentdBufferQueueLength() error {
ctx := context.Background()
httpClient, err := google.DefaultClient(ctx, monitoring.CloudPlatformScope)
if err != nil {
return err
}
client, err := monitoring.New(httpClient)
if err != nil {
return err
}
var (
// The project on which to execute the request. The format is `"projects/{project_id_or_number}"`.
name = "projects/*****"
)
start := time.Now().Add(time.Hour * -3).UTC().Format(time.RFC3339)
now := time.Now().UTC().Format(time.RFC3339)
filter := "metric.type = \"custom.googleapis.com/fluentd/buffer_queue_length\""
resp, err := client.Projects.TimeSeries.List(name).
IntervalStartTime(start).
IntervalEndTime(now).
Filter(filter).Context(ctx).Do()
if err != nil {
return err
}
for _, v := range resp.TimeSeries {
fmt.Println(v.Metric.Type)
for _, p := range v.Points {
fmt.Println(*(p.Value.Int64Value))
}
}
return nil
}
ちゃんと届いていれば Resource->Metrics Explorerでもcustom/fluentd/buffer_queue_lengthを確認できる。
これでAlertを設定できるようになった。TargetのResource TypeはCustom Metrics。