winton_kafka_streams.processor package¶
Subpackages¶
Submodules¶
winton_kafka_streams.processor.extract_timestamp module¶
Time extractor from the message being processed
-
class
winton_kafka_streams.processor.extract_timestamp.
RecordTimeStampExtractor
¶ Bases:
winton_kafka_streams.processor._timestamp.TimeStampExtractor
Time stamp extractor that returns a time taken from the message itself
This is an abstract class, the on_error function must be implemented to use this extractor.
-
extract
(record, previous_timestamp)¶ Returns kafka timestamp for message
- record : Kafka record
- New record from which time should be assigned
- previous_timestamp : long
- Last extracted timestamp (seconds since the epoch)
- time : long
- Time in seconds since the epoch
-
on_error
(record, timestamp, previous_timestamp)¶ Called when an invalid timestamp is found in a record
Parameters: record : Kafka record
The current record being processed- timestamp : long
- The (invalid) timestamp that was processed
- previous_timestamp : long
- Last extracted timestamp (seconds since the epoch)
-
winton_kafka_streams.processor.processor module¶
Base definitions for all processors
-
class
winton_kafka_streams.processor.processor.
BaseProcessor
¶ Bases:
object
-
initialise
(_name, _context)¶
-
-
class
winton_kafka_streams.processor.processor.
SinkProcessor
(_topic)¶ Bases:
winton_kafka_streams.processor.processor.BaseProcessor
Forward values from processor nodes to the record collector from where they will be written to a Kafka topic
-
process
(key, value)¶
-
punctuate
(timestamp)¶
-
-
class
winton_kafka_streams.processor.processor.
SourceProcessor
(topics)¶ Bases:
winton_kafka_streams.processor.processor.BaseProcessor
Fetches values from a kafka topic(s)and forwards them to child node for processing
-
process
(key, value)¶
-
punctuate
(timestamp)¶
-
winton_kafka_streams.processor.processor_context module¶
Default context passed to every processor
-
class
winton_kafka_streams.processor.processor_context.
ProcessorContext
(_task_id, _task, _record_collector, _state_record_collector, _state_stores)¶ Bases:
winton_kafka_streams.processor._context.Context
The same processor context is shared betwen all nodes in a single topology instance. It takes care of forwarding values to downstream processors.
-
commit
()¶ Request a commit
- None
-
forward
(key, value)¶ Forward the key/value to the next node in the topology
-
schedule
(timestamp)¶ Schedule the punctuation function call
-
winton_kafka_streams.processor.topology module¶
Classes for building a graph topology comprising processor derived nodes
-
class
winton_kafka_streams.processor.topology.
ProcessorNode
(name, processor)¶ Bases:
object
-
initialise
(_context)¶
-
process
(key, value)¶
-
punctuate
(timestamp)¶
-
-
class
winton_kafka_streams.processor.topology.
Topology
(sources, processors, sinks, store_suppliers)¶ Bases:
object
A realised instance of a topology
-
class
winton_kafka_streams.processor.topology.
TopologyBuilder
¶ Bases:
object
Convenience class for building a graph topology
-
build
()¶
-
processor
(name, processor_type, *parents)¶ Add a processor to the topology
- name : str
- The name of the node
- processor : winton_kafka_streams.processor.base.BaseProcessor
- Processor object that will process the key, value pair passed
- *parents:
- Parent nodes supplying inputs
topology : TopologyBuilder
Raises: KafkaStreamsError
- If no inputs are specified
-
sink
(name, topic, *parents)¶
-
sinks
¶
-
source
(name, topics)¶ Add a source to the topology
- name : str
- The name of the node
- topics : str
- Source topic
topology : TopologyBuilder
Raises: KafkaStreamsError
- If node with same name exists already
-
sources
¶
-
state_store
(store_supplier, *processors)¶ Add a store and connect to processors
store_supplier : winton_kafka_streams.state.StateStoreSupplier *processors : processor names to which store should be attached
Raises: KafkaStreamsError
- If store_supplier is None
- If store_supplier already exists
-
state_stores
¶
-
winton_kafka_streams.processor.wallclock_timestamp module¶
Wall clock time extractor
-
class
winton_kafka_streams.processor.wallclock_timestamp.
WallClockTimeStampExtractor
¶ Bases:
winton_kafka_streams.processor._timestamp.TimeStampExtractor
Time stamp extractor that returns wall clock time at the point a record is processed
-
extract
(record, previous_timestamp)¶ Returns wall clock time for every message
- record : Kafka record
- New record from which time should be assigned
- previous_timestamp : long
- Last extracted timestamp (seconds since the epoch)
- time : long
- Time in seconds since the epoch
-
Module contents¶
Processor generating functions