Fluentdはv1.0から標準でマルチプロセス対応され、 指定した数のWorkerで処理が行われるようになった。
<system>
workers 4
</system>
どのように実行しているのかv1.12.3
の実装を見ていく。
Supervisor
Multi Process Workersのドキュメントにもあるように、 Workerと通信するSupervisorというのが存在する。 具体的に何をしているかというとWorkerプロセスを作成するWorkerModuleを渡してサーバーを立てたり、SIGNALを送ったりしている。
# https://github.com/fluent/fluentd/blob/master/lib/fluent/supervisor.rb#L365-L377
module WorkerModule
def spawn(process_manager)
main_cmd = config[:main_cmd]
env = {
'SERVERENGINE_WORKER_ID' => @worker_id.to_i.to_s,
}
@pm = process_manager.spawn(env, *main_cmd)
end
def after_start
(config[:worker_pid] ||= {})[@worker_id] = @pm.pid
end
end
# https://github.com/fluent/fluentd/blob/v1.12.3/lib/fluent/supervisor.rb#L808-L811
se = ServerEngine.create(ServerModule, WorkerModule){
Fluent::Supervisor.load_config(@config_path, params)
}
se.run
# https://github.com/fluent/fluentd/blob/master/lib/fluent/supervisor.rb#L305-L317
def kill_worker
if config[:worker_pid]
pids = config[:worker_pid].clone
config[:worker_pid].clear
pids.each_value do |pid|
if Fluent.windows?
Process.kill :KILL, pid
else
Process.kill :TERM, pid
end
end
end
end
treasure-data/serverengine
ロバストなマルチプロセスのサーバーを実装するためのフレームワーク。
worker_type
をspawnにしているので、
MultiSpawnServerが使われる。
# https://github.com/treasure-data/serverengine/blob/v2.2.3/lib/serverengine/supervisor.rb#L90-L113
def self.create_server_proc(server_module, worker_module, config)
wt = config[:worker_type] || 'embedded'
case wt
when 'embedded'
server_class = EmbeddedServer
when 'process'
server_class = MultiProcessServer
when 'thread'
server_class = MultiThreadServer
when 'spawn'
server_class = MultiSpawnServer
else
raise ArgumentError, "unexpected :worker_type option #{wt}"
end
...
end
MultiSpawnServerはMultiWorkerServerを継承していて、常に設定した数になるようWorkerを起動し、その際にWorkerModule.spawn
が呼ばれる。
# https://github.com/treasure-data/serverengine/blob/v2.2.3/lib/serverengine/multi_worker_server.rb#L56-L62
def run
while true
num_alive = keepalive_workers
break if num_alive == 0
wait_tick
end
end
# https://github.com/treasure-data/serverengine/blob/v2.2.3/lib/serverengine/multi_worker_server.rb#L116-L121
elsif wid < @num_workers
# scale up or reboot
unless @stop
@monitors[wid] = delayed_start_worker(wid)
num_alive += 1
end
in_forward plugin
どうもWorkerに処理を割り振っている様子がないので、プラグインの方も見てみる。
in_forward
pluginはWorker数が1より大きいとき、環境変数SERVERENGINE_SOCKETMANAGER_PATH
を渡してSocketManager::Client
というのを作成し、listenしている。
# https://github.com/fluent/fluentd/blob/v1.12.3/lib/fluent/plugin/in_forward.rb#L169-L181
shared_socket = system_config.workers > 1
log.info "listening port", port: @port, bind: @bind
server_create_connection(
:in_forward_server, @port,
bind: @bind,
shared: shared_socket,
resolve_name: @resolve_hostname,
linger_timeout: @linger_timeout,
send_keepalive_packet: @send_keepalive_packet,
backlog: @backlog,
&method(:handle_connection)
)
# https://github.com/fluent/fluentd/blob/v1.12.3/lib/fluent/plugin_helper/server.rb#L343-L353
def server_socket_manager_client
socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
ServerEngine::SocketManager::Client.new(socket_manager_path)
end
def server_create_tcp_socket(shared, bind, port)
sock = if shared
server_socket_manager_client.listen_tcp(bind, port)
SocketManager
SocketManagerは複数のClientから同じポートを動的にlistenできるようにするものらしい。 ClientがあるならServerもあってSupervisorで作られている。 Clientを作るときに渡していた環境変数はここで代入されてspawn時に共有される。
# https://github.com/fluent/fluentd/blob/v1.12.3/lib/fluent/supervisor.rb#L71-L73
socket_manager_path = ServerEngine::SocketManager::Server.generate_path
ServerEngine::SocketManager::Server.open(socket_manager_path)
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s
UNIXの場合、ClientはServerとUNIXドメインソケットで通信して、 外部からのリクエストを受けるソケットのファイルディスクリプタを取得する。
# https://github.com/treasure-data/serverengine/blob/v2.2.3/lib/serverengine/socket_manager.rb#L43-L60
listen_method = case proto
when :tcp then :listen_tcp
when :udp then :listen_udp
else
raise ArgumentError, "unknown protocol: #{proto}"
end
peer = connect_peer(@path)
begin
SocketManager.send_peer(peer, [Process.pid, listen_method, bind, port])
res = SocketManager.recv_peer(peer)
if res.is_a?(Exception)
raise res
else
return send(:recv, family, proto, peer, res)
end
ensure
peer.close
end
# https://github.com/treasure-data/serverengine/blob/8bdd8ef83fa9fc932614a3c5a07c7c41591ead7d/lib/serverengine/socket_manager_unix.rb#L30-L38
def recv(family, proto, peer, sent)
server_class = case proto
when :tcp then TCPServer
when :udp then UDPSocket
else
raise ArgumentError, "invalid protocol: #{proto}"
end
peer.recv_io(server_class)
end
Serverはすでにソケットが作られていればそれを、作られていなければ新たに作って返す。
# https://github.com/treasure-data/serverengine/blob/v2.2.3/lib/serverengine/socket_manager.rb#L113-L128
def listen(proto, bind, port)
sockets, new_method = case proto
when :tcp then [@tcp_sockets, :listen_tcp_new]
when :udp then [@udp_sockets, :listen_udp_new]
else
raise ArgumentError, "invalid protocol: #{proto}"
end
key, bind_ip = resolve_bind_key(bind, port)
@mutex.synchronize do
unless sockets.has_key?(key)
sockets[key] = send(new_method, bind_ip, port)
end
return sockets[key]
end
end
挙動確認
次の設定で動かす。
$ cat fluent.conf
<system>
workers 4
</system>
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
$ docker run -it --rm --name fluentd -v $(pwd)/fluent.conf:/fluentd/etc/fluent.conf fluent/fluentd:v1.12
...
2021-04-26 13:31:18 +0000 [info]: #2 starting fluentd worker pid=18 ppid=7 worker=2
2021-04-26 13:31:18 +0000 [info]: #2 listening port port=24224 bind="0.0.0.0"
2021-04-26 13:31:18 +0000 [info]: #2 fluentd worker is now running worker=2
2021-04-26 13:31:18 +0000 [info]: #3 starting fluentd worker pid=19 ppid=7 worker=3
2021-04-26 13:31:18 +0000 [info]: #3 listening port port=24224 bind="0.0.0.0"
2021-04-26 13:31:18 +0000 [info]: #3 fluentd worker is now running worker=3
2021-04-26 13:31:18 +0000 [info]: adding source type="forward"
2021-04-26 13:31:18 +0000 [info]: #1 starting fluentd worker pid=17 ppid=7 worker=1
2021-04-26 13:31:18 +0000 [info]: #1 listening port port=24224 bind="0.0.0.0"
2021-04-26 13:31:18 +0000 [info]: #1 fluentd worker is now running worker=1
2021-04-26 13:31:18 +0000 [info]: #0 starting fluentd worker pid=16 ppid=7 worker=0
2021-04-26 13:31:18 +0000 [info]: #0 listening port port=24224 bind="0.0.0.0"
2021-04-26 13:31:18 +0000 [info]: #0 fluentd worker is now running worker=0
netstat
の後継であるss
コマンドをインストールして実行すると、複数のプロセスが同じfdでlistenしていることが確認できる。
$ docker exec --user 0 fluentd apk add iproute2
$ docker exec fluentd ss -antp
State Recv-Q Send-Q Local Address:Port Peer Address:PortProcess
LISTEN 0 128 0.0.0.0:24224 0.0.0.0:* users:(("ruby",pid=19,fd=7),("ruby",pid=18,fd=7),("ruby",pid=17,fd=7),("ruby",pid=16,fd=7),("fluentd",pid=7,fd=13))
何度かログを送ってみたところ、全てのWorkerで処理が行われてはいるが、少なくともラウンドロビンはしていない。
$ docker exec -it fluentd watch -n 1 "echo '{\"message\":\"hello\"}' | fluent-cat debug.log"
2021-04-26 16:27:54 +0000 [warn]: #0 no patterns matched tag="debug.log"
2021-04-26 16:27:56 +0000 [warn]: #0 no patterns matched tag="debug.log"
2021-04-26 16:27:57 +0000 [warn]: #2 no patterns matched tag="debug.log"
2021-04-26 16:28:00 +0000 [warn]: #0 no patterns matched tag="debug.log"
2021-04-26 16:28:06 +0000 [warn]: #0 no patterns matched tag="debug.log"
2021-04-26 16:28:07 +0000 [warn]: #3 no patterns matched tag="debug.log"
2021-04-26 16:28:09 +0000 [warn]: #3 no patterns matched tag="debug.log"
2021-04-26 16:28:10 +0000 [warn]: #2 no patterns matched tag="debug.log"
2021-04-26 16:28:13 +0000 [warn]: #1 no patterns matched tag="debug.log"
2021-04-26 16:28:24 +0000 [warn]: #0 no patterns matched tag="debug.log"
2021-04-26 16:28:25 +0000 [warn]: #3 no patterns matched tag="debug.log"
現状Fluentdはロードバランシングの仕組みを持たず、先にaccept()
したWorkerが処理するようになっていて、
そのためにLinuxでは偏りが発生する問題があるそうだ。