FluentdがどのようにMulti Process Workersで処理を実行しているのか実装を追う

fluentdlinux

Fluentdはv1.0から標準でマルチプロセス対応され、 指定した数のWorkerで処理が行われるようになった。

<system>
  workers 4
</system>

どのように実行しているのかv1.12.3の実装を見ていく。

Supervisor

Multi Process Workersのドキュメントにもあるように、 Workerと通信するSupervisorというのが存在する。 具体的に何をしているかというとWorkerプロセスを作成するWorkerModuleを渡してサーバーを立てたり、SIGNALを送ったりしている。

# https://github.com/fluent/fluentd/blob/master/lib/fluent/supervisor.rb#L365-L377
module WorkerModule
  def spawn(process_manager)
    main_cmd = config[:main_cmd]
    env = {
      'SERVERENGINE_WORKER_ID' => @worker_id.to_i.to_s,
    }
    @pm = process_manager.spawn(env, *main_cmd)
  end

  def after_start
    (config[:worker_pid] ||= {})[@worker_id] = @pm.pid
  end
end
# https://github.com/fluent/fluentd/blob/v1.12.3/lib/fluent/supervisor.rb#L808-L811
se = ServerEngine.create(ServerModule, WorkerModule){
  Fluent::Supervisor.load_config(@config_path, params)
}
se.run
# https://github.com/fluent/fluentd/blob/master/lib/fluent/supervisor.rb#L305-L317
def kill_worker
  if config[:worker_pid]
    pids = config[:worker_pid].clone
    config[:worker_pid].clear
    pids.each_value do |pid|
      if Fluent.windows?
        Process.kill :KILL, pid
      else
        Process.kill :TERM, pid
      end
    end
  end
end

treasure-data/serverengine

ロバストなマルチプロセスのサーバーを実装するためのフレームワーク。

worker_typespawnにしているので、 MultiSpawnServerが使われる。

# https://github.com/treasure-data/serverengine/blob/v2.2.3/lib/serverengine/supervisor.rb#L90-L113
def self.create_server_proc(server_module, worker_module, config)
  wt = config[:worker_type] || 'embedded'
  case wt
  when 'embedded'
    server_class = EmbeddedServer
  when 'process'
    server_class = MultiProcessServer
  when 'thread'
    server_class = MultiThreadServer
  when 'spawn'
    server_class = MultiSpawnServer
  else
    raise ArgumentError, "unexpected :worker_type option #{wt}"
  end
  ...
end

MultiSpawnServerはMultiWorkerServerを継承していて、常に設定した数になるようWorkerを起動し、その際にWorkerModule.spawnが呼ばれる。

# https://github.com/treasure-data/serverengine/blob/v2.2.3/lib/serverengine/multi_worker_server.rb#L56-L62
def run
  while true
    num_alive = keepalive_workers
    break if num_alive == 0
    wait_tick
  end
end
# https://github.com/treasure-data/serverengine/blob/v2.2.3/lib/serverengine/multi_worker_server.rb#L116-L121
elsif wid < @num_workers
  # scale up or reboot
  unless @stop
    @monitors[wid] = delayed_start_worker(wid)
    num_alive += 1
  end

in_forward plugin

どうもWorkerに処理を割り振っている様子がないので、プラグインの方も見てみる。

in_forward pluginはWorker数が1より大きいとき、環境変数SERVERENGINE_SOCKETMANAGER_PATHを渡してSocketManager::Clientというのを作成し、listenしている。

# https://github.com/fluent/fluentd/blob/v1.12.3/lib/fluent/plugin/in_forward.rb#L169-L181
shared_socket = system_config.workers > 1

log.info "listening port", port: @port, bind: @bind
server_create_connection(
  :in_forward_server, @port,
  bind: @bind,
  shared: shared_socket,
  resolve_name: @resolve_hostname,
  linger_timeout: @linger_timeout,
  send_keepalive_packet: @send_keepalive_packet,
  backlog: @backlog,
  &method(:handle_connection)
)
# https://github.com/fluent/fluentd/blob/v1.12.3/lib/fluent/plugin_helper/server.rb#L343-L353
def server_socket_manager_client
  socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
  if Fluent.windows?
    socket_manager_path = socket_manager_path.to_i
  end
  ServerEngine::SocketManager::Client.new(socket_manager_path)
end

def server_create_tcp_socket(shared, bind, port)
  sock = if shared
            server_socket_manager_client.listen_tcp(bind, port)

SocketManager

SocketManagerは複数のClientから同じポートを動的にlistenできるようにするものらしい。 ClientがあるならServerもあってSupervisorで作られている。 Clientを作るときに渡していた環境変数はここで代入されてspawn時に共有される。

# https://github.com/fluent/fluentd/blob/v1.12.3/lib/fluent/supervisor.rb#L71-L73
socket_manager_path = ServerEngine::SocketManager::Server.generate_path
ServerEngine::SocketManager::Server.open(socket_manager_path)
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s

UNIXの場合、ClientはServerとUNIXドメインソケットで通信して、 外部からのリクエストを受けるソケットのファイルディスクリプタを取得する

# https://github.com/treasure-data/serverengine/blob/v2.2.3/lib/serverengine/socket_manager.rb#L43-L60
listen_method = case proto
                when :tcp then :listen_tcp
                when :udp then :listen_udp
                else
                  raise ArgumentError, "unknown protocol: #{proto}"
                end
peer = connect_peer(@path)
begin
  SocketManager.send_peer(peer, [Process.pid, listen_method, bind, port])
  res = SocketManager.recv_peer(peer)
  if res.is_a?(Exception)
    raise res
  else
    return send(:recv, family, proto, peer, res)
  end
ensure
  peer.close
end
# https://github.com/treasure-data/serverengine/blob/8bdd8ef83fa9fc932614a3c5a07c7c41591ead7d/lib/serverengine/socket_manager_unix.rb#L30-L38
 def recv(family, proto, peer, sent)
  server_class = case proto
                  when :tcp then TCPServer
                  when :udp then UDPSocket
                  else
                    raise ArgumentError, "invalid protocol: #{proto}"
                  end
  peer.recv_io(server_class)
end

Serverはすでにソケットが作られていればそれを、作られていなければ新たに作って返す。

# https://github.com/treasure-data/serverengine/blob/v2.2.3/lib/serverengine/socket_manager.rb#L113-L128
def listen(proto, bind, port)
  sockets, new_method = case proto
                        when :tcp then [@tcp_sockets, :listen_tcp_new]
                        when :udp then [@udp_sockets, :listen_udp_new]
                        else
                          raise ArgumentError, "invalid protocol: #{proto}"
                        end
  key, bind_ip = resolve_bind_key(bind, port)

  @mutex.synchronize do
    unless sockets.has_key?(key)
      sockets[key] = send(new_method, bind_ip, port)
    end
    return sockets[key]
  end
end

挙動確認

次の設定で動かす。

$ cat fluent.conf
<system>
  workers 4
</system>

<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

$ docker run -it --rm --name fluentd -v $(pwd)/fluent.conf:/fluentd/etc/fluent.conf fluent/fluentd:v1.12
...
2021-04-26 13:31:18 +0000 [info]: #2 starting fluentd worker pid=18 ppid=7 worker=2
2021-04-26 13:31:18 +0000 [info]: #2 listening port port=24224 bind="0.0.0.0"
2021-04-26 13:31:18 +0000 [info]: #2 fluentd worker is now running worker=2
2021-04-26 13:31:18 +0000 [info]: #3 starting fluentd worker pid=19 ppid=7 worker=3
2021-04-26 13:31:18 +0000 [info]: #3 listening port port=24224 bind="0.0.0.0"
2021-04-26 13:31:18 +0000 [info]: #3 fluentd worker is now running worker=3
2021-04-26 13:31:18 +0000 [info]: adding source type="forward"
2021-04-26 13:31:18 +0000 [info]: #1 starting fluentd worker pid=17 ppid=7 worker=1
2021-04-26 13:31:18 +0000 [info]: #1 listening port port=24224 bind="0.0.0.0"
2021-04-26 13:31:18 +0000 [info]: #1 fluentd worker is now running worker=1
2021-04-26 13:31:18 +0000 [info]: #0 starting fluentd worker pid=16 ppid=7 worker=0
2021-04-26 13:31:18 +0000 [info]: #0 listening port port=24224 bind="0.0.0.0"
2021-04-26 13:31:18 +0000 [info]: #0 fluentd worker is now running worker=0

netstatの後継であるssコマンドをインストールして実行すると、複数のプロセスが同じfdでlistenしていることが確認できる。

$ docker exec --user 0 fluentd apk add iproute2 
$ docker exec fluentd ss -antp
State  Recv-Q Send-Q Local Address:Port  Peer Address:PortProcess
LISTEN 0      128          0.0.0.0:24224      0.0.0.0:*    users:(("ruby",pid=19,fd=7),("ruby",pid=18,fd=7),("ruby",pid=17,fd=7),("ruby",pid=16,fd=7),("fluentd",pid=7,fd=13))

何度かログを送ってみたところ、全てのWorkerで処理が行われてはいるが、少なくともラウンドロビンはしていない。

$ docker exec -it fluentd watch -n 1 "echo '{\"message\":\"hello\"}' | fluent-cat debug.log"

2021-04-26 16:27:54 +0000 [warn]: #0 no patterns matched tag="debug.log"
2021-04-26 16:27:56 +0000 [warn]: #0 no patterns matched tag="debug.log"
2021-04-26 16:27:57 +0000 [warn]: #2 no patterns matched tag="debug.log"
2021-04-26 16:28:00 +0000 [warn]: #0 no patterns matched tag="debug.log"
2021-04-26 16:28:06 +0000 [warn]: #0 no patterns matched tag="debug.log"
2021-04-26 16:28:07 +0000 [warn]: #3 no patterns matched tag="debug.log"
2021-04-26 16:28:09 +0000 [warn]: #3 no patterns matched tag="debug.log"
2021-04-26 16:28:10 +0000 [warn]: #2 no patterns matched tag="debug.log"
2021-04-26 16:28:13 +0000 [warn]: #1 no patterns matched tag="debug.log"
2021-04-26 16:28:24 +0000 [warn]: #0 no patterns matched tag="debug.log"
2021-04-26 16:28:25 +0000 [warn]: #3 no patterns matched tag="debug.log"

現状Fluentdはロードバランシングの仕組みを持たず、先にaccept()したWorkerが処理するようになっていて、 そのためにLinuxでは偏りが発生する問題があるそうだ

参考

Fluentd v1 and Roadmap