Each Heron container has its own centralized Metrics
Manager (MM), which collects
metrics from all instances in the container. You can define how the MM processes
metrics by implementing a metrics sink, which specifies how the MM handles
incoming
MetricsRecord
objects.
Java is currently the only supported language for custom metrics sinks. This may change in the future.
Currently-supported Sinks
Heron comes equipped out of the box with three metrics sinks that you can apply for a specific topology. The code for these sinks may prove helpful for implementing your own.
- GraphiteSink— Sends each- MetricsRecordobject to a Graphite instance according to a Graphite prefix.
- ScribeSink— Sends each- MetricsRecordobject to a Scribe instance according to a Scribe category and namespace.
- FileSink— Writes each- MetricsRecordobject to a JSON file at a specified path.
More on using those sinks in a Heron cluster can be found in Metrics Manager.
Java Setup
In order to create a custom metrics sink, you need to import the heron-spi
library into your project.
Maven
<dependency>
  <groupId>com.twitter.heron</groupId>
  <artifactId>heron-spi</artifactId>
  <version>0.14.7</version>
</dependency>
Gradle
dependencies {
  compile group: "com.twitter.heron", name: "heron-spi", version: "0.14.7"
}
The IMetricsSink Interface
Each metrics sink must implement the
IMetricsSink
interface, which requires you to implement the following methods:
- void init(Map<String, Object> conf, SinkContext context)— Defines the initialization behavior of the sink. The- confmap is the configuration that is passed to the sink by the- .yamlconfiguration file at- heron/config/metrics_sink.yaml; the- SinkContextobject enables you to access values from the sink’s runtime context (the ID of the metrics manager, the ID of the sink, and the name of the topology).
- void processRecord(MetricsRecord record)— Defines how each- MetricsRecordthat passes through the sink is processed.
- void flush()— Flush any buffered metrics; this function is called at the interval specified by the- flush-frequency-ms. More info can be found in the Stream Manager document.
- void close()— Closes the stream and releases any system resources associated with it; if the stream is already closed, invoking- close()has no effect.
Your implementation of those interfaces will need to be packaged into a JAR file
and distributed to the heron-core/lib/metricsmgr folder of your Heron
release.
Example Implementation
Below is an example implementation that simply prints the contents of each metrics record as it passes through:
import com.twitter.heron.metricsmgr.api.metrics.MetricsInfo;
import com.twitter.heron.metricsmgr.api.metrics.MetricsRecord;
import com.twitter.heron.metricsmgr.api.sink.IMetricsSink;
import com.twitter.heron.metricsmgr.api.sink.SinkContext;
public class PrintSink implements IMetricsSink {
    @Override
    public void init(Map<String, Object> conf, SinkContext context) {
        System.out.println("Sink configuration:");
        // This will print out each config in the supplied configuration
        for (Map.Entry<String, Object> config : conf.entrySet()) {
            System.out.println(String.format("%s: %s", config.getKey(), config.getValue());
        }
        System.out.println(String.format("Topology name: %s", context.getTopologyName());
        System.out.println(String.format("Sink ID: %s", context.getSinkId()));
    }
    @Override
    public void processRecord(MetricsRecord record) {
        String recordString = String.format("Record received: %s", record.toString());
        System.out.println(recordString);
    }
    @Override
    public void flush() {
        // Since we're just printing to stdout in this sink, we don't need to
        // specify any flush() behavior
    }
    @Override
    public void close() {
        // Since we're just printing to stdout in this sink, we don't need to
        // specify any close() behavior
    }
}
Configuring Your Custom Sink
The configuration for your sink needs to be provided in the
YAML file at heron/config/src/yaml/conf/${CLUSTER}/metrics_sinks.yaml.
At the top of that file there’s a sinks parameter that lists each available
sink by name. You should add your sink to that list. Here’s an example:
sinks:
  - file-sink
  - scribe-sink
  - tmaster-sink
  - print-sink
For each sink you are required to specify the followings:
- class— The Java class name of your custom implementation of the- IMetricsSinkinterface, e.g.- biz.acme.heron.metrics.PrintSink.
- flush-frequency-ms— The frequency (in milliseconds) at which the- flush()method is called in your implementation of- IMetricsSink.
- sink-restart-attempts— The number of times that a sink will attempt to restart if it throws exceptions and dies. If you do not set this, the default is 0; if you set it to -1, the sink will attempt to restart forever.
Below is an example metrics_sink.yaml configuration:
sinks:
  - print-sink
print-sink:
  class: "biz.acme.heron.metrics.PrintSink"
  flush-frequency-ms: 60000 # One minute
  sink-restart-attempts: -1 # Attempt to restart forever
It is optional to add other configurations for the sink. All configurations will be constructed
 as an unmodifiable map Map<String, Object> conf and passed to init(conf, context).
Using Your Custom Sink
Once you’ve made a JAR for your custom Java sink, distributed that JAR to
heron-core/lib/metricsmgr folder, and changed the configuration in
heron/config/src/yaml/conf/${CLUSTER}/metrics_sinks.yaml.
Any topology submitted using that configuration will include the custom sink.You must re-compile
Heron if you want to include the configuration in a new heron-cli distribution.