Browse Source

dbWriteSize now has average history
improved robustness for clientSetup failing

Simon 2 years ago
parent
commit
9d5d4cb8b4
2 changed files with 35 additions and 21 deletions
  1. 2 2
      client/client.go
  2. 33 19
      leader/leader.go

+ 2 - 2
client/client.go

@@ -37,7 +37,7 @@ const leader string = "127.0.0.1:4441"
 
 //needs to be changed at leader/follower/client at the same time
 const neededSubscriptions = 1
-const numClients = 2
+const numClients = 1
 
 var topicList []string
 var archiveTopicList []string
@@ -188,7 +188,7 @@ func client(tweet []byte, clientNumber int) {
 			var dpfQueryA *C.uchar
 			var dpfQueryB *C.uchar
 
-			C.prepQuery(C.int(clientNumber), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB)
+			C.prepQuery(C.int(1), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB)
 
 			intQuerySize := int(cQuerySize) //byteToInt(querySize)
 

+ 33 - 19
leader/leader.go

@@ -83,8 +83,11 @@ var maxTimePerRound time.Duration = 5 * time.Second
 var phase1Channel = make(chan net.Conn, maxNumberOfClients)
 var phase3Channel = make(chan net.Conn, maxNumberOfClients)
 
-//variable for calculating the dbWrite size
-var publisherNumber float64
+//variables for calculating the dbWrite size
+const publisherRounds int = 3
+
+var publisherAmount float64
+var publisherHistory [publisherRounds]int
 
 func main() {
 
@@ -184,7 +187,7 @@ func main() {
 		for {
 			clientConnection, err := lnClients.Accept()
 			if err != nil {
-				panic(err)
+				fmt.Println(err)
 			}
 			clientConnection.SetDeadline(time.Time{})
 
@@ -194,13 +197,17 @@ func main() {
 			//send leader publicKey
 			_, err = clientConnection.Write(leaderPublicKey[:])
 			if err != nil {
-				panic(err)
+				fmt.Println(err)
+				clientConnection.Close()
+				break
 			}
 
 			//send follower publicKey
 			_, err = clientConnection.Write(followerPublicKey[:])
 			if err != nil {
-				panic(err)
+				fmt.Println(err)
+				clientConnection.Close()
+				break
 			}
 
 			var clientPublicKey *[32]byte
@@ -208,7 +215,9 @@ func main() {
 			//gets publicKey from client
 			_, err = clientConnection.Read(tmpClientPublicKey[:])
 			if err != nil {
-				panic(err)
+				fmt.Println(err)
+				clientConnection.Close()
+				break
 			}
 
 			clientPublicKey = &tmpClientPublicKey
@@ -277,10 +286,6 @@ func main() {
 
 		fmt.Println("phase3")
 
-		if round == 1 {
-			//addTestTweets()
-		}
-
 		//no tweets -> continue to phase 1 and mb get tweets
 		topicList, topicAmount = lib.GetTopicList(0)
 		if len(topicList) == 0 {
@@ -364,8 +369,6 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			panic(err)
 		}
 
-		fmt.Println("SizeToClient", dbWriteSize)
-
 		//tells client current round
 		_, err = clientConnection.Write(roundAsBytes)
 		if err != nil {
@@ -677,13 +680,13 @@ func phase2(followerConnection net.Conn) {
 	}
 
 	var tweets []lib.Tweet
-	publisherNumber = 0
+	var currentPublisherAmount int = 0
 	for i := 0; i < dbSize; i++ {
 		//discard cover message
 		if tmpdb[i][0] == 0 {
 			continue
 		} else {
-			publisherNumber++
+			currentPublisherAmount++
 			//reconstruct tweet
 			var position int = 0
 			var topics []string
@@ -723,14 +726,25 @@ func phase2(followerConnection net.Conn) {
 
 	C.resetDb()
 
-	//calculates the dbWriteSize for this round
-	if publisherNumber > 0 {
-		dbWriteSize = int(math.Ceil(19.5 * publisherNumber))
+	//calculates the publisherAverage over the last publisherRounds rounds
+	index := round % publisherRounds
+	publisherHistory[index] = currentPublisherAmount
+
+	var publisherAmount int
+	for _, num := range publisherHistory {
+		publisherAmount += num
+	}
+
+	publisherAverage := 0
+	if round < publisherRounds {
+		publisherAverage = publisherAmount / round
 	} else {
-		//default, handles 100 clients
-		dbWriteSize = 1950
+		publisherAverage = publisherAmount / publisherRounds
 	}
 
+	//calculates the dbWriteSize for this round
+	dbWriteSize = int(math.Ceil(19.5 * float64(publisherAverage)))
+
 	//writes dbWriteSize of current round to follower
 	_, err = followerConnection.Write(intToByte(dbWriteSize))
 	if err != nil {