Source code for example.benchmarker

"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
"""

import asyncio
import logging
import random
import uuid
from datetime import datetime

from stream_manager import MessageStreamDefinition, Persistence, StrategyOnFull, StreamManagerClient





[docs]def main(): try: client = StreamManagerClient() stream_name = str(uuid.uuid4()) # Benchmark configuration num_messages = 100 num_bytes_per_message = 512 persistence = Persistence.File flushOnWrite = False stream_def = MessageStreamDefinition( name=stream_name, max_size=1024 * 1024 * 1024, # 1GB max stream_segment_size=16 * 1024 * 1024, # 16MB segment size is default strategy_on_full=StrategyOnFull.OverwriteOldestData, persistence=persistence, flush_on_write=flushOnWrite, ) client.create_message_stream(stream_def) messages = [] # Generate at most 100 random messages to send to the server. We don't generate all num_messages because # it could be a massive amount of data too big for RAM messages_to_generate = min(100, num_messages) print("Pre-generating {} messages of {} bytes each".format(messages_to_generate, num_bytes_per_message)) for _ in range(messages_to_generate): messages.append(bytes(random.getrandbits(8) for _ in range(num_bytes_per_message))) print("Pre-generated messages to send, starting to send to server now...") print_progress_bar(0, num_messages) # Start benchmark timer start = datetime.utcnow() for i in range(num_messages): print_progress_bar(i + 1, num_messages) # Cycle through all generated messages, sending each one # Don't use random because random is very slow client.append_message(stream_name, messages[i % messages_to_generate]) # End benchmark timer end = datetime.utcnow() total_time = end - start print("\n\nResults") print( "Completed sending {} messages of {} bytes each in {} using {} persistence and flushOnWrite is {}.".format( num_messages, num_bytes_per_message, total_time, stream_def.persistence.name, stream_def.flush_on_write ) ) print( "This represents an average of {} TPS with a throughput of {} KBps".format( num_messages / total_time.total_seconds(), ((num_messages * num_bytes_per_message) / 1024) / total_time.total_seconds(), ) ) client.delete_message_stream(stream_name) except asyncio.TimeoutError: print("Timed out") except Exception as e: print(e) print(repr(e)) finally: if client: client.close()
if __name__ == "__main__": logging.basicConfig(level=logging.INFO) main()