client.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803
  1. package main
  2. /*
  3. #cgo CFLAGS: -O2
  4. #cgo LDFLAGS: -lcrypto -lm
  5. #include "../c/dpf.h"
  6. #include "../c/okvClient.h"
  7. #include "../c/dpf.c"
  8. #include "../c/okvClient.c"
  9. */
  10. import "C"
  11. //sssssssssssss
  12. import (
  13. "2PPS/lib"
  14. "bufio"
  15. "bytes"
  16. "crypto/rand"
  17. "crypto/sha256"
  18. "crypto/tls"
  19. "encoding/json"
  20. "fmt"
  21. "math/big"
  22. mr "math/rand"
  23. "net"
  24. "os"
  25. "sort"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "time"
  30. "unsafe"
  31. "golang.org/x/crypto/nacl/box"
  32. )
  33. type tweet struct {
  34. Topics []string
  35. Text string
  36. }
  37. const leader string = "127.0.0.1:4441"
  38. //needs to be changed at leader/follower/client at the same time
  39. const numClients = 4000
  40. const dataLength int = 256
  41. const numThreads int = 12
  42. //Maximum Transport Unit
  43. const mtu int = 1100
  44. var dbWriteSize int
  45. var round int
  46. var topicList []string
  47. var archiveTopicList []string
  48. var neededSubscriptions int
  49. var maxTimePerRound time.Duration = 100 * time.Second
  50. var startTime int
  51. //todo! expand this for multiple clients
  52. var archiveInterests = make([]int, 1)
  53. var sharedSecret [numClients][2][32]byte = createSharedSecret()
  54. var wantsArchive = make([]byte, 1)
  55. var leaderPublicKey *[32]byte
  56. var followerPublicKey *[32]byte
  57. var clientPrivateKey [numClients]*[32]byte
  58. var clientPublicKey [numClients]*[32]byte
  59. func main() {
  60. wg := &sync.WaitGroup{}
  61. for i := 0; i < numClients; i++ {
  62. wg.Add(1)
  63. go client(i)
  64. time.Sleep(10 * time.Millisecond)
  65. }
  66. wg.Wait()
  67. }
  68. func client(clientNumber int) {
  69. generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
  70. if err != nil {
  71. panic(err)
  72. }
  73. clientPrivateKey[clientNumber] = generatedPrivateKey
  74. clientPublicKey[clientNumber] = generatedPublicKey
  75. C.initializeCipher()
  76. //initializes the connection to the leader
  77. conf := &tls.Config{
  78. InsecureSkipVerify: true,
  79. }
  80. leaderConn, err := tls.Dial("tcp", leader, conf)
  81. if err != nil {
  82. panic(err)
  83. }
  84. leaderConn.SetDeadline(time.Time{})
  85. //receives topics first so client can participate asap
  86. receiveTopicLists(leaderConn)
  87. //gets the public keys of both servers
  88. var tmpLeaderPubKey [32]byte
  89. _, err = leaderConn.Read(tmpLeaderPubKey[:])
  90. if err != nil {
  91. panic(err)
  92. }
  93. leaderPublicKey = &tmpLeaderPubKey
  94. var tmpFollowerPubKey [32]byte
  95. _, err = leaderConn.Read(tmpFollowerPubKey[:])
  96. if err != nil {
  97. panic(err)
  98. }
  99. followerPublicKey = &tmpFollowerPubKey
  100. //sends own public key
  101. writeTo(leaderConn, clientPublicKey[clientNumber][:])
  102. neededSubscriptionsBytes := readFrom(leaderConn, 4)
  103. neededSubscriptions = byteToInt(neededSubscriptionsBytes)
  104. startTimeBytes := readFrom(leaderConn, 4)
  105. startTime = byteToInt(startTimeBytes)
  106. //setup ends above
  107. //while client is active he is always connected and has to participate
  108. for {
  109. //gets current phase
  110. phase := readFrom(leaderConn, 1)
  111. if phase[0] == 1 {
  112. //gets current dbWriteSize from leader
  113. dbWriteSizeBytes := readFrom(leaderConn, 4)
  114. dbWriteSize = byteToInt(dbWriteSizeBytes)
  115. //todo! put into tweet creation
  116. //roundAsBytes := readFrom(leaderConn, 4)
  117. roundAsBytes := make([]byte, 4)
  118. _, err = leaderConn.Read(roundAsBytes)
  119. if err != nil {
  120. panic(err)
  121. }
  122. round = byteToInt(roundAsBytes)
  123. if clientNumber == 0 {
  124. fmt.Println("Round ", round)
  125. }
  126. //request virtualAddress from leader via pirQuery
  127. encryptedQueryLeader, encryptedQueryFollower := createAuditPIRQuery(clientNumber)
  128. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  129. pos := receiveVirtualAddress(sharedSecret[clientNumber], leaderConn)
  130. tweet := getRandomTweet(clientNumber)
  131. //prep the query
  132. dataSize := len(tweet)
  133. querySize := make([]byte, 4)
  134. cQuerySize := C.int(byteToInt(querySize))
  135. var dpfQueryA *C.uchar
  136. var dpfQueryB *C.uchar
  137. C.prepQuery(C.int(pos), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB)
  138. intQuerySize := int(cQuerySize) //byteToInt(querySize)
  139. //write the query
  140. queryAPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryA), C.int(intQuerySize))
  141. //encrypts queryA and appends it to message
  142. var nonce [24]byte
  143. //fill nonce with randomness
  144. _, err = rand.Read(nonce[:])
  145. if err != nil {
  146. panic("couldn't get randomness for nonce!")
  147. }
  148. dpfQueryAEncrypted := box.Seal(nonce[:], queryAPlaintext, &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  149. //encrypts queryB and appends it to message
  150. queryBPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryB), C.int(intQuerySize))
  151. //fill nonce with randomness
  152. _, err = rand.Read(nonce[:])
  153. if err != nil {
  154. panic("couldn't get randomness for nonce!")
  155. }
  156. dpfQueryBEncrypted := box.Seal(nonce[:], queryBPlaintext, &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  157. //writes the dpfQuery to the leader
  158. dpfLengthBytes := intToByte(len(dpfQueryAEncrypted))
  159. writeTo(leaderConn, dpfLengthBytes)
  160. writeTo(leaderConn, dpfQueryAEncrypted)
  161. writeTo(leaderConn, dpfQueryBEncrypted)
  162. C.free(unsafe.Pointer(dpfQueryA))
  163. C.free(unsafe.Pointer(dpfQueryB))
  164. } else if phase[0] == 3 {
  165. /*
  166. possible Values
  167. 0 : new client
  168. leader expects sharedSecrets, expects pirQuery
  169. 1 : update needed
  170. leader sends topicList, performs local update of sharedSecret, expects pirQuery
  171. 2 : no update needed
  172. nothing
  173. */
  174. subPhase := readFrom(leaderConn, 1)
  175. var encryptedQueryLeader, encryptedQueryFollower []byte
  176. //first time participating
  177. if subPhase[0] == 0 {
  178. receiveTopicLists(leaderConn)
  179. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  180. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  181. }
  182. //updates the topic list and what client is interested in
  183. if subPhase[0] == 1 {
  184. receiveTopicLists(leaderConn)
  185. //updates local secret
  186. for index := 0; index < 2; index++ {
  187. sharedSecret[clientNumber][index] = sha256.Sum256(sharedSecret[clientNumber][index][:])
  188. }
  189. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  190. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  191. }
  192. receiveTweets(sharedSecret[clientNumber], leaderConn, false, clientNumber)
  193. if len(archiveTopicList) > 0 {
  194. wantsArchive[0] = 0 //archive test
  195. } else {
  196. wantsArchive[0] = 0
  197. }
  198. writeTo(leaderConn, wantsArchive)
  199. if wantsArchive[0] == 1 && len(archiveTopicList) > 0 {
  200. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(-1, clientNumber)
  201. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, true)
  202. receiveTweets(sharedSecret[clientNumber], leaderConn, true, clientNumber)
  203. }
  204. } else {
  205. fmt.Println("Phase", phase)
  206. panic("somethin went wrong")
  207. }
  208. }
  209. }
  210. //creates and sends the pirQuerys for each server
  211. func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
  212. //later this will be taken from gui, this is only for testing
  213. topicsOFInterest := make([]int, 10)
  214. topicsOFInterest[0] = 0
  215. topicsOFInterest[1] = 1
  216. topicsOFInterest[9] = 1
  217. archiveInterests[0] = 1
  218. //todo! repeat for archive
  219. tmpNeededSubscriptions := neededSubscriptions
  220. if tmpNeededSubscriptions > len(topicList) {
  221. tmpNeededSubscriptions = len(topicList)
  222. }
  223. tmptopicsOfInterest := make([]int, len(topicList))
  224. copy(tmptopicsOfInterest, topicsOFInterest)
  225. tmpTopicList := make([]string, len(topicList))
  226. copy(tmpTopicList, topicList)
  227. if wantsArchive[0] == 1 && subPhase == -1 {
  228. tmpNeededSubscriptions = len(archiveInterests)
  229. if tmpNeededSubscriptions > len(archiveTopicList) {
  230. tmpNeededSubscriptions = len(archiveTopicList)
  231. }
  232. copy(tmptopicsOfInterest, archiveInterests) //archiveInterests from gui
  233. copy(tmpTopicList, archiveTopicList)
  234. }
  235. //creates fake topicsOfInterest if client is boooring
  236. if len(tmptopicsOfInterest) < tmpNeededSubscriptions && subPhase != -1 {
  237. tmptopicsOfInterest = addFakeInterests(len(tmpTopicList), tmptopicsOfInterest, false)
  238. }
  239. //pirQuery [topicsofinterest][serverAmount][topicAmount]byte
  240. pirQuerys := make([][][]byte, len(tmptopicsOfInterest))
  241. for i := range pirQuerys {
  242. pirQuerys[i] = make([][]byte, 2)
  243. for j := range pirQuerys[i] {
  244. pirQuerys[i][j] = make([]byte, len(tmpTopicList))
  245. }
  246. }
  247. //for leader
  248. //pirQuery will be filled with random bits
  249. for topic := range tmptopicsOfInterest {
  250. for index := range tmpTopicList {
  251. bit, err := rand.Int(rand.Reader, big.NewInt(2))
  252. if err != nil {
  253. panic(err)
  254. }
  255. pirQuerys[topic][0][index] = byte(bit.Int64())
  256. }
  257. }
  258. tmptopicsOfInterestBytes := make([]byte, tmpNeededSubscriptions)
  259. for index := range tmptopicsOfInterestBytes {
  260. if tmptopicsOfInterest[index] == 1 {
  261. tmptopicsOfInterestBytes[index] = 1
  262. }
  263. }
  264. for topic := range tmptopicsOfInterest {
  265. for index := range tmpTopicList {
  266. if topic == index {
  267. if pirQuerys[topic][0][index] == 1 {
  268. pirQuerys[topic][1][index] = 0
  269. } else {
  270. pirQuerys[topic][1][index] = 1
  271. }
  272. } else {
  273. if pirQuerys[topic][0][index] == 0 {
  274. pirQuerys[topic][1][index] = 0
  275. } else {
  276. pirQuerys[topic][1][index] = 1
  277. }
  278. }
  279. }
  280. }
  281. //flattens the querys to be able to send them more efficently
  282. messagesFlattened := make([][]byte, 2)
  283. //adds the sharedSecret to the first pirQuery when first time participating
  284. if subPhase == 0 {
  285. for server := 0; server < 2; server++ {
  286. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  287. }
  288. }
  289. for server := range messagesFlattened {
  290. for topic := range pirQuerys {
  291. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[topic][server]...)
  292. }
  293. }
  294. var nonce [24]byte
  295. _, err := rand.Read(nonce[:])
  296. if err != nil {
  297. panic("couldn't get randomness for nonce!")
  298. }
  299. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  300. _, err = rand.Read(nonce[:])
  301. if err != nil {
  302. panic("couldn't get randomness for nonce!")
  303. }
  304. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  305. return encryptedQueryLeader, encryptedQueryFollower
  306. }
  307. func sendQuerys(encryptedQueryLeader, encryptedQueryFollower []byte, leaderConn net.Conn, getArchive bool) {
  308. encryptedLength := len(encryptedQueryLeader)
  309. //sends the pirQuerysLength to the leader
  310. writeTo(leaderConn, intToByte(encryptedLength))
  311. //sends the pirQuerys to the leader
  312. writeTo(leaderConn, encryptedQueryLeader)
  313. writeTo(leaderConn, encryptedQueryFollower)
  314. if getArchive {
  315. writeTo(leaderConn, intToByte(len(archiveInterests)))
  316. }
  317. }
  318. func receiveVirtualAddress(sharedSecret [2][32]byte, leaderConn net.Conn) int {
  319. virtualAddressByte := readFrom(leaderConn, 4)
  320. //xores the sharedSecret
  321. for h := 0; h < 2; h++ {
  322. for i := 0; i < 4; i++ {
  323. virtualAddressByte[i] = virtualAddressByte[i] ^ sharedSecret[h][i]
  324. }
  325. }
  326. return byteToInt(virtualAddressByte)
  327. }
  328. func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive bool, clientNumber int) {
  329. tmpNeededSubscriptions := neededSubscriptions
  330. if tmpNeededSubscriptions > len(topicList) {
  331. tmpNeededSubscriptions = len(topicList)
  332. }
  333. if getArchive {
  334. tmpNeededSubscriptions = len(archiveInterests)
  335. if tmpNeededSubscriptions > len(archiveTopicList) {
  336. tmpNeededSubscriptions = len(archiveTopicList)
  337. }
  338. }
  339. for i := 0; i < tmpNeededSubscriptions; i++ {
  340. //client receives tweets
  341. tweetsLengthBytes := readFrom(leaderConn, 4)
  342. tweetsLength := byteToInt(tweetsLengthBytes)
  343. tweets := readFrom(leaderConn, tweetsLength)
  344. //fmt.Println(tweets[:10])
  345. //expand sharedSecret so it is of right length
  346. expandBy := len(tweets) / 32
  347. expandedSharedSecrets := make([][]byte, 2)
  348. for i := 0; i < 2; i++ {
  349. for j := 0; j < expandBy; j++ {
  350. expandedSharedSecrets[i] = append(expandedSharedSecrets[i], sharedSecret[i][:]...)
  351. }
  352. }
  353. //xors the received messge into the message to display
  354. for i := 0; i < 2; i++ {
  355. lib.Xor(expandedSharedSecrets[i][:], tweets)
  356. }
  357. //fmt.Println("PubKey", clientPublicKey[clientNumber], "Bytes", tweets)
  358. index := strings.Index(string(tweets), ";;;")
  359. if index != -1 {
  360. //fmt.Println("Round", round, "Tweets length", len(tweets))
  361. /*
  362. fmt.Println("Correct")
  363. fmt.Println("PubKey", clientPublicKey[clientNumber])
  364. text := string(tweets)[:index]
  365. fmt.Println("Round", round, "Text", text[:5], "Length", len(tweets))
  366. */
  367. } else {
  368. fmt.Println("error")
  369. fmt.Println("pubKey", clientPublicKey[clientNumber])
  370. fmt.Println("round", round, "text:", string(tweets[:5]), "length", len(tweets))
  371. return
  372. //panic("received text not of correct format")
  373. }
  374. }
  375. }
  376. //creates a shared secret for each server
  377. func createSharedSecret() [numClients][2][32]byte {
  378. var tmpSharedSecret [numClients][2][32]byte
  379. for i := 0; i < numClients; i++ {
  380. for j := 0; j < 2; j++ {
  381. _, err := rand.Read(tmpSharedSecret[i][j][:])
  382. if err != nil {
  383. panic("couldn't get randomness for sharedSecret!")
  384. }
  385. }
  386. }
  387. return tmpSharedSecret
  388. }
  389. func createAuditPIRQuery(clientNumber int) ([]byte, []byte) {
  390. //pirQuery [serverAmount][dbWriteSize]byte
  391. pirQuerys := make([][]byte, 2)
  392. for i := range pirQuerys {
  393. pirQuerys[i] = make([]byte, dbWriteSize)
  394. }
  395. //for leader
  396. //pirQuery will be filled with random bits
  397. for index := range pirQuerys[0] {
  398. bit := mr.Intn(2)
  399. pirQuerys[0][index] = byte(bit)
  400. }
  401. copy(pirQuerys[1], pirQuerys[0])
  402. //the positon the virtual address will be taken from
  403. pos := mr.Intn(dbWriteSize)
  404. pirQuerys[0][pos] = 1
  405. pirQuerys[1][pos] = 0
  406. //flattens the querys to be able to send them more efficently
  407. messagesFlattened := make([][]byte, 2)
  408. //adds the sharedSecret to the pirQuery
  409. for server := 0; server < 2; server++ {
  410. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  411. }
  412. for server := 0; server < 2; server++ {
  413. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[server][:]...)
  414. }
  415. var nonce [24]byte
  416. _, err := rand.Read(nonce[:])
  417. if err != nil {
  418. panic("couldn't get randomness for nonce!")
  419. }
  420. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  421. _, err = rand.Read(nonce[:])
  422. if err != nil {
  423. panic("couldn't get randomness for nonce!")
  424. }
  425. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  426. return encryptedQueryLeader, encryptedQueryFollower
  427. }
  428. //generates a topicOfInterest array with random values
  429. func addFakeInterests(max int, topicsOfInterest []int, doAuditing bool) []int {
  430. tmpNeededSubscriptions := neededSubscriptions
  431. if tmpNeededSubscriptions > len(topicList) {
  432. tmpNeededSubscriptions = len(topicList)
  433. }
  434. fakeTopicsOfInterest := make([]int, tmpNeededSubscriptions)
  435. maxInt := max
  436. //fills the array with unique random ascending values ranging from 0 to max
  437. for i := 0; i < tmpNeededSubscriptions; i++ {
  438. fakeTopicsOfInterest[i] = mr.Intn(maxInt)
  439. for j := 0; j < i; j++ {
  440. if fakeTopicsOfInterest[i] == fakeTopicsOfInterest[j] {
  441. i--
  442. break
  443. }
  444. }
  445. }
  446. if doAuditing {
  447. sort.Ints(fakeTopicsOfInterest)
  448. return fakeTopicsOfInterest
  449. }
  450. //adds unique and new random numbers to topicOfInterests until length is satisfied
  451. for _, number := range fakeTopicsOfInterest {
  452. if !inList(number, topicsOfInterest) {
  453. topicsOfInterest = append(topicsOfInterest, number)
  454. }
  455. if len(topicsOfInterest) == tmpNeededSubscriptions {
  456. break
  457. }
  458. }
  459. sort.Ints(topicsOfInterest)
  460. return topicsOfInterest
  461. }
  462. func inList(number int, list []int) bool {
  463. for _, element := range list {
  464. if element == number {
  465. return true
  466. }
  467. }
  468. return false
  469. }
  470. func receiveTopicLists(leaderConn net.Conn) {
  471. for i := 0; i < 2; i++ {
  472. topicListLength := readFrom(leaderConn, 4)
  473. recTopicList := readFrom(leaderConn, byteToInt(topicListLength))
  474. var tmpTopicList []string
  475. arrayReader := bytes.NewReader(recTopicList[:])
  476. json.NewDecoder(arrayReader).Decode(&tmpTopicList)
  477. if i == 0 {
  478. topicList = tmpTopicList
  479. } else {
  480. archiveTopicList = tmpTopicList
  481. }
  482. }
  483. }
  484. func getRealTweet(clientNumber int) []byte {
  485. fUserList, err := os.Create("tweets/userList")
  486. if err != nil {
  487. panic(err)
  488. }
  489. defer fUserList.Close()
  490. currentLine := 0
  491. scanner := bufio.NewScanner(fUserList)
  492. userID := ""
  493. for scanner.Scan() {
  494. if currentLine == clientNumber {
  495. userID = scanner.Text()
  496. break
  497. }
  498. currentLine++
  499. }
  500. if userID == "" {
  501. panic("no userID picked")
  502. }
  503. fTweets, err := os.Open("tweets/userTweets/" + userID)
  504. if err != nil {
  505. panic(err)
  506. }
  507. defer fTweets.Close()
  508. scanner = bufio.NewScanner(fTweets)
  509. lowerBound := time.Now().Second()
  510. upperBound := lowerBound + int(3*maxTimePerRound.Seconds()) + 10
  511. var tweet []byte
  512. for scanner.Scan() {
  513. lineArr := strings.Split(scanner.Text(), ", \"hashtags\"")
  514. lineArr = strings.Split(lineArr[0], ": ")
  515. timestamp, _ := strconv.Atoi(lineArr[len(lineArr)])
  516. //transforms timestamp to current time
  517. timestamp -= 1351742400
  518. timestamp += startTime
  519. if timestamp > lowerBound && timestamp < upperBound {
  520. lineArr = strings.Split(scanner.Text(), "[\"")
  521. line := lineArr[1]
  522. lineArr = strings.Split(line, "\"]")
  523. line = lineArr[0]
  524. lineArr = strings.Split(line, ",")
  525. var topics []byte
  526. for _, topic := range lineArr {
  527. topicLine := strings.Split(topic, "\"")
  528. topics = append(topics, []byte(topicLine[1])[:]...)
  529. topics = append(topics, []byte(",")[0])
  530. }
  531. topics = topics[:len(topics)-1]
  532. tweet = append(tweet, topics...)
  533. tweet = append(tweet, []byte(";")[0])
  534. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  535. num := r.Intn(10000)
  536. if num == 0 {
  537. num = 1
  538. }
  539. tweet = append(tweet, []byte(strconv.Itoa(num) + ";;")[:]...)
  540. }
  541. }
  542. //adds padding
  543. length := dataLength - len(tweet)
  544. padding := make([]byte, length)
  545. rand.Read(padding)
  546. tweet = append(tweet, padding...)
  547. return tweet
  548. }
  549. func getRandomTweet(clientNumber int) []byte {
  550. var tweet []byte
  551. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  552. maxTopics := r.Intn(6)
  553. if maxTopics == 0 {
  554. maxTopics = 1
  555. }
  556. maxInt := numClients * 5
  557. topicNumbers := make([]int, maxTopics)
  558. //fills the array with unique random ascending values ranging from 0 to maxInt
  559. for i := 0; i < maxTopics; i++ {
  560. topicNumbers[i] = mr.Intn(maxInt)
  561. for j := 0; j < i; j++ {
  562. if topicNumbers[i] == topicNumbers[j] {
  563. i--
  564. break
  565. }
  566. }
  567. }
  568. sort.Ints(topicNumbers)
  569. //fmt.Println("topicNumbers", topicNumbers)
  570. var topics []byte
  571. topicIndex := 0
  572. for i := 0; i < len(topicNumbers)*2; i++ {
  573. if i%2 == 0 {
  574. topics = append(topics, byte(topicNumbers[topicIndex]))
  575. topicIndex++
  576. } else if i != (len(topicNumbers)*2)-1 {
  577. topics = append(topics, []byte(",")[0])
  578. }
  579. }
  580. topics = append(topics, []byte(";")[0])
  581. num := r.Intn(100)
  582. if num == 0 {
  583. num = 1
  584. }
  585. text := []byte(strconv.Itoa(num) + ";")
  586. tweet = append(tweet, topics...)
  587. tweet = append(tweet, text...)
  588. tweet = append(tweet, []byte(";")[0])
  589. //fmt.Println("writing", string(text))
  590. //fmt.Println(topicNumbers)
  591. if len(tweet) > 32 {
  592. fmt.Println("lenlen", len(tweet))
  593. }
  594. //adds padding
  595. length := dataLength - len(tweet)
  596. padding := make([]byte, length)
  597. rand.Read(padding)
  598. tweet = append(tweet, padding...)
  599. return tweet
  600. }
  601. //sends the array to the connection
  602. func writeTo(connection net.Conn, array []byte) {
  603. remainingLength := len(array)
  604. for remainingLength > 0 {
  605. if remainingLength >= mtu {
  606. _, err := connection.Write(array[:mtu])
  607. if err != nil {
  608. panic(err)
  609. }
  610. array = array[mtu:]
  611. remainingLength -= mtu
  612. } else {
  613. _, err := connection.Write(array)
  614. if err != nil {
  615. panic(err)
  616. }
  617. remainingLength = 0
  618. }
  619. }
  620. }
  621. //reads an array which is returned and of size "size" from the connection
  622. func readFrom(connection net.Conn, size int) []byte {
  623. var array []byte
  624. remainingSize := size
  625. for remainingSize > 0 {
  626. var err error
  627. toAppend := make([]byte, mtu)
  628. if remainingSize > mtu {
  629. _, err = connection.Read(toAppend)
  630. array = append(array, toAppend...)
  631. remainingSize -= mtu
  632. } else {
  633. _, err = connection.Read(toAppend[:remainingSize])
  634. array = append(array, toAppend[:remainingSize]...)
  635. remainingSize = 0
  636. }
  637. if err != nil {
  638. panic(err)
  639. }
  640. }
  641. return array
  642. }
  643. func intToByte(myInt int) (retBytes []byte) {
  644. retBytes = make([]byte, 4)
  645. retBytes[3] = byte((myInt >> 24) & 0xff)
  646. retBytes[2] = byte((myInt >> 16) & 0xff)
  647. retBytes[1] = byte((myInt >> 8) & 0xff)
  648. retBytes[0] = byte(myInt & 0xff)
  649. return
  650. }
  651. func byteToInt(myBytes []byte) (x int) {
  652. x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0])
  653. return
  654. }