Browse Source

ready for eval

Simon 1 year ago
parent
commit
3b60ac2678
4 changed files with 71 additions and 65 deletions
  1. 23 25
      client/client.go
  2. 23 16
      follower/follower.go
  3. 17 12
      leader/leader.go
  4. 8 12
      lib/databaseRead.go

+ 23 - 25
client/client.go

@@ -10,7 +10,6 @@ package main
 */
 import "C"
 
-//sssssssssssss
 import (
 	"2PPS/lib"
 	"bufio"
@@ -42,7 +41,7 @@ type tweet struct {
 const leader string = "127.0.0.1:4441"
 
 //needs to be changed at leader/follower/client at the same time
-const numClients = 5
+const numClients = 500
 
 //mylimit=8000
 //sudo prlimit --nofile=$mylimit --pid $$; ulimit -n $mylimit
@@ -58,13 +57,13 @@ var topicList []string
 var archiveTopicList []string
 var neededSubscriptions int
 var publisherAmount int
+var goodPadding int
 var timeBounds []int
-var speedUp int = 500
+var speedUp int = 1000
 
-var maxTimePerRound time.Duration = 3 * time.Second
+var maxTimePerRound time.Duration = 10 * time.Second
 var startTime int
 
-//todo! expand this for multiple clients
 var archiveInterests = make([]int, 1)
 var sharedSecret [numClients][2][32]byte = createSharedSecret()
 
@@ -151,7 +150,6 @@ func client(clientNumber int) {
 			dbWriteSizeBytes := readFrom(leaderConn, 4)
 			dbWriteSize = byteToInt(dbWriteSizeBytes)
 
-			//todo! put into tweet creation
 			//roundAsBytes := readFrom(leaderConn, 4)
 			roundAsBytes := make([]byte, 4)
 			_, err = leaderConn.Read(roundAsBytes)
@@ -173,6 +171,8 @@ func client(clientNumber int) {
 			tweet := getRealTweet(clientNumber)
 			if clientNumber == numClients-1 {
 				fmt.Println("publisherAmount", publisherAmount)
+				fmt.Println("goodPadding", goodPadding)
+
 			}
 
 			//prep the query
@@ -257,13 +257,14 @@ func client(clientNumber int) {
 			receiveTweets(sharedSecret[clientNumber], leaderConn, false, clientNumber)
 
 			if len(archiveTopicList) > 0 {
-				wantsArchive[0] = 1 //archive test
+				wantsArchive[0] = 1
 			} else {
 				wantsArchive[0] = 0
 			}
 
 			writeTo(leaderConn, wantsArchive)
 
+			//fmt.Println("list", archiveTopicList)
 			if wantsArchive[0] == 1 && len(archiveTopicList) > 0 {
 				encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(-1, clientNumber)
 				sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, true)
@@ -282,8 +283,8 @@ func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
 	//later this will be taken from gui, this is only for testing
 	topicsOfInterest := make([]int, 1)
 	topicsOfInterest[0] = mr.Intn(10)
+	archiveInterests[0] = mr.Intn(10)
 
-	//todo! repeat for archive
 	tmpNeededSubscriptions := neededSubscriptions
 	if tmpNeededSubscriptions > len(topicList) {
 		tmpNeededSubscriptions = len(topicList)
@@ -299,8 +300,8 @@ func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
 		if tmpNeededSubscriptions > len(archiveTopicList) {
 			tmpNeededSubscriptions = len(archiveTopicList)
 		}
-		copy(tmptopicsOfInterest, archiveInterests) //archiveInterests from gui
-		copy(tmpTopicList, archiveTopicList)
+		tmptopicsOfInterest = archiveInterests //!todo take archiveInterests from gui
+		tmpTopicList = archiveTopicList
 	}
 
 	//creates fake topicsOfInterest if client is boooring
@@ -426,6 +427,7 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive boo
 			tmpNeededSubscriptions = len(archiveTopicList)
 		}
 	}
+
 	for i := 0; i < tmpNeededSubscriptions; i++ {
 		//client receives tweets
 		tweetsLengthBytes := readFrom(leaderConn, 4)
@@ -453,28 +455,24 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive boo
 		//fmt.Println("PubKey", clientPublicKey[clientNumber], "Bytes", tweets)
 
 		index := strings.Index(string(tweets), ";;")
-		if getArchive {
-			fmt.Println("gettingArchive")
-		}
-		if index != -1 && clientNumber == 0 {
-			//fmt.Println("Correct")
+
+		if index != -1 {
 
 			textArr := strings.Split(string(tweets), ";;;")
 			text := textArr[:len(textArr)-1]
 
-			fmt.Println("Round", round, text[0], "Length", len(tweets))
-			fmt.Println("padding", text[1])
+			//fmt.Println("Round", round, text[0], "Length", len(tweets))
+
+			if text[1] != "" {
+				text[1] = text[1][1:]
 
-			/* doesnt work
-			goodPadding := strings.Index(text[0], text[1])
-			if text[0] == text[1] || text[1] == "" {
-				goodPadding = -1
 			}
-			if goodPadding != -1 {
-				fmt.Println("Round", round, text[0], "Length", len(tweets))
-				fmt.Println("padding", text[1])
+			ok := strings.Contains(text[0], text[1])
+			if ok {
+				goodPadding++
+				//fmt.Println("Round", round, text[0], "Length", len(tweets))
+				//fmt.Println("padding", text[1])
 			}
-			*/
 
 		} else if index == -1 && tweets[0] != 0 {
 			fmt.Println("error")

+ 23 - 16
follower/follower.go

@@ -8,8 +8,6 @@ package main
 //#include "../c/okv.c"
 import "C"
 
-//sssssssssssssssss
-
 import (
 	"2PPS/lib"
 	"crypto/rand"
@@ -41,6 +39,7 @@ type clientKeys struct {
 var clientData = make(map[[32]byte]clientKeys)
 var topicList []byte
 var topicAmount int
+var archiveTopicAmount int
 
 var followerPrivateKey *[32]byte
 var followerPublicKey *[32]byte
@@ -53,7 +52,7 @@ const numThreads = 12
 //Maximum Transport Unit
 const mtu int = 1100
 
-var dbWriteSize int = 20000
+var dbWriteSize int = 10000
 var neededSubscriptions int
 
 var round int = 0
@@ -512,7 +511,7 @@ func phase3(leaderWorkerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex
 			}
 		}
 
-		getSendTweets(clientKeys, nil, leaderWorkerConnection, m, clientPublicKey)
+		getSendTweets(clientKeys, nil, leaderWorkerConnection, clientPublicKey)
 
 		wantsArchive, errorBool := readFromWError(leaderWorkerConnection, 1)
 		if errorBool {
@@ -520,12 +519,11 @@ func phase3(leaderWorkerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex
 		}
 
 		if wantsArchive[0] == 1 {
-			fmt.Println("shouldnt be called")
 			_, archiveQuerys, errorBool := handlePirQuery(clientKeys, leaderWorkerConnection, -1, clientPublicKey, false)
 			if errorBool {
 				continue
 			}
-			getSendTweets(clientKeys, archiveQuerys, leaderWorkerConnection, m, clientPublicKey)
+			getSendTweets(clientKeys, archiveQuerys, leaderWorkerConnection, clientPublicKey)
 		}
 
 		//saves clientKeys
@@ -536,14 +534,16 @@ func phase3(leaderWorkerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex
 }
 
 //gets tweet from db and sends them to leader
-func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, leaderWorkerConnection net.Conn, m *sync.RWMutex, pubKey [32]byte) {
-	//todo! repeat for archive
+func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, leaderWorkerConnection net.Conn, pubKey [32]byte) {
 	tmpNeededSubscriptions := neededSubscriptions
 	if tmpNeededSubscriptions > topicAmount {
 		tmpNeededSubscriptions = topicAmount
 	}
 	if archiveQuerys != nil {
 		tmpNeededSubscriptions = len(archiveQuerys)
+		if tmpNeededSubscriptions > archiveTopicAmount {
+			tmpNeededSubscriptions = archiveTopicAmount
+		}
 	}
 	for i := 0; i < tmpNeededSubscriptions; i++ {
 		//gets all requested tweets
@@ -563,25 +563,25 @@ func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, leaderWorkerCo
 		//Xor's sharedSecret with all tweets
 		lib.Xor(expandedSharedSecret[:], tweets)
 
-		//fmt.Println("tweetsLen", len(tweets))
+		if archiveQuerys != nil {
+			//fmt.Println("length", len(tweets))
+		}
+
 		//sends tweets to leader
-		//fmt.Println("pubKey", pubKey, "Bytes", tweets)
 		writeTo(leaderWorkerConnection, tweets)
 	}
 }
 
-var ignoreMe []byte
-
 //returns true if client connection is lost
 func handlePirQuery(clientKeys clientKeys, leaderWorkerConnection net.Conn, subPhase int, clientPublicKey [32]byte, doAuditing bool) (clientKeys, [][]byte, bool) {
 
 	archiveNeededSubscriptions := make([]byte, 4)
 	if subPhase == -1 {
-		archiveNeededSubscriptions, errorBool := readFromWError(leaderWorkerConnection, 4)
+		var errorBool bool
+		archiveNeededSubscriptions, errorBool = readFromWError(leaderWorkerConnection, 4)
 		if errorBool {
 			return clientKeys, nil, true
 		}
-		ignoreMe = archiveNeededSubscriptions
 	}
 
 	//gets the msg length
@@ -630,7 +630,6 @@ func handlePirQuery(clientKeys clientKeys, leaderWorkerConnection net.Conn, subP
 	//follower expects pirQuery
 
 	//transforms byteArray to ints of wanted topics
-	//todo! repeat for archive
 	tmpNeededSubscriptions := neededSubscriptions
 	if tmpNeededSubscriptions > topicAmount {
 		tmpNeededSubscriptions = topicAmount
@@ -638,7 +637,11 @@ func handlePirQuery(clientKeys clientKeys, leaderWorkerConnection net.Conn, subP
 	tmpTopicAmount := topicAmount
 	if subPhase == -1 {
 		tmpNeededSubscriptions = byteToInt(archiveNeededSubscriptions)
-		_, tmpTopicAmount = lib.GetTopicList(1)
+		_, archiveTopicAmount = lib.GetTopicList(1)
+		if tmpNeededSubscriptions > archiveTopicAmount {
+			tmpNeededSubscriptions = archiveTopicAmount
+		}
+		tmpTopicAmount = archiveTopicAmount
 	}
 
 	pirQueryFlattened := decrypted
@@ -655,6 +658,10 @@ func handlePirQuery(clientKeys clientKeys, leaderWorkerConnection net.Conn, subP
 		clientKeys.PirQuery = pirQuerys
 	}
 
+	if subPhase == -1 {
+		//fmt.Println("query", pirQuerys)
+	}
+
 	return clientKeys, pirQuerys, false
 }
 

+ 17 - 12
leader/leader.go

@@ -8,8 +8,6 @@ package main
 //#include "../c/okv.c"
 import "C"
 
-//ssssssssssssssss
-
 import (
 	"crypto/rand"
 	"crypto/rsa"
@@ -61,18 +59,18 @@ var topicAmount int
 var archiveTopicAmount int
 
 // every roundsBeforeUpdate the client updates his pirQuery
-const roundsBeforeUpdate = 1
-const neededSubscriptions = 3
+const roundsBeforeUpdate = 3
+const neededSubscriptions = 4
 const numThreads = 12
 const dataLength = 256
 const minDBWriteSize = 1000
 
-var dbWriteSize float64 = 20000
+var dbWriteSize float64 = 10000
 var collisionCounter []float64
 
 var clientsConnected int
 
-var maxTimePerRound time.Duration = 3 * time.Second
+var maxTimePerRound time.Duration = 10 * time.Second
 
 var clientsServedPhase1 []int
 var clientsServedPhase3 []int
@@ -97,7 +95,6 @@ const publisherRounds int = 10
 var publisherAmount float64
 var publisherHistory [publisherRounds]int
 
-//todo! handle client dc during phase1/3
 func main() {
 
 	//prevents race conditions for wrtiting
@@ -929,7 +926,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			}
 		}
 
-		errorBool = getSendTweets(clientKeys, nil, clientConnection, followerConnection, m)
+		errorBool = getSendTweets(clientKeys, nil, clientConnection, followerConnection)
 		if errorBool {
 			contBool := handleClientDC(wg, followerConnection, phase3Channel)
 			if contBool {
@@ -961,7 +958,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 					return
 				}
 			}
-			errorBool = getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection, m)
+			errorBool = getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection)
 			if errorBool {
 				contBool := handleClientDC(wg, followerConnection, phase3Channel)
 				if contBool {
@@ -1077,14 +1074,16 @@ func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret
 	return errorBool
 }
 
-func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnection, followerConnection net.Conn, m *sync.RWMutex) bool {
-	//todo! repeat for archive
+func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnection, followerConnection net.Conn) bool {
 	tmpNeededSubscriptions := neededSubscriptions
 	if tmpNeededSubscriptions > topicAmount {
 		tmpNeededSubscriptions = topicAmount
 	}
 	if archiveQuerys != nil {
 		tmpNeededSubscriptions = len(archiveQuerys)
+		if tmpNeededSubscriptions > archiveTopicAmount {
+			tmpNeededSubscriptions = archiveTopicAmount
+		}
 	}
 	//fmt.Println("tmpNeededSubscriptions", tmpNeededSubscriptions)
 
@@ -1157,7 +1156,6 @@ func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerCo
 		return clientKeys, nil, true
 	}
 
-	//todo! repeat for archive
 	tmpNeededSubscriptions := neededSubscriptions
 	if tmpNeededSubscriptions > topicAmount {
 		tmpNeededSubscriptions = topicAmount
@@ -1173,6 +1171,10 @@ func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerCo
 		writeToWError(followerConnection, archiveNeededSubscriptions, nil, 0)
 		tmpNeededSubscriptions = byteToInt(archiveNeededSubscriptions)
 		tmpTopicAmount = archiveTopicAmount
+
+		if tmpNeededSubscriptions > archiveTopicAmount {
+			tmpNeededSubscriptions = archiveTopicAmount
+		}
 	}
 
 	if doAuditing {
@@ -1210,6 +1212,9 @@ func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerCo
 		return clientKeys, result, false
 	}
 
+	if subPhase == -1 {
+	}
+
 	//transforms byteArray to ints of wanted topics
 	pirQueryFlattened := decrypted
 	pirQuerys := make([][]byte, tmpNeededSubscriptions)

+ 8 - 12
lib/databaseRead.go

@@ -24,7 +24,7 @@ var archive = make(map[string][]Tweet)
 var minimumBlockSize int
 
 //needs to be dividable by roundsBeforUpdate
-var roundsBeforeArchiving = 2
+var roundsBeforeArchiving = -1
 
 var topicList []string
 
@@ -80,6 +80,7 @@ func GetTweets(pirQuery []byte, dataLength int, whereFrom int, pubKey [32]byte)
 		tmpdb = archive
 	}
 	minimumBlockSize = dataLength * maxTweetAmount(whereFrom)
+
 	var wantedTopics = getNamesForTopics(pirQuery, whereFrom)
 	tweetsToReturn := make([][]Tweet, len(wantedTopics))
 	for index, wantedTopic := range wantedTopics {
@@ -109,7 +110,7 @@ func maxTweetAmount(whereFrom int) int {
 	}
 	var max int = 0
 	for i := range tmpdb {
-		nrOfTweets := len(dbR[i])
+		nrOfTweets := len(tmpdb[i])
 		if nrOfTweets > max {
 			max = nrOfTweets
 		}
@@ -134,7 +135,7 @@ func getNamesForTopics(wantedIndices []byte, whereFrom int) []string {
 //transform struct to byte array for sending
 func tweetsToByteArray(tweetsToReturn [][]Tweet, whereFrom int, wantedTopics []string) []byte {
 	tweetsAsBytes := make([]byte, minimumBlockSize)
-	for blockIndex, block := range tweetsToReturn {
+	for _, block := range tweetsToReturn {
 		var topicPadding []string
 		var blockToAppend []byte
 		for index, tweet := range block {
@@ -157,11 +158,6 @@ func tweetsToByteArray(tweetsToReturn [][]Tweet, whereFrom int, wantedTopics []s
 		//adds padding
 		blockToAppend = append(blockToAppend, []byte(";;;")[:]...)
 		remainingLength := minimumBlockSize - len(blockToAppend)
-		//fmt.Println("block", string(blockToAppend))
-
-		if blockIndex == 0 {
-			//	fmt.Println("tweet", string(blockToAppend))
-		}
 
 		//grouping using topic from recovered tweets
 		index := len(topicPadding) - 1
@@ -179,10 +175,6 @@ func tweetsToByteArray(tweetsToReturn [][]Tweet, whereFrom int, wantedTopics []s
 			index--
 		}
 
-		if blockIndex == 0 {
-			//	fmt.Println("tweetPadded", string(blockToAppend))
-		}
-
 		padding := bytes.Repeat([]byte(";"), remainingLength)
 		blockToAppend = append(blockToAppend, padding...)
 		Xor(blockToAppend, tweetsAsBytes)
@@ -256,6 +248,10 @@ func GetTopicList(whereFrom int) ([]byte, int) {
 //iterates through full dbR and moves old tweets to archive
 func CleanUpdbR(round int) {
 
+	if roundsBeforeArchiving == -1 {
+		return
+	}
+
 	if roundsBeforeArchiving-round == 0 {
 
 		keys := make([]string, len(dbR))