winton_kafka_streams.processor package

Submodules

winton_kafka_streams.processor.extract_timestamp module

Time extractor from the message being processed

class winton_kafka_streams.processor.extract_timestamp.RecordTimeStampExtractor

Bases: winton_kafka_streams.processor._timestamp.TimeStampExtractor

Time stamp extractor that returns a time taken from the message itself

This is an abstract class, the on_error function must be implemented to use this extractor.

extract(record, previous_timestamp)

Returns kafka timestamp for message

record : Kafka record
New record from which time should be assigned
previous_timestamp : long
Last extracted timestamp (seconds since the epoch)
time : long
Time in seconds since the epoch
on_error(record, timestamp, previous_timestamp)

Called when an invalid timestamp is found in a record

Parameters: record : Kafka record

The current record being processed
timestamp : long
The (invalid) timestamp that was processed
previous_timestamp : long
Last extracted timestamp (seconds since the epoch)

winton_kafka_streams.processor.processor module

Base definitions for all processors

class winton_kafka_streams.processor.processor.BaseProcessor

Bases: object

initialise(_name, _context)
class winton_kafka_streams.processor.processor.SinkProcessor(_topic)

Bases: winton_kafka_streams.processor.processor.BaseProcessor

Forward values from processor nodes to the record collector from where they will be written to a Kafka topic

process(key, value)
punctuate(timestamp)
class winton_kafka_streams.processor.processor.SourceProcessor(topics)

Bases: winton_kafka_streams.processor.processor.BaseProcessor

Fetches values from a kafka topic(s)and forwards them to child node for processing

process(key, value)
punctuate(timestamp)

winton_kafka_streams.processor.processor_context module

Default context passed to every processor

class winton_kafka_streams.processor.processor_context.ProcessorContext(_task_id, _task, _record_collector, _state_record_collector, _state_stores)

Bases: winton_kafka_streams.processor._context.Context

The same processor context is shared betwen all nodes in a single topology instance. It takes care of forwarding values to downstream processors.

commit()

Request a commit

  • None
forward(key, value)

Forward the key/value to the next node in the topology

schedule(timestamp)

Schedule the punctuation function call

winton_kafka_streams.processor.topology module

Classes for building a graph topology comprising processor derived nodes

class winton_kafka_streams.processor.topology.ProcessorNode(name, processor)

Bases: object

initialise(_context)
process(key, value)
punctuate(timestamp)
class winton_kafka_streams.processor.topology.Topology(sources, processors, sinks, store_suppliers)

Bases: object

A realised instance of a topology

class winton_kafka_streams.processor.topology.TopologyBuilder

Bases: object

Convenience class for building a graph topology

build()
processor(name, processor_type, *parents)

Add a processor to the topology

name : str
The name of the node
processor : winton_kafka_streams.processor.base.BaseProcessor
Processor object that will process the key, value pair passed
*parents:
Parent nodes supplying inputs

topology : TopologyBuilder

Raises: KafkaStreamsError

  • If no inputs are specified
sink(name, topic, *parents)
sinks
source(name, topics)

Add a source to the topology

name : str
The name of the node
topics : str
Source topic

topology : TopologyBuilder

Raises: KafkaStreamsError

  • If node with same name exists already
sources
state_store(store_supplier, *processors)

Add a store and connect to processors

store_supplier : winton_kafka_streams.state.StateStoreSupplier *processors : processor names to which store should be attached

Raises: KafkaStreamsError

  • If store_supplier is None
  • If store_supplier already exists
state_stores

winton_kafka_streams.processor.wallclock_timestamp module

Wall clock time extractor

class winton_kafka_streams.processor.wallclock_timestamp.WallClockTimeStampExtractor

Bases: winton_kafka_streams.processor._timestamp.TimeStampExtractor

Time stamp extractor that returns wall clock time at the point a record is processed

extract(record, previous_timestamp)

Returns wall clock time for every message

record : Kafka record
New record from which time should be assigned
previous_timestamp : long
Last extracted timestamp (seconds since the epoch)
time : long
Time in seconds since the epoch

Module contents

Processor generating functions