pct-speedtest.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. #
  4. # pct-speedtest.py: Speed test for the Python Cryptography Toolkit
  5. #
  6. # Written in 2009 by Dwayne C. Litzenberger <dlitz@dlitz.net>
  7. #
  8. # ===================================================================
  9. # The contents of this file are dedicated to the public domain. To
  10. # the extent that dedication to the public domain is not available,
  11. # everyone is granted a worldwide, perpetual, royalty-free,
  12. # non-exclusive license to exercise all rights associated with the
  13. # contents of this file for any purpose whatsoever.
  14. # No rights are reserved.
  15. #
  16. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  17. # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  18. # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  19. # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  20. # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  21. # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  22. # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23. # SOFTWARE.
  24. # ===================================================================
  25. import time
  26. import os
  27. import sys
  28. from tls.Crypto.PublicKey import RSA
  29. from tls.Crypto.Cipher import PKCS1_OAEP, PKCS1_v1_5 as RSAES_PKCS1_v1_5
  30. from tls.Crypto.Signature import PKCS1_PSS, PKCS1_v1_5 as RSASSA_PKCS1_v1_5
  31. from tls.Crypto.Cipher import (AES, ARC2, ARC4, Blowfish, CAST, DES3, DES,
  32. Salsa20, ChaCha20)
  33. from tls.Crypto.Hash import (HMAC, MD2, MD4, MD5, SHA224, SHA256, SHA384, SHA512,
  34. CMAC, SHA3_224, SHA3_256, SHA3_384, SHA3_512,
  35. BLAKE2b, BLAKE2s)
  36. from tls.Crypto.Random import get_random_bytes
  37. import Crypto.Util.Counter
  38. from tls.Crypto.Util.number import bytes_to_long
  39. try:
  40. from tls.Crypto.Hash import SHA1
  41. except ImportError:
  42. # Maybe it's called SHA
  43. from tls.Crypto.Hash import SHA as SHA1
  44. try:
  45. from tls.Crypto.Hash import RIPEMD160
  46. except ImportError:
  47. # Maybe it's called RIPEMD
  48. try:
  49. from tls.Crypto.Hash import RIPEMD as RIPEMD160
  50. except ImportError:
  51. # Some builds of PyCrypto don't have the RIPEMD module
  52. RIPEMD160 = None
  53. try:
  54. import hashlib
  55. import hmac
  56. except ImportError: # Some builds/versions of Python don't have a hashlib module
  57. hashlib = hmac = None
  58. from tls.Crypto.Random import random as pycrypto_random
  59. import random as stdlib_random
  60. class BLAKE2b_512(object):
  61. digest_size = 512
  62. @staticmethod
  63. def new(data=None):
  64. return BLAKE2b.new(digest_bits=512, data=data)
  65. class BLAKE2s_256(object):
  66. digest_size = 256
  67. @staticmethod
  68. def new(data=None):
  69. return BLAKE2s.new(digest_bits=256, data=data)
  70. class ChaCha20_old_style(object):
  71. @staticmethod
  72. def new(key, nonce):
  73. return ChaCha20.new(key=key, nonce=nonce)
  74. class ModeNotAvailable(ValueError):
  75. pass
  76. rng = get_random_bytes
  77. class Benchmark:
  78. def __init__(self):
  79. self.__random_data = None
  80. def random_keys(self, bytes, n=10**5):
  81. """Return random keys of the specified number of bytes.
  82. If this function has been called before with the same number of bytes,
  83. cached keys are used instead of randomly generating new ones.
  84. """
  85. return self.random_blocks(bytes, n)
  86. def random_blocks(self, bytes_per_block, blocks):
  87. bytes = bytes_per_block * blocks
  88. data = self.random_data(bytes)
  89. retval = []
  90. for i in range(blocks):
  91. p = i * bytes_per_block
  92. retval.append(data[p:p+bytes_per_block])
  93. return retval
  94. def random_data(self, bytes):
  95. if self.__random_data is None:
  96. self.__random_data = self._random_bytes(bytes)
  97. return self.__random_data
  98. elif bytes == len(self.__random_data):
  99. return self.__random_data
  100. elif bytes < len(self.__random_data):
  101. return self.__random_data[:bytes]
  102. else:
  103. self.__random_data += self._random_bytes(bytes - len(self.__random_data))
  104. return self.__random_data
  105. def _random_bytes(self, b):
  106. return os.urandom(b)
  107. def announce_start(self, test_name):
  108. sys.stdout.write("%s: " % (test_name,))
  109. sys.stdout.flush()
  110. def announce_result(self, value, units):
  111. sys.stdout.write("%.2f %s\n" % (value, units))
  112. sys.stdout.flush()
  113. def test_random_module(self, module_name, module):
  114. self.announce_start("%s.choice" % (module_name,))
  115. alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
  116. t0 = time.time()
  117. for i in range(5000):
  118. module.choice(alphabet)
  119. t = time.time()
  120. invocations_per_second = 5000 / (t - t0)
  121. self.announce_result(invocations_per_second, "invocations/sec")
  122. def test_pubkey_setup(self, pubkey_name, module, key_bytes):
  123. self.announce_start("%s pubkey setup" % (pubkey_name,))
  124. keys = self.random_keys(key_bytes)[:5]
  125. t0 = time.time()
  126. for k in keys:
  127. module.generate(key_bytes*8)
  128. t = time.time()
  129. pubkey_setups_per_second = len(keys) / (t - t0)
  130. self.announce_result(pubkey_setups_per_second, "Keys/sec")
  131. def test_key_setup(self, cipher_name, module, key_bytes, params):
  132. self.generate_cipher(module, key_bytes, params)
  133. self.announce_start("%s key setup" % (cipher_name,))
  134. for x in xrange(5000):
  135. t0 = time.time()
  136. self.generate_cipher(module, key_bytes, params)
  137. t = time.time()
  138. key_setups_per_second = 5000 / (t - t0)
  139. self.announce_result(key_setups_per_second/1000, "kKeys/sec")
  140. def test_encryption(self, cipher_name, module, key_bytes, params):
  141. self.announce_start("%s encryption" % (cipher_name,))
  142. pt_size = 16384000L
  143. pt = rng(pt_size)
  144. cipher = self.generate_cipher(module, key_bytes, params)
  145. # Perform encryption
  146. t0 = time.time()
  147. cipher.encrypt(pt)
  148. t = time.time()
  149. encryption_speed = pt_size / (t - t0)
  150. self.announce_result(encryption_speed / 10**6, "MBps")
  151. def test_hash_small(self, hash_name, hash_constructor, digest_size):
  152. self.announce_start("%s (%d-byte inputs)" % (hash_name, digest_size))
  153. blocks = self.random_blocks(digest_size, 10000)
  154. # Initialize hashes
  155. t0 = time.time()
  156. for b in blocks:
  157. hash_constructor(b).digest()
  158. t = time.time()
  159. hashes_per_second = len(blocks) / (t - t0)
  160. self.announce_result(hashes_per_second / 1000, "kHashes/sec")
  161. def test_hash_large(self, hash_name, hash_constructor, digest_size):
  162. self.announce_start("%s (single large input)" % (hash_name,))
  163. blocks = self.random_blocks(16384, 10000)
  164. # Perform hashing
  165. t0 = time.time()
  166. h = hash_constructor()
  167. for b in blocks:
  168. h.update(b)
  169. h.digest()
  170. t = time.time()
  171. hash_speed = len(blocks) * len(blocks[0]) / (t - t0)
  172. self.announce_result(hash_speed / 10**6, "MBps")
  173. def test_hmac_small(self, mac_name, hmac_constructor, digestmod, digest_size):
  174. keys = iter(self.random_keys(digest_size))
  175. if sys.version_info[0] == 2:
  176. mac_constructor = lambda data=None: hmac_constructor(keys.next(), data, digestmod)
  177. else:
  178. mac_constructor = lambda data=None: hmac_constructor(keys.__next__(), data, digestmod)
  179. self.test_hash_small(mac_name, mac_constructor, digest_size)
  180. def test_hmac_large(self, mac_name, hmac_constructor, digestmod, digest_size):
  181. key = self.random_keys(digest_size)[0]
  182. mac_constructor = lambda data=None: hmac_constructor(key, data, digestmod)
  183. self.test_hash_large(mac_name, mac_constructor, digest_size)
  184. def test_cmac_small(self, mac_name, cmac_constructor, ciphermod, key_size):
  185. keys = iter(self.random_keys(key_size))
  186. if sys.version_info[0] == 2:
  187. mac_constructor = lambda data=None: cmac_constructor(keys.next(), data, ciphermod)
  188. else:
  189. mac_constructor = lambda data=None: cmac_constructor(keys.__next__(), data, ciphermod)
  190. self.test_hash_small(mac_name, mac_constructor, ciphermod.block_size)
  191. def test_cmac_large(self, mac_name, cmac_constructor, ciphermod, key_size):
  192. key = self.random_keys(key_size)[0]
  193. mac_constructor = lambda data=None: cmac_constructor(key, data, ciphermod)
  194. self.test_hash_large(mac_name, mac_constructor, ciphermod.block_size)
  195. def test_pkcs1_sign(self, scheme_name, scheme_constructor, hash_name, hash_constructor, digest_size):
  196. self.announce_start("%s signing %s (%d-byte inputs)" % (scheme_name, hash_name, digest_size))
  197. # Make a key
  198. k = RSA.generate(2048)
  199. sigscheme = scheme_constructor(k)
  200. # Make some hashes
  201. blocks = self.random_blocks(digest_size, 50)
  202. hashes = []
  203. for b in blocks:
  204. hashes.append(hash_constructor(b))
  205. # Perform signing
  206. t0 = time.time()
  207. for h in hashes:
  208. sigscheme.sign(h)
  209. t = time.time()
  210. speed = len(hashes) / (t - t0)
  211. self.announce_result(speed, "sigs/sec")
  212. def test_pkcs1_verify(self, scheme_name, scheme_constructor, hash_name, hash_constructor, digest_size):
  213. self.announce_start("%s verification %s (%d-byte inputs)" % (scheme_name, hash_name, digest_size))
  214. # Make a key
  215. k = RSA.generate(2048)
  216. sigscheme = scheme_constructor(k)
  217. # Make some hashes
  218. blocks = self.random_blocks(digest_size, 50)
  219. hashes = []
  220. for b in blocks:
  221. hashes.append(hash_constructor(b))
  222. # Make some signatures
  223. signatures = []
  224. for h in hashes:
  225. signatures.append(sigscheme.sign(h))
  226. # Double the list, to make timing better
  227. hashes = hashes + hashes
  228. signatures = signatures + signatures
  229. # Perform verification
  230. t0 = time.time()
  231. for h, s in zip(hashes, signatures):
  232. sigscheme.verify(h, s)
  233. t = time.time()
  234. speed = len(hashes) / (t - t0)
  235. self.announce_result(speed, "sigs/sec")
  236. def generate_cipher(self, module, key_size, params):
  237. params_dict = {}
  238. if params:
  239. params_dict = dict([x.split("=") for x in params.split(" ")])
  240. gen_tuple = []
  241. gen_dict = {}
  242. # 1st parameter (mandatory): key
  243. if params_dict.get('ks') == "x2":
  244. key = rng(2 * key_size)
  245. else:
  246. key = rng(key_size)
  247. gen_tuple.append(key)
  248. # 2nd parameter: mode
  249. mode = params_dict.get("mode")
  250. if mode:
  251. mode_value = getattr(module, mode, None)
  252. if mode_value is None:
  253. # Mode not available for this cipher
  254. raise ModeNotAvailable()
  255. gen_tuple.append(getattr(module, mode))
  256. # 3rd parameter: IV/nonce
  257. iv_length = params_dict.get("iv")
  258. if iv_length is None:
  259. iv_length = params_dict.get("nonce")
  260. if iv_length:
  261. if iv_length == "bs":
  262. iv_length = module.block_size
  263. iv = rng(int(iv_length))
  264. gen_tuple.append(iv)
  265. # Specific to CTR mode
  266. le = params_dict.get("little_endian")
  267. if le:
  268. if le == "True":
  269. le = True
  270. else:
  271. le = False
  272. # Remove iv from parameters
  273. gen_tuple = gen_tuple[:-1]
  274. ctr = Crypto.Util.Counter.new(module.block_size*8,
  275. initial_value=bytes_to_long(iv),
  276. little_endian=le,
  277. allow_wraparound=True)
  278. gen_dict['counter'] = ctr
  279. # Generate cipher
  280. return module.new(*gen_tuple, **gen_dict)
  281. def run(self):
  282. pubkey_specs = [
  283. ("RSA(1024)", RSA, int(1024/8)),
  284. ("RSA(2048)", RSA, int(2048/8)),
  285. ("RSA(4096)", RSA, int(4096/8)),
  286. ]
  287. block_cipher_modes = [
  288. # Mode name, key setup, parameters
  289. ("CBC", True, "mode=MODE_CBC iv=bs"),
  290. ("CFB-8", False, "mode=MODE_CFB iv=bs"),
  291. ("OFB", False, "mode=MODE_OFB iv=bs"),
  292. ("ECB", False, "mode=MODE_ECB"),
  293. ("CTR-LE", True, "mode=MODE_CTR iv=bs little_endian=True"),
  294. ("CTR-BE", False, "mode=MODE_CTR iv=bs little_endian=False"),
  295. ("OPENPGP", False, "mode=MODE_OPENPGP iv=bs"),
  296. ("CCM", True, "mode=MODE_CCM nonce=12"),
  297. ("GCM", True, "mode=MODE_GCM nonce=16"),
  298. ("EAX", True, "mode=MODE_EAX nonce=16"),
  299. ("SIV", True, "mode=MODE_SIV ks=x2 nonce=16"),
  300. ("OCB", True, "mode=MODE_OCB nonce=15"),
  301. ]
  302. block_specs = [
  303. # Cipher name, module, key size
  304. ("DES", DES, 8),
  305. ("DES3", DES3, 24),
  306. ("AES128", AES, 16),
  307. ("AES192", AES, 24),
  308. ("AES256", AES, 32),
  309. ("Blowfish(256)", Blowfish, 32),
  310. ("CAST(128)", CAST, 16),
  311. ("ARC2(128)", ARC2, 16),
  312. ]
  313. stream_specs = [
  314. # Cipher name, module, key size, nonce size
  315. ("ARC4(128)", ARC4, 16, 0),
  316. ("Salsa20(16)", Salsa20, 16, 8),
  317. ("Salsa20(32)", Salsa20, 32, 8),
  318. ("ChaCha20", ChaCha20_old_style, 32, 8),
  319. ]
  320. hash_specs = [
  321. ("MD2", MD2),
  322. ("MD4", MD4),
  323. ("MD5", MD5),
  324. ("SHA1", SHA1),
  325. ("SHA224", SHA224),
  326. ("SHA256", SHA256),
  327. ("SHA384", SHA384),
  328. ("SHA512", SHA512),
  329. ("SHA3_224", SHA3_224),
  330. ("SHA3_256", SHA3_256),
  331. ("SHA3_384", SHA3_384),
  332. ("SHA3_512", SHA3_512),
  333. ("BLAKE2b", BLAKE2b_512),
  334. ("BLAKE2s", BLAKE2s_256),
  335. ]
  336. if RIPEMD160 is not None:
  337. hash_specs += [("RIPEMD160", RIPEMD160)]
  338. hashlib_specs = []
  339. if hashlib is not None:
  340. if hasattr(hashlib, 'md5'): hashlib_specs.append(("hashlib.md5", hashlib.md5))
  341. if hasattr(hashlib, 'sha1'): hashlib_specs.append(("hashlib.sha1", hashlib.sha1))
  342. if hasattr(hashlib, 'sha224'): hashlib_specs.append(("hashlib.sha224", hashlib.sha224))
  343. if hasattr(hashlib, 'sha256'): hashlib_specs.append(("hashlib.sha256", hashlib.sha256))
  344. if hasattr(hashlib, 'sha384'): hashlib_specs.append(("hashlib.sha384", hashlib.sha384))
  345. if hasattr(hashlib, 'sha512'): hashlib_specs.append(("hashlib.sha512", hashlib.sha512))
  346. # stdlib random
  347. self.test_random_module("stdlib random", stdlib_random)
  348. # Crypto.Random.random
  349. self.test_random_module("Crypto.Random.random", pycrypto_random)
  350. # Crypto.PublicKey
  351. for pubkey_name, module, key_bytes in pubkey_specs:
  352. self.test_pubkey_setup(pubkey_name, module, key_bytes)
  353. # Crypto.Cipher (block ciphers)
  354. for cipher_name, module, key_bytes in block_specs:
  355. # Benchmark each cipher in each of the various modes (CBC, etc)
  356. for mode_name, test_ks, params in block_cipher_modes:
  357. mode_text = "%s-%s" % (cipher_name, mode_name)
  358. try:
  359. if test_ks:
  360. self.test_key_setup(mode_text, module, key_bytes, params)
  361. self.test_encryption(mode_text, module, key_bytes, params)
  362. except ModeNotAvailable as e:
  363. pass
  364. # Crypto.Cipher (stream ciphers)
  365. for cipher_name, module, key_bytes, nonce_bytes in stream_specs:
  366. params = ""
  367. if nonce_bytes:
  368. params = "nonce=" + str(nonce_bytes)
  369. self.test_key_setup(cipher_name, module, key_bytes, params)
  370. self.test_encryption(cipher_name, module, key_bytes, params)
  371. # Crypto.Hash
  372. for hash_name, module in hash_specs:
  373. self.test_hash_small(hash_name, module.new, module.digest_size)
  374. self.test_hash_large(hash_name, module.new, module.digest_size)
  375. # standard hashlib
  376. for hash_name, func in hashlib_specs:
  377. self.test_hash_small(hash_name, func, func().digest_size)
  378. self.test_hash_large(hash_name, func, func().digest_size)
  379. # PyCrypto HMAC
  380. for hash_name, module in hash_specs:
  381. if not hasattr(module, "block_size"):
  382. continue
  383. self.test_hmac_small("HMAC-"+hash_name, HMAC.new, module, module.digest_size)
  384. self.test_hmac_large("HMAC-"+hash_name, HMAC.new, module, module.digest_size)
  385. # standard hmac + hashlib
  386. for hash_name, func in hashlib_specs:
  387. if not hasattr(module, "block_size"):
  388. continue
  389. self.test_hmac_small("hmac+"+hash_name, hmac.HMAC, func, func().digest_size)
  390. self.test_hmac_large("hmac+"+hash_name, hmac.HMAC, func, func().digest_size)
  391. # CMAC
  392. for cipher_name, module, key_size in (("AES128", AES, 16),):
  393. self.test_cmac_small(cipher_name+"-CMAC", CMAC.new, module, key_size)
  394. self.test_cmac_large(cipher_name+"-CMAC", CMAC.new, module, key_size)
  395. # PKCS1_v1_5 (sign) + Crypto.Hash
  396. for hash_name, module in hash_specs:
  397. self.test_pkcs1_sign("PKCS#1-v1.5", RSASSA_PKCS1_v1_5.new, hash_name, module.new, module.digest_size)
  398. # PKCS1_PSS (sign) + Crypto.Hash
  399. for hash_name, module in hash_specs:
  400. self.test_pkcs1_sign("PKCS#1-PSS", PKCS1_PSS.new, hash_name, module.new, module.digest_size)
  401. # PKCS1_v1_5 (verify) + Crypto.Hash
  402. for hash_name, module in hash_specs:
  403. self.test_pkcs1_verify("PKCS#1-v1.5", RSASSA_PKCS1_v1_5.new, hash_name, module.new, module.digest_size)
  404. # PKCS1_PSS (verify) + Crypto.Hash
  405. for hash_name, module in hash_specs:
  406. self.test_pkcs1_verify("PKCS#1-PSS", PKCS1_PSS.new, hash_name, module.new, module.digest_size)
  407. if __name__ == '__main__':
  408. Benchmark().run()
  409. # vim:set ts=4 sw=4 sts=4 expandtab: