Elixir
Support for Elixir is provided through fluvio-ex, a community supported project by @viniarck.
This will create a topic lobby with 1 partition and 1 replica.
alias Fluvio.Admin
{:ok, pid} = Admin.start_link()
{:ok, _} = Admin.create_topic(pid, "lobby", %{partitions: 1, replication: 1})
In this example, a Producer for the topic lobby is created. Then the message hello is sent to the topic. Also, twenty values (1 to 20) are sent asynchronously in chunks of 10.
alias Fluvio.Producer
{:ok, pid} = Producer.start_link(%{topic: "lobby"})
{:ok, _} = Producer.send(pid, "hello")
{:ok, _} = Producer.flush(pid)
[] =
1..20
|> Stream.chunk_every(10)
|> Stream.flat_map(fn chunk ->
[
chunk
|> Enum.map(fn value ->
Task.async(fn -> {Producer.send(pid, to_string(value)), value} end)
end)
|> Task.await_many()
|> Enum.filter(&match?({{:error, _msg}, _value}, &1)),
[{Producer.flush(pid), :flush}]
|> Enum.filter(&match?({{:error, _msg}, _value}, &1))
]
end)
|> Stream.concat()
|> Enum.to_list()
In this example, a Consumer for the topic lobby is created, starting from offset 0. As records are received, the record contents are printed with IO.inspect.
alias Fluvio.Consumer
{:ok, pid} = Consumer.start_link(%{topic: "lobby", offset: [from_beginning: 0]})
Consumer.stream_each(pid, fn result ->
case result do
{:ok, record} -> IO.inspect(record)
{:error, msg} -> IO.inspect("Error: #{msg}")
end
end)