"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
"""
import asyncio
import logging
import random
import uuid
from datetime import datetime
from stream_manager import MessageStreamDefinition, Persistence, StrategyOnFull, StreamManagerClient
[docs]def print_progress_bar(iteration, total, prefix="", suffix="", decimals=1, length=100, fill="█"):
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filled_length = int(length * iteration // total)
bar = fill * filled_length + "-" * (length - filled_length)
print("\r%s |%s| %s%% %s" % (prefix, bar, percent, suffix), end="")
# Print New Line on Complete
if iteration == total:
print()
[docs]def main():
try:
client = StreamManagerClient()
stream_name = str(uuid.uuid4())
# Benchmark configuration
num_messages = 100
num_bytes_per_message = 512
persistence = Persistence.File
flushOnWrite = False
stream_def = MessageStreamDefinition(
name=stream_name,
max_size=1024 * 1024 * 1024, # 1GB max
stream_segment_size=16 * 1024 * 1024, # 16MB segment size is default
strategy_on_full=StrategyOnFull.OverwriteOldestData,
persistence=persistence,
flush_on_write=flushOnWrite,
)
client.create_message_stream(stream_def)
messages = []
# Generate at most 100 random messages to send to the server. We don't generate all num_messages because
# it could be a massive amount of data too big for RAM
messages_to_generate = min(100, num_messages)
print("Pre-generating {} messages of {} bytes each".format(messages_to_generate, num_bytes_per_message))
for _ in range(messages_to_generate):
messages.append(bytes(random.getrandbits(8) for _ in range(num_bytes_per_message)))
print("Pre-generated messages to send, starting to send to server now...")
print_progress_bar(0, num_messages)
# Start benchmark timer
start = datetime.utcnow()
for i in range(num_messages):
print_progress_bar(i + 1, num_messages)
# Cycle through all generated messages, sending each one
# Don't use random because random is very slow
client.append_message(stream_name, messages[i % messages_to_generate])
# End benchmark timer
end = datetime.utcnow()
total_time = end - start
print("\n\nResults")
print(
"Completed sending {} messages of {} bytes each in {} using {} persistence and flushOnWrite is {}.".format(
num_messages, num_bytes_per_message, total_time, stream_def.persistence.name, stream_def.flush_on_write
)
)
print(
"This represents an average of {} TPS with a throughput of {} KBps".format(
num_messages / total_time.total_seconds(),
((num_messages * num_bytes_per_message) / 1024) / total_time.total_seconds(),
)
)
client.delete_message_stream(stream_name)
except asyncio.TimeoutError:
print("Timed out")
except Exception as e:
print(e)
print(repr(e))
finally:
if client:
client.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()