test_sharding.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. import sh
  2. import unittest
  3. from sh import cut, grep, cat, wc, uniq, mv
  4. zmap_std_args = [ "-b",
  5. "configs/blacklist_shard.conf",
  6. "--seed=1234",
  7. "192.168.1.0/24",
  8. "--dryrun",
  9. "-c",
  10. "1"
  11. ]
  12. zmap = sh.Command("../src/zmap").bake(*zmap_std_args)
  13. def shard_file_name(shards, threads):
  14. # Use naming convertion <shards>-t<threads>
  15. return ''.join([str(shards), '-t', str(threads)])
  16. def output_file_name(shards, shard, threads):
  17. # Use naming convention: <shards>.<shard>-t<threads>
  18. return ''.join([str(shards), '.', str(shard), '-t', str(threads)])
  19. def parse(filename, **kwargs):
  20. # cat outfile | grep ip | cut -d '|' -f 2 | cut -d ' ' -f 3 | cut -d '.' -f 4 | sort -n | wc -l
  21. return sh.sort(cut(cut(cut(grep(cat(filename), "ip"), d="|", f=2), d=" ", f=3), d=".", f=4), "-n", _out=kwargs.get("_out"))
  22. class TestSharding(unittest.TestCase):
  23. NUM_IPS = 256
  24. def setUp(self):
  25. pass
  26. def takeDown(self):
  27. pass
  28. def _runTest(self, shards, max_threads):
  29. for threads in range(1, max_threads + 1):
  30. for shard in range(0, shards):
  31. with sh.sudo:
  32. outfile = output_file_name(shards, shard, threads)
  33. zmap(p=80, T=threads, shards=shards, shard=shard, _out="tempfile")
  34. parse("tempfile", _out=outfile)
  35. dup_lines = int(wc(uniq(cat(outfile), "-d"), "-l"))
  36. self.assertEqual(dup_lines, 0)
  37. shard_file = shard_file_name(shards, threads)
  38. if shard == 0:
  39. cat(outfile, _out=shard_file)
  40. else:
  41. cat(shard_file, outfile, _out="tempfile")
  42. mv("tempfile", shard_file)
  43. for threads in range(1, max_threads + 1):
  44. shard_file = shard_file_name(shards, threads)
  45. num_lines = int(wc(cat(shard_file), "-l"))
  46. self.assertEqual(num_lines, TestSharding.NUM_IPS)
  47. dup_lines = int(wc(uniq(sh.sort(cat(shard_file), "-n"), "-d"), "-l"))
  48. self.assertEqual(dup_lines, 0)
  49. def testOneShard(self):
  50. # Test with one shard
  51. self._runTest(1, 4)
  52. def testTwoShards(self):
  53. self._runTest(2, 4)
  54. if __name__ == '__main__':
  55. unittest.main()