瀏覽代碼

probably could start eval

Simon 2 年之前
父節點
當前提交
04ecfbb0c6
共有 4 個文件被更改,包括 260 次插入85 次删除
  1. 66 20
      client/client.go
  2. 79 25
      follower/follower.go
  3. 110 34
      leader/leader.go
  4. 5 6
      lib/databaseRead.go

+ 66 - 20
client/client.go

@@ -12,7 +12,7 @@ import "C"
 
 //sssssssssssss
 import (
-	lib "2PPS/lib"
+	"2PPS/lib"
 	"bytes"
 	"crypto/rand"
 	"crypto/sha256"
@@ -40,10 +40,13 @@ type tweet struct {
 const leader string = "127.0.0.1:4441"
 
 //needs to be changed at leader/follower/client at the same time
-const numClients = 50
+const numClients = 200
 const dataLength int = 128
 const numThreads int = 12
 
+//Maximum Transport Unit
+const mtu int = 1100
+
 var dbWriteSize int
 var round int
 var topicList []string
@@ -228,7 +231,7 @@ func client(clientNumber int) {
 
 			}
 
-			receiveTweets(sharedSecret[clientNumber], leaderConn, false)
+			receiveTweets(sharedSecret[clientNumber], leaderConn, false, clientNumber)
 
 			if len(archiveTopicList) > 0 {
 				wantsArchive[0] = 0 //archive test
@@ -241,10 +244,11 @@ func client(clientNumber int) {
 			if wantsArchive[0] == 1 && len(archiveTopicList) > 0 {
 				encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(-1, clientNumber)
 				sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, true)
-				receiveTweets(sharedSecret[clientNumber], leaderConn, true)
+				receiveTweets(sharedSecret[clientNumber], leaderConn, true, clientNumber)
 			}
 
 		} else {
+			fmt.Println("Phase", phase)
 			panic("somethin went wrong")
 		}
 	}
@@ -254,7 +258,7 @@ func client(clientNumber int) {
 func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
 	//later this will be taken from gui, this is only for testing
 	topicsOFInterest := make([]int, 10)
-	topicsOFInterest[0] = 1
+	topicsOFInterest[0] = 0
 	topicsOFInterest[1] = 1
 	topicsOFInterest[9] = 1
 	archiveInterests[0] = 1
@@ -391,7 +395,7 @@ func receiveVirtualAddress(sharedSecret [2][32]byte, leaderConn net.Conn) int {
 	return byteToInt(virtualAddressByte)
 }
 
-func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive bool) {
+func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive bool, clientNumber int) {
 
 	tmpNeededSubscriptions := neededSubscriptions
 	if tmpNeededSubscriptions > len(topicList) {
@@ -427,14 +431,23 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive boo
 			lib.Xor(expandedSharedSecrets[i][:], tweets)
 		}
 
+		//fmt.Println("PubKey", clientPublicKey[clientNumber], "Bytes", tweets)
+
 		index := strings.Index(string(tweets), ";;;")
 		if index != -1 {
-			text := string(tweets)[:index]
-			fmt.Println("received in round", round, "Text", text, len(tweets))
+			fmt.Println("Round", round, "Tweets length", len(tweets))
+			/*
+				fmt.Println("Correct")
+				fmt.Println("PubKey", clientPublicKey[clientNumber])
+				text := string(tweets)[:index]
+				fmt.Println("Round", round, "Text", text[:5], "Length", len(tweets))
+			*/
 		} else {
-			fmt.Println("Round", round, "Text:", string(tweets))
-			fmt.Println("länge", byteToInt(tweetsLengthBytes))
-			panic("received text not of correct format")
+			fmt.Println("error")
+			fmt.Println("pubKey", clientPublicKey[clientNumber])
+			fmt.Println("round", round, "text:", string(tweets[:5]), "length", len(tweets))
+			return
+			//panic("received text not of correct format")
 		}
 	}
 }
@@ -578,7 +591,7 @@ func getTweet(clientNumber int) []byte {
 	if maxTopics == 0 {
 		maxTopics = 1
 	}
-	maxInt := 10
+	maxInt := numClients * 5
 	topicNumbers := make([]int, maxTopics)
 	//fills the array with unique random ascending values ranging from 0 to maxInt
 	for i := 0; i < maxTopics; i++ {
@@ -608,7 +621,13 @@ func getTweet(clientNumber int) []byte {
 
 	topics = append(topics, []byte(";")[0])
 
-	text := []byte(strconv.Itoa(r.Intn(10000)) + ";")
+	num := r.Intn(100)
+	if num == 0 {
+		num = 1
+	}
+
+	text := []byte(strconv.Itoa(num) + ";")
+
 	tweet = append(tweet, topics...)
 	tweet = append(tweet, text...)
 	tweet = append(tweet, []byte(";")[0])
@@ -626,18 +645,45 @@ func getTweet(clientNumber int) []byte {
 
 //sends the array to the connection
 func writeTo(connection net.Conn, array []byte) {
-	_, err := connection.Write(array)
-	if err != nil {
-		panic(err)
+	remainingLength := len(array)
+	for remainingLength > 0 {
+		if remainingLength >= mtu {
+			_, err := connection.Write(array[:mtu])
+			if err != nil {
+				panic(err)
+			}
+			array = array[mtu:]
+			remainingLength -= mtu
+		} else {
+			_, err := connection.Write(array)
+			if err != nil {
+				panic(err)
+			}
+			remainingLength = 0
+		}
 	}
 }
 
 //reads an array which is returned and of size "size" from the connection
 func readFrom(connection net.Conn, size int) []byte {
-	array := make([]byte, size)
-	_, err := connection.Read(array)
-	if err != nil {
-		panic(err)
+	var array []byte
+	remainingSize := size
+	for remainingSize > 0 {
+		var err error
+		toAppend := make([]byte, mtu)
+		if remainingSize > mtu {
+			_, err = connection.Read(toAppend)
+			array = append(array, toAppend...)
+			remainingSize -= mtu
+		} else {
+			_, err = connection.Read(toAppend[:remainingSize])
+			array = append(array, toAppend[:remainingSize]...)
+			remainingSize = 0
+
+		}
+		if err != nil {
+			panic(err)
+		}
 	}
 
 	return array

+ 79 - 25
follower/follower.go

@@ -50,10 +50,13 @@ var leaderPublicKey *[32]byte
 const dataLength = 128
 const numThreads = 12
 
-var dbWriteSize int = 10
+//Maximum Transport Unit
+const mtu int = 1100
+
+var dbWriteSize int = 100
 var neededSubscriptions int
 
-var maxTimePerRound time.Duration = 1000 * time.Millisecond
+var maxTimePerRound time.Duration = 5 * time.Second
 
 var round int = 0
 var startTime time.Time
@@ -505,7 +508,7 @@ func phase3(leaderWorkerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex
 			}
 		}
 
-		getSendTweets(clientKeys, nil, leaderWorkerConnection)
+		getSendTweets(clientKeys, nil, leaderWorkerConnection, m, clientPublicKey)
 
 		wantsArchive, errorBool := readFromWError(leaderWorkerConnection, 1)
 		if errorBool {
@@ -513,11 +516,12 @@ func phase3(leaderWorkerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex
 		}
 
 		if wantsArchive[0] == 1 {
+			fmt.Println("shouldnt be called")
 			_, archiveQuerys, errorBool := handlePirQuery(clientKeys, leaderWorkerConnection, -1, clientPublicKey, false)
 			if errorBool {
 				continue
 			}
-			getSendTweets(clientKeys, archiveQuerys, leaderWorkerConnection)
+			getSendTweets(clientKeys, archiveQuerys, leaderWorkerConnection, m, clientPublicKey)
 		}
 
 		//saves clientKeys
@@ -528,7 +532,7 @@ func phase3(leaderWorkerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex
 }
 
 //gets tweet from db and sends them to leader
-func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, leaderWorkerConnection net.Conn) {
+func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, leaderWorkerConnection net.Conn, m *sync.RWMutex, pubKey [32]byte) {
 	//todo! repeat for archive
 	tmpNeededSubscriptions := neededSubscriptions
 	if tmpNeededSubscriptions > topicAmount {
@@ -541,11 +545,10 @@ func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, leaderWorkerCo
 		//gets all requested tweets
 		var tweets []byte
 		if archiveQuerys == nil {
-			tweets = lib.GetTweets(clientKeys.PirQuery[i], dataLength, 0)
+			tweets = lib.GetTweets(clientKeys.PirQuery[i], dataLength, 0, pubKey)
 		} else {
-			tweets = lib.GetTweets(archiveQuerys[i], dataLength, 1)
+			tweets = lib.GetTweets(archiveQuerys[i], dataLength, 1, pubKey)
 		}
-
 		//expand sharedSecret so it is of right length
 		expandBy := len(tweets) / 32
 		var expandedSharedSecret []byte
@@ -556,13 +559,9 @@ func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, leaderWorkerCo
 		//Xor's sharedSecret with all tweets
 		lib.Xor(expandedSharedSecret[:], tweets)
 
-		lib.Xor(tweets, expandedSharedSecret[:])
-
+		//fmt.Println("tweetsLen", len(tweets))
 		//sends tweets to leader
-		tweetsLengthBytes := intToByte(len(tweets))
-
-		writeTo(leaderWorkerConnection, tweetsLengthBytes)
-
+		//fmt.Println("pubKey", pubKey, "Bytes", tweets)
 		writeTo(leaderWorkerConnection, tweets)
 	}
 }
@@ -676,35 +675,90 @@ func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret
 
 //sends the array to the connection
 func writeTo(connection net.Conn, array []byte) {
-	_, err := connection.Write(array)
-	if err != nil {
-		panic(err)
+	remainingLength := len(array)
+	for remainingLength > 0 {
+		if remainingLength >= mtu {
+			_, err := connection.Write(array[:mtu])
+			if err != nil {
+				panic(err)
+			}
+			array = array[mtu:]
+			remainingLength -= mtu
+		} else {
+			_, err := connection.Write(array)
+			if err != nil {
+				panic(err)
+			}
+			remainingLength = 0
+		}
 	}
 }
 
 //reads an array which is returned and of size "size" from the connection
 //returned bool is one if connection to client was lost
 func readFrom(connection net.Conn, size int) []byte {
-	array := make([]byte, size)
-	_, err := connection.Read(array)
-	if err != nil {
-		panic(err)
+	var array []byte
+	remainingSize := size
+	for remainingSize > 0 {
+		var err error
+		toAppend := make([]byte, mtu)
+		if remainingSize > mtu {
+			_, err = connection.Read(toAppend)
+			array = append(array, toAppend...)
+			remainingSize -= mtu
+		} else {
+			_, err = connection.Read(toAppend[:remainingSize])
+			array = append(array, toAppend[:remainingSize]...)
+			remainingSize = 0
+
+		}
+		if err != nil {
+			panic(err)
+		}
 	}
+
 	return array
 }
 
 //reads an array which is returned and of size "size" from the connection
 //returns true if connection to client is lost
 func readFromWError(connection net.Conn, size int) ([]byte, bool) {
-	array := make([]byte, size+1)
-	_, err := connection.Read(array)
-	if err != nil {
-		panic(err)
+	var array []byte
+	remainingSize := size + 1
+	for remainingSize > 0 {
+		var err error
+		toAppend := make([]byte, mtu)
+		if remainingSize > mtu {
+			_, err = connection.Read(toAppend)
+			array = append(array, toAppend...)
+			remainingSize -= mtu
+		} else {
+			_, err = connection.Read(toAppend[:remainingSize])
+			array = append(array, toAppend[:remainingSize]...)
+			remainingSize = 0
+		}
+		if err != nil {
+			panic(err)
+		}
 	}
 	if array[0] == 1 {
 		return nil, true
 	}
 	return array[1:], false
+
+	/*
+
+		array := make([]byte, size+1)
+		_, err := connection.Read(array)
+		if err != nil {
+			panic(err)
+		}
+		if array[0] == 1 {
+			return nil, true
+		}
+		return array[1:], false
+
+	*/
 }
 
 func transformBytesToStringArray(topicsAsBytes []byte) []string {

+ 110 - 34
leader/leader.go

@@ -47,6 +47,9 @@ var clientData = make(map[net.Addr]clientKeys)
 
 const follower string = "127.0.0.1:4442"
 
+//Maximum Transport Unit
+const mtu int = 1100
+
 var leaderPrivateKey *[32]byte
 var leaderPublicKey *[32]byte
 var followerPublicKey *[32]byte
@@ -59,15 +62,18 @@ var archiveTopicAmount int
 
 // every roundsBeforeUpdate the client updates his pirQuery
 const roundsBeforeUpdate = 1
-const neededSubscriptions = 10
+const neededSubscriptions = 1
 const numThreads = 12
 const dataLength = 128
 
-var dbWriteSize int = 10
+var dbWriteSize int = 100
 
 const minDBWriteSize int = 10
 
-var maxTimePerRound time.Duration = 1000 * time.Millisecond
+var maxTimePerRound time.Duration = 5 * time.Second
+
+var clientsServedPhase1 int = 0
+var clientsServedPhase3 int = 0
 
 //counts the number of rounds
 var round int = 0
@@ -248,6 +254,13 @@ func main() {
 
 		round++
 
+		fmt.Println("clientsServedPhase1", clientsServedPhase1)
+		fmt.Println("clientsServedPhase3", clientsServedPhase3)
+		fmt.Println("dbWriteSize", dbWriteSize)
+
+		clientsServedPhase1 = 0
+		clientsServedPhase3 = 0
+
 		fmt.Println("Phase 1 Round", round)
 
 		//creates a new write Db for this round
@@ -332,6 +345,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 	}
 
 	for clientConnection := range phase1Channel {
+		clientsServedPhase1++
 
 		gotClient[0] = 1
 		//tells follower that this worker got a clientConnection
@@ -709,6 +723,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 	}
 
 	for clientConnection := range phase3Channel {
+		clientsServedPhase3++
 
 		gotClient[0] = 1
 		//tells follower that this worker got a clientConnection
@@ -820,7 +835,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			}
 		}
 
-		errorBool = getSendTweets(clientKeys, nil, clientConnection, followerConnection)
+		errorBool = getSendTweets(clientKeys, nil, clientConnection, followerConnection, m)
 		if errorBool {
 			contBool := handleClientDC(wg, followerConnection, phase3Channel)
 			if contBool {
@@ -852,7 +867,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 					return
 				}
 			}
-			errorBool = getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection)
+			errorBool = getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection, m)
 			if errorBool {
 				contBool := handleClientDC(wg, followerConnection, phase3Channel)
 				if contBool {
@@ -968,7 +983,7 @@ func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret
 	return errorBool
 }
 
-func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnection, followerConnection net.Conn) bool {
+func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnection, followerConnection net.Conn, m *sync.RWMutex) bool {
 	//todo! repeat for archive
 	tmpNeededSubscriptions := neededSubscriptions
 	if tmpNeededSubscriptions > topicAmount {
@@ -983,9 +998,9 @@ func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnecti
 	for i := 0; i < tmpNeededSubscriptions; i++ {
 		//gets all requested tweets
 		if archiveQuerys == nil {
-			tweets[i] = lib.GetTweets(clientKeys.PirQuery[i], dataLength, 0)
+			tweets[i] = lib.GetTweets(clientKeys.PirQuery[i], dataLength, 0, *clientKeys.PublicKey)
 		} else {
-			tweets[i] = lib.GetTweets(archiveQuerys[i], dataLength, 1)
+			tweets[i] = lib.GetTweets(archiveQuerys[i], dataLength, 1, *clientKeys.PublicKey)
 		}
 
 		//expand sharedSecret so it is of right length
@@ -1000,18 +1015,15 @@ func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnecti
 
 		//fmt.Println(tweets[0])
 
-		//receives tweets from follower and Xor's them in
-		tweetsLengthBytes, _ := readFrom(followerConnection, 4, nil, 0)
-
-		tweetsReceivedLength := byteToInt(tweetsLengthBytes)
+		blockLength := len(tweets[i])
 
-		receivedTweets, _ := readFrom(followerConnection, tweetsReceivedLength, nil, 0)
+		receivedTweets, _ := readFrom(followerConnection, blockLength, nil, 0)
 
+		//fmt.Println("receivedTweets", blockLength, len(receivedTweets))
+		//fmt.Println("pubKey", *clientKeys.PublicKey, "Bytes", tweets, "follower", receivedTweets)
 		lib.Xor(receivedTweets, tweets[i])
 	}
 
-	//fmt.Println("Bytes", tweets[0][:10])
-
 	//sends tweets to client
 	for i := 0; i < tmpNeededSubscriptions; i++ {
 		tweetsLengthBytes := intToByte(len(tweets[i]))
@@ -1179,25 +1191,75 @@ func sendTopicLists(clientConnection, followerConnection net.Conn, setup bool) b
 
 //sends the array to the connection
 func writeTo(connection net.Conn, array []byte) {
-	_, err := connection.Write(array)
-	if err != nil {
-		panic(err)
+	remainingLength := len(array)
+	for remainingLength > 0 {
+		if remainingLength >= mtu {
+			_, err := connection.Write(array[:mtu])
+			if err != nil {
+				panic(err)
+			}
+			array = array[mtu:]
+			remainingLength -= mtu
+		} else {
+			_, err := connection.Write(array)
+			if err != nil {
+				panic(err)
+			}
+			remainingLength = 0
+		}
 	}
 }
 
 func writeToWError(connection net.Conn, array []byte, followerConnection net.Conn, size int) bool {
-	var err error
 	if connection.RemoteAddr().String() == follower {
 		arrayWError := make([]byte, 1)
 		arrayWError = append(arrayWError, array[:]...)
-		_, err = connection.Write(arrayWError)
+		remainingLength := len(arrayWError)
+		for remainingLength > 0 {
+			if remainingLength >= mtu {
+				_, err := connection.Write(arrayWError[:mtu])
+				if err != nil {
+					return handleError(connection, followerConnection, size, err)
+				}
+				arrayWError = arrayWError[mtu:]
+				remainingLength -= mtu
+			} else {
+				_, err := connection.Write(arrayWError)
+				if err != nil {
+					return handleError(connection, followerConnection, size, err)
+				}
+				remainingLength = 0
+			}
+		}
 	} else {
-		_, err = connection.Write(array)
+		remainingLength := len(array)
+		for remainingLength > 0 {
+			if remainingLength >= mtu {
+				_, err := connection.Write(array[:mtu])
+				if err != nil {
+					return handleError(connection, followerConnection, size, err)
+				}
+				array = array[mtu:]
+				remainingLength -= mtu
+			} else {
+				_, err := connection.Write(array)
+				if err != nil {
+					return handleError(connection, followerConnection, size, err)
+				}
+				remainingLength = 0
+			}
+		}
 	}
+	return false
+}
 
+func handleError(connection, followerConnection net.Conn, size int, err error) bool {
 	if err != nil {
 		//lets follower know that client has disconnected unexpectedly
 		if connection.RemoteAddr().String() != follower {
+			if size > mtu {
+				fmt.Println("have a look here")
+			}
 			fmt.Println("error", err)
 			array := make([]byte, size)
 			array[0] = 1
@@ -1216,23 +1278,37 @@ func writeToWError(connection net.Conn, array []byte, followerConnection net.Con
 //reads an array which is returned and of size "size" from the connection
 //returns true if error occured
 func readFrom(connection net.Conn, size int, followerConnection net.Conn, sizeError int) ([]byte, bool) {
-	array := make([]byte, size)
-	_, err := connection.Read(array)
-	if err != nil {
-		//lets follower know that client has disconnected unexpectedly
-		if connection.RemoteAddr().String() != follower {
-			fmt.Println("error", err)
-			array := make([]byte, sizeError)
-			array[0] = 1
-			_, err = followerConnection.Write(array)
-			if err != nil {
+	var array []byte
+	remainingSize := size
+	for remainingSize > 0 {
+		var err error
+		toAppend := make([]byte, mtu)
+		if remainingSize > mtu {
+			_, err = connection.Read(toAppend)
+			array = append(array, toAppend...)
+			remainingSize -= mtu
+		} else {
+			_, err = connection.Read(toAppend[:remainingSize])
+			array = append(array, toAppend[:remainingSize]...)
+			remainingSize = 0
+		}
+		if err != nil {
+			//lets follower know that client has disconnected unexpectedly
+			if connection.RemoteAddr().String() != follower {
+				fmt.Println(err)
+				array := make([]byte, sizeError)
+				array[0] = 1
+				_, err = followerConnection.Write(array)
+				if err != nil {
+					panic(err)
+				}
+				return nil, true
+			} else {
 				panic(err)
 			}
-			return nil, true
-		} else {
-			panic(err)
 		}
 	}
+
 	return array, false
 }
 

+ 5 - 6
lib/databaseRead.go

@@ -70,7 +70,8 @@ func NewEntries(inputTweets []Tweet, whereTo int) {
 }
 
 //todo! add round to pirquery only get tweets that have been posted from that round onward
-func GetTweets(pirQuery []byte, dataLength int, whereFrom int) []byte {
+func GetTweets(pirQuery []byte, dataLength int, whereFrom int, pubKey [32]byte) []byte {
+	//fmt.Println(pirQuery, pubKey)
 	tmpdb := dbR
 	if whereFrom == 1 {
 		tmpdb = archive
@@ -80,6 +81,7 @@ func GetTweets(pirQuery []byte, dataLength int, whereFrom int) []byte {
 	tweetsToReturn := make([][]Tweet, len(wantedTopics))
 	for index, wantedTopic := range wantedTopics {
 		for _, tweet := range tmpdb[wantedTopic] {
+			//fmt.Println(tweet)
 			//new Tweet
 			if tweet.Text != "" {
 				tweet.RoundPosted = 0
@@ -149,16 +151,13 @@ func tweetsToByteArray(tweetsToReturn [][]Tweet) []byte {
 		//todo! replace with grouping
 		padding := bytes.Repeat([]byte(";"), length)
 		tweetToAppend = append(tweetToAppend, padding...)
-		//Xor(tweetToAppend, tweetsAsBytes)
-
-		for index := range tweetToAppend {
-			tweetsAsBytes[index] = tweetsAsBytes[index] ^ tweetToAppend[index]
-		}
+		Xor(tweetToAppend, tweetsAsBytes)
 
 		//fmt.Println(tweetsAsBytes)
 
 	}
 
+	//fmt.Println("length Returned", len(tweetsAsBytes))
 	return tweetsAsBytes
 }