0

I want to aggregate the values of my DataStream in tumbling windows of 10 seconds.

Unfortunately is the documentation in Bytewax very limited and I also don't find any other source where an average of the data is performed.

I have the following script and my reduce_window function is adding up the values, but all my attempts to divide to get the average failed..

import json
import time
from datetime import datetime, timedelta, timezone
from bytewax.connectors.kafka import KafkaSource
from bytewax import operators as op
from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutSink
from bytewax.operators.windowing import (
 EventClock,
 TumblingWindower,
 reduce_window,
)
time.sleep(10)
# Define the Bytewax dataflow
flow = Dataflow("Exmaple-Flow")
# Kafka Source (consume messages from topic)
kafka_source = KafkaSource(
 brokers=["kafka:9093"],
 topics=["factory_001"],
)
def extract_value(msg):
 """Extract JSON data from KafkaSourceMessage."""
 try:
 # Decode byte string to a normal string
 message_str = msg.value.decode("utf-8")
 # Convert JSON string to Python dictionary
 message_dict = json.loads(json.loads(message_str))
 return message_dict
 except Exception as e:
 print(f"Error parsing Kafka message: {e}")
 return None # Return None if there's an error
def extract_timestamp(msg):
 """Extract and convert Kafka timestamp"""
 return datetime.strptime(msg["timestamp"], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
kinp = op.input("kafka-in", flow, kafka_source)
mapped = op.map("extract_string", kinp, lambda x: extract_value(x))
keyed_stream = op.key_on("key_on_engine_id", mapped, lambda e: e["engine_id"])
clock = EventClock(
 ts_getter=extract_timestamp,
 wait_for_system_duration=timedelta(seconds=10)
)
windower = TumblingWindower(
 length=timedelta(seconds=10),
 align_to=datetime(2024, 8, 29, 8, 0, 0, tzinfo=timezone.utc)
)
def add(acc, x):
 acc["temp_air"] += x["temp_air"]
 return acc
windowed_avg = reduce_window(
 step_id="average_temp_air",
 up=keyed_stream,
 clock=clock,
 windower=windower,
 reducer=add
)
op.output("out", windowed_avg.down, StdOutSink())

Stream example returned by the output:

('engine_001', (1756558, {'factory_id': 'factory_001', 'engine_id': 'engine_001', 'timestamp': '2025-03-20 15:19:45', 'temp_air': 499.29999999999995, 'temp_oil': 89.23, 'temp_exhaust': 759.54, 'vibration': 3.15, 'pressure_1': 149.81, 'pressure_2': 150.94, 'rpm': 2999}))
('engine_002', (1756558, {'factory_id': 'factory_001', 'engine_id': 'engine_002', 'timestamp': '2025-03-20 15:19:45', 'temp_air': 499.27000000000004, 'temp_oil': 89.24, 'temp_exhaust': 759.39, 'vibration': 2.54, 'pressure_1': 149.04, 'pressure_2': 151.2, 'rpm': 2998}))
('engine_001', (1756559, {'factory_id': 'factory_001', 'engine_id': 'engine_001', 'timestamp': '2025-03-20 15:19:50', 'temp_air': 1000.6199999999999, 'temp_oil': 88.94, 'temp_exhaust': 759.77, 'vibration': 2.78, 'pressure_1': 150.19, 'pressure_2': 149.25, 'rpm': 2999}))
('engine_002', (1756559, {'factory_id': 'factory_001', 'engine_id': 'engine_002', 'timestamp': '2025-03-20 15:19:50', 'temp_air': 1004.8700000000001, 'temp_oil': 88.81, 'temp_exhaust': 759.67, 'vibration': 1.88, 'pressure_1': 149.33, 'pressure_2': 149.04, 'rpm': 3000}))
marc_s
760k186 gold badges1.4k silver badges1.5k bronze badges
asked Mar 20, 2025 at 15:23

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.