Browse Source

bugfixes for archive

Simon 1 year ago
parent
commit
c30f4e37d9
4 changed files with 39 additions and 25 deletions
  1. 8 4
      client/client.go
  2. 2 2
      follower/follower.go
  3. 3 3
      leader/leader.go
  4. 26 16
      lib/databaseRead.go

+ 8 - 4
client/client.go

@@ -42,10 +42,11 @@ type tweet struct {
 const leader string = "127.0.0.1:4441"
 
 //needs to be changed at leader/follower/client at the same time
-const numClients = 500
+const numClients = 5
 
 //mylimit=8000
 //sudo prlimit --nofile=$mylimit --pid $$; ulimit -n $mylimit
+//for every terminal
 const dataLength int = 256
 
 //Maximum Transport Unit
@@ -60,7 +61,7 @@ var publisherAmount int
 var timeBounds []int
 var speedUp int = 500
 
-var maxTimePerRound time.Duration = 10 * time.Second
+var maxTimePerRound time.Duration = 3 * time.Second
 var startTime int
 
 //todo! expand this for multiple clients
@@ -256,7 +257,7 @@ func client(clientNumber int) {
 			receiveTweets(sharedSecret[clientNumber], leaderConn, false, clientNumber)
 
 			if len(archiveTopicList) > 0 {
-				wantsArchive[0] = 0 //archive test
+				wantsArchive[0] = 1 //archive test
 			} else {
 				wantsArchive[0] = 0
 			}
@@ -452,6 +453,9 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive boo
 		//fmt.Println("PubKey", clientPublicKey[clientNumber], "Bytes", tweets)
 
 		index := strings.Index(string(tweets), ";;")
+		if getArchive {
+			fmt.Println("gettingArchive")
+		}
 		if index != -1 && clientNumber == 0 {
 			//fmt.Println("Correct")
 
@@ -472,7 +476,7 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive boo
 			}
 			*/
 
-		} else if index == -1 {
+		} else if index == -1 && tweets[0] != 0 {
 			fmt.Println("error")
 			fmt.Println("round", round, string(tweets), "length", len(tweets))
 			return

+ 2 - 2
follower/follower.go

@@ -59,8 +59,6 @@ var neededSubscriptions int
 var round int = 0
 var startTime time.Time
 
-var ignoreMe []byte
-
 func main() {
 
 	generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
@@ -572,6 +570,8 @@ func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, leaderWorkerCo
 	}
 }
 
+var ignoreMe []byte
+
 //returns true if client connection is lost
 func handlePirQuery(clientKeys clientKeys, leaderWorkerConnection net.Conn, subPhase int, clientPublicKey [32]byte, doAuditing bool) (clientKeys, [][]byte, bool) {
 

+ 3 - 3
leader/leader.go

@@ -65,14 +65,14 @@ const roundsBeforeUpdate = 1
 const neededSubscriptions = 3
 const numThreads = 12
 const dataLength = 256
-const minDBWriteSize = 10
+const minDBWriteSize = 1000
 
 var dbWriteSize float64 = 20000
 var collisionCounter []float64
 
 var clientsConnected int
 
-var maxTimePerRound time.Duration = 10 * time.Second
+var maxTimePerRound time.Duration = 3 * time.Second
 
 var clientsServedPhase1 []int
 var clientsServedPhase3 []int
@@ -734,7 +734,7 @@ func phase2(followerConnection net.Conn) {
 	collisionCounter = append(collisionCounter, collisions)
 
 	if collisions/dbWriteSize > 0.05 {
-		fmt.Println("Collisions this round", collisions, "dbWriteSize", dbWriteSize)
+		fmt.Println("Collision % this round", collisions/dbWriteSize, "dbWriteSize", dbWriteSize)
 	}
 
 	//fmt.Println("tweets recovered: ", tweets)

+ 26 - 16
lib/databaseRead.go

@@ -3,7 +3,7 @@ package lib
 import (
 	"bytes"
 	"encoding/json"
-	"fmt"
+	"sort"
 )
 
 //topicPointer and textPointer should not be exported
@@ -23,6 +23,7 @@ var archive = make(map[string][]Tweet)
 //has to be dividable by 32
 var minimumBlockSize int
 
+//needs to be dividable by roundsBeforUpdate
 var roundsBeforeArchiving = 2
 
 var topicList []string
@@ -98,7 +99,7 @@ func GetTweets(pirQuery []byte, dataLength int, whereFrom int, pubKey [32]byte)
 		}
 	}
 
-	return tweetsToByteArray(tweetsToReturn, whereFrom)
+	return tweetsToByteArray(tweetsToReturn, whereFrom, wantedTopics)
 }
 
 func maxTweetAmount(whereFrom int) int {
@@ -131,18 +132,18 @@ func getNamesForTopics(wantedIndices []byte, whereFrom int) []string {
 }
 
 //transform struct to byte array for sending
-func tweetsToByteArray(tweetsToReturn [][]Tweet, whereFrom int) []byte {
+func tweetsToByteArray(tweetsToReturn [][]Tweet, whereFrom int, wantedTopics []string) []byte {
 	tweetsAsBytes := make([]byte, minimumBlockSize)
 	for blockIndex, block := range tweetsToReturn {
 		var topicPadding []string
 		var blockToAppend []byte
-		for _, tweet := range block {
+		for index, tweet := range block {
 			for topicIndex, topic := range tweet.Topics {
 				blockToAppend = append(blockToAppend, []byte(topic)...)
 				blockToAppend = append(blockToAppend, ","...)
 
 				//gets the topic used for padding
-				if topicIndex > 0 {
+				if topicIndex > 0 && index < len(wantedTopics) && topic != wantedTopics[index] {
 					topicPadding = append(topicPadding, topic)
 				}
 			}
@@ -163,8 +164,8 @@ func tweetsToByteArray(tweetsToReturn [][]Tweet, whereFrom int) []byte {
 		}
 
 		//grouping using topic from recovered tweets
-		index := 0
-		for len(topicPadding) > index && remainingLength > 0 && topicPadding[index] != "" {
+		index := len(topicPadding) - 1
+		for index >= 0 && remainingLength > 0 && topicPadding[index] != "" {
 			paddingTweet, err := getNextTweet(topicPadding[index], index, whereFrom)
 			if err {
 				break
@@ -175,7 +176,7 @@ func tweetsToByteArray(tweetsToReturn [][]Tweet, whereFrom int) []byte {
 			blockToAppend = append(blockToAppend, paddingTweet...)
 
 			remainingLength -= len(paddingTweet)
-			index++
+			index--
 		}
 
 		if blockIndex == 0 {
@@ -255,12 +256,20 @@ func GetTopicList(whereFrom int) ([]byte, int) {
 //iterates through full dbR and moves old tweets to archive
 func CleanUpdbR(round int) {
 
-	//doesnt work
-	return
 	if roundsBeforeArchiving-round == 0 {
+
+		keys := make([]string, len(dbR))
+		i := 0
+		for k := range dbR {
+			keys[i] = k
+			i++
+		}
+		sort.Strings(keys)
+
 		var tweetsToArchive []Tweet
-		for j := range dbR {
-			tweets := dbR[j]
+
+		for _, topic := range keys {
+			tweets := dbR[topic]
 			for i := len(tweets) - 1; i >= 0; i-- {
 				if round-roundsBeforeArchiving+1 == tweets[i].RoundPosted {
 					//only adds the tweet to the archive when there is text
@@ -272,7 +281,7 @@ func CleanUpdbR(round int) {
 					tweets = row
 				}
 			}
-			dbR[j] = tweets
+			dbR[topic] = tweets
 		}
 		NewEntries(tweetsToArchive, 1)
 
@@ -281,10 +290,11 @@ func CleanUpdbR(round int) {
 		//redoes the whole dbR to correct pointers
 		var tweetsToMain []Tweet
 
-		for i := range dbR {
-			tweets := dbR[i]
+		for _, topic := range keys {
+			tweets := dbR[topic]
 			for _, tweet := range tweets {
 				if tweet.Text != "" {
+					//fmt.Println("tweet", tweet)
 					tweetsToMain = append(tweetsToMain, tweet)
 				}
 			}
@@ -294,7 +304,7 @@ func CleanUpdbR(round int) {
 		dbR = make(map[string][]Tweet)
 		topicList = nil
 
-		fmt.Println("tweetsToMain", tweetsToMain)
 		NewEntries(tweetsToMain, 0)
 	}
+
 }