#!/usr/bin/env python #Connection Stress Test - Test concurrent connections to a server #Author: HT #Usage: " + sys.argv[0] + " import threading import socket import sys class threadedConnect ( threading.Thread ): def __init__ ( self, host, port, timeout ) : self.host = host # host to connect to self.port = port # port to connnect to self.timeout = timeout # 0 for no timeout threading.Thread.__init__ ( self ) def run ( self ) : self.sock = socket.socket ( socket.AF_INET, socket.SOCK_STREAM ) self.sock.settimeout( self.timeout ) try: self.sock.connect ( ( self.host, self.port ) ) except: print "Error Connecting" try: print self.sock.recv( 1024 ) except socket.timeout: print "Connection Timed Out" def usage ( ): print "Connection Stress Test - Test concurrent connections to a server" print "Author: HT " print "Usage: " + sys.argv[0] + " " sys.exit() #Init Variables if ( len(sys.argv) == 1 ) : usage() try: host = sys.argv[ 1 ] except IndexError: usage() try: port = int ( sys.argv[ 2 ] ) except IndexError: usage() try: timeout = int ( sys.argv [ 3 ] ) except IndexError: usage() try: connCount = int ( sys.argv [ 4 ] ) except IndexError: usage() for x in xrange ( connCount ) : #Number of connections to make threadedConnect( host, port, timeout).start()