client.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844
  1. package main
  2. /*
  3. #cgo CFLAGS: -O2
  4. #cgo LDFLAGS: -lcrypto -lm
  5. #include "../c/dpf.h"
  6. #include "../c/okvClient.h"
  7. #include "../c/dpf.c"
  8. #include "../c/okvClient.c"
  9. */
  10. import "C"
  11. //sssssssssssss
  12. import (
  13. "2PPS/lib"
  14. "bufio"
  15. "bytes"
  16. "crypto/rand"
  17. "crypto/sha256"
  18. "crypto/tls"
  19. "encoding/json"
  20. "fmt"
  21. "math/big"
  22. mr "math/rand"
  23. "net"
  24. "os"
  25. "sort"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "time"
  30. "unsafe"
  31. "golang.org/x/crypto/nacl/box"
  32. )
  33. type tweet struct {
  34. Topics []string
  35. Text string
  36. }
  37. const leader string = "127.0.0.1:4441"
  38. //needs to be changed at leader/follower/client at the same time
  39. const numClients = 500
  40. //mylimit=8000
  41. //sudo prlimit --nofile=$mylimit --pid $$; ulimit -n $mylimit
  42. const dataLength int = 256
  43. const numThreads int = 12
  44. //Maximum Transport Unit
  45. const mtu int = 1100
  46. var dbWriteSize int
  47. var round int
  48. var topicList []string
  49. var archiveTopicList []string
  50. var neededSubscriptions int
  51. var publisherAmount int
  52. var timeBounds []int
  53. var speedUp int = 50
  54. var maxTimePerRound time.Duration = 25 * time.Second
  55. var startTime int
  56. //todo! expand this for multiple clients
  57. var archiveInterests = make([]int, 1)
  58. var sharedSecret [numClients][2][32]byte = createSharedSecret()
  59. var wantsArchive = make([]byte, 1)
  60. var leaderPublicKey *[32]byte
  61. var followerPublicKey *[32]byte
  62. var clientPrivateKey [numClients]*[32]byte
  63. var clientPublicKey [numClients]*[32]byte
  64. func main() {
  65. wg := &sync.WaitGroup{}
  66. getTimeBounds()
  67. for i := 0; i < numClients; i++ {
  68. wg.Add(1)
  69. go client(i)
  70. time.Sleep(1 * time.Millisecond)
  71. }
  72. wg.Wait()
  73. }
  74. func client(clientNumber int) {
  75. generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
  76. if err != nil {
  77. panic(err)
  78. }
  79. clientPrivateKey[clientNumber] = generatedPrivateKey
  80. clientPublicKey[clientNumber] = generatedPublicKey
  81. C.initializeCipher()
  82. //initializes the connection to the leader
  83. conf := &tls.Config{
  84. InsecureSkipVerify: true,
  85. }
  86. leaderConn, err := tls.Dial("tcp", leader, conf)
  87. if err != nil {
  88. fmt.Println("clientNumber", clientNumber)
  89. panic(err)
  90. }
  91. leaderConn.SetDeadline(time.Time{})
  92. //receives topics first so client can participate asap
  93. receiveTopicLists(leaderConn)
  94. //gets the public keys of both servers
  95. var tmpLeaderPubKey [32]byte
  96. _, err = leaderConn.Read(tmpLeaderPubKey[:])
  97. if err != nil {
  98. panic(err)
  99. }
  100. leaderPublicKey = &tmpLeaderPubKey
  101. var tmpFollowerPubKey [32]byte
  102. _, err = leaderConn.Read(tmpFollowerPubKey[:])
  103. if err != nil {
  104. panic(err)
  105. }
  106. followerPublicKey = &tmpFollowerPubKey
  107. //sends own public key
  108. writeTo(leaderConn, clientPublicKey[clientNumber][:])
  109. neededSubscriptionsBytes := readFrom(leaderConn, 4)
  110. neededSubscriptions = byteToInt(neededSubscriptionsBytes)
  111. startTimeBytes := readFrom(leaderConn, 4)
  112. startTime = byteToInt(startTimeBytes)
  113. //setup ends above
  114. //while client is active he is always connected and has to participate
  115. for {
  116. //gets current phase
  117. phase := readFrom(leaderConn, 1)
  118. if phase[0] == 1 {
  119. //gets current dbWriteSize from leader
  120. dbWriteSizeBytes := readFrom(leaderConn, 4)
  121. dbWriteSize = byteToInt(dbWriteSizeBytes)
  122. //todo! put into tweet creation
  123. //roundAsBytes := readFrom(leaderConn, 4)
  124. roundAsBytes := make([]byte, 4)
  125. _, err = leaderConn.Read(roundAsBytes)
  126. if err != nil {
  127. panic(err)
  128. }
  129. round = byteToInt(roundAsBytes)
  130. if clientNumber == 0 {
  131. fmt.Println("Round ", round)
  132. }
  133. //request virtualAddress from leader via pirQuery
  134. encryptedQueryLeader, encryptedQueryFollower := createAuditPIRQuery(clientNumber)
  135. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  136. pos := receiveVirtualAddress(sharedSecret[clientNumber], leaderConn)
  137. tweet := getRealTweet(clientNumber)
  138. if clientNumber == numClients-1 {
  139. fmt.Println("publisherAmount", publisherAmount)
  140. }
  141. //prep the query
  142. dataSize := len(tweet)
  143. querySize := make([]byte, 4)
  144. cQuerySize := C.int(byteToInt(querySize))
  145. var dpfQueryA *C.uchar
  146. var dpfQueryB *C.uchar
  147. C.prepQuery(C.int(pos), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB)
  148. intQuerySize := int(cQuerySize) //byteToInt(querySize)
  149. //write the query
  150. queryAPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryA), C.int(intQuerySize))
  151. //encrypts queryA and appends it to message
  152. var nonce [24]byte
  153. //fill nonce with randomness
  154. _, err = rand.Read(nonce[:])
  155. if err != nil {
  156. panic("couldn't get randomness for nonce!")
  157. }
  158. dpfQueryAEncrypted := box.Seal(nonce[:], queryAPlaintext, &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  159. //encrypts queryB and appends it to message
  160. queryBPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryB), C.int(intQuerySize))
  161. //fill nonce with randomness
  162. _, err = rand.Read(nonce[:])
  163. if err != nil {
  164. panic("couldn't get randomness for nonce!")
  165. }
  166. dpfQueryBEncrypted := box.Seal(nonce[:], queryBPlaintext, &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  167. //writes the dpfQuery to the leader
  168. dpfLengthBytes := intToByte(len(dpfQueryAEncrypted))
  169. writeTo(leaderConn, dpfLengthBytes)
  170. writeTo(leaderConn, dpfQueryAEncrypted)
  171. writeTo(leaderConn, dpfQueryBEncrypted)
  172. C.free(unsafe.Pointer(dpfQueryA))
  173. C.free(unsafe.Pointer(dpfQueryB))
  174. } else if phase[0] == 3 {
  175. /*
  176. possible Values
  177. 0 : new client
  178. leader expects sharedSecrets, expects pirQuery
  179. 1 : update needed
  180. leader sends topicList, performs local update of sharedSecret, expects pirQuery
  181. 2 : no update needed
  182. nothing
  183. */
  184. subPhase := readFrom(leaderConn, 1)
  185. var encryptedQueryLeader, encryptedQueryFollower []byte
  186. //first time participating
  187. if subPhase[0] == 0 {
  188. receiveTopicLists(leaderConn)
  189. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  190. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  191. }
  192. //updates the topic list and what client is interested in
  193. if subPhase[0] == 1 {
  194. receiveTopicLists(leaderConn)
  195. //updates local secret
  196. for index := 0; index < 2; index++ {
  197. sharedSecret[clientNumber][index] = sha256.Sum256(sharedSecret[clientNumber][index][:])
  198. }
  199. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  200. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  201. }
  202. receiveTweets(sharedSecret[clientNumber], leaderConn, false, clientNumber)
  203. if len(archiveTopicList) > 0 {
  204. wantsArchive[0] = 0 //archive test
  205. } else {
  206. wantsArchive[0] = 0
  207. }
  208. writeTo(leaderConn, wantsArchive)
  209. if wantsArchive[0] == 1 && len(archiveTopicList) > 0 {
  210. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(-1, clientNumber)
  211. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, true)
  212. receiveTweets(sharedSecret[clientNumber], leaderConn, true, clientNumber)
  213. }
  214. } else {
  215. fmt.Println("Phase", phase)
  216. panic("somethin went wrong")
  217. }
  218. }
  219. }
  220. //creates and sends the pirQuerys for each server
  221. func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
  222. //later this will be taken from gui, this is only for testing
  223. topicsOFInterest := make([]int, 10)
  224. topicsOFInterest[0] = 0
  225. topicsOFInterest[1] = 1
  226. topicsOFInterest[9] = 1
  227. archiveInterests[0] = 1
  228. //todo! repeat for archive
  229. tmpNeededSubscriptions := neededSubscriptions
  230. if tmpNeededSubscriptions > len(topicList) {
  231. tmpNeededSubscriptions = len(topicList)
  232. }
  233. tmptopicsOfInterest := make([]int, len(topicList))
  234. copy(tmptopicsOfInterest, topicsOFInterest)
  235. tmpTopicList := make([]string, len(topicList))
  236. copy(tmpTopicList, topicList)
  237. if wantsArchive[0] == 1 && subPhase == -1 {
  238. tmpNeededSubscriptions = len(archiveInterests)
  239. if tmpNeededSubscriptions > len(archiveTopicList) {
  240. tmpNeededSubscriptions = len(archiveTopicList)
  241. }
  242. copy(tmptopicsOfInterest, archiveInterests) //archiveInterests from gui
  243. copy(tmpTopicList, archiveTopicList)
  244. }
  245. //creates fake topicsOfInterest if client is boooring
  246. if len(tmptopicsOfInterest) < tmpNeededSubscriptions && subPhase != -1 {
  247. tmptopicsOfInterest = addFakeInterests(len(tmpTopicList), tmptopicsOfInterest, false)
  248. }
  249. //pirQuery [topicsofinterest][serverAmount][topicAmount]byte
  250. pirQuerys := make([][][]byte, len(tmptopicsOfInterest))
  251. for i := range pirQuerys {
  252. pirQuerys[i] = make([][]byte, 2)
  253. for j := range pirQuerys[i] {
  254. pirQuerys[i][j] = make([]byte, len(tmpTopicList))
  255. }
  256. }
  257. //for leader
  258. //pirQuery will be filled with random bits
  259. for topic := range tmptopicsOfInterest {
  260. for index := range tmpTopicList {
  261. bit, err := rand.Int(rand.Reader, big.NewInt(2))
  262. if err != nil {
  263. panic(err)
  264. }
  265. pirQuerys[topic][0][index] = byte(bit.Int64())
  266. }
  267. }
  268. tmptopicsOfInterestBytes := make([]byte, tmpNeededSubscriptions)
  269. for index := range tmptopicsOfInterestBytes {
  270. if tmptopicsOfInterest[index] == 1 {
  271. tmptopicsOfInterestBytes[index] = 1
  272. }
  273. }
  274. for topic := range tmptopicsOfInterest {
  275. for index := range tmpTopicList {
  276. if topic == index {
  277. if pirQuerys[topic][0][index] == 1 {
  278. pirQuerys[topic][1][index] = 0
  279. } else {
  280. pirQuerys[topic][1][index] = 1
  281. }
  282. } else {
  283. if pirQuerys[topic][0][index] == 0 {
  284. pirQuerys[topic][1][index] = 0
  285. } else {
  286. pirQuerys[topic][1][index] = 1
  287. }
  288. }
  289. }
  290. }
  291. //flattens the querys to be able to send them more efficently
  292. messagesFlattened := make([][]byte, 2)
  293. //adds the sharedSecret to the first pirQuery when first time participating
  294. if subPhase == 0 {
  295. for server := 0; server < 2; server++ {
  296. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  297. }
  298. }
  299. for server := range messagesFlattened {
  300. for topic := range pirQuerys {
  301. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[topic][server]...)
  302. }
  303. }
  304. var nonce [24]byte
  305. _, err := rand.Read(nonce[:])
  306. if err != nil {
  307. panic("couldn't get randomness for nonce!")
  308. }
  309. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  310. _, err = rand.Read(nonce[:])
  311. if err != nil {
  312. panic("couldn't get randomness for nonce!")
  313. }
  314. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  315. return encryptedQueryLeader, encryptedQueryFollower
  316. }
  317. func sendQuerys(encryptedQueryLeader, encryptedQueryFollower []byte, leaderConn net.Conn, getArchive bool) {
  318. encryptedLength := len(encryptedQueryLeader)
  319. //sends the pirQuerysLength to the leader
  320. writeTo(leaderConn, intToByte(encryptedLength))
  321. //sends the pirQuerys to the leader
  322. writeTo(leaderConn, encryptedQueryLeader)
  323. writeTo(leaderConn, encryptedQueryFollower)
  324. if getArchive {
  325. writeTo(leaderConn, intToByte(len(archiveInterests)))
  326. }
  327. }
  328. func receiveVirtualAddress(sharedSecret [2][32]byte, leaderConn net.Conn) int {
  329. virtualAddressByte := readFrom(leaderConn, 4)
  330. //xores the sharedSecret
  331. for h := 0; h < 2; h++ {
  332. for i := 0; i < 4; i++ {
  333. virtualAddressByte[i] = virtualAddressByte[i] ^ sharedSecret[h][i]
  334. }
  335. }
  336. return byteToInt(virtualAddressByte)
  337. }
  338. func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive bool, clientNumber int) {
  339. tmpNeededSubscriptions := neededSubscriptions
  340. if tmpNeededSubscriptions > len(topicList) {
  341. tmpNeededSubscriptions = len(topicList)
  342. }
  343. if getArchive {
  344. tmpNeededSubscriptions = len(archiveInterests)
  345. if tmpNeededSubscriptions > len(archiveTopicList) {
  346. tmpNeededSubscriptions = len(archiveTopicList)
  347. }
  348. }
  349. for i := 0; i < tmpNeededSubscriptions; i++ {
  350. //client receives tweets
  351. tweetsLengthBytes := readFrom(leaderConn, 4)
  352. tweetsLength := byteToInt(tweetsLengthBytes)
  353. tweets := readFrom(leaderConn, tweetsLength)
  354. //fmt.Println(tweets[:10])
  355. //expand sharedSecret so it is of right length
  356. expandBy := len(tweets) / 32
  357. expandedSharedSecrets := make([][]byte, 2)
  358. for i := 0; i < 2; i++ {
  359. for j := 0; j < expandBy; j++ {
  360. expandedSharedSecrets[i] = append(expandedSharedSecrets[i], sharedSecret[i][:]...)
  361. }
  362. }
  363. //xors the received messge into the message to display
  364. for i := 0; i < 2; i++ {
  365. lib.Xor(expandedSharedSecrets[i][:], tweets)
  366. }
  367. //fmt.Println("PubKey", clientPublicKey[clientNumber], "Bytes", tweets)
  368. index := strings.Index(string(tweets), ";;")
  369. if index != -1 && clientNumber == 0 {
  370. /*
  371. fmt.Println("Correct")
  372. textArr := strings.Split(string(tweets), ";;;;;;;;")
  373. text := string(tweets)[:len(textArr)-1]
  374. fmt.Println("Round", round, "Text", text[:5], "Length", len(tweets))
  375. */
  376. } else if index == -1 {
  377. fmt.Println("error")
  378. fmt.Println("round", round, "text:", string(tweets), "length", len(tweets))
  379. return
  380. //panic("received text not of correct format")
  381. }
  382. }
  383. }
  384. //creates a shared secret for each server
  385. func createSharedSecret() [numClients][2][32]byte {
  386. var tmpSharedSecret [numClients][2][32]byte
  387. for i := 0; i < numClients; i++ {
  388. for j := 0; j < 2; j++ {
  389. _, err := rand.Read(tmpSharedSecret[i][j][:])
  390. if err != nil {
  391. panic("couldn't get randomness for sharedSecret!")
  392. }
  393. }
  394. }
  395. return tmpSharedSecret
  396. }
  397. func createAuditPIRQuery(clientNumber int) ([]byte, []byte) {
  398. //pirQuery [serverAmount][dbWriteSize]byte
  399. pirQuerys := make([][]byte, 2)
  400. for i := range pirQuerys {
  401. pirQuerys[i] = make([]byte, dbWriteSize)
  402. }
  403. //for leader
  404. //pirQuery will be filled with random bits
  405. for index := range pirQuerys[0] {
  406. bit := mr.Intn(2)
  407. pirQuerys[0][index] = byte(bit)
  408. }
  409. copy(pirQuerys[1], pirQuerys[0])
  410. //the positon the virtual address will be taken from
  411. pos := mr.Intn(dbWriteSize)
  412. pirQuerys[0][pos] = 1
  413. pirQuerys[1][pos] = 0
  414. //flattens the querys to be able to send them more efficently
  415. messagesFlattened := make([][]byte, 2)
  416. //adds the sharedSecret to the pirQuery
  417. for server := 0; server < 2; server++ {
  418. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  419. }
  420. for server := 0; server < 2; server++ {
  421. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[server][:]...)
  422. }
  423. var nonce [24]byte
  424. _, err := rand.Read(nonce[:])
  425. if err != nil {
  426. panic("couldn't get randomness for nonce!")
  427. }
  428. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  429. _, err = rand.Read(nonce[:])
  430. if err != nil {
  431. panic("couldn't get randomness for nonce!")
  432. }
  433. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  434. return encryptedQueryLeader, encryptedQueryFollower
  435. }
  436. //generates a topicOfInterest array with random values
  437. func addFakeInterests(max int, topicsOfInterest []int, doAuditing bool) []int {
  438. tmpNeededSubscriptions := neededSubscriptions
  439. if tmpNeededSubscriptions > len(topicList) {
  440. tmpNeededSubscriptions = len(topicList)
  441. }
  442. fakeTopicsOfInterest := make([]int, tmpNeededSubscriptions)
  443. maxInt := max
  444. //fills the array with unique random ascending values ranging from 0 to max
  445. for i := 0; i < tmpNeededSubscriptions; i++ {
  446. fakeTopicsOfInterest[i] = mr.Intn(maxInt)
  447. for j := 0; j < i; j++ {
  448. if fakeTopicsOfInterest[i] == fakeTopicsOfInterest[j] {
  449. i--
  450. break
  451. }
  452. }
  453. }
  454. if doAuditing {
  455. sort.Ints(fakeTopicsOfInterest)
  456. return fakeTopicsOfInterest
  457. }
  458. //adds unique and new random numbers to topicOfInterests until length is satisfied
  459. for _, number := range fakeTopicsOfInterest {
  460. if !inList(number, topicsOfInterest) {
  461. topicsOfInterest = append(topicsOfInterest, number)
  462. }
  463. if len(topicsOfInterest) == tmpNeededSubscriptions {
  464. break
  465. }
  466. }
  467. sort.Ints(topicsOfInterest)
  468. return topicsOfInterest
  469. }
  470. func inList(number int, list []int) bool {
  471. for _, element := range list {
  472. if element == number {
  473. return true
  474. }
  475. }
  476. return false
  477. }
  478. func receiveTopicLists(leaderConn net.Conn) {
  479. for i := 0; i < 2; i++ {
  480. topicListLength := readFrom(leaderConn, 4)
  481. recTopicList := readFrom(leaderConn, byteToInt(topicListLength))
  482. var tmpTopicList []string
  483. arrayReader := bytes.NewReader(recTopicList[:])
  484. json.NewDecoder(arrayReader).Decode(&tmpTopicList)
  485. if i == 0 {
  486. topicList = tmpTopicList
  487. } else {
  488. archiveTopicList = tmpTopicList
  489. }
  490. }
  491. }
  492. func getRealTweet(clientNumber int) []byte {
  493. fUserList, err := os.Open("/home/simon/goCode/tweets/userList")
  494. if err != nil {
  495. panic(err)
  496. }
  497. defer fUserList.Close()
  498. currentLine := 0
  499. scanner := bufio.NewScanner(fUserList)
  500. userID := ""
  501. for scanner.Scan() {
  502. if currentLine == clientNumber {
  503. userID = scanner.Text()
  504. break
  505. }
  506. currentLine++
  507. }
  508. if userID == "" {
  509. panic("no userID picked")
  510. }
  511. fTweets, err := os.Open("/home/simon/goCode/tweets/userTweets/" + userID)
  512. if err != nil {
  513. panic(err)
  514. }
  515. defer fTweets.Close()
  516. scanner = bufio.NewScanner(fTweets)
  517. lowerBound := timeBounds[round-1]
  518. upperBound := timeBounds[round]
  519. var tweet []byte
  520. for scanner.Scan() {
  521. lineArr := strings.Split(scanner.Text(), ", \"hashtags\"")
  522. lineArr = strings.Split(lineArr[0], ": ")
  523. lineArr = strings.Split(lineArr[1], " \"")
  524. timestamp, _ := strconv.Atoi(lineArr[0])
  525. //transforms timestamp to current time
  526. timestamp -= 1351742400
  527. timestamp += startTime
  528. if timestamp > lowerBound && timestamp < upperBound {
  529. lineArr = strings.Split(scanner.Text(), "[\"")
  530. line := lineArr[1]
  531. lineArr = strings.Split(line, "\"]")
  532. line = lineArr[0]
  533. lineArr = strings.Split(line, ",")
  534. line = strings.Join(lineArr, "")
  535. topicLine := strings.Split(line, "\"")
  536. var topics []byte
  537. for index, topic := range topicLine {
  538. if index%2 == 1 {
  539. continue
  540. }
  541. if len(topics)+len(topic) > dataLength-10 {
  542. break
  543. }
  544. topics = append(topics, []byte(topic)[:]...)
  545. topics = append(topics, []byte(",")[0])
  546. }
  547. topics = topics[:len(topics)-1]
  548. //fmt.Println(string(topics))
  549. tweet = append(tweet, topics...)
  550. tweet = append(tweet, []byte(";")[0])
  551. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  552. num := r.Intn(10000)
  553. if num == 0 {
  554. num = 1
  555. }
  556. tweet = append(tweet, []byte(strconv.Itoa(num) + ";;")[:]...)
  557. //fmt.Println("tweet", string(tweet))
  558. //adds padding
  559. length := dataLength - len(tweet)
  560. padding := make([]byte, length)
  561. rand.Read(padding)
  562. tweet = append(tweet, padding...)
  563. publisherAmount++
  564. return tweet
  565. }
  566. }
  567. tweet = make([]byte, dataLength)
  568. return tweet
  569. }
  570. func getTimeBounds() {
  571. timeBounds = make([]int, 10000)
  572. timeBounds[0] = int(time.Now().Unix())
  573. for index := range timeBounds {
  574. if index == 0 {
  575. continue
  576. }
  577. timeBounds[index] = timeBounds[index-1] + speedUp*(int(3*maxTimePerRound.Seconds())+2)
  578. }
  579. }
  580. func getRandomTweet(clientNumber int) []byte {
  581. var tweet []byte
  582. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  583. maxTopics := r.Intn(6)
  584. if maxTopics == 0 {
  585. maxTopics = 1
  586. }
  587. maxInt := 10000
  588. topicNumbers := make([]int, maxTopics)
  589. //fills the array with unique random ascending values ranging from 0 to maxInt
  590. for i := 0; i < maxTopics; i++ {
  591. topicNumbers[i] = mr.Intn(maxInt)
  592. for j := 0; j < i; j++ {
  593. if topicNumbers[i] == topicNumbers[j] {
  594. i--
  595. break
  596. }
  597. }
  598. }
  599. sort.Ints(topicNumbers)
  600. //fmt.Println("topicNumbers", topicNumbers)
  601. var topics []byte
  602. topicIndex := 0
  603. for i := 0; i < len(topicNumbers)*2; i++ {
  604. if i%2 == 0 {
  605. topics = append(topics, byte(topicNumbers[topicIndex]))
  606. topicIndex++
  607. } else if i != (len(topicNumbers)*2)-1 {
  608. topics = append(topics, []byte(",")[0])
  609. }
  610. }
  611. topics = append(topics, []byte(";")[0])
  612. num := r.Intn(100)
  613. if num == 0 {
  614. num = 1
  615. }
  616. text := []byte(strconv.Itoa(num) + ";")
  617. tweet = append(tweet, topics...)
  618. tweet = append(tweet, text...)
  619. tweet = append(tweet, []byte(";")[0])
  620. //fmt.Println("writing", string(text))
  621. //fmt.Println(topicNumbers)
  622. if len(tweet) > 32 {
  623. fmt.Println("lenlen", len(tweet))
  624. }
  625. //adds padding
  626. length := dataLength - len(tweet)
  627. padding := make([]byte, length)
  628. rand.Read(padding)
  629. tweet = append(tweet, padding...)
  630. return tweet
  631. }
  632. //sends the array to the connection
  633. func writeTo(connection net.Conn, array []byte) {
  634. remainingLength := len(array)
  635. for remainingLength > 0 {
  636. if remainingLength >= mtu {
  637. _, err := connection.Write(array[:mtu])
  638. if err != nil {
  639. panic(err)
  640. }
  641. array = array[mtu:]
  642. remainingLength -= mtu
  643. } else {
  644. _, err := connection.Write(array)
  645. if err != nil {
  646. panic(err)
  647. }
  648. remainingLength = 0
  649. }
  650. }
  651. }
  652. //reads an array which is returned and of size "size" from the connection
  653. func readFrom(connection net.Conn, size int) []byte {
  654. var array []byte
  655. remainingSize := size
  656. for remainingSize > 0 {
  657. var err error
  658. toAppend := make([]byte, mtu)
  659. if remainingSize > mtu {
  660. _, err = connection.Read(toAppend)
  661. array = append(array, toAppend...)
  662. remainingSize -= mtu
  663. } else {
  664. _, err = connection.Read(toAppend[:remainingSize])
  665. array = append(array, toAppend[:remainingSize]...)
  666. remainingSize = 0
  667. }
  668. if err != nil {
  669. panic(err)
  670. }
  671. }
  672. return array
  673. }
  674. func intToByte(myInt int) (retBytes []byte) {
  675. retBytes = make([]byte, 4)
  676. retBytes[3] = byte((myInt >> 24) & 0xff)
  677. retBytes[2] = byte((myInt >> 16) & 0xff)
  678. retBytes[1] = byte((myInt >> 8) & 0xff)
  679. retBytes[0] = byte(myInt & 0xff)
  680. return
  681. }
  682. func byteToInt(myBytes []byte) (x int) {
  683. x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0])
  684. return
  685. }