leader.go 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416
  1. package main
  2. //#cgo CFLAGS: -fopenmp -O2
  3. //#cgo LDFLAGS: -lcrypto -lm -fopenmp
  4. //#include "../c/dpf.h"
  5. //#include "../c/okv.h"
  6. //#include "../c/dpf.c"
  7. //#include "../c/okv.c"
  8. import "C"
  9. //ssssssssssssssss
  10. import (
  11. "crypto/rand"
  12. "crypto/rsa"
  13. "crypto/sha256"
  14. "crypto/tls"
  15. "crypto/x509"
  16. "crypto/x509/pkix"
  17. "encoding/pem"
  18. "fmt"
  19. "math"
  20. "math/big"
  21. mr "math/rand"
  22. "net"
  23. "sort"
  24. "strconv"
  25. "strings"
  26. "sync"
  27. "time"
  28. lib "2PPS/lib"
  29. "unsafe"
  30. "golang.org/x/crypto/nacl/box"
  31. )
  32. //this stores all neccessary information for each client
  33. type clientKeys struct {
  34. roundsParticipating int
  35. PublicKey *[32]byte
  36. SharedSecret [32]byte
  37. PirQuery [][]byte
  38. }
  39. var clientData = make(map[net.Addr]clientKeys)
  40. const follower string = "127.0.0.1:4442"
  41. //Maximum Transport Unit
  42. const mtu int = 1100
  43. var leaderPrivateKey *[32]byte
  44. var leaderPublicKey *[32]byte
  45. var followerPublicKey *[32]byte
  46. const maxNumberOfClients = 10000000
  47. var topicList []byte
  48. var topicAmount int
  49. var archiveTopicAmount int
  50. // every roundsBeforeUpdate the client updates his pirQuery
  51. const roundsBeforeUpdate = 1
  52. const neededSubscriptions = 3
  53. const numThreads = 12
  54. const dataLength = 256
  55. const minDBWriteSize = 10
  56. var dbWriteSize float64 = 20000
  57. var collisionCounter []float64
  58. var clientsConnected int
  59. var maxTimePerRound time.Duration = 10 * time.Second
  60. var clientsServedPhase1 []int
  61. var clientsServedPhase3 []int
  62. var startPhase1 time.Time
  63. var startPhase2 time.Time
  64. var startPhase3 time.Time
  65. //counts the number of rounds
  66. var round int = 0
  67. var startTime time.Time
  68. var startTimeRound time.Time
  69. //channel for goroutine communication with clients
  70. var phase1Channel = make(chan net.Conn, maxNumberOfClients)
  71. var phase3Channel = make(chan net.Conn, maxNumberOfClients)
  72. //variables for calculating the dbWrite size
  73. const publisherRounds int = 10
  74. var publisherAmount float64
  75. var publisherHistory [publisherRounds]int
  76. //todo! handle client dc during phase1/3
  77. func main() {
  78. //prevents race conditions for wrtiting
  79. m := &sync.RWMutex{}
  80. startTime = time.Now()
  81. clientsServedPhase1 = make([]int, 1000)
  82. clientsServedPhase3 = make([]int, 1000)
  83. generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
  84. if err != nil {
  85. panic(err)
  86. }
  87. //why is this neccessary?
  88. leaderPrivateKey = generatedPrivateKey
  89. leaderPublicKey = generatedPublicKey
  90. C.initializeServer(C.int(numThreads))
  91. //calls follower for setup
  92. conf := &tls.Config{
  93. InsecureSkipVerify: true,
  94. }
  95. followerConnection, err := tls.Dial("tcp", follower, conf)
  96. if err != nil {
  97. panic(err)
  98. }
  99. followerConnection.SetDeadline(time.Time{})
  100. //receives follower publicKey
  101. var tmpFollowerPubKey [32]byte
  102. _, err = followerConnection.Read(tmpFollowerPubKey[:])
  103. if err != nil {
  104. panic(err)
  105. }
  106. followerPublicKey = &tmpFollowerPubKey
  107. //send publicKey to follower
  108. writeTo(followerConnection, leaderPublicKey[:])
  109. writeTo(followerConnection, intToByte(neededSubscriptions))
  110. //goroutine for accepting new clients
  111. go func() {
  112. leaderConnectionPrivateKey, err := rsa.GenerateKey(rand.Reader, 2048)
  113. if err != nil {
  114. panic(err)
  115. }
  116. // Generate a pem block with the private key
  117. keyPem := pem.EncodeToMemory(&pem.Block{
  118. Type: "RSA PRIVATE KEY",
  119. Bytes: x509.MarshalPKCS1PrivateKey(leaderConnectionPrivateKey),
  120. })
  121. tml := x509.Certificate{
  122. // you can add any attr that you need
  123. NotBefore: time.Now(),
  124. NotAfter: time.Now().AddDate(5, 0, 0),
  125. // you have to generate a different serial number each execution
  126. SerialNumber: big.NewInt(123123),
  127. Subject: pkix.Name{
  128. CommonName: "New Name",
  129. Organization: []string{"New Org."},
  130. },
  131. BasicConstraintsValid: true,
  132. }
  133. cert, err := x509.CreateCertificate(rand.Reader, &tml, &tml, &leaderConnectionPrivateKey.PublicKey, leaderConnectionPrivateKey)
  134. if err != nil {
  135. panic(err)
  136. }
  137. // Generate a pem block with the certificate
  138. certPem := pem.EncodeToMemory(&pem.Block{
  139. Type: "CERTIFICATE",
  140. Bytes: cert,
  141. })
  142. tlsCert, err := tls.X509KeyPair(certPem, keyPem)
  143. if err != nil {
  144. panic(err)
  145. }
  146. config := &tls.Config{Certificates: []tls.Certificate{tlsCert}}
  147. //listens for clients
  148. lnClients, err := tls.Listen("tcp", ":4441", config)
  149. if err != nil {
  150. panic(err)
  151. }
  152. defer lnClients.Close()
  153. for {
  154. clientConnection, err := lnClients.Accept()
  155. if err != nil {
  156. fmt.Println("Client connection error 1", err)
  157. clientConnection.Close()
  158. break
  159. }
  160. clientConnection.SetDeadline(time.Time{})
  161. //sends topicList so client can participate in phase 3 asap
  162. errorBool := sendTopicLists(clientConnection, followerConnection, true)
  163. if errorBool {
  164. break
  165. }
  166. //send leader publicKey
  167. _, err = clientConnection.Write(leaderPublicKey[:])
  168. if err != nil {
  169. fmt.Println("Client connection error 2", err)
  170. clientConnection.Close()
  171. break
  172. }
  173. //send follower publicKey
  174. _, err = clientConnection.Write(followerPublicKey[:])
  175. if err != nil {
  176. fmt.Println("Client connection error 3", err)
  177. clientConnection.Close()
  178. break
  179. }
  180. var clientPublicKey *[32]byte
  181. var tmpClientPublicKey [32]byte
  182. //gets publicKey from client
  183. _, err = clientConnection.Read(tmpClientPublicKey[:])
  184. if err != nil {
  185. fmt.Println("Client connection error 4", err)
  186. clientConnection.Close()
  187. break
  188. }
  189. _, err = clientConnection.Write(intToByte(neededSubscriptions))
  190. if err != nil {
  191. fmt.Println("Client connection error 5", err)
  192. clientConnection.Close()
  193. break
  194. }
  195. _, err = clientConnection.Write(intToByte(int(startTime.Unix())))
  196. if err != nil {
  197. fmt.Println("Client connection error 6", err)
  198. clientConnection.Close()
  199. break
  200. }
  201. clientPublicKey = &tmpClientPublicKey
  202. //this is the key for map(client data)
  203. remoteAddress := clientConnection.RemoteAddr()
  204. //pirQuery will be added in phase 3
  205. //bs! only want to set roundsParticipating and answerAmount to 0, mb there is a better way
  206. //will work for now
  207. var emptyArray [32]byte
  208. var emptyByteArray [][]byte
  209. keys := clientKeys{0, clientPublicKey, emptyArray, emptyByteArray}
  210. m.Lock()
  211. clientData[remoteAddress] = keys
  212. m.Unlock()
  213. phase1Channel <- clientConnection
  214. clientsConnected++
  215. if clientsConnected%1000 == 0 {
  216. fmt.Println("clientsConnected", clientsConnected)
  217. }
  218. }
  219. }()
  220. wg := &sync.WaitGroup{}
  221. //the current phase
  222. phase := make([]byte, 1)
  223. for {
  224. startPhase1 = time.Now()
  225. startTimeRound = time.Now()
  226. phase[0] = 1
  227. round++
  228. fmt.Println("clientsServedPhase1", clientsServedPhase1[round-1])
  229. fmt.Println("clientsServedPhase3", clientsServedPhase3[round-1])
  230. fmt.Println("dbWriteSize", dbWriteSize)
  231. fmt.Println("Phase 1 Round", round)
  232. //creates a new write Db for this round
  233. for i := 0; i < int(dbWriteSize); i++ {
  234. C.createDb(C.int(1), C.int(dataLength))
  235. }
  236. //creates a new db containing virtual addresses for auditing
  237. virtualAddresses := createVirtualAddresses()
  238. //send all virtualAddresses to follower
  239. for i := 0; i <= int(dbWriteSize); i++ {
  240. writeTo(followerConnection, intToByte(virtualAddresses[i]))
  241. }
  242. //moves all clients to phase1
  243. if len(phase3Channel) > 0 {
  244. for client := range phase3Channel {
  245. phase1Channel <- client
  246. if len(phase3Channel) == 0 {
  247. break
  248. }
  249. }
  250. }
  251. for id := 0; id < numThreads; id++ {
  252. wg.Add(1)
  253. followerConnection, err := tls.Dial("tcp", follower, conf)
  254. if err != nil {
  255. panic(err)
  256. }
  257. followerConnection.SetDeadline(time.Time{})
  258. go phase1(id, phase, followerConnection, wg, m, virtualAddresses)
  259. }
  260. wg.Wait()
  261. fmt.Println("fullDurationPhase1", time.Since(startPhase1).Seconds())
  262. //Phase 2
  263. startPhase2 = time.Now()
  264. followerConnection, err := tls.Dial("tcp", follower, conf)
  265. if err != nil {
  266. panic(err)
  267. }
  268. followerConnection.SetDeadline(time.Time{})
  269. phase2(followerConnection)
  270. fmt.Println("fullDurationPhase2", time.Since(startPhase2).Seconds())
  271. //Phase 3
  272. //moves all clients to phase3
  273. if len(phase1Channel) > 0 {
  274. for client := range phase1Channel {
  275. phase3Channel <- client
  276. if len(phase1Channel) == 0 {
  277. break
  278. }
  279. }
  280. }
  281. startPhase3 = time.Now()
  282. //no tweets -> continue to phase 1 and mb get tweets
  283. topicList, topicAmount = lib.GetTopicList(0)
  284. if len(topicList) == 0 {
  285. continue
  286. }
  287. phase[0] = 3
  288. startTimeRound = time.Now()
  289. for id := 0; id < numThreads; id++ {
  290. wg.Add(1)
  291. followerConnection, err := tls.Dial("tcp", follower, conf)
  292. if err != nil {
  293. panic(err)
  294. }
  295. followerConnection.SetDeadline(time.Time{})
  296. go phase3(id, phase, followerConnection, wg, m)
  297. }
  298. wg.Wait()
  299. fmt.Println("fullDurationPhase3", time.Since(startPhase3).Seconds())
  300. lib.CleanUpdbR(round)
  301. }
  302. }
  303. func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex, virtualAddresses []int) {
  304. roundAsBytes := intToByte(round)
  305. gotClient := make([]byte, 1)
  306. gotClient[0] = 0
  307. //wait until time is up
  308. for len(phase1Channel) == 0 {
  309. if time.Since(startTimeRound) > maxTimePerRound {
  310. //tells follower that this worker is done
  311. writeTo(followerConnection, gotClient)
  312. wg.Done()
  313. return
  314. }
  315. time.Sleep(1 * time.Second)
  316. }
  317. for clientConnection := range phase1Channel {
  318. clientsServedPhase1[round] = clientsServedPhase1[round] + 1
  319. if clientsServedPhase1[round]%1000 == 0 {
  320. fmt.Println("clientsServedPhase1", clientsServedPhase1[round])
  321. fmt.Println("timeTaken", time.Since(startPhase1))
  322. }
  323. gotClient[0] = 1
  324. //tells follower that this worker got a clientConnection
  325. writeTo(followerConnection, gotClient)
  326. //sends clients publicKey to follower
  327. m.RLock()
  328. clientPublicKey := clientData[clientConnection.RemoteAddr()].PublicKey
  329. m.RUnlock()
  330. writeTo(followerConnection, clientPublicKey[:])
  331. //setup the worker-specific db
  332. dbSize := int(C.dbSize)
  333. db := make([][]byte, dbSize)
  334. for i := 0; i < dbSize; i++ {
  335. db[i] = make([]byte, int(C.db[i].dataSize))
  336. }
  337. //tells client that phase 1 has begun
  338. errorBool := writeToWError(clientConnection, phase, followerConnection, 5)
  339. if errorBool {
  340. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  341. if contBool {
  342. continue
  343. } else {
  344. return
  345. }
  346. }
  347. //tells client current dbWriteSize
  348. errorBool = writeToWError(clientConnection, intToByte(int(dbWriteSize)), followerConnection, 5)
  349. if errorBool {
  350. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  351. if contBool {
  352. continue
  353. } else {
  354. return
  355. }
  356. }
  357. //tells client current round
  358. errorBool = writeToWError(clientConnection, roundAsBytes, followerConnection, 5)
  359. if errorBool {
  360. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  361. if contBool {
  362. continue
  363. } else {
  364. return
  365. }
  366. }
  367. //begin auditing
  368. //auditingStart := time.Now()
  369. m.RLock()
  370. var clientKeys = clientData[clientConnection.RemoteAddr()]
  371. m.RUnlock()
  372. clientKeys, pirQuery, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, 0, true)
  373. if errorBool {
  374. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  375. if contBool {
  376. continue
  377. } else {
  378. return
  379. }
  380. }
  381. errorBool = getSendVirtualAddress(pirQuery[0], virtualAddresses, clientKeys.SharedSecret, clientConnection, followerConnection)
  382. if errorBool {
  383. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  384. if contBool {
  385. continue
  386. } else {
  387. return
  388. }
  389. }
  390. if id == 0 {
  391. //fmt.Println("Auditing duration", time.Since(auditingStart).Seconds())
  392. }
  393. m.Lock()
  394. clientData[clientConnection.RemoteAddr()] = clientKeys
  395. m.Unlock()
  396. //accept dpfQuery from client
  397. dpfLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
  398. if errorBool {
  399. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  400. if contBool {
  401. continue
  402. } else {
  403. return
  404. }
  405. }
  406. dpfLength := byteToInt(dpfLengthBytes)
  407. dpfQueryAEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
  408. if errorBool {
  409. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  410. if contBool {
  411. continue
  412. } else {
  413. return
  414. }
  415. }
  416. dpfQueryBEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
  417. if errorBool {
  418. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  419. if contBool {
  420. continue
  421. } else {
  422. return
  423. }
  424. }
  425. writeToWError(followerConnection, dpfLengthBytes, nil, 0)
  426. writeToWError(followerConnection, dpfQueryBEncrypted, nil, 0)
  427. //decrypt dpfQueryA for sorting into db
  428. var decryptNonce [24]byte
  429. copy(decryptNonce[:], dpfQueryAEncrypted[:24])
  430. dpfQueryA, ok := box.Open(nil, dpfQueryAEncrypted[24:], &decryptNonce, clientPublicKey, leaderPrivateKey)
  431. if !ok {
  432. panic("dpfQueryA decryption not ok")
  433. }
  434. ds := int(C.db[0].dataSize)
  435. dataShareLeader := make([]byte, ds)
  436. pos := C.getUint128_t(C.int(virtualAddresses[int(dbWriteSize)]))
  437. C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShareLeader[0]))
  438. dataShareFollower, _ := readFrom(followerConnection, ds, nil, 0)
  439. writeToWError(followerConnection, dataShareLeader, nil, 0)
  440. auditXOR := make([]byte, ds)
  441. passedAudit := true
  442. for i := 0; i < ds; i++ {
  443. auditXOR[i] = dataShareLeader[i] ^ dataShareFollower[i]
  444. //client tried to write to a position that is not a virtuallAddress
  445. if auditXOR[i] != 0 {
  446. clientConnection.Close()
  447. passedAudit = false
  448. }
  449. }
  450. if passedAudit {
  451. //run dpf, xor into local db
  452. for i := 0; i < dbSize; i++ {
  453. ds := int(C.db[i].dataSize)
  454. dataShare := make([]byte, ds)
  455. pos := C.getUint128_t(C.int(virtualAddresses[i]))
  456. C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShare[0]))
  457. for j := 0; j < ds; j++ {
  458. db[i][j] = db[i][j] ^ dataShare[j]
  459. }
  460. }
  461. //xor the worker's DB into the main DB
  462. for i := 0; i < dbSize; i++ {
  463. m.Lock()
  464. C.xorIn(C.int(i), (*C.uchar)(&db[i][0]))
  465. m.Unlock()
  466. }
  467. phase3Channel <- clientConnection
  468. }
  469. //loop that waits for new client or leaves phase1 if time is up
  470. for {
  471. if time.Since(startTimeRound) < maxTimePerRound {
  472. //this worker handles the next client
  473. if len(phase1Channel) > 0 {
  474. break
  475. //this worker waits for next client
  476. } else {
  477. time.Sleep(1 * time.Second)
  478. }
  479. //times up
  480. } else {
  481. //tells follower that this worker is done
  482. gotClient[0] = 0
  483. writeTo(followerConnection, gotClient)
  484. wg.Done()
  485. return
  486. }
  487. }
  488. }
  489. }
  490. func phase2(followerConnection net.Conn) {
  491. //gets current seed
  492. seedLeader := make([]byte, 16)
  493. C.readSeed((*C.uchar)(&seedLeader[0]))
  494. //get data
  495. dbSize := int(C.dbSize)
  496. tmpdbLeader := make([][]byte, dbSize)
  497. for i := range tmpdbLeader {
  498. tmpdbLeader[i] = make([]byte, dataLength)
  499. }
  500. for i := 0; i < dbSize; i++ {
  501. C.readData(C.int(i), (*C.uchar)(&tmpdbLeader[i][0]))
  502. }
  503. //writes seed to follower
  504. writeTo(followerConnection, seedLeader)
  505. //write data to follower
  506. //this is surely inefficent
  507. for i := 0; i < dbSize; i++ {
  508. writeTo(followerConnection, tmpdbLeader[i])
  509. }
  510. //receive seed from follower
  511. seedFollower, _ := readFrom(followerConnection, 16, nil, 0)
  512. //receive data from follower
  513. tmpdbFollower := make([][]byte, dbSize)
  514. for i := range tmpdbFollower {
  515. tmpdbFollower[i] = make([]byte, dataLength)
  516. }
  517. for i := 0; i < dbSize; i++ {
  518. tmpdbFollower[i], _ = readFrom(followerConnection, dataLength, nil, 0)
  519. }
  520. //put together the db
  521. tmpdb := make([][]byte, dbSize)
  522. for i := range tmpdb {
  523. tmpdb[i] = make([]byte, dataLength)
  524. }
  525. //get own Ciphers
  526. ciphersLeader := make([]*C.uchar, dbSize)
  527. for i := 0; i < dbSize; i++ {
  528. ciphersLeader[i] = (*C.uchar)(C.malloc(16))
  529. }
  530. for i := 0; i < dbSize; i++ {
  531. C.getCipher(1, C.int(i), ciphersLeader[i])
  532. }
  533. //send own Ciphers to follower
  534. for i := 0; i < dbSize; i++ {
  535. writeTo(followerConnection, C.GoBytes(unsafe.Pointer(ciphersLeader[i]), 16))
  536. }
  537. //receive ciphers from follower
  538. ciphersFollower := make([]byte, dbSize*16)
  539. for i := 0; i < dbSize; i++ {
  540. _, err := followerConnection.Read(ciphersFollower[i*16:])
  541. if err != nil {
  542. panic(err)
  543. }
  544. }
  545. //put in ciphers from follower
  546. for i := 0; i < dbSize; i++ {
  547. C.putCipher(1, C.int(i), (*C.uchar)(&ciphersFollower[i*16]))
  548. }
  549. //decrypt each row
  550. for i := 0; i < dbSize; i++ {
  551. C.decryptRow(C.int(i), (*C.uchar)(&tmpdb[i][0]), (*C.uchar)(&tmpdbLeader[i][0]), (*C.uchar)(&tmpdbFollower[i][0]), (*C.uchar)(&seedLeader[0]), (*C.uchar)(&seedFollower[0]))
  552. }
  553. var tweets []lib.Tweet
  554. var currentPublisherAmount int = 0
  555. var collisions float64
  556. for i := 0; i < dbSize; i++ {
  557. //discard cover message
  558. if tmpdb[i][1] == 0 {
  559. continue
  560. //collision
  561. } else if -1 == strings.Index(string(tmpdb[i]), ";;") {
  562. currentPublisherAmount++
  563. currentPublisherAmount++
  564. collisions++
  565. continue
  566. } else {
  567. currentPublisherAmount++
  568. //reconstruct tweet
  569. var position int = 0
  570. var topics []string
  571. var topic string
  572. var text string
  573. for _, letter := range tmpdb[i] {
  574. if string(letter) == ";" {
  575. if topic != "" {
  576. topics = append(topics, topic)
  577. topic = ""
  578. }
  579. position++
  580. } else {
  581. if position == 0 {
  582. if string(letter) == "," {
  583. topics = append(topics, topic)
  584. topic = ""
  585. } else {
  586. //if topics are
  587. //int
  588. //topic = topic + fmt.Sprint(int(letter))
  589. //string
  590. topic = topic + string(letter)
  591. }
  592. } else if position == 1 {
  593. text = text + string(letter)
  594. }
  595. }
  596. }
  597. tweet := lib.Tweet{"", -1, topics, text, round}
  598. if text != "" {
  599. tweets = append(tweets, tweet)
  600. } else {
  601. //this is a odd(number) way collisions
  602. collisions++
  603. }
  604. }
  605. }
  606. collisionCounter = append(collisionCounter, collisions)
  607. if collisions/dbWriteSize > 0.05 {
  608. fmt.Println("Collisions this round", collisions, "dbWriteSize", dbWriteSize)
  609. }
  610. //fmt.Println("tweets recovered: ", tweets)
  611. //sort into read db
  612. //fmt.Println("newTweets", tweets)
  613. lib.NewEntries(tweets, 0)
  614. C.resetDb()
  615. //calculates the publisherAverage over the last publisherRounds rounds
  616. index := round % publisherRounds
  617. publisherHistory[index] = currentPublisherAmount
  618. fmt.Println("currentPublisherAmount", currentPublisherAmount)
  619. var publisherAmount int
  620. for _, num := range publisherHistory {
  621. publisherAmount += num
  622. }
  623. publisherAverage := 0
  624. if round < publisherRounds {
  625. publisherAverage = publisherAmount / round
  626. } else {
  627. publisherAverage = publisherAmount / publisherRounds
  628. }
  629. //calculates the dbWriteSize for this round
  630. dbWriteSize = math.Ceil(19.5 * float64(publisherAverage))
  631. if dbWriteSize < minDBWriteSize {
  632. dbWriteSize = minDBWriteSize
  633. }
  634. //writes dbWriteSize of current round to follower
  635. writeTo(followerConnection, intToByte(int(dbWriteSize)))
  636. lib.CleanUpdbR(round)
  637. }
  638. func addTestTweets() {
  639. //creates test tweets
  640. tweets := make([]lib.Tweet, 5)
  641. for i := range tweets {
  642. j := i
  643. if i == 1 {
  644. j = 0
  645. }
  646. text := "Text " + strconv.Itoa(i)
  647. var topics []string
  648. topics = append(topics, "Topic "+strconv.Itoa(j))
  649. tweets[i] = lib.Tweet{"", -1, topics, text, i}
  650. }
  651. lib.NewEntries(tweets, 0)
  652. }
  653. //opti! mb it is quicker to send updated topicLists to clients first so pirQuerys are ready
  654. func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex) {
  655. gotClient := make([]byte, 1)
  656. gotClient[0] = 0
  657. //wait until time is up
  658. for len(phase3Channel) == 0 {
  659. if time.Since(startTimeRound) > maxTimePerRound*2 {
  660. //tells follower that this worker is done
  661. writeToWError(followerConnection, gotClient, nil, 0)
  662. wg.Done()
  663. return
  664. }
  665. time.Sleep(1 * time.Second)
  666. }
  667. for clientConnection := range phase3Channel {
  668. clientsServedPhase3[round] = clientsServedPhase3[round] + 1
  669. if clientsServedPhase3[round]%1000 == 0 {
  670. fmt.Println("clientsServedPhase3", clientsServedPhase3[round])
  671. fmt.Println("timeTaken", time.Since(startPhase3).Seconds())
  672. }
  673. gotClient[0] = 1
  674. //tells follower that this worker got a clientConnection
  675. writeToWError(followerConnection, gotClient, nil, 0)
  676. //tells client current phase
  677. errorBool := writeToWError(clientConnection, phase, followerConnection, 2)
  678. if errorBool {
  679. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  680. if contBool {
  681. continue
  682. } else {
  683. return
  684. }
  685. }
  686. /*
  687. possible Values
  688. 0 : new client
  689. leader expects sharedSecrets, expects pirQuery
  690. 1 : update needed
  691. leader sends topicList, performs local update of sharedSecret, expects pirQuery
  692. 2 : no update needed
  693. nothing
  694. */
  695. subPhase := make([]byte, 1)
  696. //gets the data for the current client
  697. m.RLock()
  698. var clientKeys = clientData[clientConnection.RemoteAddr()]
  699. m.RUnlock()
  700. var roundsParticipating = clientKeys.roundsParticipating
  701. //client participates for the first time
  702. if roundsParticipating == 0 {
  703. subPhase[0] = 0
  704. } else if roundsParticipating%roundsBeforeUpdate == 0 {
  705. subPhase[0] = 1
  706. } else {
  707. subPhase[0] = 2
  708. }
  709. //tells client what leader expects
  710. errorBool = writeToWError(clientConnection, subPhase, followerConnection, 2)
  711. if errorBool {
  712. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  713. if contBool {
  714. continue
  715. } else {
  716. return
  717. }
  718. }
  719. //tells follower what will happen
  720. writeToWError(followerConnection, subPhase, nil, 0)
  721. //sends clients publicKey so follower knows which client is being served
  722. writeTo(followerConnection, clientKeys.PublicKey[:])
  723. //increases rounds participating for client
  724. clientKeys.roundsParticipating = roundsParticipating + 1
  725. //declaring variables here to prevent dupclicates later
  726. m.RLock()
  727. var sharedSecret [32]byte = clientData[clientConnection.RemoteAddr()].SharedSecret
  728. m.RUnlock()
  729. if subPhase[0] == 0 {
  730. errorBool := sendTopicLists(clientConnection, followerConnection, false)
  731. if errorBool {
  732. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  733. if contBool {
  734. continue
  735. } else {
  736. return
  737. }
  738. }
  739. clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
  740. if errorBool {
  741. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  742. if contBool {
  743. continue
  744. } else {
  745. return
  746. }
  747. }
  748. } else if subPhase[0] == 1 {
  749. errorBool := sendTopicLists(clientConnection, followerConnection, false)
  750. if errorBool {
  751. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  752. if contBool {
  753. continue
  754. } else {
  755. return
  756. }
  757. }
  758. //updates sharedSecret
  759. sharedSecret = sha256.Sum256(sharedSecret[:])
  760. clientKeys.SharedSecret = sharedSecret
  761. clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
  762. if errorBool {
  763. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  764. if contBool {
  765. continue
  766. } else {
  767. return
  768. }
  769. }
  770. }
  771. errorBool = getSendTweets(clientKeys, nil, clientConnection, followerConnection, m)
  772. if errorBool {
  773. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  774. if contBool {
  775. continue
  776. } else {
  777. return
  778. }
  779. }
  780. wantsArchive, errorBool := readFrom(clientConnection, 1, followerConnection, 2)
  781. if errorBool {
  782. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  783. if contBool {
  784. continue
  785. } else {
  786. return
  787. }
  788. }
  789. writeToWError(followerConnection, wantsArchive, nil, 0)
  790. if wantsArchive[0] == 1 && archiveTopicAmount > 0 {
  791. _, archiveQuerys, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, -1, false)
  792. if errorBool {
  793. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  794. if contBool {
  795. continue
  796. } else {
  797. return
  798. }
  799. }
  800. errorBool = getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection, m)
  801. if errorBool {
  802. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  803. if contBool {
  804. continue
  805. } else {
  806. return
  807. }
  808. }
  809. }
  810. //saves all changes for client
  811. m.Lock()
  812. clientData[clientConnection.RemoteAddr()] = clientKeys
  813. m.Unlock()
  814. phase1Channel <- clientConnection
  815. for {
  816. if time.Since(startTimeRound) < 2*maxTimePerRound {
  817. //this worker handles the next client
  818. if len(phase3Channel) > 0 {
  819. break
  820. //this worker waits for next client
  821. } else {
  822. time.Sleep(1 * time.Second)
  823. }
  824. //times up
  825. } else {
  826. //tells follower that this worker is done
  827. gotClient[0] = 0
  828. writeToWError(followerConnection, gotClient, nil, 0)
  829. wg.Done()
  830. return
  831. }
  832. }
  833. }
  834. }
  835. //returns true if there is another client
  836. func handleClientDC(wg *sync.WaitGroup, followerConnection net.Conn, channel chan net.Conn) bool {
  837. //loop that waits for new client or leaves phase1 if time is up
  838. for {
  839. if time.Since(startTimeRound) < maxTimePerRound {
  840. //this worker handles the next client
  841. if len(channel) > 0 {
  842. return true
  843. //this worker waits for next client
  844. } else {
  845. time.Sleep(1 * time.Second)
  846. }
  847. //times up
  848. } else {
  849. //tells follower that this worker is done
  850. gotClient := make([]byte, 1)
  851. gotClient[0] = 0
  852. writeTo(followerConnection, gotClient)
  853. wg.Done()
  854. return false
  855. }
  856. }
  857. }
  858. func createVirtualAddresses() []int {
  859. //array will be filled with unique random ascending values
  860. //adapted from: https://stackoverflow.com/questions/20039025/java-array-of-unique-randomly-generated-integers
  861. //+1 to have a position to evaluate each received message
  862. arraySize := int(dbWriteSize) + 1
  863. var maxInt int = int(math.Pow(2, 31))
  864. virtualAddresses := make([]int, arraySize)
  865. for i := 0; i < arraySize; i++ {
  866. virtualAddresses[i] = mr.Intn(maxInt)
  867. for j := 0; j < i; j++ {
  868. if virtualAddresses[i] == virtualAddresses[j] {
  869. i--
  870. break
  871. }
  872. }
  873. }
  874. sort.Ints(virtualAddresses)
  875. return virtualAddresses
  876. }
  877. func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret [32]byte, clientConnection, followerConnection net.Conn) bool {
  878. //xores all requested addresses into virtuallAddress
  879. virtualAddress := make([]byte, 4)
  880. for index, num := range pirQuery {
  881. if num == 1 {
  882. currentAddress := intToByte(virtualAddresses[index])
  883. for i := 0; i < 4; i++ {
  884. virtualAddress[i] = virtualAddress[i] ^ currentAddress[i]
  885. }
  886. }
  887. }
  888. //xores the sharedSecret
  889. for i := 0; i < 4; i++ {
  890. virtualAddress[i] = virtualAddress[i] ^ sharedSecret[i]
  891. }
  892. virtualAddressFollower, _ := readFrom(followerConnection, 4, nil, 0)
  893. //xores the data from follower
  894. for i := 0; i < 4; i++ {
  895. virtualAddress[i] = virtualAddress[i] ^ virtualAddressFollower[i]
  896. }
  897. errorBool := writeToWError(clientConnection, virtualAddress, followerConnection, 5)
  898. return errorBool
  899. }
  900. func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnection, followerConnection net.Conn, m *sync.RWMutex) bool {
  901. //todo! repeat for archive
  902. tmpNeededSubscriptions := neededSubscriptions
  903. if tmpNeededSubscriptions > topicAmount {
  904. tmpNeededSubscriptions = topicAmount
  905. }
  906. if archiveQuerys != nil {
  907. tmpNeededSubscriptions = len(archiveQuerys)
  908. }
  909. //fmt.Println("tmpNeededSubscriptions", tmpNeededSubscriptions)
  910. tweets := make([][]byte, tmpNeededSubscriptions)
  911. for i := 0; i < tmpNeededSubscriptions; i++ {
  912. //gets all requested tweets
  913. if archiveQuerys == nil {
  914. tweets[i] = lib.GetTweets(clientKeys.PirQuery[i], dataLength, 0, *clientKeys.PublicKey)
  915. } else {
  916. tweets[i] = lib.GetTweets(archiveQuerys[i], dataLength, 1, *clientKeys.PublicKey)
  917. }
  918. //expand sharedSecret so it is of right length
  919. expandBy := len(tweets[i]) / 32
  920. var expandedSharedSecret []byte
  921. for i := 0; i < expandBy; i++ {
  922. expandedSharedSecret = append(expandedSharedSecret, clientKeys.SharedSecret[:]...)
  923. }
  924. //Xor's sharedSecret with all tweets
  925. lib.Xor(expandedSharedSecret[:], tweets[i])
  926. //fmt.Println(tweets[0])
  927. blockLength := len(tweets[i])
  928. receivedTweets, _ := readFrom(followerConnection, blockLength, nil, 0)
  929. //fmt.Println("receivedTweets", blockLength, len(receivedTweets))
  930. //fmt.Println("pubKey", *clientKeys.PublicKey, "Bytes", tweets, "follower", receivedTweets)
  931. lib.Xor(receivedTweets, tweets[i])
  932. }
  933. //sends tweets to client
  934. for i := 0; i < tmpNeededSubscriptions; i++ {
  935. tweetsLengthBytes := intToByte(len(tweets[i]))
  936. errorBool := writeToWError(clientConnection, tweetsLengthBytes, followerConnection, 2)
  937. if errorBool {
  938. return true
  939. }
  940. errorBool = writeToWError(clientConnection, tweets[i], followerConnection, 2)
  941. if errorBool {
  942. return true
  943. }
  944. }
  945. return false
  946. }
  947. func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerConnection net.Conn, subPhase int, doAuditing bool) (clientKeys, [][]byte, bool) {
  948. clientPublicKey := clientKeys.PublicKey
  949. //gets the msg length
  950. msgLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
  951. if errorBool {
  952. return clientKeys, nil, true
  953. }
  954. msgLength := byteToInt(msgLengthBytes)
  955. //gets the leader box
  956. leaderBox, errorBool := readFrom(clientConnection, msgLength, followerConnection, 5)
  957. if errorBool {
  958. return clientKeys, nil, true
  959. }
  960. //gets the follower box
  961. followerBox, errorBool := readFrom(clientConnection, msgLength, followerConnection, 5)
  962. if errorBool {
  963. return clientKeys, nil, true
  964. }
  965. //todo! repeat for archive
  966. tmpNeededSubscriptions := neededSubscriptions
  967. if tmpNeededSubscriptions > topicAmount {
  968. tmpNeededSubscriptions = topicAmount
  969. }
  970. tmpTopicAmount := topicAmount
  971. if subPhase == -1 {
  972. archiveNeededSubscriptions, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
  973. if errorBool {
  974. return clientKeys, nil, true
  975. }
  976. writeToWError(followerConnection, archiveNeededSubscriptions, nil, 0)
  977. tmpNeededSubscriptions = byteToInt(archiveNeededSubscriptions)
  978. tmpTopicAmount = archiveTopicAmount
  979. }
  980. if doAuditing {
  981. tmpNeededSubscriptions = 1
  982. tmpTopicAmount = int(dbWriteSize)
  983. }
  984. //send length to follower
  985. writeToWError(followerConnection, msgLengthBytes, nil, 0)
  986. //send box to follower
  987. writeToWError(followerConnection, followerBox, nil, 0)
  988. var decryptNonce [24]byte
  989. copy(decryptNonce[:], leaderBox[:24])
  990. decrypted, ok := box.Open(nil, leaderBox[24:], &decryptNonce, clientPublicKey, leaderPrivateKey)
  991. if !ok {
  992. fmt.Println("pirQuery decryption not ok")
  993. return clientKeys, nil, true
  994. }
  995. //if sharedSecret is send
  996. if subPhase == 0 {
  997. var tmpSharedSecret [32]byte
  998. for index := 0; index < 32; index++ {
  999. tmpSharedSecret[index] = decrypted[index]
  1000. }
  1001. clientKeys.SharedSecret = tmpSharedSecret
  1002. decrypted = decrypted[32:]
  1003. }
  1004. if doAuditing {
  1005. result := make([][]byte, 1)
  1006. result[0] = decrypted
  1007. return clientKeys, result, false
  1008. }
  1009. //transforms byteArray to ints of wanted topics
  1010. pirQueryFlattened := decrypted
  1011. pirQuerys := make([][]byte, tmpNeededSubscriptions)
  1012. for i := range pirQuerys {
  1013. pirQuerys[i] = make([]byte, tmpTopicAmount)
  1014. }
  1015. for i := 0; i < tmpNeededSubscriptions; i++ {
  1016. pirQuerys[i] = pirQueryFlattened[i*tmpTopicAmount : (i+1)*tmpTopicAmount]
  1017. }
  1018. //sets the pirQuery for the client in case whe are not archiving, and not Auditing
  1019. if subPhase != -1 {
  1020. clientKeys.PirQuery = pirQuerys
  1021. }
  1022. return clientKeys, pirQuerys, false
  1023. }
  1024. func transformBytesToStringArray(topicsAsBytes []byte) []string {
  1025. var topics []string
  1026. var topic string
  1027. var position int = 0
  1028. for _, letter := range topicsAsBytes {
  1029. if string(letter) == "," {
  1030. topics[position] = topic
  1031. topic = ""
  1032. position++
  1033. } else {
  1034. topic = topic + string(letter)
  1035. }
  1036. }
  1037. return topics
  1038. }
  1039. func byteToInt(myBytes []byte) (x int) {
  1040. x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0])
  1041. return
  1042. }
  1043. //returns true if error occured
  1044. func sendTopicLists(clientConnection, followerConnection net.Conn, setup bool) bool {
  1045. for i := 0; i < 2; i++ {
  1046. var topicList []byte
  1047. if i == 0 {
  1048. topicList, topicAmount = lib.GetTopicList(i)
  1049. } else {
  1050. topicList, archiveTopicAmount = lib.GetTopicList(i)
  1051. }
  1052. topicListLengthBytes := intToByte(len(topicList))
  1053. if !setup {
  1054. err := writeToWError(clientConnection, topicListLengthBytes, followerConnection, 5)
  1055. if err {
  1056. return true
  1057. }
  1058. err = writeToWError(clientConnection, topicList, followerConnection, 5)
  1059. if err {
  1060. return true
  1061. }
  1062. } else {
  1063. _, err := clientConnection.Write(topicListLengthBytes)
  1064. if err != nil {
  1065. return true
  1066. }
  1067. _, err = clientConnection.Write(topicList)
  1068. if err != nil {
  1069. return true
  1070. }
  1071. }
  1072. }
  1073. return false
  1074. }
  1075. //sends the array to the connection
  1076. func writeTo(connection net.Conn, array []byte) {
  1077. remainingLength := len(array)
  1078. for remainingLength > 0 {
  1079. if remainingLength >= mtu {
  1080. _, err := connection.Write(array[:mtu])
  1081. if err != nil {
  1082. panic(err)
  1083. }
  1084. array = array[mtu:]
  1085. remainingLength -= mtu
  1086. } else {
  1087. _, err := connection.Write(array)
  1088. if err != nil {
  1089. panic(err)
  1090. }
  1091. remainingLength = 0
  1092. }
  1093. }
  1094. }
  1095. func writeToWError(connection net.Conn, array []byte, followerConnection net.Conn, size int) bool {
  1096. if connection.RemoteAddr().String() == follower {
  1097. arrayWError := make([]byte, 1)
  1098. arrayWError = append(arrayWError, array[:]...)
  1099. remainingLength := len(arrayWError)
  1100. for remainingLength > 0 {
  1101. if remainingLength >= mtu {
  1102. _, err := connection.Write(arrayWError[:mtu])
  1103. if err != nil {
  1104. return handleError(connection, followerConnection, size, err)
  1105. }
  1106. arrayWError = arrayWError[mtu:]
  1107. remainingLength -= mtu
  1108. } else {
  1109. _, err := connection.Write(arrayWError)
  1110. if err != nil {
  1111. return handleError(connection, followerConnection, size, err)
  1112. }
  1113. remainingLength = 0
  1114. }
  1115. }
  1116. } else {
  1117. remainingLength := len(array)
  1118. for remainingLength > 0 {
  1119. if remainingLength >= mtu {
  1120. _, err := connection.Write(array[:mtu])
  1121. if err != nil {
  1122. return handleError(connection, followerConnection, size, err)
  1123. }
  1124. array = array[mtu:]
  1125. remainingLength -= mtu
  1126. } else {
  1127. _, err := connection.Write(array)
  1128. if err != nil {
  1129. return handleError(connection, followerConnection, size, err)
  1130. }
  1131. remainingLength = 0
  1132. }
  1133. }
  1134. }
  1135. return false
  1136. }
  1137. func handleError(connection, followerConnection net.Conn, size int, err error) bool {
  1138. if err != nil {
  1139. //lets follower know that client has disconnected unexpectedly
  1140. if connection.RemoteAddr().String() != follower {
  1141. if size > mtu {
  1142. fmt.Println("have a look here")
  1143. }
  1144. fmt.Println("handleError", err)
  1145. array := make([]byte, size)
  1146. array[0] = 1
  1147. _, err = followerConnection.Write(array)
  1148. if err != nil {
  1149. panic(err)
  1150. }
  1151. return true
  1152. } else {
  1153. panic(err)
  1154. }
  1155. }
  1156. return false
  1157. }
  1158. //reads an array which is returned and of size "size" from the connection
  1159. //returns true if error occured
  1160. func readFrom(connection net.Conn, size int, followerConnection net.Conn, sizeError int) ([]byte, bool) {
  1161. var array []byte
  1162. remainingSize := size
  1163. for remainingSize > 0 {
  1164. var err error
  1165. toAppend := make([]byte, mtu)
  1166. if remainingSize > mtu {
  1167. _, err = connection.Read(toAppend)
  1168. array = append(array, toAppend...)
  1169. remainingSize -= mtu
  1170. } else {
  1171. _, err = connection.Read(toAppend[:remainingSize])
  1172. array = append(array, toAppend[:remainingSize]...)
  1173. remainingSize = 0
  1174. }
  1175. if err != nil {
  1176. //lets follower know that client has disconnected unexpectedly
  1177. if connection.RemoteAddr().String() != follower {
  1178. fmt.Println(err)
  1179. array := make([]byte, sizeError)
  1180. array[0] = 1
  1181. _, err = followerConnection.Write(array)
  1182. if err != nil {
  1183. panic(err)
  1184. }
  1185. return nil, true
  1186. } else {
  1187. panic(err)
  1188. }
  1189. }
  1190. }
  1191. return array, false
  1192. }
  1193. func intToByte(myInt int) (retBytes []byte) {
  1194. retBytes = make([]byte, 4)
  1195. retBytes[3] = byte((myInt >> 24) & 0xff)
  1196. retBytes[2] = byte((myInt >> 16) & 0xff)
  1197. retBytes[1] = byte((myInt >> 8) & 0xff)
  1198. retBytes[0] = byte(myInt & 0xff)
  1199. return
  1200. }