Browse Source

added support for more clients

Simon 2 years ago
parent
commit
08d58d6eb1
2 changed files with 66 additions and 54 deletions
  1. 24 20
      client/client.go
  2. 42 34
      leader/leader.go

+ 24 - 20
client/client.go

@@ -42,8 +42,11 @@ type tweet struct {
 const leader string = "127.0.0.1:4441"
 
 //needs to be changed at leader/follower/client at the same time
-const numClients = 4000
-const dataLength int = 256
+const numClients = 10000
+
+//mylimit=8000
+//sudo prlimit --nofile=$mylimit --pid $$; ulimit -n $mylimit
+const dataLength int = 64
 const numThreads int = 12
 
 //Maximum Transport Unit
@@ -55,7 +58,7 @@ var topicList []string
 var archiveTopicList []string
 var neededSubscriptions int
 
-var maxTimePerRound time.Duration = 100 * time.Second
+var maxTimePerRound time.Duration = 10 * time.Second
 var startTime int
 
 //todo! expand this for multiple clients
@@ -75,7 +78,7 @@ func main() {
 	for i := 0; i < numClients; i++ {
 		wg.Add(1)
 		go client(i)
-		time.Sleep(10 * time.Millisecond)
+		time.Sleep(1 * time.Millisecond)
 	}
 	wg.Wait()
 }
@@ -98,6 +101,7 @@ func client(clientNumber int) {
 
 	leaderConn, err := tls.Dial("tcp", leader, conf)
 	if err != nil {
+		fmt.Println("clientNumber", clientNumber)
 		panic(err)
 	}
 	leaderConn.SetDeadline(time.Time{})
@@ -161,7 +165,7 @@ func client(clientNumber int) {
 			sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
 			pos := receiveVirtualAddress(sharedSecret[clientNumber], leaderConn)
 
-			tweet := getRandomTweet(clientNumber)
+			tweet := getRealTweet(clientNumber)
 
 			//prep the query
 			dataSize := len(tweet)
@@ -445,18 +449,14 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive boo
 		//fmt.Println("PubKey", clientPublicKey[clientNumber], "Bytes", tweets)
 
 		index := strings.Index(string(tweets), ";;;")
-		if index != -1 {
+		if index != -1 && clientNumber == 0 {
 			//fmt.Println("Round", round, "Tweets length", len(tweets))
-			/*
-				fmt.Println("Correct")
-				fmt.Println("PubKey", clientPublicKey[clientNumber])
-				text := string(tweets)[:index]
-				fmt.Println("Round", round, "Text", text[:5], "Length", len(tweets))
-			*/
+			fmt.Println("Correct")
+			text := string(tweets)[:index]
+			fmt.Println("Round", round, "Text", text[:5], "Length", len(tweets))
 		} else {
 			fmt.Println("error")
-			fmt.Println("pubKey", clientPublicKey[clientNumber])
-			fmt.Println("round", round, "text:", string(tweets[:5]), "length", len(tweets))
+			fmt.Println("round", round, "text:", string(tweets), "length", len(tweets))
 			return
 			//panic("received text not of correct format")
 		}
@@ -595,7 +595,7 @@ func receiveTopicLists(leaderConn net.Conn) {
 
 func getRealTweet(clientNumber int) []byte {
 
-	fUserList, err := os.Create("tweets/userList")
+	fUserList, err := os.Open("/home/simon/goCode/tweets/userList")
 	if err != nil {
 		panic(err)
 	}
@@ -608,7 +608,7 @@ func getRealTweet(clientNumber int) []byte {
 	userID := ""
 
 	for scanner.Scan() {
-		if currentLine == clientNumber {
+		if currentLine-1 == clientNumber {
 			userID = scanner.Text()
 			break
 		}
@@ -619,7 +619,7 @@ func getRealTweet(clientNumber int) []byte {
 		panic("no userID picked")
 	}
 
-	fTweets, err := os.Open("tweets/userTweets/" + userID)
+	fTweets, err := os.Open("/home/simon/goCode/tweets/userTweets/" + userID)
 	if err != nil {
 		panic(err)
 	}
@@ -636,7 +636,7 @@ func getRealTweet(clientNumber int) []byte {
 	for scanner.Scan() {
 		lineArr := strings.Split(scanner.Text(), ", \"hashtags\"")
 		lineArr = strings.Split(lineArr[0], ": ")
-		timestamp, _ := strconv.Atoi(lineArr[len(lineArr)])
+		timestamp, _ := strconv.Atoi(lineArr[len(lineArr)-1])
 
 		//transforms timestamp to current time
 		timestamp -= 1351742400
@@ -666,9 +666,13 @@ func getRealTweet(clientNumber int) []byte {
 				num = 1
 			}
 			tweet = append(tweet, []byte(strconv.Itoa(num) + ";;")[:]...)
-
+			break
 		}
 	}
+
+	//todo! doesnt work
+	fmt.Println("tweet", string(tweet))
+
 	//adds padding
 	length := dataLength - len(tweet)
 	padding := make([]byte, length)
@@ -687,7 +691,7 @@ func getRandomTweet(clientNumber int) []byte {
 	if maxTopics == 0 {
 		maxTopics = 1
 	}
-	maxInt := numClients * 5
+	maxInt := 10000
 	topicNumbers := make([]int, maxTopics)
 	//fills the array with unique random ascending values ranging from 0 to maxInt
 	for i := 0; i < maxTopics; i++ {

+ 42 - 34
leader/leader.go

@@ -70,7 +70,9 @@ const minDBWriteSize = 1000
 var dbWriteSize float64 = 20000
 var collisionCounter []float64
 
-var maxTimePerRound time.Duration = 100 * time.Second
+var clientsConnected int
+
+var maxTimePerRound time.Duration = 10 * time.Second
 
 var clientsServedPhase1 []int
 var clientsServedPhase3 []int
@@ -85,8 +87,9 @@ var round int = 0
 var startTime time.Time
 
 //channel for goroutine communication with clients
-var phase1Channel = make(chan net.Conn, maxNumberOfClients)
-var phase3Channel = make(chan net.Conn, maxNumberOfClients)
+var waiCthannel = make(chan net.Conn, maxNumberOfClients)
+
+//var waiCthannel = make(chan net.Conn, maxNumberOfClients)
 
 //variables for calculating the dbWrite size
 const publisherRounds int = 10
@@ -100,8 +103,8 @@ func main() {
 	//prevents race conditions for wrtiting
 	m := &sync.RWMutex{}
 
-	clientsServedPhase1 := make([]int, 1000)
-	clientsServedPhase3 := make([]int, 1000)
+	clientsServedPhase1 = make([]int, 1000)
+	clientsServedPhase3 = make([]int, 1000)
 
 	generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
 	if err != nil {
@@ -252,7 +255,12 @@ func main() {
 			clientData[remoteAddress] = keys
 			m.Unlock()
 
-			phase1Channel <- clientConnection
+			waiCthannel <- clientConnection
+			clientsConnected++
+
+			if clientsConnected%1000 == 0 {
+				fmt.Println("clientsConnected", clientsConnected)
+			}
 		}
 	}()
 
@@ -356,7 +364,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 	gotClient[0] = 0
 
 	//wait until time is up
-	for len(phase1Channel) == 0 {
+	for len(waiCthannel) == 0 {
 		if time.Since(startTime) > maxTimePerRound {
 			//tells follower that this worker is done
 			writeTo(followerConnection, gotClient)
@@ -366,7 +374,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		time.Sleep(1 * time.Second)
 	}
 
-	for clientConnection := range phase1Channel {
+	for clientConnection := range waiCthannel {
 		clientsServedPhase1[round] = clientsServedPhase1[round] + 1
 
 		if clientsServedPhase1[round]%1000 == 0 {
@@ -394,7 +402,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client that phase 1 has begun
 		errorBool := writeToWError(clientConnection, phase, followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			contBool := handleClientDC(wg, followerConnection, waiCthannel)
 			if contBool {
 				continue
 			} else {
@@ -405,7 +413,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client current dbWriteSize
 		errorBool = writeToWError(clientConnection, intToByte(int(dbWriteSize)), followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			contBool := handleClientDC(wg, followerConnection, waiCthannel)
 			if contBool {
 				continue
 			} else {
@@ -416,7 +424,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client current round
 		errorBool = writeToWError(clientConnection, roundAsBytes, followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			contBool := handleClientDC(wg, followerConnection, waiCthannel)
 			if contBool {
 				continue
 			} else {
@@ -425,13 +433,13 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		}
 
 		//begin auditing
-		auditingStart := time.Now()
+		//auditingStart := time.Now()
 		m.RLock()
 		var clientKeys = clientData[clientConnection.RemoteAddr()]
 		m.RUnlock()
 		clientKeys, pirQuery, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, 0, true)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			contBool := handleClientDC(wg, followerConnection, waiCthannel)
 			if contBool {
 				continue
 			} else {
@@ -440,7 +448,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		}
 		errorBool = getSendVirtualAddress(pirQuery[0], virtualAddresses, clientKeys.SharedSecret, clientConnection, followerConnection)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			contBool := handleClientDC(wg, followerConnection, waiCthannel)
 			if contBool {
 				continue
 			} else {
@@ -449,7 +457,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		}
 
 		if id == 0 {
-			fmt.Println("Auditing duration", time.Since(auditingStart))
+			//fmt.Println("Auditing duration", time.Since(auditingStart).Seconds())
 		}
 
 		m.Lock()
@@ -459,7 +467,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//accept dpfQuery from client
 		dpfLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			contBool := handleClientDC(wg, followerConnection, waiCthannel)
 			if contBool {
 				continue
 			} else {
@@ -471,7 +479,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		dpfQueryAEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			contBool := handleClientDC(wg, followerConnection, waiCthannel)
 			if contBool {
 				continue
 			} else {
@@ -481,7 +489,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		dpfQueryBEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			contBool := handleClientDC(wg, followerConnection, waiCthannel)
 			if contBool {
 				continue
 			} else {
@@ -540,14 +548,14 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 				C.xorIn(C.int(i), (*C.uchar)(&db[i][0]))
 				m.Unlock()
 			}
-			phase3Channel <- clientConnection
+			waiCthannel <- clientConnection
 		}
 
 		//loop that waits for new client or leaves phase1 if time is up
 		for {
 			if time.Since(startTime) < maxTimePerRound {
 				//this worker handles the next client
-				if len(phase1Channel) > 0 {
+				if len(waiCthannel) > 0 {
 					break
 					//this worker waits for next client
 				} else {
@@ -762,7 +770,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 	gotClient[0] = 0
 
 	//wait until time is up
-	for len(phase3Channel) == 0 {
+	for len(waiCthannel) == 0 {
 		if time.Since(startTime) > maxTimePerRound*2 {
 			//tells follower that this worker is done
 			writeToWError(followerConnection, gotClient, nil, 0)
@@ -772,7 +780,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		time.Sleep(1 * time.Second)
 	}
 
-	for clientConnection := range phase3Channel {
+	for clientConnection := range waiCthannel {
 		clientsServedPhase3[round] = clientsServedPhase3[round] + 1
 
 		if clientsServedPhase3[round]%1000 == 0 {
@@ -787,7 +795,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client current phase
 		errorBool := writeToWError(clientConnection, phase, followerConnection, 2)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, phase3Channel)
+			contBool := handleClientDC(wg, followerConnection, waiCthannel)
 			if contBool {
 				continue
 			} else {
@@ -824,7 +832,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client what leader expects
 		errorBool = writeToWError(clientConnection, subPhase, followerConnection, 2)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, phase3Channel)
+			contBool := handleClientDC(wg, followerConnection, waiCthannel)
 			if contBool {
 				continue
 			} else {
@@ -849,7 +857,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		if subPhase[0] == 0 {
 			errorBool := sendTopicLists(clientConnection, followerConnection, false)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, phase3Channel)
+				contBool := handleClientDC(wg, followerConnection, waiCthannel)
 				if contBool {
 					continue
 				} else {
@@ -858,7 +866,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			}
 			clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, phase3Channel)
+				contBool := handleClientDC(wg, followerConnection, waiCthannel)
 				if contBool {
 					continue
 				} else {
@@ -868,7 +876,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		} else if subPhase[0] == 1 {
 			errorBool := sendTopicLists(clientConnection, followerConnection, false)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, phase3Channel)
+				contBool := handleClientDC(wg, followerConnection, waiCthannel)
 				if contBool {
 					continue
 				} else {
@@ -881,7 +889,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 			clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, phase3Channel)
+				contBool := handleClientDC(wg, followerConnection, waiCthannel)
 				if contBool {
 					continue
 				} else {
@@ -892,7 +900,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		errorBool = getSendTweets(clientKeys, nil, clientConnection, followerConnection, m)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, phase3Channel)
+			contBool := handleClientDC(wg, followerConnection, waiCthannel)
 			if contBool {
 				continue
 			} else {
@@ -902,7 +910,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		wantsArchive, errorBool := readFrom(clientConnection, 1, followerConnection, 2)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, phase3Channel)
+			contBool := handleClientDC(wg, followerConnection, waiCthannel)
 			if contBool {
 				continue
 			} else {
@@ -915,7 +923,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		if wantsArchive[0] == 1 && archiveTopicAmount > 0 {
 			_, archiveQuerys, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, -1, false)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, phase3Channel)
+				contBool := handleClientDC(wg, followerConnection, waiCthannel)
 				if contBool {
 					continue
 				} else {
@@ -924,7 +932,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			}
 			errorBool = getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection, m)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, phase3Channel)
+				contBool := handleClientDC(wg, followerConnection, waiCthannel)
 				if contBool {
 					continue
 				} else {
@@ -938,12 +946,12 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		clientData[clientConnection.RemoteAddr()] = clientKeys
 		m.Unlock()
 
-		phase1Channel <- clientConnection
+		waiCthannel <- clientConnection
 
 		for {
 			if time.Since(startTime) < 2*maxTimePerRound {
 				//this worker handles the next client
-				if len(phase3Channel) > 0 {
+				if len(waiCthannel) > 0 {
 					break
 					//this worker waits for next client
 				} else {