client.go 21 KB


  1. package main
  2. /*
  3. #cgo CFLAGS: -O2
  4. #cgo LDFLAGS: -lcrypto -lm
  5. #include "../c/dpf.h"
  6. #include "../c/okvClient.h"
  7. #include "../c/dpf.c"
  8. #include "../c/okvClient.c"
  9. */
  10. import "C"
  11. //sssssssssssss
  12. import (
  13. "2PPS/lib"
  14. "bufio"
  15. "bytes"
  16. "crypto/rand"
  17. "crypto/sha256"
  18. "crypto/tls"
  19. "encoding/json"
  20. "fmt"
  21. "math/big"
  22. mr "math/rand"
  23. "net"
  24. "os"
  25. "sort"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "time"
  30. "unsafe"
  31. "golang.org/x/crypto/nacl/box"
  32. )
  33. type tweet struct {
  34. Topics []string
  35. Text string
  36. }
  37. const leader string = "127.0.0.1:4441"
  38. //needs to be changed at leader/follower/client at the same time
  39. const numClients = 10000
  40. //mylimit=8000
  41. //sudo prlimit --nofile=$mylimit --pid $$; ulimit -n $mylimit
  42. const dataLength int = 64
  43. const numThreads int = 12
  44. //Maximum Transport Unit
  45. const mtu int = 1100
  46. var dbWriteSize int
  47. var round int
  48. var topicList []string
  49. var archiveTopicList []string
  50. var neededSubscriptions int
  51. var maxTimePerRound time.Duration = 10 * time.Second
  52. var startTime int
  53. //todo! expand this for multiple clients
  54. var archiveInterests = make([]int, 1)
  55. var sharedSecret [numClients][2][32]byte = createSharedSecret()
  56. var wantsArchive = make([]byte, 1)
  57. var leaderPublicKey *[32]byte
  58. var followerPublicKey *[32]byte
  59. var clientPrivateKey [numClients]*[32]byte
  60. var clientPublicKey [numClients]*[32]byte
  61. func main() {
  62. wg := &sync.WaitGroup{}
  63. for i := 0; i < numClients; i++ {
  64. wg.Add(1)
  65. go client(i)
  66. time.Sleep(1 * time.Millisecond)
  67. }
  68. wg.Wait()
  69. }
  70. func client(clientNumber int) {
  71. generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
  72. if err != nil {
  73. panic(err)
  74. }
  75. clientPrivateKey[clientNumber] = generatedPrivateKey
  76. clientPublicKey[clientNumber] = generatedPublicKey
  77. C.initializeCipher()
  78. //initializes the connection to the leader
  79. conf := &tls.Config{
  80. InsecureSkipVerify: true,
  81. }
  82. leaderConn, err := tls.Dial("tcp", leader, conf)
  83. if err != nil {
  84. fmt.Println("clientNumber", clientNumber)
  85. panic(err)
  86. }
  87. leaderConn.SetDeadline(time.Time{})
  88. //receives topics first so client can participate asap
  89. receiveTopicLists(leaderConn)
  90. //gets the public keys of both servers
  91. var tmpLeaderPubKey [32]byte
  92. _, err = leaderConn.Read(tmpLeaderPubKey[:])
  93. if err != nil {
  94. panic(err)
  95. }
  96. leaderPublicKey = &tmpLeaderPubKey
  97. var tmpFollowerPubKey [32]byte
  98. _, err = leaderConn.Read(tmpFollowerPubKey[:])
  99. if err != nil {
  100. panic(err)
  101. }
  102. followerPublicKey = &tmpFollowerPubKey
  103. //sends own public key
  104. writeTo(leaderConn, clientPublicKey[clientNumber][:])
  105. neededSubscriptionsBytes := readFrom(leaderConn, 4)
  106. neededSubscriptions = byteToInt(neededSubscriptionsBytes)
  107. startTimeBytes := readFrom(leaderConn, 4)
  108. startTime = byteToInt(startTimeBytes)
  109. //setup ends above
  110. //while client is active he is always connected and has to participate
  111. for {
  112. //gets current phase
  113. phase := readFrom(leaderConn, 1)
  114. if phase[0] == 1 {
  115. //gets current dbWriteSize from leader
  116. dbWriteSizeBytes := readFrom(leaderConn, 4)
  117. dbWriteSize = byteToInt(dbWriteSizeBytes)
  118. //todo! put into tweet creation
  119. //roundAsBytes := readFrom(leaderConn, 4)
  120. roundAsBytes := make([]byte, 4)
  121. _, err = leaderConn.Read(roundAsBytes)
  122. if err != nil {
  123. panic(err)
  124. }
  125. round = byteToInt(roundAsBytes)
  126. if clientNumber == 0 {
  127. fmt.Println("Round ", round)
  128. }
  129. //request virtualAddress from leader via pirQuery
  130. encryptedQueryLeader, encryptedQueryFollower := createAuditPIRQuery(clientNumber)
  131. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  132. pos := receiveVirtualAddress(sharedSecret[clientNumber], leaderConn)
  133. tweet := getRealTweet(clientNumber)
  134. //prep the query
  135. dataSize := len(tweet)
  136. querySize := make([]byte, 4)
  137. cQuerySize := C.int(byteToInt(querySize))
  138. var dpfQueryA *C.uchar
  139. var dpfQueryB *C.uchar
  140. C.prepQuery(C.int(pos), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB)
  141. intQuerySize := int(cQuerySize) //byteToInt(querySize)
  142. //write the query
  143. queryAPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryA), C.int(intQuerySize))
  144. //encrypts queryA and appends it to message
  145. var nonce [24]byte
  146. //fill nonce with randomness
  147. _, err = rand.Read(nonce[:])
  148. if err != nil {
  149. panic("couldn't get randomness for nonce!")
  150. }
  151. dpfQueryAEncrypted := box.Seal(nonce[:], queryAPlaintext, &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  152. //encrypts queryB and appends it to message
  153. queryBPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryB), C.int(intQuerySize))
  154. //fill nonce with randomness
  155. _, err = rand.Read(nonce[:])
  156. if err != nil {
  157. panic("couldn't get randomness for nonce!")
  158. }
  159. dpfQueryBEncrypted := box.Seal(nonce[:], queryBPlaintext, &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  160. //writes the dpfQuery to the leader
  161. dpfLengthBytes := intToByte(len(dpfQueryAEncrypted))
  162. writeTo(leaderConn, dpfLengthBytes)
  163. writeTo(leaderConn, dpfQueryAEncrypted)
  164. writeTo(leaderConn, dpfQueryBEncrypted)
  165. C.free(unsafe.Pointer(dpfQueryA))
  166. C.free(unsafe.Pointer(dpfQueryB))
  167. } else if phase[0] == 3 {
  168. /*
  169. possible Values
  170. 0 : new client
  171. leader expects sharedSecrets, expects pirQuery
  172. 1 : update needed
  173. leader sends topicList, performs local update of sharedSecret, expects pirQuery
  174. 2 : no update needed
  175. nothing
  176. */
  177. subPhase := readFrom(leaderConn, 1)
  178. var encryptedQueryLeader, encryptedQueryFollower []byte
  179. //first time participating
  180. if subPhase[0] == 0 {
  181. receiveTopicLists(leaderConn)
  182. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  183. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  184. }
  185. //updates the topic list and what client is interested in
  186. if subPhase[0] == 1 {
  187. receiveTopicLists(leaderConn)
  188. //updates local secret
  189. for index := 0; index < 2; index++ {
  190. sharedSecret[clientNumber][index] = sha256.Sum256(sharedSecret[clientNumber][index][:])
  191. }
  192. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  193. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  194. }
  195. receiveTweets(sharedSecret[clientNumber], leaderConn, false, clientNumber)
  196. if len(archiveTopicList) > 0 {
  197. wantsArchive[0] = 0 //archive test
  198. } else {
  199. wantsArchive[0] = 0
  200. }
  201. writeTo(leaderConn, wantsArchive)
  202. if wantsArchive[0] == 1 && len(archiveTopicList) > 0 {
  203. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(-1, clientNumber)
  204. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, true)
  205. receiveTweets(sharedSecret[clientNumber], leaderConn, true, clientNumber)
  206. }
  207. } else {
  208. fmt.Println("Phase", phase)
  209. panic("somethin went wrong")
  210. }
  211. }
  212. }
  213. //creates and sends the pirQuerys for each server
  214. func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
  215. //later this will be taken from gui, this is only for testing
  216. topicsOFInterest := make([]int, 10)
  217. topicsOFInterest[0] = 0
  218. topicsOFInterest[1] = 1
  219. topicsOFInterest[9] = 1
  220. archiveInterests[0] = 1
  221. //todo! repeat for archive
  222. tmpNeededSubscriptions := neededSubscriptions
  223. if tmpNeededSubscriptions > len(topicList) {
  224. tmpNeededSubscriptions = len(topicList)
  225. }
  226. tmptopicsOfInterest := make([]int, len(topicList))
  227. copy(tmptopicsOfInterest, topicsOFInterest)
  228. tmpTopicList := make([]string, len(topicList))
  229. copy(tmpTopicList, topicList)
  230. if wantsArchive[0] == 1 && subPhase == -1 {
  231. tmpNeededSubscriptions = len(archiveInterests)
  232. if tmpNeededSubscriptions > len(archiveTopicList) {
  233. tmpNeededSubscriptions = len(archiveTopicList)
  234. }
  235. copy(tmptopicsOfInterest, archiveInterests) //archiveInterests from gui
  236. copy(tmpTopicList, archiveTopicList)
  237. }
  238. //creates fake topicsOfInterest if client is boooring
  239. if len(tmptopicsOfInterest) < tmpNeededSubscriptions && subPhase != -1 {
  240. tmptopicsOfInterest = addFakeInterests(len(tmpTopicList), tmptopicsOfInterest, false)
  241. }
  242. //pirQuery [topicsofinterest][serverAmount][topicAmount]byte
  243. pirQuerys := make([][][]byte, len(tmptopicsOfInterest))
  244. for i := range pirQuerys {
  245. pirQuerys[i] = make([][]byte, 2)
  246. for j := range pirQuerys[i] {
  247. pirQuerys[i][j] = make([]byte, len(tmpTopicList))
  248. }
  249. }
  250. //for leader
  251. //pirQuery will be filled with random bits
  252. for topic := range tmptopicsOfInterest {
  253. for index := range tmpTopicList {
  254. bit, err := rand.Int(rand.Reader, big.NewInt(2))
  255. if err != nil {
  256. panic(err)
  257. }
  258. pirQuerys[topic][0][index] = byte(bit.Int64())
  259. }
  260. }
  261. tmptopicsOfInterestBytes := make([]byte, tmpNeededSubscriptions)
  262. for index := range tmptopicsOfInterestBytes {
  263. if tmptopicsOfInterest[index] == 1 {
  264. tmptopicsOfInterestBytes[index] = 1
  265. }
  266. }
  267. for topic := range tmptopicsOfInterest {
  268. for index := range tmpTopicList {
  269. if topic == index {
  270. if pirQuerys[topic][0][index] == 1 {
  271. pirQuerys[topic][1][index] = 0
  272. } else {
  273. pirQuerys[topic][1][index] = 1
  274. }
  275. } else {
  276. if pirQuerys[topic][0][index] == 0 {
  277. pirQuerys[topic][1][index] = 0
  278. } else {
  279. pirQuerys[topic][1][index] = 1
  280. }
  281. }
  282. }
  283. }
  284. //flattens the querys to be able to send them more efficently
  285. messagesFlattened := make([][]byte, 2)
  286. //adds the sharedSecret to the first pirQuery when first time participating
  287. if subPhase == 0 {
  288. for server := 0; server < 2; server++ {
  289. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  290. }
  291. }
  292. for server := range messagesFlattened {
  293. for topic := range pirQuerys {
  294. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[topic][server]...)
  295. }
  296. }
  297. var nonce [24]byte
  298. _, err := rand.Read(nonce[:])
  299. if err != nil {
  300. panic("couldn't get randomness for nonce!")
  301. }
  302. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  303. _, err = rand.Read(nonce[:])
  304. if err != nil {
  305. panic("couldn't get randomness for nonce!")
  306. }
  307. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  308. return encryptedQueryLeader, encryptedQueryFollower
  309. }
  310. func sendQuerys(encryptedQueryLeader, encryptedQueryFollower []byte, leaderConn net.Conn, getArchive bool) {
  311. encryptedLength := len(encryptedQueryLeader)
  312. //sends the pirQuerysLength to the leader
  313. writeTo(leaderConn, intToByte(encryptedLength))
  314. //sends the pirQuerys to the leader
  315. writeTo(leaderConn, encryptedQueryLeader)
  316. writeTo(leaderConn, encryptedQueryFollower)
  317. if getArchive {
  318. writeTo(leaderConn, intToByte(len(archiveInterests)))
  319. }
  320. }
  321. func receiveVirtualAddress(sharedSecret [2][32]byte, leaderConn net.Conn) int {
  322. virtualAddressByte := readFrom(leaderConn, 4)
  323. //xores the sharedSecret
  324. for h := 0; h < 2; h++ {
  325. for i := 0; i < 4; i++ {
  326. virtualAddressByte[i] = virtualAddressByte[i] ^ sharedSecret[h][i]
  327. }
  328. }
  329. return byteToInt(virtualAddressByte)
  330. }
  331. func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive bool, clientNumber int) {
  332. tmpNeededSubscriptions := neededSubscriptions
  333. if tmpNeededSubscriptions > len(topicList) {
  334. tmpNeededSubscriptions = len(topicList)
  335. }
  336. if getArchive {
  337. tmpNeededSubscriptions = len(archiveInterests)
  338. if tmpNeededSubscriptions > len(archiveTopicList) {
  339. tmpNeededSubscriptions = len(archiveTopicList)
  340. }
  341. }
  342. for i := 0; i < tmpNeededSubscriptions; i++ {
  343. //client receives tweets
  344. tweetsLengthBytes := readFrom(leaderConn, 4)
  345. tweetsLength := byteToInt(tweetsLengthBytes)
  346. tweets := readFrom(leaderConn, tweetsLength)
  347. //fmt.Println(tweets[:10])
  348. //expand sharedSecret so it is of right length
  349. expandBy := len(tweets) / 32
  350. expandedSharedSecrets := make([][]byte, 2)
  351. for i := 0; i < 2; i++ {
  352. for j := 0; j < expandBy; j++ {
  353. expandedSharedSecrets[i] = append(expandedSharedSecrets[i], sharedSecret[i][:]...)
  354. }
  355. }
  356. //xors the received messge into the message to display
  357. for i := 0; i < 2; i++ {
  358. lib.Xor(expandedSharedSecrets[i][:], tweets)
  359. }
  360. //fmt.Println("PubKey", clientPublicKey[clientNumber], "Bytes", tweets)
  361. index := strings.Index(string(tweets), ";;;")
  362. if index != -1 && clientNumber == 0 {
  363. //fmt.Println("Round", round, "Tweets length", len(tweets))
  364. fmt.Println("Correct")
  365. text := string(tweets)[:index]
  366. fmt.Println("Round", round, "Text", text[:5], "Length", len(tweets))
  367. } else {
  368. fmt.Println("error")
  369. fmt.Println("round", round, "text:", string(tweets), "length", len(tweets))
  370. return
  371. //panic("received text not of correct format")
  372. }
  373. }
  374. }
  375. //creates a shared secret for each server
  376. func createSharedSecret() [numClients][2][32]byte {
  377. var tmpSharedSecret [numClients][2][32]byte
  378. for i := 0; i < numClients; i++ {
  379. for j := 0; j < 2; j++ {
  380. _, err := rand.Read(tmpSharedSecret[i][j][:])
  381. if err != nil {
  382. panic("couldn't get randomness for sharedSecret!")
  383. }
  384. }
  385. }
  386. return tmpSharedSecret
  387. }
  388. func createAuditPIRQuery(clientNumber int) ([]byte, []byte) {
  389. //pirQuery [serverAmount][dbWriteSize]byte
  390. pirQuerys := make([][]byte, 2)
  391. for i := range pirQuerys {
  392. pirQuerys[i] = make([]byte, dbWriteSize)
  393. }
  394. //for leader
  395. //pirQuery will be filled with random bits
  396. for index := range pirQuerys[0] {
  397. bit := mr.Intn(2)
  398. pirQuerys[0][index] = byte(bit)
  399. }
  400. copy(pirQuerys[1], pirQuerys[0])
  401. //the positon the virtual address will be taken from
  402. pos := mr.Intn(dbWriteSize)
  403. pirQuerys[0][pos] = 1
  404. pirQuerys[1][pos] = 0
  405. //flattens the querys to be able to send them more efficently
  406. messagesFlattened := make([][]byte, 2)
  407. //adds the sharedSecret to the pirQuery
  408. for server := 0; server < 2; server++ {
  409. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  410. }
  411. for server := 0; server < 2; server++ {
  412. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[server][:]...)
  413. }
  414. var nonce [24]byte
  415. _, err := rand.Read(nonce[:])
  416. if err != nil {
  417. panic("couldn't get randomness for nonce!")
  418. }
  419. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  420. _, err = rand.Read(nonce[:])
  421. if err != nil {
  422. panic("couldn't get randomness for nonce!")
  423. }
  424. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  425. return encryptedQueryLeader, encryptedQueryFollower
  426. }
  427. //generates a topicOfInterest array with random values
  428. func addFakeInterests(max int, topicsOfInterest []int, doAuditing bool) []int {
  429. tmpNeededSubscriptions := neededSubscriptions
  430. if tmpNeededSubscriptions > len(topicList) {
  431. tmpNeededSubscriptions = len(topicList)
  432. }
  433. fakeTopicsOfInterest := make([]int, tmpNeededSubscriptions)
  434. maxInt := max
  435. //fills the array with unique random ascending values ranging from 0 to max
  436. for i := 0; i < tmpNeededSubscriptions; i++ {
  437. fakeTopicsOfInterest[i] = mr.Intn(maxInt)
  438. for j := 0; j < i; j++ {
  439. if fakeTopicsOfInterest[i] == fakeTopicsOfInterest[j] {
  440. i--
  441. break
  442. }
  443. }
  444. }
  445. if doAuditing {
  446. sort.Ints(fakeTopicsOfInterest)
  447. return fakeTopicsOfInterest
  448. }
  449. //adds unique and new random numbers to topicOfInterests until length is satisfied
  450. for _, number := range fakeTopicsOfInterest {
  451. if !inList(number, topicsOfInterest) {
  452. topicsOfInterest = append(topicsOfInterest, number)
  453. }
  454. if len(topicsOfInterest) == tmpNeededSubscriptions {
  455. break
  456. }
  457. }
  458. sort.Ints(topicsOfInterest)
  459. return topicsOfInterest
  460. }
  461. func inList(number int, list []int) bool {
  462. for _, element := range list {
  463. if element == number {
  464. return true
  465. }
  466. }
  467. return false
  468. }
  469. func receiveTopicLists(leaderConn net.Conn) {
  470. for i := 0; i < 2; i++ {
  471. topicListLength := readFrom(leaderConn, 4)
  472. recTopicList := readFrom(leaderConn, byteToInt(topicListLength))
  473. var tmpTopicList []string
  474. arrayReader := bytes.NewReader(recTopicList[:])
  475. json.NewDecoder(arrayReader).Decode(&tmpTopicList)
  476. if i == 0 {
  477. topicList = tmpTopicList
  478. } else {
  479. archiveTopicList = tmpTopicList
  480. }
  481. }
  482. }
  483. func getRealTweet(clientNumber int) []byte {
  484. fUserList, err := os.Open("/home/simon/goCode/tweets/userList")
  485. if err != nil {
  486. panic(err)
  487. }
  488. defer fUserList.Close()
  489. currentLine := 0
  490. scanner := bufio.NewScanner(fUserList)
  491. userID := ""
  492. for scanner.Scan() {
  493. if currentLine-1 == clientNumber {
  494. userID = scanner.Text()
  495. break
  496. }
  497. currentLine++
  498. }
  499. if userID == "" {
  500. panic("no userID picked")
  501. }
  502. fTweets, err := os.Open("/home/simon/goCode/tweets/userTweets/" + userID)
  503. if err != nil {
  504. panic(err)
  505. }
  506. defer fTweets.Close()
  507. scanner = bufio.NewScanner(fTweets)
  508. lowerBound := time.Now().Second()
  509. upperBound := lowerBound + int(3*maxTimePerRound.Seconds()) + 10
  510. var tweet []byte
  511. for scanner.Scan() {
  512. lineArr := strings.Split(scanner.Text(), ", \"hashtags\"")
  513. lineArr = strings.Split(lineArr[0], ": ")
  514. timestamp, _ := strconv.Atoi(lineArr[len(lineArr)-1])
  515. //transforms timestamp to current time
  516. timestamp -= 1351742400
  517. timestamp += startTime
  518. if timestamp > lowerBound && timestamp < upperBound {
  519. lineArr = strings.Split(scanner.Text(), "[\"")
  520. line := lineArr[1]
  521. lineArr = strings.Split(line, "\"]")
  522. line = lineArr[0]
  523. lineArr = strings.Split(line, ",")
  524. var topics []byte
  525. for _, topic := range lineArr {
  526. topicLine := strings.Split(topic, "\"")
  527. topics = append(topics, []byte(topicLine[1])[:]...)
  528. topics = append(topics, []byte(",")[0])
  529. }
  530. topics = topics[:len(topics)-1]
  531. tweet = append(tweet, topics...)
  532. tweet = append(tweet, []byte(";")[0])
  533. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  534. num := r.Intn(10000)
  535. if num == 0 {
  536. num = 1
  537. }
  538. tweet = append(tweet, []byte(strconv.Itoa(num) + ";;")[:]...)
  539. break
  540. }
  541. }
  542. //todo! doesnt work
  543. fmt.Println("tweet", string(tweet))
  544. //adds padding
  545. length := dataLength - len(tweet)
  546. padding := make([]byte, length)
  547. rand.Read(padding)
  548. tweet = append(tweet, padding...)
  549. return tweet
  550. }
  551. func getRandomTweet(clientNumber int) []byte {
  552. var tweet []byte
  553. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  554. maxTopics := r.Intn(6)
  555. if maxTopics == 0 {
  556. maxTopics = 1
  557. }
  558. maxInt := 10000
  559. topicNumbers := make([]int, maxTopics)
  560. //fills the array with unique random ascending values ranging from 0 to maxInt
  561. for i := 0; i < maxTopics; i++ {
  562. topicNumbers[i] = mr.Intn(maxInt)
  563. for j := 0; j < i; j++ {
  564. if topicNumbers[i] == topicNumbers[j] {
  565. i--
  566. break
  567. }
  568. }
  569. }
  570. sort.Ints(topicNumbers)
  571. //fmt.Println("topicNumbers", topicNumbers)
  572. var topics []byte
  573. topicIndex := 0
  574. for i := 0; i < len(topicNumbers)*2; i++ {
  575. if i%2 == 0 {
  576. topics = append(topics, byte(topicNumbers[topicIndex]))
  577. topicIndex++
  578. } else if i != (len(topicNumbers)*2)-1 {
  579. topics = append(topics, []byte(",")[0])
  580. }
  581. }
  582. topics = append(topics, []byte(";")[0])
  583. num := r.Intn(100)
  584. if num == 0 {
  585. num = 1
  586. }
  587. text := []byte(strconv.Itoa(num) + ";")
  588. tweet = append(tweet, topics...)
  589. tweet = append(tweet, text...)
  590. tweet = append(tweet, []byte(";")[0])
  591. //fmt.Println("writing", string(text))
  592. //fmt.Println(topicNumbers)
  593. if len(tweet) > 32 {
  594. fmt.Println("lenlen", len(tweet))
  595. }
  596. //adds padding
  597. length := dataLength - len(tweet)
  598. padding := make([]byte, length)
  599. rand.Read(padding)
  600. tweet = append(tweet, padding...)
  601. return tweet
  602. }
  603. //sends the array to the connection
  604. func writeTo(connection net.Conn, array []byte) {
  605. remainingLength := len(array)
  606. for remainingLength > 0 {
  607. if remainingLength >= mtu {
  608. _, err := connection.Write(array[:mtu])
  609. if err != nil {
  610. panic(err)
  611. }
  612. array = array[mtu:]
  613. remainingLength -= mtu
  614. } else {
  615. _, err := connection.Write(array)
  616. if err != nil {
  617. panic(err)
  618. }
  619. remainingLength = 0
  620. }
  621. }
  622. }
  623. //reads an array which is returned and of size "size" from the connection
  624. func readFrom(connection net.Conn, size int) []byte {
  625. var array []byte
  626. remainingSize := size
  627. for remainingSize > 0 {
  628. var err error
  629. toAppend := make([]byte, mtu)
  630. if remainingSize > mtu {
  631. _, err = connection.Read(toAppend)
  632. array = append(array, toAppend...)
  633. remainingSize -= mtu
  634. } else {
  635. _, err = connection.Read(toAppend[:remainingSize])
  636. array = append(array, toAppend[:remainingSize]...)
  637. remainingSize = 0
  638. }
  639. if err != nil {
  640. panic(err)
  641. }
  642. }
  643. return array
  644. }
  645. func intToByte(myInt int) (retBytes []byte) {
  646. retBytes = make([]byte, 4)
  647. retBytes[3] = byte((myInt >> 24) & 0xff)
  648. retBytes[2] = byte((myInt >> 16) & 0xff)
  649. retBytes[1] = byte((myInt >> 8) & 0xff)
  650. retBytes[0] = byte(myInt & 0xff)
  651. return
  652. }
  653. func byteToInt(myBytes []byte) (x int) {
  654. x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0])
  655. return
  656. }