Currently, support for developing a Heron topology in Python is still experimental. It is compatible with the Streamparse API, so Python topologies written for the Streamparse can be deployed on Heron with ease. This page describes how to write and launch a topology in Python, as well as how to convert a Streamparse topology to a PyHeron topology.
Note that a Python topology is known to be approximately 20-40 times slower than a topology written in Java. This performance issue will be resolved in later releases.
You need to first download PyHeron library and include it in your project.
Writing your own topology in Python
Spouts and Bolts discuss how to implement spouts and bolts in Python, respectively.
After defining the spouts and bolts, a topology can be composed by two ways:
- Using TopologyBuilder(not compatible with the Streamparse API)
- Subclassing Topologyclass (compatible with the Streamparse API)
Defining a topology using a TopologyBuilder
This way of defining a topology is similar to defining a topology in Java, and is not compatible with the Streamparse API.
The TopologyBuilder has two major methods to specify the components:
- add_spout(self, name, spout_cls, par, config=None)- nameis- strspecifying the unique identifier that is assigned to this spout.
- spout_clsis a subclass of- Spoutthat defines this spout.
- paris- intspecifying the number of instances of this spout.
- configis- dictspecifying this spout-specific configuration.
 
- add_bolt(self, name, bolt_cls, par, inputs, config=None)- nameis- strspecifying the unique identifier that is assigned to this bolt.
- bolt_clsis a subclass of- Boltthat defines this bolt.
- paris- intspecifying the number of instances of this bolt.
- inputsis either- dictmapping from- HeronComponentSpecto- Grouping; or- listof- HeronComponentSpec, in which case the shuffle grouping is used.
- configis- dictspecifying this bolt-specific configuration.
 
Each method returns the corresponding HeronComponentSpec object.
The following is an example implementation of WordCountTopology in Python.
from pyheron import TopologyBuilder
from your_spout import WordSpout
from your_bolt import CountBolt
if __name__ == "__main__":
  builder = TopologyBuilder("WordCountTopology")
  word_spout = builder.add_spout("word_spout", WordSpout, par=2)
  count_bolt = builder.add_bolt("count_bolt", CountBolt, par=2,
                                inputs={word_spout: Grouping.fields('word')})
  builder.build_and_submit()
Note that arguments to the main method can be passed by providing them in the
heron submit command.
Defining a topology by subclassing Topology class
This way of defining a topology is compatible with the Streamparse API.
All you need to do is to place HeronComponentSpec as the class attributes
of your topology class, which are returned by the spec() method of
your spout or bolt class.
- Spout.spec(cls, name=None, par=1, config=None)- nameis either- strspecifying the unique identifier that is assigned to this spout, or- Noneif you want to use the variable name of the returned- HeronComponentSpecas the unique identifier for this spout.
- paris- intspecifying the number of instances of this spout.
- configis- dictspecifying this spout-specific configuration.
 
- Bolt.spec(cls, name=None, inputs=None, par=1, config=None)- nameis either- strspecifying the unique identifier that is assigned to this bolt; or- Noneif you want to use the variable name of the returned- HeronComponentSpecas the unique identifier for this bolt.
- inputsis either- dictmapping from- HeronComponentSpecto- Grouping; or- listof- HeronComponentSpec, in which case the shuffle grouping is used.
- paris- intspecifying the number of instances of this bolt.
- configis- dictspecifying this bolt-specific configuration.
 
The same WordCountTopology is defined in the following manner.
from pyheron import Topology
from your_spout import WordSpout
from your_bolt import CountBolt
class WordCount(Topology):
  word_spout = WordSpout.spec(par=2)
  count_bolt = CountBolt.spec(par=2, inputs={word_spout: Grouping.fields('word')})
Topology-wide configuration
Topology-wide configuration can be specified by using set_config() method if
you are using TopologyBuilder, or by placing config containing dict
as the class attribute of your topology. Note that these configuration will be
overriden by component-specific configuration at runtime
Multiple streams
To specify that a component has multiple output streams, instead of using a list of
strings for outputs, you can specify a list of Stream objects, in the following manner.
class MultiStreamSpout(Spout):
  outputs = [Stream(fields=['normal', 'fields'], name='default'),
             Stream(fields=['error_message'], name='error_stream')]
To select one of these streams as the input for your bolt, you can simply
use [] to specify the stream you want. Without any stream specified, the default
stream will be used.
class MultiStreamTopology(Topology):
  spout = MultiStreamSpout.spec()
  error_bolt = ErrorBolt.spec(inputs={spout['error_stream']: Grouping.LOWEST})
  consume_bolt = ConsumeBolt.spec(inputs={spout: Grouping.SHUFFLE})
For further information about the API, refer to the Streamparse API documentation, although there are some methods and functionalities that are not supported or are invalid in Heron.
Declaring output fields from the spec() method
In Python topologies, the declareOutputFields() method doesn’t exist, so
the output fields of your spout and bolt need to be declared by placing
outputs class attributes. This is compatible with the Streamparse API, but
dynamically declaring output fields is more complicated in this way.
So, PyHeron provides a way to dynamically declare output fields via the
optional_outputs argument in the spec() method.
This is useful in a situation like below.
class IdentityBolt(Bolt):
  # can't statically declare output fields
  class process(self, tup):
    emit([tup.values])
class DynamicOutputField(Topology):
  spout = WordSpout.spec()
  bolt = IdentityBolt.spec(inputs={spout: Grouping.ALL},
                           optional_outputs=['word'])
You can also declare outputs in the add_spout() and the add_bolt()
method for the TopologyBuilder in the same way.
Launching your python topology
You need to first package your Python topology project to a PEX file.
If you defined your topology using TopologyBuilder, your topology
definition python file should have if __name__ = "__main__" method.
The following shows the submission command of an example WordCountTopology, where its
pex file is located in ~/project/word_count.pex.
$ heron submit local ~/project/word_count.pex - WordCountTopology
If you defined your topology by subclassing Topology, your topology
definition python file should not contain main method.
The following shows the submission command of an example WordCountTopology, where
its pex file is located in ~/project/word_count.pex, inside which your WordCount
class resides under topology.word_count_topology.WordCount.
$ heron submit local \
~/project/word_count.pex \
topology.word_count_topology.WordCount \
WordCountTopology