Forráskód Böngészése

added option to increase timeframe for tweets

Simon 2 éve
szülő
commit
467f61966c
3 módosított fájl, 86 hozzáadás és 41 törlés
  1. 32 9
      client/client.go
  2. 53 32
      leader/leader.go
  3. 1 0
      lib/databaseRead.go

+ 32 - 9
client/client.go

@@ -42,7 +42,7 @@ type tweet struct {
 const leader string = "127.0.0.1:4441"
 
 //needs to be changed at leader/follower/client at the same time
-const numClients = 2000
+const numClients = 500
 
 //mylimit=8000
 //sudo prlimit --nofile=$mylimit --pid $$; ulimit -n $mylimit
@@ -57,8 +57,11 @@ var round int
 var topicList []string
 var archiveTopicList []string
 var neededSubscriptions int
+var publisherAmount int
+var timeBounds []int
+var speedUp int = 50
 
-var maxTimePerRound time.Duration = 30 * time.Second
+var maxTimePerRound time.Duration = 25 * time.Second
 var startTime int
 
 //todo! expand this for multiple clients
@@ -75,6 +78,8 @@ var clientPublicKey [numClients]*[32]byte
 func main() {
 	wg := &sync.WaitGroup{}
 
+	getTimeBounds()
+
 	for i := 0; i < numClients; i++ {
 		wg.Add(1)
 		go client(i)
@@ -166,6 +171,9 @@ func client(clientNumber int) {
 			pos := receiveVirtualAddress(sharedSecret[clientNumber], leaderConn)
 
 			tweet := getRealTweet(clientNumber)
+			if clientNumber == numClients-1 {
+				fmt.Println("publisherAmount", publisherAmount)
+			}
 
 			//prep the query
 			dataSize := len(tweet)
@@ -450,11 +458,12 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive boo
 
 		index := strings.Index(string(tweets), ";;")
 		if index != -1 && clientNumber == 0 {
-			//fmt.Println("Round", round, "Tweets length", len(tweets))
-			fmt.Println("Correct")
-			textArr := strings.Split(string(tweets), ";;;;;;;;")
-			text := string(tweets)[:len(textArr)-1]
-			fmt.Println("Round", round, "Text", text[:5], "Length", len(tweets))
+			/*
+				fmt.Println("Correct")
+				textArr := strings.Split(string(tweets), ";;;;;;;;")
+				text := string(tweets)[:len(textArr)-1]
+				fmt.Println("Round", round, "Text", text[:5], "Length", len(tweets))
+			*/
 		} else if index == -1 {
 			fmt.Println("error")
 			fmt.Println("round", round, "text:", string(tweets), "length", len(tweets))
@@ -629,8 +638,8 @@ func getRealTweet(clientNumber int) []byte {
 
 	scanner = bufio.NewScanner(fTweets)
 
-	lowerBound := int(time.Now().Unix())
-	upperBound := lowerBound + int(3*maxTimePerRound.Seconds()) + 10
+	lowerBound := timeBounds[round-1]
+	upperBound := timeBounds[round]
 
 	var tweet []byte
 
@@ -670,6 +679,7 @@ func getRealTweet(clientNumber int) []byte {
 
 			topics = topics[:len(topics)-1]
 
+			//fmt.Println(string(topics))
 			tweet = append(tweet, topics...)
 			tweet = append(tweet, []byte(";")[0])
 
@@ -687,6 +697,7 @@ func getRealTweet(clientNumber int) []byte {
 			rand.Read(padding)
 			tweet = append(tweet, padding...)
 
+			publisherAmount++
 			return tweet
 		}
 	}
@@ -696,6 +707,18 @@ func getRealTweet(clientNumber int) []byte {
 	return tweet
 }
 
+func getTimeBounds() {
+	timeBounds = make([]int, 10000)
+	timeBounds[0] = int(time.Now().Unix())
+
+	for index := range timeBounds {
+		if index == 0 {
+			continue
+		}
+		timeBounds[index] = timeBounds[index-1] + speedUp*(int(3*maxTimePerRound.Seconds())+2)
+	}
+}
+
 func getRandomTweet(clientNumber int) []byte {
 	var tweet []byte
 

+ 53 - 32
leader/leader.go

@@ -65,14 +65,14 @@ const roundsBeforeUpdate = 5
 const neededSubscriptions = 1
 const numThreads = 12
 const dataLength = 256
-const minDBWriteSize = 1000
+const minDBWriteSize = 10
 
 var dbWriteSize float64 = 20000
 var collisionCounter []float64
 
 var clientsConnected int
 
-var maxTimePerRound time.Duration = 30 * time.Second
+var maxTimePerRound time.Duration = 25 * time.Second
 
 var clientsServedPhase1 []int
 var clientsServedPhase3 []int
@@ -88,9 +88,8 @@ var startTime time.Time
 var startTimeRound time.Time
 
 //channel for goroutine communication with clients
-var waitChannel = make(chan net.Conn, maxNumberOfClients)
-
-//var waitChannel = make(chan net.Conn, maxNumberOfClients)
+var phase1Channel = make(chan net.Conn, maxNumberOfClients)
+var phase3Channel = make(chan net.Conn, maxNumberOfClients)
 
 //variables for calculating the dbWrite size
 const publisherRounds int = 10
@@ -258,7 +257,7 @@ func main() {
 			clientData[remoteAddress] = keys
 			m.Unlock()
 
-			waitChannel <- clientConnection
+			phase1Channel <- clientConnection
 			clientsConnected++
 
 			if clientsConnected%1000 == 0 {
@@ -298,6 +297,16 @@ func main() {
 			writeTo(followerConnection, intToByte(virtualAddresses[i]))
 		}
 
+		//moves all clients to phase1
+		if len(phase3Channel) > 0 {
+			for client := range phase3Channel {
+				phase1Channel <- client
+				if len(phase3Channel) == 0 {
+					break
+				}
+			}
+		}
+
 		for id := 0; id < numThreads; id++ {
 			wg.Add(1)
 
@@ -330,6 +339,16 @@ func main() {
 
 		//Phase 3
 
+		//moves all clients to phase3
+		if len(phase1Channel) > 0 {
+			for client := range phase1Channel {
+				phase3Channel <- client
+				if len(phase1Channel) == 0 {
+					break
+				}
+			}
+		}
+
 		startPhase3 = time.Now()
 
 		//no tweets -> continue to phase 1 and mb get tweets
@@ -367,7 +386,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 	gotClient[0] = 0
 
 	//wait until time is up
-	for len(waitChannel) == 0 {
+	for len(phase1Channel) == 0 {
 		if time.Since(startTimeRound) > maxTimePerRound {
 			//tells follower that this worker is done
 			writeTo(followerConnection, gotClient)
@@ -377,7 +396,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		time.Sleep(1 * time.Second)
 	}
 
-	for clientConnection := range waitChannel {
+	for clientConnection := range phase1Channel {
 		clientsServedPhase1[round] = clientsServedPhase1[round] + 1
 
 		if clientsServedPhase1[round]%1000 == 0 {
@@ -405,7 +424,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client that phase 1 has begun
 		errorBool := writeToWError(clientConnection, phase, followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waitChannel)
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
 			if contBool {
 				continue
 			} else {
@@ -416,7 +435,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client current dbWriteSize
 		errorBool = writeToWError(clientConnection, intToByte(int(dbWriteSize)), followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waitChannel)
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
 			if contBool {
 				continue
 			} else {
@@ -427,7 +446,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client current round
 		errorBool = writeToWError(clientConnection, roundAsBytes, followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waitChannel)
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
 			if contBool {
 				continue
 			} else {
@@ -442,7 +461,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		m.RUnlock()
 		clientKeys, pirQuery, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, 0, true)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waitChannel)
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
 			if contBool {
 				continue
 			} else {
@@ -451,7 +470,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		}
 		errorBool = getSendVirtualAddress(pirQuery[0], virtualAddresses, clientKeys.SharedSecret, clientConnection, followerConnection)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waitChannel)
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
 			if contBool {
 				continue
 			} else {
@@ -470,7 +489,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//accept dpfQuery from client
 		dpfLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waitChannel)
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
 			if contBool {
 				continue
 			} else {
@@ -482,7 +501,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		dpfQueryAEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waitChannel)
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
 			if contBool {
 				continue
 			} else {
@@ -492,7 +511,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		dpfQueryBEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waitChannel)
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
 			if contBool {
 				continue
 			} else {
@@ -551,14 +570,14 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 				C.xorIn(C.int(i), (*C.uchar)(&db[i][0]))
 				m.Unlock()
 			}
-			waitChannel <- clientConnection
+			phase3Channel <- clientConnection
 		}
 
 		//loop that waits for new client or leaves phase1 if time is up
 		for {
 			if time.Since(startTimeRound) < maxTimePerRound {
 				//this worker handles the next client
-				if len(waitChannel) > 0 {
+				if len(phase1Channel) > 0 {
 					break
 					//this worker waits for next client
 				} else {
@@ -726,6 +745,8 @@ func phase2(followerConnection net.Conn) {
 	index := round % publisherRounds
 	publisherHistory[index] = currentPublisherAmount
 
+	fmt.Println("currentPublisherAmount", currentPublisherAmount)
+
 	var publisherAmount int
 	for _, num := range publisherHistory {
 		publisherAmount += num
@@ -774,7 +795,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 	gotClient[0] = 0
 
 	//wait until time is up
-	for len(waitChannel) == 0 {
+	for len(phase3Channel) == 0 {
 		if time.Since(startTimeRound) > maxTimePerRound*2 {
 			//tells follower that this worker is done
 			writeToWError(followerConnection, gotClient, nil, 0)
@@ -784,7 +805,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		time.Sleep(1 * time.Second)
 	}
 
-	for clientConnection := range waitChannel {
+	for clientConnection := range phase3Channel {
 		clientsServedPhase3[round] = clientsServedPhase3[round] + 1
 
 		if clientsServedPhase3[round]%1000 == 0 {
@@ -799,7 +820,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client current phase
 		errorBool := writeToWError(clientConnection, phase, followerConnection, 2)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waitChannel)
+			contBool := handleClientDC(wg, followerConnection, phase3Channel)
 			if contBool {
 				continue
 			} else {
@@ -836,7 +857,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client what leader expects
 		errorBool = writeToWError(clientConnection, subPhase, followerConnection, 2)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waitChannel)
+			contBool := handleClientDC(wg, followerConnection, phase3Channel)
 			if contBool {
 				continue
 			} else {
@@ -861,7 +882,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		if subPhase[0] == 0 {
 			errorBool := sendTopicLists(clientConnection, followerConnection, false)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, waitChannel)
+				contBool := handleClientDC(wg, followerConnection, phase3Channel)
 				if contBool {
 					continue
 				} else {
@@ -870,7 +891,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			}
 			clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, waitChannel)
+				contBool := handleClientDC(wg, followerConnection, phase3Channel)
 				if contBool {
 					continue
 				} else {
@@ -880,7 +901,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		} else if subPhase[0] == 1 {
 			errorBool := sendTopicLists(clientConnection, followerConnection, false)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, waitChannel)
+				contBool := handleClientDC(wg, followerConnection, phase3Channel)
 				if contBool {
 					continue
 				} else {
@@ -893,7 +914,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 			clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, waitChannel)
+				contBool := handleClientDC(wg, followerConnection, phase3Channel)
 				if contBool {
 					continue
 				} else {
@@ -904,7 +925,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		errorBool = getSendTweets(clientKeys, nil, clientConnection, followerConnection, m)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waitChannel)
+			contBool := handleClientDC(wg, followerConnection, phase3Channel)
 			if contBool {
 				continue
 			} else {
@@ -914,7 +935,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		wantsArchive, errorBool := readFrom(clientConnection, 1, followerConnection, 2)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waitChannel)
+			contBool := handleClientDC(wg, followerConnection, phase3Channel)
 			if contBool {
 				continue
 			} else {
@@ -927,7 +948,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		if wantsArchive[0] == 1 && archiveTopicAmount > 0 {
 			_, archiveQuerys, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, -1, false)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, waitChannel)
+				contBool := handleClientDC(wg, followerConnection, phase3Channel)
 				if contBool {
 					continue
 				} else {
@@ -936,7 +957,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			}
 			errorBool = getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection, m)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, waitChannel)
+				contBool := handleClientDC(wg, followerConnection, phase3Channel)
 				if contBool {
 					continue
 				} else {
@@ -950,12 +971,12 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		clientData[clientConnection.RemoteAddr()] = clientKeys
 		m.Unlock()
 
-		waitChannel <- clientConnection
+		phase1Channel <- clientConnection
 
 		for {
 			if time.Since(startTimeRound) < 2*maxTimePerRound {
 				//this worker handles the next client
-				if len(waitChannel) > 0 {
+				if len(phase3Channel) > 0 {
 					break
 					//this worker waits for next client
 				} else {

+ 1 - 0
lib/databaseRead.go

@@ -151,6 +151,7 @@ func tweetsToByteArray(tweetsToReturn [][]Tweet) []byte {
 		length := minimumBlockSize - len(tweetToAppend)
 		//fmt.Println("len", len(tweetToAppend))
 		//todo! replace with grouping
+		//grouping using topics from recovered tweets
 		if length < 0 {
 			fmt.Println("<0", string(tweetToAppend))
 		}