leader.go 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433
  1. package main
  2. //#cgo CFLAGS: -fopenmp -O2
  3. //#cgo LDFLAGS: -lcrypto -lm -fopenmp
  4. //#include "../c/dpf.h"
  5. //#include "../c/okv.h"
  6. //#include "../c/dpf.c"
  7. //#include "../c/okv.c"
  8. import "C"
  9. import (
  10. "crypto/rand"
  11. "crypto/rsa"
  12. "crypto/sha256"
  13. "crypto/tls"
  14. "crypto/x509"
  15. "crypto/x509/pkix"
  16. "encoding/pem"
  17. "fmt"
  18. "log"
  19. "math"
  20. "math/big"
  21. mr "math/rand"
  22. "net"
  23. "os"
  24. "sort"
  25. "strings"
  26. "sync"
  27. "time"
  28. lib "2PPS/lib"
  29. "unsafe"
  30. "golang.org/x/crypto/nacl/box"
  31. )
  32. //this stores all neccessary information for each client
  33. type clientKeys struct {
  34. roundsParticipating int
  35. PublicKey *[32]byte
  36. SharedSecret [32]byte
  37. PirQuery [][]byte
  38. }
  39. var clientData = make(map[net.Addr]clientKeys)
  40. const follower string = "127.0.0.1:4442"
  41. //Maximum Transport Unit
  42. const mtu int = 1100
  43. var leaderPrivateKey *[32]byte
  44. var leaderPublicKey *[32]byte
  45. var followerPublicKey *[32]byte
  46. const maxNumberOfClients = 1000000
  47. var topicList []byte
  48. var topicAmount int
  49. var archiveTopicAmount int
  50. // every roundsBeforeUpdate the client updates his pirQuery
  51. const roundsBeforeUpdate = 2
  52. const neededSubscriptions = 1
  53. const numThreads = 12
  54. const dataLength = 256
  55. const numClients = 1000
  56. const minDBWriteSize = numClients * 0.5
  57. //riposte says 19.5
  58. const dbSizeFactor = 5
  59. const publisherGuess = 0.2
  60. var dbWriteSize float64 = numClients * dbSizeFactor * publisherGuess
  61. //this is the number of positions for auditing
  62. var extraPositions int = 10
  63. var collisionCounter []float64
  64. var clientsConnected float64
  65. var maxTimePerRound time.Duration = 1000 * time.Second
  66. //counts the number of rounds
  67. var round int
  68. var clientsServedPhase1 []int
  69. var clientsServedPhase3 []int
  70. var startPhase1 time.Time
  71. var startPhase2 time.Time
  72. var startPhase3 time.Time
  73. //only prints auditing time for one client per round
  74. var firstAuditPrint bool
  75. var auditingStart time.Time
  76. var auditingEnd time.Time
  77. var startTime time.Time
  78. var startTimeRound time.Time
  79. //channel for goroutine communication with clients
  80. var phase1Channel = make(chan net.Conn, maxNumberOfClients)
  81. var phase3Channel = make(chan net.Conn, maxNumberOfClients)
  82. //variables for calculating the dbWrite size
  83. const publisherRounds int = 10
  84. var publisherAmount float64
  85. var publisherHistory [publisherRounds]int
  86. var firstTweetSend bool
  87. func main() {
  88. f, err := os.OpenFile("evalData", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
  89. if err != nil {
  90. log.Fatalf("error opening file: %v", err)
  91. }
  92. defer f.Close()
  93. log.SetOutput(f)
  94. log.Println("roundsBeforeUpdate", roundsBeforeUpdate)
  95. log.Println("neededSubscriptions", neededSubscriptions)
  96. log.Println("dataLength", dataLength)
  97. log.Println("maxTimePerRound~", maxTimePerRound*3)
  98. log.Println("extraPositions", extraPositions)
  99. log.Println("numClients", numClients)
  100. log.Println("Archiving is done every 24h, no Clients wants Archive")
  101. //prevents race conditions for wrtiting
  102. m := &sync.RWMutex{}
  103. startTime = time.Now()
  104. clientsServedPhase1 = make([]int, 1000)
  105. clientsServedPhase3 = make([]int, 1000)
  106. generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
  107. if err != nil {
  108. panic(err)
  109. }
  110. //why is this neccessary?
  111. leaderPrivateKey = generatedPrivateKey
  112. leaderPublicKey = generatedPublicKey
  113. C.initializeServer(C.int(numThreads))
  114. //calls follower for setup
  115. conf := &tls.Config{
  116. InsecureSkipVerify: true,
  117. }
  118. followerConnection, err := tls.Dial("tcp", follower, conf)
  119. if err != nil {
  120. panic(err)
  121. }
  122. followerConnection.SetDeadline(time.Time{})
  123. //receives follower publicKey
  124. var tmpFollowerPubKey [32]byte
  125. _, err = followerConnection.Read(tmpFollowerPubKey[:])
  126. if err != nil {
  127. panic(err)
  128. }
  129. followerPublicKey = &tmpFollowerPubKey
  130. //send publicKey to follower
  131. writeTo(followerConnection, leaderPublicKey[:])
  132. writeTo(followerConnection, intToByte(neededSubscriptions))
  133. writeTo(followerConnection, intToByte(int(dbWriteSize)))
  134. //goroutine for accepting new clients
  135. go func() {
  136. leaderConnectionPrivateKey, err := rsa.GenerateKey(rand.Reader, 2048)
  137. if err != nil {
  138. panic(err)
  139. }
  140. // Generate a pem block with the private key
  141. keyPem := pem.EncodeToMemory(&pem.Block{
  142. Type: "RSA PRIVATE KEY",
  143. Bytes: x509.MarshalPKCS1PrivateKey(leaderConnectionPrivateKey),
  144. })
  145. tml := x509.Certificate{
  146. // you can add any attr that you need
  147. NotBefore: time.Now(),
  148. NotAfter: time.Now().AddDate(5, 0, 0),
  149. // you have to generate a different serial number each execution
  150. SerialNumber: big.NewInt(123123),
  151. Subject: pkix.Name{
  152. CommonName: "New Name",
  153. Organization: []string{"New Org."},
  154. },
  155. BasicConstraintsValid: true,
  156. }
  157. cert, err := x509.CreateCertificate(rand.Reader, &tml, &tml, &leaderConnectionPrivateKey.PublicKey, leaderConnectionPrivateKey)
  158. if err != nil {
  159. panic(err)
  160. }
  161. // Generate a pem block with the certificate
  162. certPem := pem.EncodeToMemory(&pem.Block{
  163. Type: "CERTIFICATE",
  164. Bytes: cert,
  165. })
  166. tlsCert, err := tls.X509KeyPair(certPem, keyPem)
  167. if err != nil {
  168. panic(err)
  169. }
  170. config := &tls.Config{Certificates: []tls.Certificate{tlsCert}}
  171. //listens for clients
  172. lnClients, err := tls.Listen("tcp", ":4441", config)
  173. if err != nil {
  174. panic(err)
  175. }
  176. defer lnClients.Close()
  177. for {
  178. clientConnection, err := lnClients.Accept()
  179. if err != nil {
  180. fmt.Println("Client connection error 1", err)
  181. clientConnection.Close()
  182. break
  183. }
  184. clientConnection.SetDeadline(time.Time{})
  185. //sends topicList so client can participate in phase 3 asap
  186. errorBool := sendTopicLists(clientConnection, followerConnection, true)
  187. if errorBool {
  188. break
  189. }
  190. //send leader publicKey
  191. _, err = clientConnection.Write(leaderPublicKey[:])
  192. if err != nil {
  193. fmt.Println("Client connection error 2", err)
  194. clientConnection.Close()
  195. break
  196. }
  197. //send follower publicKey
  198. _, err = clientConnection.Write(followerPublicKey[:])
  199. if err != nil {
  200. fmt.Println("Client connection error 3", err)
  201. clientConnection.Close()
  202. break
  203. }
  204. var clientPublicKey *[32]byte
  205. var tmpClientPublicKey [32]byte
  206. //gets publicKey from client
  207. _, err = clientConnection.Read(tmpClientPublicKey[:])
  208. if err != nil {
  209. fmt.Println("Client connection error 4", err)
  210. clientConnection.Close()
  211. break
  212. }
  213. _, err = clientConnection.Write(intToByte(neededSubscriptions))
  214. if err != nil {
  215. fmt.Println("Client connection error 5", err)
  216. clientConnection.Close()
  217. break
  218. }
  219. _, err = clientConnection.Write(intToByte(int(startTime.Unix())))
  220. if err != nil {
  221. fmt.Println("Client connection error 6", err)
  222. clientConnection.Close()
  223. break
  224. }
  225. clientPublicKey = &tmpClientPublicKey
  226. //this is the key for map(client data)
  227. remoteAddress := clientConnection.RemoteAddr()
  228. //pirQuery will be added in phase 3
  229. //bs! only want to set roundsParticipating and answerAmount to 0, mb there is a better way
  230. //will work for now
  231. var emptyArray [32]byte
  232. var emptyByteArray [][]byte
  233. keys := clientKeys{0, clientPublicKey, emptyArray, emptyByteArray}
  234. m.Lock()
  235. clientData[remoteAddress] = keys
  236. m.Unlock()
  237. phase1Channel <- clientConnection
  238. clientsConnected++
  239. }
  240. }()
  241. wg := &sync.WaitGroup{}
  242. //the current phase
  243. phase := make([]byte, 1)
  244. for {
  245. startPhase1 = time.Now()
  246. startTimeRound = time.Now()
  247. firstAuditPrint = true
  248. phase[0] = 1
  249. round++
  250. fmt.Println("Round", round)
  251. fmt.Println("clientsServedPhase1", clientsServedPhase1[round-1])
  252. fmt.Println("clientsServedPhase3", clientsServedPhase3[round-1])
  253. log.Println()
  254. log.Println("Phase 1 Round", round)
  255. log.Println()
  256. bytesSaved := lib.GetBytesSaved()
  257. log.Println("bytesSaved Percentage", bytesSaved)
  258. //creates a new write Db for this round
  259. for i := 0; i < int(dbWriteSize); i++ {
  260. C.createDb(C.int(1), C.int(dataLength))
  261. }
  262. //creates a new db containing virtual addresses for auditing
  263. virtualAddresses := createVirtualAddresses()
  264. //send all virtualAddresses to follower
  265. for i := 0; i <= int(dbWriteSize); i++ {
  266. writeTo(followerConnection, intToByte(virtualAddresses[i]))
  267. }
  268. //moves all clients to phase1
  269. if len(phase3Channel) > 0 {
  270. for client := range phase3Channel {
  271. phase1Channel <- client
  272. if len(phase3Channel) == 0 {
  273. break
  274. }
  275. }
  276. }
  277. for id := 0; id < numThreads; id++ {
  278. wg.Add(1)
  279. followerConnection, err := tls.Dial("tcp", follower, conf)
  280. if err != nil {
  281. panic(err)
  282. }
  283. followerConnection.SetDeadline(time.Time{})
  284. go phase1(id, phase, followerConnection, wg, m, virtualAddresses)
  285. }
  286. wg.Wait()
  287. log.Println("fullDurationPhase1", time.Since(startPhase1).Seconds())
  288. log.Println("fullAuditingDuration~", auditingEnd.Sub(auditingStart).Seconds()*clientsConnected)
  289. log.Println("auditingPercentage", (auditingEnd.Sub(auditingStart).Seconds()*clientsConnected)/(time.Since(startPhase1).Seconds()))
  290. //Phase 2
  291. startPhase2 = time.Now()
  292. followerConnection, err := tls.Dial("tcp", follower, conf)
  293. if err != nil {
  294. panic(err)
  295. }
  296. followerConnection.SetDeadline(time.Time{})
  297. phase2(followerConnection)
  298. log.Println("fullDurationPhase2", time.Since(startPhase2).Seconds())
  299. //Phase 3
  300. //moves all clients to phase3
  301. if len(phase1Channel) > 0 {
  302. for client := range phase1Channel {
  303. phase3Channel <- client
  304. if len(phase1Channel) == 0 {
  305. break
  306. }
  307. }
  308. }
  309. startPhase3 = time.Now()
  310. //no tweets -> continue to phase 1 and mb get tweets
  311. topicList, topicAmount = lib.GetTopicList(0)
  312. if len(topicList) == 0 {
  313. continue
  314. }
  315. firstTweetSend = true
  316. phase[0] = 3
  317. startTimeRound = time.Now()
  318. for id := 0; id < numThreads; id++ {
  319. wg.Add(1)
  320. followerConnection, err := tls.Dial("tcp", follower, conf)
  321. if err != nil {
  322. panic(err)
  323. }
  324. followerConnection.SetDeadline(time.Time{})
  325. go phase3(id, phase, followerConnection, wg, m)
  326. }
  327. wg.Wait()
  328. log.Println("fullDurationPhase3", time.Since(startPhase3).Seconds())
  329. lib.CleanUpdbR(round)
  330. }
  331. }
  332. func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex, virtualAddresses []int) {
  333. roundAsBytes := intToByte(round)
  334. gotClient := make([]byte, 1)
  335. gotClient[0] = 0
  336. //wait until time is up
  337. for len(phase1Channel) == 0 {
  338. if time.Since(startTimeRound) > maxTimePerRound {
  339. //tells follower that this worker is done
  340. writeTo(followerConnection, gotClient)
  341. wg.Done()
  342. return
  343. }
  344. time.Sleep(1 * time.Second)
  345. }
  346. for clientConnection := range phase1Channel {
  347. clientsServedPhase1[round] = clientsServedPhase1[round] + 1
  348. gotClient[0] = 1
  349. //tells follower that this worker got a clientConnection
  350. writeTo(followerConnection, gotClient)
  351. //sends clients publicKey to follower
  352. m.RLock()
  353. clientPublicKey := clientData[clientConnection.RemoteAddr()].PublicKey
  354. m.RUnlock()
  355. writeTo(followerConnection, clientPublicKey[:])
  356. //setup the worker-specific db
  357. dbSize := int(C.dbSize)
  358. db := make([][]byte, dbSize)
  359. for i := 0; i < dbSize; i++ {
  360. db[i] = make([]byte, int(C.db[i].dataSize))
  361. }
  362. //tells client that phase 1 has begun
  363. errorBool := writeToWError(clientConnection, phase, followerConnection, 5)
  364. if errorBool {
  365. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  366. if contBool {
  367. continue
  368. } else {
  369. return
  370. }
  371. }
  372. //tells client current dbWriteSize
  373. errorBool = writeToWError(clientConnection, intToByte(int(dbWriteSize)), followerConnection, 5)
  374. if errorBool {
  375. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  376. if contBool {
  377. continue
  378. } else {
  379. return
  380. }
  381. }
  382. //tells client current round
  383. errorBool = writeToWError(clientConnection, roundAsBytes, followerConnection, 5)
  384. if errorBool {
  385. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  386. if contBool {
  387. continue
  388. } else {
  389. return
  390. }
  391. }
  392. //begin auditing
  393. if id == 0 && firstAuditPrint {
  394. auditingStart = time.Now()
  395. }
  396. m.RLock()
  397. var clientKeys = clientData[clientConnection.RemoteAddr()]
  398. m.RUnlock()
  399. clientKeys, pirQuery, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, 0, true)
  400. if errorBool {
  401. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  402. if contBool {
  403. continue
  404. } else {
  405. return
  406. }
  407. }
  408. errorBool = getSendVirtualAddress(pirQuery[0], virtualAddresses, clientKeys.SharedSecret, clientConnection, followerConnection)
  409. if errorBool {
  410. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  411. if contBool {
  412. continue
  413. } else {
  414. return
  415. }
  416. }
  417. if id == 0 && firstAuditPrint {
  418. firstAuditPrint = false
  419. auditingEnd = time.Now()
  420. log.Println("Auditing duration", time.Since(auditingStart).Seconds(), "numVirtualAddresses", len(virtualAddresses))
  421. }
  422. m.Lock()
  423. clientData[clientConnection.RemoteAddr()] = clientKeys
  424. m.Unlock()
  425. //accept dpfQuery from client
  426. dpfLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
  427. if errorBool {
  428. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  429. if contBool {
  430. continue
  431. } else {
  432. return
  433. }
  434. }
  435. dpfLength := byteToInt(dpfLengthBytes)
  436. dpfQueryAEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
  437. if errorBool {
  438. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  439. if contBool {
  440. continue
  441. } else {
  442. return
  443. }
  444. }
  445. dpfQueryBEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
  446. if errorBool {
  447. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  448. if contBool {
  449. continue
  450. } else {
  451. return
  452. }
  453. }
  454. writeToWError(followerConnection, dpfLengthBytes, nil, 0)
  455. writeToWError(followerConnection, dpfQueryBEncrypted, nil, 0)
  456. //decrypt dpfQueryA for sorting into db
  457. var decryptNonce [24]byte
  458. copy(decryptNonce[:], dpfQueryAEncrypted[:24])
  459. dpfQueryA, ok := box.Open(nil, dpfQueryAEncrypted[24:], &decryptNonce, clientPublicKey, leaderPrivateKey)
  460. if !ok {
  461. panic("dpfQueryA decryption not ok")
  462. }
  463. ds := int(C.db[0].dataSize)
  464. dataShareLeader := make([]byte, ds)
  465. pos := C.getUint128_t(C.int(virtualAddresses[int(dbWriteSize)]))
  466. C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShareLeader[0]))
  467. dataShareFollower, _ := readFrom(followerConnection, ds, nil, 0)
  468. writeToWError(followerConnection, dataShareLeader, nil, 0)
  469. auditXOR := make([]byte, ds)
  470. passedAudit := true
  471. for i := 0; i < ds; i++ {
  472. auditXOR[i] = dataShareLeader[i] ^ dataShareFollower[i]
  473. //client tried to write to a position that is not a virtuallAddress
  474. if auditXOR[i] != 0 {
  475. clientConnection.Close()
  476. passedAudit = false
  477. }
  478. }
  479. if passedAudit {
  480. //run dpf, xor into local db
  481. for i := 0; i < dbSize; i++ {
  482. ds := int(C.db[i].dataSize)
  483. dataShare := make([]byte, ds)
  484. pos := C.getUint128_t(C.int(virtualAddresses[i]))
  485. C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShare[0]))
  486. for j := 0; j < ds; j++ {
  487. db[i][j] = db[i][j] ^ dataShare[j]
  488. }
  489. }
  490. //xor the worker's DB into the main DB
  491. for i := 0; i < dbSize; i++ {
  492. m.Lock()
  493. C.xorIn(C.int(i), (*C.uchar)(&db[i][0]))
  494. m.Unlock()
  495. }
  496. phase3Channel <- clientConnection
  497. }
  498. //loop that waits for new client or leaves phase1 if time is up
  499. //todo! remove && len(phase1Channel) > 0
  500. for {
  501. if time.Since(startTimeRound) < maxTimePerRound && len(phase1Channel) > 0 {
  502. //this worker handles the next client
  503. if len(phase1Channel) > 0 {
  504. break
  505. //this worker waits for next client
  506. } else {
  507. time.Sleep(1 * time.Second)
  508. }
  509. //times up
  510. } else {
  511. //tells follower that this worker is done
  512. gotClient[0] = 0
  513. writeTo(followerConnection, gotClient)
  514. wg.Done()
  515. return
  516. }
  517. }
  518. }
  519. }
  520. func phase2(followerConnection net.Conn) {
  521. //gets current seed
  522. seedLeader := make([]byte, 16)
  523. C.readSeed((*C.uchar)(&seedLeader[0]))
  524. //get data
  525. dbSize := int(C.dbSize)
  526. tmpdbLeader := make([][]byte, dbSize)
  527. for i := range tmpdbLeader {
  528. tmpdbLeader[i] = make([]byte, dataLength)
  529. }
  530. for i := 0; i < dbSize; i++ {
  531. C.readData(C.int(i), (*C.uchar)(&tmpdbLeader[i][0]))
  532. }
  533. //writes seed to follower
  534. writeTo(followerConnection, seedLeader)
  535. //write data to follower
  536. //this is surely inefficent
  537. for i := 0; i < dbSize; i++ {
  538. writeTo(followerConnection, tmpdbLeader[i])
  539. }
  540. //receive seed from follower
  541. seedFollower, _ := readFrom(followerConnection, 16, nil, 0)
  542. //receive data from follower
  543. tmpdbFollower := make([][]byte, dbSize)
  544. for i := range tmpdbFollower {
  545. tmpdbFollower[i] = make([]byte, dataLength)
  546. }
  547. for i := 0; i < dbSize; i++ {
  548. tmpdbFollower[i], _ = readFrom(followerConnection, dataLength, nil, 0)
  549. }
  550. //put together the db
  551. tmpdb := make([][]byte, dbSize)
  552. for i := range tmpdb {
  553. tmpdb[i] = make([]byte, dataLength)
  554. }
  555. //get own Ciphers
  556. ciphersLeader := make([]*C.uchar, dbSize)
  557. for i := 0; i < dbSize; i++ {
  558. ciphersLeader[i] = (*C.uchar)(C.malloc(16))
  559. }
  560. for i := 0; i < dbSize; i++ {
  561. C.getCipher(1, C.int(i), ciphersLeader[i])
  562. }
  563. //send own Ciphers to follower
  564. for i := 0; i < dbSize; i++ {
  565. writeTo(followerConnection, C.GoBytes(unsafe.Pointer(ciphersLeader[i]), 16))
  566. }
  567. //receive ciphers from follower
  568. ciphersFollower := make([]byte, dbSize*16)
  569. for i := 0; i < dbSize; i++ {
  570. _, err := followerConnection.Read(ciphersFollower[i*16:])
  571. if err != nil {
  572. panic(err)
  573. }
  574. }
  575. //put in ciphers from follower
  576. for i := 0; i < dbSize; i++ {
  577. C.putCipher(1, C.int(i), (*C.uchar)(&ciphersFollower[i*16]))
  578. }
  579. //decrypt each row
  580. for i := 0; i < dbSize; i++ {
  581. C.decryptRow(C.int(i), (*C.uchar)(&tmpdb[i][0]), (*C.uchar)(&tmpdbLeader[i][0]), (*C.uchar)(&tmpdbFollower[i][0]), (*C.uchar)(&seedLeader[0]), (*C.uchar)(&seedFollower[0]))
  582. }
  583. var tweets []lib.Tweet
  584. var currentPublisherAmount float64
  585. var collisions float64
  586. for i := 0; i < dbSize; i++ {
  587. //discard cover message
  588. if tmpdb[i][1] == 0 {
  589. continue
  590. //collision
  591. } else if -1 == strings.Index(string(tmpdb[i]), ";;") {
  592. currentPublisherAmount++
  593. currentPublisherAmount++
  594. collisions++
  595. continue
  596. } else {
  597. currentPublisherAmount++
  598. //reconstruct tweet
  599. var position int = 0
  600. var topics []string
  601. var topic string
  602. var text string
  603. for _, letter := range tmpdb[i] {
  604. if string(letter) == ";" {
  605. if topic != "" {
  606. topics = append(topics, topic)
  607. topic = ""
  608. }
  609. position++
  610. } else {
  611. if position == 0 {
  612. if string(letter) == "," {
  613. topics = append(topics, topic)
  614. topic = ""
  615. } else {
  616. //if topics are
  617. //int
  618. //topic = topic + fmt.Sprint(int(letter))
  619. //string
  620. topic = topic + string(letter)
  621. }
  622. } else if position == 1 {
  623. text = text + string(letter)
  624. }
  625. }
  626. }
  627. tweet := lib.Tweet{"", -1, topics, text, round}
  628. if text != "" {
  629. tweets = append(tweets, tweet)
  630. } else {
  631. //this is a odd(number) way collisions
  632. collisions++
  633. }
  634. }
  635. }
  636. collisionCounter = append(collisionCounter, collisions)
  637. log.Println("Collision percentage this round", collisions/dbWriteSize, "dbWriteSize", dbWriteSize)
  638. lib.NewEntries(tweets, 0)
  639. C.resetDb()
  640. //calculates the publisherAverage over the last publisherRounds rounds
  641. index := round % publisherRounds
  642. publisherHistory[index] = int(currentPublisherAmount)
  643. log.Println("currentPublisherAmount", currentPublisherAmount, "publisher percentage", currentPublisherAmount/clientsConnected)
  644. var publisherAmount int
  645. for _, num := range publisherHistory {
  646. publisherAmount += num
  647. }
  648. publisherAverage := 0
  649. if round < publisherRounds {
  650. publisherAverage = publisherAmount / round
  651. } else {
  652. publisherAverage = publisherAmount / publisherRounds
  653. }
  654. //calculates the dbWriteSize for this round
  655. dbWriteSize = math.Ceil(dbSizeFactor * float64(publisherAverage))
  656. if dbWriteSize < minDBWriteSize {
  657. dbWriteSize = minDBWriteSize
  658. }
  659. //writes dbWriteSize of current round to follower
  660. writeTo(followerConnection, intToByte(int(dbWriteSize)))
  661. lib.CleanUpdbR(round)
  662. }
  663. //opti! mb it is quicker to send updated topicLists to clients first so pirQuerys are ready
  664. func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex) {
  665. gotClient := make([]byte, 1)
  666. gotClient[0] = 0
  667. //wait until time is up
  668. for len(phase3Channel) == 0 {
  669. if time.Since(startTimeRound) > maxTimePerRound*2 {
  670. //tells follower that this worker is done
  671. writeToWError(followerConnection, gotClient, nil, 0)
  672. wg.Done()
  673. return
  674. }
  675. time.Sleep(1 * time.Second)
  676. }
  677. for clientConnection := range phase3Channel {
  678. clientsServedPhase3[round] = clientsServedPhase3[round] + 1
  679. gotClient[0] = 1
  680. //tells follower that this worker got a clientConnection
  681. writeToWError(followerConnection, gotClient, nil, 0)
  682. //tells client current phase
  683. errorBool := writeToWError(clientConnection, phase, followerConnection, 2)
  684. if errorBool {
  685. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  686. if contBool {
  687. continue
  688. } else {
  689. return
  690. }
  691. }
  692. /*
  693. possible Values
  694. 0 : new client
  695. leader expects sharedSecrets, expects pirQuery
  696. 1 : update needed
  697. leader sends topicList, performs local update of sharedSecret, expects pirQuery
  698. 2 : no update needed
  699. nothing
  700. */
  701. subPhase := make([]byte, 1)
  702. //gets the data for the current client
  703. m.RLock()
  704. var clientKeys = clientData[clientConnection.RemoteAddr()]
  705. m.RUnlock()
  706. var roundsParticipating = clientKeys.roundsParticipating
  707. //client participates for the first time
  708. if roundsParticipating == 0 {
  709. subPhase[0] = 0
  710. } else if roundsParticipating%roundsBeforeUpdate == 0 {
  711. subPhase[0] = 1
  712. } else {
  713. subPhase[0] = 2
  714. }
  715. //tells client what leader expects
  716. errorBool = writeToWError(clientConnection, subPhase, followerConnection, 2)
  717. if errorBool {
  718. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  719. if contBool {
  720. continue
  721. } else {
  722. return
  723. }
  724. }
  725. //tells follower what will happen
  726. writeToWError(followerConnection, subPhase, nil, 0)
  727. //sends clients publicKey so follower knows which client is being served
  728. writeTo(followerConnection, clientKeys.PublicKey[:])
  729. //increases rounds participating for client
  730. clientKeys.roundsParticipating = roundsParticipating + 1
  731. //declaring variables here to prevent dupclicates later
  732. m.RLock()
  733. var sharedSecret [32]byte = clientData[clientConnection.RemoteAddr()].SharedSecret
  734. m.RUnlock()
  735. if subPhase[0] == 0 {
  736. errorBool := sendTopicLists(clientConnection, followerConnection, false)
  737. if errorBool {
  738. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  739. if contBool {
  740. continue
  741. } else {
  742. return
  743. }
  744. }
  745. clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
  746. if errorBool {
  747. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  748. if contBool {
  749. continue
  750. } else {
  751. return
  752. }
  753. }
  754. } else if subPhase[0] == 1 {
  755. errorBool := sendTopicLists(clientConnection, followerConnection, false)
  756. if errorBool {
  757. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  758. if contBool {
  759. continue
  760. } else {
  761. return
  762. }
  763. }
  764. //updates sharedSecret
  765. sharedSecret = sha256.Sum256(sharedSecret[:])
  766. clientKeys.SharedSecret = sharedSecret
  767. clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
  768. if errorBool {
  769. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  770. if contBool {
  771. continue
  772. } else {
  773. return
  774. }
  775. }
  776. }
  777. errorBool = getSendTweets(clientKeys, nil, clientConnection, followerConnection)
  778. if errorBool {
  779. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  780. if contBool {
  781. continue
  782. } else {
  783. return
  784. }
  785. }
  786. wantsArchive, errorBool := readFrom(clientConnection, 1, followerConnection, 2)
  787. if errorBool {
  788. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  789. if contBool {
  790. continue
  791. } else {
  792. return
  793. }
  794. }
  795. writeToWError(followerConnection, wantsArchive, nil, 0)
  796. if wantsArchive[0] == 1 && archiveTopicAmount > 0 {
  797. _, archiveQuerys, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, -1, false)
  798. if errorBool {
  799. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  800. if contBool {
  801. continue
  802. } else {
  803. return
  804. }
  805. }
  806. errorBool = getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection)
  807. if errorBool {
  808. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  809. if contBool {
  810. continue
  811. } else {
  812. return
  813. }
  814. }
  815. }
  816. //saves all changes for client
  817. m.Lock()
  818. clientData[clientConnection.RemoteAddr()] = clientKeys
  819. m.Unlock()
  820. phase1Channel <- clientConnection
  821. //todo! remove && len(phase3Channel) > 0
  822. for {
  823. if time.Since(startTimeRound) < 2*maxTimePerRound && len(phase3Channel) > 0 {
  824. //this worker handles the next client
  825. if len(phase3Channel) > 0 {
  826. break
  827. //this worker waits for next client
  828. } else {
  829. fmt.Print("sleepi")
  830. time.Sleep(1 * time.Second)
  831. }
  832. //times up
  833. } else {
  834. //tells follower that this worker is done
  835. gotClient[0] = 0
  836. writeToWError(followerConnection, gotClient, nil, 0)
  837. wg.Done()
  838. return
  839. }
  840. }
  841. }
  842. }
  843. //returns true if there is another client
  844. func handleClientDC(wg *sync.WaitGroup, followerConnection net.Conn, channel chan net.Conn) bool {
  845. //loop that waits for new client or leaves phase1 if time is up
  846. for {
  847. if time.Since(startTimeRound) < maxTimePerRound {
  848. //this worker handles the next client
  849. if len(channel) > 0 {
  850. return true
  851. //this worker waits for next client
  852. } else {
  853. fmt.Print("sleepydc")
  854. time.Sleep(1 * time.Second)
  855. }
  856. //times up
  857. } else {
  858. //tells follower that this worker is done
  859. gotClient := make([]byte, 1)
  860. gotClient[0] = 0
  861. writeTo(followerConnection, gotClient)
  862. wg.Done()
  863. return false
  864. }
  865. }
  866. }
  867. func createVirtualAddresses() []int {
  868. //array will be filled with unique random ascending values
  869. //adapted from: https://stackoverflow.com/questions/20039025/java-array-of-unique-randomly-generated-integers
  870. //+extraPositions to have a position to evaluate each received message
  871. arraySize := int(dbWriteSize) + extraPositions
  872. var maxInt int = int(math.Pow(2, 31))
  873. virtualAddresses := make([]int, arraySize)
  874. for i := 0; i < arraySize; i++ {
  875. virtualAddresses[i] = mr.Intn(maxInt)
  876. for j := 0; j < i; j++ {
  877. if virtualAddresses[i] == virtualAddresses[j] {
  878. i--
  879. break
  880. }
  881. }
  882. }
  883. sort.Ints(virtualAddresses)
  884. return virtualAddresses
  885. }
  886. func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret [32]byte, clientConnection, followerConnection net.Conn) bool {
  887. //xores all requested addresses into virtuallAddress
  888. virtualAddress := make([]byte, 4)
  889. for index, num := range pirQuery {
  890. if num == 1 {
  891. currentAddress := intToByte(virtualAddresses[index])
  892. for i := 0; i < 4; i++ {
  893. virtualAddress[i] = virtualAddress[i] ^ currentAddress[i]
  894. }
  895. }
  896. }
  897. //xores the sharedSecret
  898. for i := 0; i < 4; i++ {
  899. virtualAddress[i] = virtualAddress[i] ^ sharedSecret[i]
  900. }
  901. virtualAddressFollower, _ := readFrom(followerConnection, 4, nil, 0)
  902. //xores the data from follower
  903. for i := 0; i < 4; i++ {
  904. virtualAddress[i] = virtualAddress[i] ^ virtualAddressFollower[i]
  905. }
  906. errorBool := writeToWError(clientConnection, virtualAddress, followerConnection, 5)
  907. return errorBool
  908. }
  909. func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnection, followerConnection net.Conn) bool {
  910. tmpNeededSubscriptions := neededSubscriptions
  911. if tmpNeededSubscriptions > topicAmount {
  912. tmpNeededSubscriptions = topicAmount
  913. }
  914. if archiveQuerys != nil {
  915. tmpNeededSubscriptions = len(archiveQuerys)
  916. if tmpNeededSubscriptions > archiveTopicAmount {
  917. tmpNeededSubscriptions = archiveTopicAmount
  918. }
  919. }
  920. tweets := make([][]byte, tmpNeededSubscriptions)
  921. for i := 0; i < tmpNeededSubscriptions; i++ {
  922. //gets all requested tweets
  923. if archiveQuerys == nil {
  924. tweets[i] = lib.GetTweets(clientKeys.PirQuery[i], dataLength, 0, *clientKeys.PublicKey)
  925. } else {
  926. tweets[i] = lib.GetTweets(archiveQuerys[i], dataLength, 1, *clientKeys.PublicKey)
  927. }
  928. //expand sharedSecret so it is of right length
  929. expandBy := len(tweets[i]) / 32
  930. var expandedSharedSecret []byte
  931. for i := 0; i < expandBy; i++ {
  932. expandedSharedSecret = append(expandedSharedSecret, clientKeys.SharedSecret[:]...)
  933. }
  934. //Xor's sharedSecret with all tweets
  935. lib.Xor(expandedSharedSecret[:], tweets[i])
  936. blockLength := len(tweets[i])
  937. receivedTweets, _ := readFrom(followerConnection, blockLength, nil, 0)
  938. lib.Xor(receivedTweets, tweets[i])
  939. }
  940. //sends tweets to client
  941. for i := 0; i < tmpNeededSubscriptions; i++ {
  942. tweetsLengthBytes := intToByte(len(tweets[i]))
  943. if firstTweetSend && archiveQuerys == nil {
  944. firstTweetSend = false
  945. log.Println("sending", len(tweets[0]), "bytes of data")
  946. }
  947. errorBool := writeToWError(clientConnection, tweetsLengthBytes, followerConnection, 2)
  948. if errorBool {
  949. return true
  950. }
  951. errorBool = writeToWError(clientConnection, tweets[i], followerConnection, 2)
  952. if errorBool {
  953. return true
  954. }
  955. }
  956. return false
  957. }
  958. func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerConnection net.Conn, subPhase int, doAuditing bool) (clientKeys, [][]byte, bool) {
  959. clientPublicKey := clientKeys.PublicKey
  960. //gets the msg length
  961. msgLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
  962. if errorBool {
  963. return clientKeys, nil, true
  964. }
  965. msgLength := byteToInt(msgLengthBytes)
  966. //gets the leader box
  967. leaderBox, errorBool := readFrom(clientConnection, msgLength, followerConnection, 5)
  968. if errorBool {
  969. return clientKeys, nil, true
  970. }
  971. //gets the follower box
  972. followerBox, errorBool := readFrom(clientConnection, msgLength, followerConnection, 5)
  973. if errorBool {
  974. return clientKeys, nil, true
  975. }
  976. tmpNeededSubscriptions := neededSubscriptions
  977. if tmpNeededSubscriptions > topicAmount {
  978. tmpNeededSubscriptions = topicAmount
  979. }
  980. tmpTopicAmount := topicAmount
  981. if subPhase == -1 {
  982. archiveNeededSubscriptions, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
  983. if errorBool {
  984. return clientKeys, nil, true
  985. }
  986. writeToWError(followerConnection, archiveNeededSubscriptions, nil, 0)
  987. tmpNeededSubscriptions = byteToInt(archiveNeededSubscriptions)
  988. tmpTopicAmount = archiveTopicAmount
  989. if tmpNeededSubscriptions > archiveTopicAmount {
  990. tmpNeededSubscriptions = archiveTopicAmount
  991. }
  992. }
  993. if doAuditing {
  994. tmpNeededSubscriptions = 1
  995. tmpTopicAmount = int(dbWriteSize)
  996. }
  997. //send length to follower
  998. writeToWError(followerConnection, msgLengthBytes, nil, 0)
  999. //send box to follower
  1000. writeToWError(followerConnection, followerBox, nil, 0)
  1001. var decryptNonce [24]byte
  1002. copy(decryptNonce[:], leaderBox[:24])
  1003. decrypted, ok := box.Open(nil, leaderBox[24:], &decryptNonce, clientPublicKey, leaderPrivateKey)
  1004. if !ok {
  1005. fmt.Println("pirQuery decryption not ok")
  1006. return clientKeys, nil, true
  1007. }
  1008. //if sharedSecret is send
  1009. if subPhase == 0 {
  1010. var tmpSharedSecret [32]byte
  1011. for index := 0; index < 32; index++ {
  1012. tmpSharedSecret[index] = decrypted[index]
  1013. }
  1014. clientKeys.SharedSecret = tmpSharedSecret
  1015. decrypted = decrypted[32:]
  1016. }
  1017. if doAuditing {
  1018. result := make([][]byte, 1)
  1019. result[0] = decrypted
  1020. return clientKeys, result, false
  1021. }
  1022. if subPhase == -1 {
  1023. }
  1024. //transforms byteArray to ints of wanted topics
  1025. pirQueryFlattened := decrypted
  1026. pirQuerys := make([][]byte, tmpNeededSubscriptions)
  1027. for i := range pirQuerys {
  1028. pirQuerys[i] = make([]byte, tmpTopicAmount)
  1029. }
  1030. for i := 0; i < tmpNeededSubscriptions; i++ {
  1031. pirQuerys[i] = pirQueryFlattened[i*tmpTopicAmount : (i+1)*tmpTopicAmount]
  1032. }
  1033. //sets the pirQuery for the client in case whe are not archiving, and not Auditing
  1034. if subPhase != -1 {
  1035. clientKeys.PirQuery = pirQuerys
  1036. }
  1037. return clientKeys, pirQuerys, false
  1038. }
  1039. func transformBytesToStringArray(topicsAsBytes []byte) []string {
  1040. var topics []string
  1041. var topic string
  1042. var position int = 0
  1043. for _, letter := range topicsAsBytes {
  1044. if string(letter) == "," {
  1045. topics[position] = topic
  1046. topic = ""
  1047. position++
  1048. } else {
  1049. topic = topic + string(letter)
  1050. }
  1051. }
  1052. return topics
  1053. }
  1054. func byteToInt(myBytes []byte) (x int) {
  1055. x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0])
  1056. return
  1057. }
  1058. //returns true if error occured
  1059. func sendTopicLists(clientConnection, followerConnection net.Conn, setup bool) bool {
  1060. for i := 0; i < 2; i++ {
  1061. var topicList []byte
  1062. if i == 0 {
  1063. topicList, topicAmount = lib.GetTopicList(i)
  1064. } else {
  1065. topicList, archiveTopicAmount = lib.GetTopicList(i)
  1066. }
  1067. topicListLengthBytes := intToByte(len(topicList))
  1068. if !setup {
  1069. err := writeToWError(clientConnection, topicListLengthBytes, followerConnection, 5)
  1070. if err {
  1071. return true
  1072. }
  1073. err = writeToWError(clientConnection, topicList, followerConnection, 5)
  1074. if err {
  1075. return true
  1076. }
  1077. } else {
  1078. _, err := clientConnection.Write(topicListLengthBytes)
  1079. if err != nil {
  1080. return true
  1081. }
  1082. _, err = clientConnection.Write(topicList)
  1083. if err != nil {
  1084. return true
  1085. }
  1086. }
  1087. }
  1088. return false
  1089. }
  1090. //sends the array to the connection
  1091. func writeTo(connection net.Conn, array []byte) {
  1092. remainingLength := len(array)
  1093. for remainingLength > 0 {
  1094. if remainingLength >= mtu {
  1095. _, err := connection.Write(array[:mtu])
  1096. if err != nil {
  1097. panic(err)
  1098. }
  1099. array = array[mtu:]
  1100. remainingLength -= mtu
  1101. } else {
  1102. _, err := connection.Write(array)
  1103. if err != nil {
  1104. panic(err)
  1105. }
  1106. remainingLength = 0
  1107. }
  1108. }
  1109. }
  1110. func writeToWError(connection net.Conn, array []byte, followerConnection net.Conn, size int) bool {
  1111. if connection.RemoteAddr().String() == follower {
  1112. arrayWError := make([]byte, 1)
  1113. arrayWError = append(arrayWError, array[:]...)
  1114. remainingLength := len(arrayWError)
  1115. for remainingLength > 0 {
  1116. if remainingLength >= mtu {
  1117. _, err := connection.Write(arrayWError[:mtu])
  1118. if err != nil {
  1119. return handleError(connection, followerConnection, size, err)
  1120. }
  1121. arrayWError = arrayWError[mtu:]
  1122. remainingLength -= mtu
  1123. } else {
  1124. _, err := connection.Write(arrayWError)
  1125. if err != nil {
  1126. return handleError(connection, followerConnection, size, err)
  1127. }
  1128. remainingLength = 0
  1129. }
  1130. }
  1131. } else {
  1132. remainingLength := len(array)
  1133. for remainingLength > 0 {
  1134. if remainingLength >= mtu {
  1135. _, err := connection.Write(array[:mtu])
  1136. if err != nil {
  1137. return handleError(connection, followerConnection, size, err)
  1138. }
  1139. array = array[mtu:]
  1140. remainingLength -= mtu
  1141. } else {
  1142. _, err := connection.Write(array)
  1143. if err != nil {
  1144. return handleError(connection, followerConnection, size, err)
  1145. }
  1146. remainingLength = 0
  1147. }
  1148. }
  1149. }
  1150. return false
  1151. }
  1152. func handleError(connection, followerConnection net.Conn, size int, err error) bool {
  1153. if err != nil {
  1154. //lets follower know that client has disconnected unexpectedly
  1155. if connection.RemoteAddr().String() != follower {
  1156. if size > mtu {
  1157. fmt.Println("have a look here")
  1158. }
  1159. fmt.Println("handleError", err)
  1160. array := make([]byte, size)
  1161. array[0] = 1
  1162. _, err = followerConnection.Write(array)
  1163. if err != nil {
  1164. panic(err)
  1165. }
  1166. return true
  1167. } else {
  1168. panic(err)
  1169. }
  1170. }
  1171. return false
  1172. }
  1173. //reads an array which is returned and of size "size" from the connection
  1174. //returns true if error occured
  1175. func readFrom(connection net.Conn, size int, followerConnection net.Conn, sizeError int) ([]byte, bool) {
  1176. var array []byte
  1177. remainingSize := size
  1178. for remainingSize > 0 {
  1179. var err error
  1180. toAppend := make([]byte, mtu)
  1181. if remainingSize > mtu {
  1182. _, err = connection.Read(toAppend)
  1183. array = append(array, toAppend...)
  1184. remainingSize -= mtu
  1185. } else {
  1186. _, err = connection.Read(toAppend[:remainingSize])
  1187. array = append(array, toAppend[:remainingSize]...)
  1188. remainingSize = 0
  1189. }
  1190. if err != nil {
  1191. //lets follower know that client has disconnected unexpectedly
  1192. if connection.RemoteAddr().String() != follower {
  1193. fmt.Println(err)
  1194. array := make([]byte, sizeError)
  1195. array[0] = 1
  1196. _, err = followerConnection.Write(array)
  1197. if err != nil {
  1198. panic(err)
  1199. }
  1200. return nil, true
  1201. } else {
  1202. panic(err)
  1203. }
  1204. }
  1205. }
  1206. return array, false
  1207. }
  1208. func intToByte(myInt int) (retBytes []byte) {
  1209. retBytes = make([]byte, 4)
  1210. retBytes[3] = byte((myInt >> 24) & 0xff)
  1211. retBytes[2] = byte((myInt >> 16) & 0xff)
  1212. retBytes[1] = byte((myInt >> 8) & 0xff)
  1213. retBytes[0] = byte(myInt & 0xff)
  1214. return
  1215. }