'FluentNetting' is fluent forwarding message received server that is based on Fluent forward protocol v1 specification.
This library is supported for both fluentd and fluent-bit.
- Message Mode
- Forward Mode
- PackedForward Mode
- CompressedPackedForward Mode
- Security forwarding authorization,
HELO
,PING
andPONG
(not tls, not username/password, only self_hostname/shared_key). - Udp heartbeat.
(data flow)
client ---(forward)---> fluend or fluent-bit ---(forward)---> server used by FluentNetting
(sync)
public class ExampleCallback : IFnCallback
{
public void Receive(string tag, List<FnMessageEntry> entries)
{
Console.WriteLine($"tag:{tag}, entries:[{string.Join(", ", entries.Select(e => $"{{{e}}}"))}]");
}
}
(async)
public class ExampleCallback : IFnCallback
{
public Task ReceiveAsync(string tag, List<FnMessageEntry> entries)
{
Console.WriteLine($"tag:{tag}, entries:[{string.Join(", ", entries.Select(e => $"{{{e}}}"))}]");
return Task.CompletedTask;
}
}
'Receive' method is disallow async override.
If you use 'async', override 'ReceiveAsync'.
'ReceiveAsync' method's contents is below at default.
public Task ReceiveAsync(string tag, List<FnMessageEntry> entries)
{
Receive(tag, entries);
return Task.CompletedTask;
}
If 'Receive' and 'ReceiveAsync' are both overriden,
Call only 'ReceiveAsync'.
var server = new FnServer(new ExampleCallback())
{
// default - not authrization, enable 'RequireAck', disable 'Keepalive'
Config = new FnConfig(),
// default - tcp timeout 65 sec, udp timeout 15 sec
SettingClient = new FnSettingClient(),
// default - listening on tcp://0.0.0.0:8710, udp://0.0.0.0:8710
SettingServer = new FnSettingServer()
};
server.Start();
server.WaitFor();
(fluentd)
<source>
@type forward
port 24224
</source>
<match **>
@type forward
send_timeout 60s
recover_wait 10s
heartbeat_type udp
heartbeat_interval 5s
phi_threshold 16
hard_timeout 60s
require_ack_response
<server>
host {{ your server address }}
port 8710
</server>
buffer_type file
buffer_path /fluentd/log/buffer
buffer_chunk_limit 1m
retry_limit 3
flush_interval 1m
</match>
(fluent-bit)
[SERVICE]
Flush 5
Daemon off
Log_Level info
storage.path /fluent-bit/log
[INPUT]
Name forward
Listen 0.0.0.0
Port 24224
storage.type filesystem
Buffer_Chunk_Size 1M
Buffer_Max_Size 6M
[OUTPUT]
Name forward
Match *
Host {{ your server address }}
Port 8710
Require_ack_response true
Send_options true
var config = new PigeonConfig("localhost", 24224);
var clientPigeon = new PigeonClient(config);
await clientPigeon.SendAsync(
"tag.example2",
new Dictionary<string, object> {["hello"] = "world2"}
);
var options = new FluentdSinkOptions(
"localhost", 24224, "tag.example1");
var log = new LoggerConfiguration().WriteTo.Fluentd(options).CreateLogger();
log.Information("hello {0}!", "world1");
using (var client = new FluentdClient.Sharp.FluentdClient(
"localhost", 24224, new CustomMessagePackSerializer()))
{
await client.ConnectAsync();
await client.SendAsync("tag.example3",
new Dictionary<string, object> {["hello"] = "world3"});
}
In detail, confirm example.
- influent - java fluentd forward server.
- MessagePack-CSharp - csharp fastest message pack parser.