Simon 2 years ago
parent
commit
bffdd23bd2
3 changed files with 78 additions and 188 deletions
  1. 7 2
      client/client.go
  2. 70 185
      leader/leader.go
  3. 1 1
      lib/databaseRead.go

+ 7 - 2
client/client.go

@@ -23,6 +23,7 @@ import (
 	mr "math/rand"
 	"net"
 	"sort"
+	"strings"
 	"sync"
 	"time"
 	"unsafe"
@@ -70,7 +71,7 @@ func main() {
 	for i := 0; i < numClients; i++ {
 		var tweet []byte
 		if i == 0 {
-			topics := []byte("house; mouse")
+			topics := []byte("house, mouse;")
 			text := []byte("I am a house in a mouse;")
 			tweet = append(tweet, topics...)
 			tweet = append(tweet, text...)
@@ -489,7 +490,11 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive boo
 		}
 
 		//tweets can be displayed
-		fmt.Println("final result: ", string(tweets))
+		split := strings.Split(string(tweets), ";")
+		topic := split[0]
+		text := split[1]
+		fmt.Println("Topics: ", topic)
+		fmt.Println("Text: ", text)
 	}
 	return 0
 }

+ 70 - 185
leader/leader.go

@@ -133,10 +133,7 @@ func main() {
 	followerPublicKey = &tmpFollowerPubKey
 
 	//send publicKey to follower
-	_, err = followerConnection.Write(leaderPublicKey[:])
-	if err != nil {
-		panic(err)
-	}
+	writeToConn(followerConnection, leaderPublicKey[:])
 
 	//goroutine for accepting new clients
 	go func() {
@@ -264,10 +261,7 @@ func main() {
 		virtualAddresses := createVirtualAddresses()
 		//send all virtualAddresses to follower
 		for i := 0; i <= dbWriteSize; i++ {
-			_, err = followerConnection.Write(intToByte(virtualAddresses[i]))
-			if err != nil {
-				panic(err)
-			}
+			writeToConn(followerConnection, intToByte(virtualAddresses[i]))
 		}
 
 		for id := 0; id < numThreads; id++ {
@@ -338,10 +332,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 	for len(phase1Channel) == 0 {
 		if time.Since(startTime) > maxTimePerRound {
 			//tells follower that this worker is done
-			_, err := followerConnection.Write(gotClient)
-			if err != nil {
-				panic(err)
-			}
+			writeToConn(followerConnection, gotClient)
 			wg.Done()
 			return
 		}
@@ -352,17 +343,11 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		gotClient[0] = 1
 		//tells follower that this worker got a clientConnection
-		_, err := followerConnection.Write(gotClient)
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(followerConnection, gotClient)
 
 		//sends clients publicKey to follower
 		clientPublicKey := clientData[clientConnection.RemoteAddr()].PublicKey
-		_, err = followerConnection.Write(clientPublicKey[:])
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(followerConnection, clientPublicKey[:])
 
 		//setup the worker-specific db
 		dbSize := int(C.dbSize)
@@ -372,22 +357,13 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		}
 
 		//tells client that phase 1 has begun
-		_, err = clientConnection.Write(phase)
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(clientConnection, phase)
 
 		//tells client current dbWriteSize
-		_, err = clientConnection.Write(intToByte(dbWriteSize))
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(clientConnection, intToByte(dbWriteSize))
 
 		//tells client current round
-		_, err = clientConnection.Write(roundAsBytes)
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(clientConnection, roundAsBytes)
 
 		var clientKeys = clientData[clientConnection.RemoteAddr()]
 		clientKeys, pirQuery := handlePirQuery(clientKeys, clientConnection, followerConnection, 0, true)
@@ -397,34 +373,20 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		//accept dpfQuery from client
 		dpfLengthBytes := make([]byte, 4)
-		_, err = clientConnection.Read(dpfLengthBytes)
-		if err != nil {
-			panic(err)
-		}
+		dpfLengthBytes = readFromConn(clientConnection, 4)
+
 		dpfLength := byteToInt(dpfLengthBytes)
 
 		dpfQueryAEncrypted := make([]byte, dpfLength)
 		dpfQueryBEncrypted := make([]byte, dpfLength)
 
-		_, err = clientConnection.Read(dpfQueryAEncrypted)
-		if err != nil {
-			panic(err)
-		}
+		dpfQueryAEncrypted = readFromConn(clientConnection, dpfLength)
 
-		_, err = clientConnection.Read(dpfQueryBEncrypted)
-		if err != nil {
-			panic(err)
-		}
+		dpfQueryBEncrypted = readFromConn(clientConnection, dpfLength)
 
-		_, err = followerConnection.Write(dpfLengthBytes)
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(followerConnection, dpfLengthBytes)
 
-		_, err = followerConnection.Write(dpfQueryBEncrypted)
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(followerConnection, dpfQueryBEncrypted)
 
 		//decrypt dpfQueryA for sorting into db
 		var decryptNonce [24]byte
@@ -441,15 +403,9 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		dataShareFollower := make([]byte, ds)
 
-		_, err = followerConnection.Read(dataShareFollower)
-		if err != nil {
-			panic(err)
-		}
+		dataShareFollower = readFromConn(followerConnection, ds)
 
-		_, err = followerConnection.Write(dataShareLeader)
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(followerConnection, dataShareLeader)
 
 		auditXOR := make([]byte, ds)
 		passedAudit := true
@@ -498,10 +454,9 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			} else {
 				//tells follower that this worker is done
 				gotClient[0] = 0
-				_, err := followerConnection.Write(gotClient)
-				if err != nil {
-					panic(err)
-				}
+
+				writeToConn(followerConnection, gotClient)
+
 				wg.Done()
 				return
 			}
@@ -526,26 +481,17 @@ func phase2(followerConnection net.Conn) {
 	}
 
 	//writes seed to follower
-	_, err := followerConnection.Write(seedLeader)
-	if err != nil {
-		panic(err)
-	}
+	writeToConn(followerConnection, seedLeader)
 
 	//write data to follower
 	//this is surely inefficent
 	for i := 0; i < dbSize; i++ {
-		_, err = followerConnection.Write(tmpdbLeader[i])
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(followerConnection, tmpdbLeader[i])
 	}
 
 	//receive seed from follower
 	seedFollower := make([]byte, 16)
-	_, err = followerConnection.Read(seedFollower)
-	if err != nil {
-		panic(err)
-	}
+	seedFollower = readFromConn(followerConnection, 16)
 
 	//receive data from follower
 	tmpdbFollower := make([][]byte, dbSize)
@@ -553,10 +499,7 @@ func phase2(followerConnection net.Conn) {
 		tmpdbFollower[i] = make([]byte, dataLength)
 	}
 	for i := 0; i < dbSize; i++ {
-		_, err = followerConnection.Read(tmpdbFollower[i])
-		if err != nil {
-			panic(err)
-		}
+		tmpdbFollower[i] = readFromConn(followerConnection, dataLength)
 	}
 
 	//put together the db
@@ -576,16 +519,13 @@ func phase2(followerConnection net.Conn) {
 
 	//send own Ciphers to follower
 	for i := 0; i < dbSize; i++ {
-		_, err = followerConnection.Write(C.GoBytes(unsafe.Pointer(ciphersLeader[i]), 16))
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(followerConnection, C.GoBytes(unsafe.Pointer(ciphersLeader[i]), 16))
 	}
 
 	//receive ciphers from follower
 	ciphersFollower := make([]byte, dbSize*16)
 	for i := 0; i < dbSize; i++ {
-		_, err = followerConnection.Read(ciphersFollower[i*16:])
+		_, err := followerConnection.Read(ciphersFollower[i*16:])
 		if err != nil {
 			panic(err)
 		}
@@ -668,10 +608,7 @@ func phase2(followerConnection net.Conn) {
 	dbWriteSize = int(math.Ceil(19.5 * float64(publisherAverage)))
 
 	//writes dbWriteSize of current round to follower
-	_, err = followerConnection.Write(intToByte(dbWriteSize))
-	if err != nil {
-		panic(err)
-	}
+	writeToConn(followerConnection, intToByte(dbWriteSize))
 }
 
 func addTestTweets() {
@@ -702,10 +639,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 	for len(phase3Channel) == 0 {
 		if time.Since(startTime) > maxTimePerRound {
 			//tells follower that this worker is done
-			_, err := followerConnection.Write(gotClient)
-			if err != nil {
-				panic(err)
-			}
+			writeToConn(followerConnection, gotClient)
 			wg.Done()
 			return
 		}
@@ -716,14 +650,10 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		gotClient[0] = 1
 		//tells follower that this worker got a clientConnection
-		_, err := followerConnection.Write(gotClient)
-		if err != nil {
-			panic(err)
-		}
-		_, err = clientConnection.Write(phase)
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(followerConnection, gotClient)
+
+		//tells client current phase
+		writeToConn(clientConnection, phase)
 
 		/*
 			possible Values
@@ -749,22 +679,13 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		}
 
 		//tells client what leader expects
-		_, err = clientConnection.Write(subPhase)
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(clientConnection, subPhase)
 
 		//tells follower what will happen
-		_, err = followerConnection.Write(subPhase)
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(followerConnection, subPhase)
 
 		//sends clients publicKey so follower knows which client is being served
-		_, err = followerConnection.Write(clientKeys.PublicKey[:])
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(followerConnection, clientKeys.PublicKey[:])
 
 		//increases rounds participating for client
 		clientKeys.roundsParticipating = roundsParticipating + 1
@@ -788,15 +709,9 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		getSendTweets(clientKeys, nil, clientConnection, followerConnection)
 
 		wantsArchive := make([]byte, 1)
-		_, err = clientConnection.Read(wantsArchive)
-		if err != nil {
-			panic(err)
-		}
+		wantsArchive = readFromConn(clientConnection, 1)
 
-		followerConnection.Write(wantsArchive)
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(followerConnection, wantsArchive)
 
 		if wantsArchive[0] == 1 && archiveTopicAmount > 0 {
 			_, archiveQuerys := handlePirQuery(clientKeys, clientConnection, followerConnection, -1, false)
@@ -821,10 +736,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			} else {
 				//tells follower that this worker is done
 				gotClient[0] = 0
-				_, err := followerConnection.Write(gotClient)
-				if err != nil {
-					panic(err)
-				}
+				writeToConn(followerConnection, gotClient)
 				wg.Done()
 				return
 			}
@@ -872,20 +784,14 @@ func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret
 	}
 
 	virtualAddressFollower := make([]byte, 4)
-	_, err := followerConnection.Read(virtualAddressFollower)
-	if err != nil {
-		panic(err)
-	}
+	virtualAddressFollower = readFromConn(followerConnection, 4)
 
 	//xores the data from follower
 	for i := 0; i < 4; i++ {
 		virtualAddress[i] = virtualAddress[i] ^ virtualAddressFollower[i]
 	}
 
-	_, err = clientConnection.Write(virtualAddress)
-	if err != nil {
-		panic(err)
-	}
+	writeToConn(clientConnection, virtualAddress)
 }
 
 func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnection, followerConnection net.Conn) {
@@ -915,30 +821,19 @@ func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnecti
 		//receives tweets from follower and Xor's them in
 		tweetsLengthBytes := make([]byte, 4)
 
-		_, err := followerConnection.Read(tweetsLengthBytes)
-		if err != nil {
-			panic(err)
-		}
+		tweetsLengthBytes = readFromConn(followerConnection, 4)
+
 		tweetsReceivedLength := byteToInt(tweetsLengthBytes)
 
 		receivedTweets := make([]byte, tweetsReceivedLength)
-		_, err = followerConnection.Read(receivedTweets)
-		if err != nil {
-			panic(err)
-		}
+		receivedTweets = readFromConn(followerConnection, tweetsReceivedLength)
 
 		lib.Xor(receivedTweets, tweets)
 
 		//sends tweets to client
 		tweetsLengthBytes = intToByte(len(tweets))
-		_, err = clientConnection.Write(tweetsLengthBytes)
-		if err != nil {
-			panic(err)
-		}
-		_, err = clientConnection.Write(tweets)
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(clientConnection, tweetsLengthBytes)
+		writeToConn(clientConnection, tweets)
 	}
 }
 
@@ -948,39 +843,25 @@ func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerCo
 
 	//gets the msg length
 	msgLengthBytes := make([]byte, 4)
-	_, err := clientConnection.Read(msgLengthBytes)
-	if err != nil {
-		panic(err)
-	}
+	msgLengthBytes = readFromConn(clientConnection, 4)
 	msgLength := byteToInt(msgLengthBytes)
 
 	leaderBox := make([]byte, msgLength)
 	followerBox := make([]byte, msgLength)
 
 	//gets the leader box
-	_, err = clientConnection.Read(leaderBox)
-	if err != nil {
-		panic(err)
-	}
+	leaderBox = readFromConn(clientConnection, msgLength)
 
 	//gets the follower box
-	_, err = clientConnection.Read(followerBox)
-	if err != nil {
-		panic(err)
-	}
+	followerBox = readFromConn(clientConnection, msgLength)
 
 	tmpNeededSubscriptions := neededSubscriptions
 	tmpTopicAmount := topicAmount
 	if subPhase == -1 {
 		archiveNeededSubscriptions := make([]byte, 4)
-		_, err = clientConnection.Read(archiveNeededSubscriptions)
-		if err != nil {
-			panic(err)
-		}
-		_, err = followerConnection.Write(archiveNeededSubscriptions)
-		if err != nil {
-			panic(err)
-		}
+		archiveNeededSubscriptions = readFromConn(clientConnection, 4)
+
+		writeToConn(followerConnection, archiveNeededSubscriptions)
 		tmpNeededSubscriptions = byteToInt(archiveNeededSubscriptions)
 		tmpTopicAmount = archiveTopicAmount
 	}
@@ -990,16 +871,10 @@ func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerCo
 	}
 
 	//send length to follower
-	_, err = followerConnection.Write(msgLengthBytes)
-	if err != nil {
-		panic(err)
-	}
+	writeToConn(followerConnection, msgLengthBytes)
 
 	//send box to follower
-	_, err = followerConnection.Write(followerBox)
-	if err != nil {
-		panic(err)
-	}
+	writeToConn(followerConnection, followerBox)
 
 	var decryptNonce [24]byte
 	copy(decryptNonce[:], leaderBox[:24])
@@ -1073,16 +948,26 @@ func sendTopicLists(clientConnection net.Conn) {
 		}
 		topicListLengthBytes := intToByte(len(topicList))
 
-		_, err := clientConnection.Write(topicListLengthBytes)
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(clientConnection, topicListLengthBytes)
 
-		_, err = clientConnection.Write(topicList)
-		if err != nil {
-			panic(err)
-		}
+		writeToConn(clientConnection, topicList)
+	}
+}
+
+func writeToConn(connection net.Conn, array []byte) {
+	_, err := connection.Write(array)
+	if err != nil {
+		panic(err)
+	}
+}
+
+func readFromConn(connection net.Conn, size int) []byte {
+	array := make([]byte, size)
+	_, err := connection.Read(array)
+	if err != nil {
+		panic(err)
 	}
+	return array
 }
 
 func intToByte(myInt int) (retBytes []byte) {

+ 1 - 1
lib/databaseRead.go

@@ -134,7 +134,7 @@ func tweetsToByteArray(tweetsToReturn [][]Tweet) []byte {
 				tweetToAppend = append(tweetToAppend, []byte(topic)...)
 				tweetToAppend = append(tweetToAppend, ","...)
 			}
-			//replaces last with ";", bc there is text following and not another topic
+			//replaces last "," with ";" bc there is text following and not another topic
 			tweetToAppend = tweetToAppend[:len(tweetToAppend)-1]
 			tweetToAppend = append(tweetToAppend, []byte(";")[0])
 			tweetToAppend = append(tweetToAppend, []byte(tweet.Text)...)