Currently, support for developing a Heron topology in Python is still experimental. It is compatible with the Streamparse API, so Python topologies written for the Streamparse can be deployed on Heron with ease. This page describes how to write and launch a topology in Python, as well as how to convert a Streamparse topology to a PyHeron topology.
Note that a Python topology is known to be approximately 20-40 times slower than a topology written in Java. This performance issue will be resolved in later releases.
You need to first download PyHeron
library and include it in your project.
Writing your own topology in Python
Spouts and Bolts discuss how to implement spouts and bolts in Python, respectively.
After defining the spouts and bolts, a topology can be composed by two ways:
- Using
TopologyBuilder
(not compatible with the Streamparse API) - Subclassing
Topology
class (compatible with the Streamparse API)
Defining a topology using a TopologyBuilder
This way of defining a topology is similar to defining a topology in Java, and is not compatible with the Streamparse API.
The TopologyBuilder
has two major methods to specify the components:
add_spout(self, name, spout_cls, par, config=None)
name
isstr
specifying the unique identifier that is assigned to this spout.spout_cls
is a subclass ofSpout
that defines this spout.par
isint
specifying the number of instances of this spout.config
isdict
specifying this spout-specific configuration.
add_bolt(self, name, bolt_cls, par, inputs, config=None)
name
isstr
specifying the unique identifier that is assigned to this bolt.bolt_cls
is a subclass ofBolt
that defines this bolt.par
isint
specifying the number of instances of this bolt.inputs
is eitherdict
mapping fromHeronComponentSpec
toGrouping
; orlist
ofHeronComponentSpec
, in which case the shuffle grouping is used.config
isdict
specifying this bolt-specific configuration.
Each method returns the corresponding HeronComponentSpec
object.
The following is an example implementation of WordCountTopology in Python.
from pyheron import TopologyBuilder
from your_spout import WordSpout
from your_bolt import CountBolt
if __name__ == "__main__":
builder = TopologyBuilder("WordCountTopology")
word_spout = builder.add_spout("word_spout", WordSpout, par=2)
count_bolt = builder.add_bolt("count_bolt", CountBolt, par=2,
inputs={word_spout: Grouping.fields('word')})
builder.build_and_submit()
Note that arguments to the main method can be passed by providing them in the
heron submit
command.
Defining a topology by subclassing Topology class
This way of defining a topology is compatible with the Streamparse API.
All you need to do is to place HeronComponentSpec
as the class attributes
of your topology class, which are returned by the spec()
method of
your spout or bolt class.
Spout.spec(cls, name=None, par=1, config=None)
name
is eitherstr
specifying the unique identifier that is assigned to this spout, orNone
if you want to use the variable name of the returnedHeronComponentSpec
as the unique identifier for this spout.par
isint
specifying the number of instances of this spout.config
isdict
specifying this spout-specific configuration.
Bolt.spec(cls, name=None, inputs=None, par=1, config=None)
name
is eitherstr
specifying the unique identifier that is assigned to this bolt; orNone
if you want to use the variable name of the returnedHeronComponentSpec
as the unique identifier for this bolt.inputs
is eitherdict
mapping fromHeronComponentSpec
toGrouping
; orlist
ofHeronComponentSpec
, in which case the shuffle grouping is used.par
isint
specifying the number of instances of this bolt.config
isdict
specifying this bolt-specific configuration.
The same WordCountTopology is defined in the following manner.
from pyheron import Topology
from your_spout import WordSpout
from your_bolt import CountBolt
class WordCount(Topology):
word_spout = WordSpout.spec(par=2)
count_bolt = CountBolt.spec(par=2, inputs={word_spout: Grouping.fields('word')})
Topology-wide configuration
Topology-wide configuration can be specified by using set_config()
method if
you are using TopologyBuilder
, or by placing config
containing dict
as the class attribute of your topology. Note that these configuration will be
overriden by component-specific configuration at runtime
Multiple streams
To specify that a component has multiple output streams, instead of using a list of
strings for outputs
, you can specify a list of Stream
objects, in the following manner.
class MultiStreamSpout(Spout):
outputs = [Stream(fields=['normal', 'fields'], name='default'),
Stream(fields=['error_message'], name='error_stream')]
To select one of these streams as the input for your bolt, you can simply
use []
to specify the stream you want. Without any stream specified, the default
stream will be used.
class MultiStreamTopology(Topology):
spout = MultiStreamSpout.spec()
error_bolt = ErrorBolt.spec(inputs={spout['error_stream']: Grouping.LOWEST})
consume_bolt = ConsumeBolt.spec(inputs={spout: Grouping.SHUFFLE})
For further information about the API, refer to the Streamparse API documentation, although there are some methods and functionalities that are not supported or are invalid in Heron.
Declaring output fields from the spec() method
In Python topologies, the declareOutputFields()
method doesn’t exist, so
the output fields of your spout and bolt need to be declared by placing
outputs
class attributes. This is compatible with the Streamparse API, but
dynamically declaring output fields is more complicated in this way.
So, PyHeron provides a way to dynamically declare output fields via the
optional_outputs
argument in the spec()
method.
This is useful in a situation like below.
class IdentityBolt(Bolt):
# can't statically declare output fields
class process(self, tup):
emit([tup.values])
class DynamicOutputField(Topology):
spout = WordSpout.spec()
bolt = IdentityBolt.spec(inputs={spout: Grouping.ALL},
optional_outputs=['word'])
You can also declare outputs in the add_spout()
and the add_bolt()
method for the TopologyBuilder
in the same way.
Launching your python topology
You need to first package your Python topology project to a PEX file.
If you defined your topology using TopologyBuilder
, your topology
definition python file should have if __name__ = "__main__"
method.
The following shows the submission command of an example WordCountTopology, where its
pex file is located in ~/project/word_count.pex
.
$ heron submit local ~/project/word_count.pex - WordCountTopology
If you defined your topology by subclassing Topology
, your topology
definition python file should not contain main method.
The following shows the submission command of an example WordCountTopology, where
its pex file is located in ~/project/word_count.pex
, inside which your WordCount
class resides under topology.word_count_topology.WordCount
.
$ heron submit local \
~/project/word_count.pex \
topology.word_count_topology.WordCount \
WordCountTopology