Understanding GenStage in Elixir
GenServers are an amazing part of the OTP ecosystem, helping solve a lot of complicated tasks easily in Elixir. But working with multiple sets of GenServers together and exchanging data between them has always been a hassle. GenEvent solved these problems to some extent, but it did not provide support for backpressure or concurrency. This is where GenStage comes in.
Last year announced on the official Elixir blog, GenStage is a new behaviour in the OTP family, built on top of GenServers. It is described as:
GenStage is a specification for exchanging events with back-pressure between Elixir producers and consumers
but that doesn’t convey its full power. The name hides the real meaning; instead of simple producers and consumers, imagine concurrent stages in a pipeline that takes an arbitrary number of input events, perform some operations on them and send them to the next stage. Each stage can consume and produce many events at once, with multiple instances of each stage running at the same time. The producers can produce as many events as they want, and the consumers can consume them according to their demand with ease, with the behavior handling the bulk of the logic.
A simple use-case of GenStage would look like a straight-forward pipeline like this:
But that doesn’t do it justice, instead you should think about each stage having many concurrent instances like this:
This is possible due to another behavior shipped with the library, called ConsumerSupervisor
.
ConsumerSupervisors allow, well, supervision of multiple consumers or producer-consumers
for better reliability and fault-tolerance. For code examples, I would suggest you check
out the project on Github, the excellent documentation on Hexdocs or
the Flow library (which is an amazing implementation of GenStage!). But there are
some things you should take care of when working with GenStages in Elixir (that I had to
learn the hard way):
- Don’t excessively produce events when there is no demand
- If a long-running producer yields zero events when there is consumer demand, you have to explicitly retry producing events again. Otherwise, it would just stop. A common case where this would happen is, integration with external message queues
- Use
Supervisor
for simple pipelines, andConsumerSupervisor
when dealing with concurrent instances of a GenStage - Try to define explicit demands for consumers
- Start the stages in order, and immediately subscribe to the previous stage for
consumers
andproducer_consumers
, preferably in theinit/1
callback - Maintining overall demand in a producers’ internal state is a good idea
Here are some other excellent posts on the topic: