Forráskód Böngészése

grouping by topics works

Simon 2 éve
szülő
commit
76a68e216e
4 módosított fájl, 81 hozzáadás és 59 törlés
  1. 43 35
      client/client.go
  2. 5 2
      follower/follower.go
  3. 16 12
      leader/leader.go
  4. 17 10
      lib/databaseRead.go

+ 43 - 35
client/client.go

@@ -42,12 +42,11 @@ type tweet struct {
 const leader string = "127.0.0.1:4441"
 
 //needs to be changed at leader/follower/client at the same time
-const numClients = 1
+const numClients = 500
 
 //mylimit=8000
 //sudo prlimit --nofile=$mylimit --pid $$; ulimit -n $mylimit
 const dataLength int = 256
-const numThreads int = 12
 
 //Maximum Transport Unit
 const mtu int = 1100
@@ -59,9 +58,9 @@ var archiveTopicList []string
 var neededSubscriptions int
 var publisherAmount int
 var timeBounds []int
-var speedUp int = 50
+var speedUp int = 500
 
-var maxTimePerRound time.Duration = 5 * time.Second
+var maxTimePerRound time.Duration = 10 * time.Second
 var startTime int
 
 //todo! expand this for multiple clients
@@ -170,7 +169,7 @@ func client(clientNumber int) {
 			sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
 			pos := receiveVirtualAddress(sharedSecret[clientNumber], leaderConn)
 
-			tweet := getRandomTweet(clientNumber)
+			tweet := getRealTweet(clientNumber)
 			if clientNumber == numClients-1 {
 				fmt.Println("publisherAmount", publisherAmount)
 			}
@@ -280,11 +279,8 @@ func client(clientNumber int) {
 //creates and sends the pirQuerys for each server
 func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
 	//later this will be taken from gui, this is only for testing
-	topicsOFInterest := make([]int, 10)
-	topicsOFInterest[0] = 0
-	topicsOFInterest[1] = 1
-	topicsOFInterest[9] = 1
-	archiveInterests[0] = 1
+	topicsOfInterest := make([]int, 1)
+	topicsOfInterest[0] = mr.Intn(10)
 
 	//todo! repeat for archive
 	tmpNeededSubscriptions := neededSubscriptions
@@ -292,8 +288,8 @@ func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
 		tmpNeededSubscriptions = len(topicList)
 	}
 
-	tmptopicsOfInterest := make([]int, len(topicList))
-	copy(tmptopicsOfInterest, topicsOFInterest)
+	tmptopicsOfInterest := make([]int, len(topicsOfInterest))
+	copy(tmptopicsOfInterest, topicsOfInterest)
 
 	tmpTopicList := make([]string, len(topicList))
 	copy(tmpTopicList, topicList)
@@ -311,7 +307,7 @@ func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
 		tmptopicsOfInterest = addFakeInterests(len(tmpTopicList), tmptopicsOfInterest, false)
 	}
 
-	//pirQuery [topicsofinterest][serverAmount][topicAmount]byte
+	//pirQuery [topicsOfInterest][serverAmount][topicAmount]byte
 	pirQuerys := make([][][]byte, len(tmptopicsOfInterest))
 	for i := range pirQuerys {
 		pirQuerys[i] = make([][]byte, 2)
@@ -332,29 +328,28 @@ func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
 		}
 	}
 
-	tmptopicsOfInterestBytes := make([]byte, tmpNeededSubscriptions)
-	for index := range tmptopicsOfInterestBytes {
+	tmptopicsOfInterestBytes := make([]byte, len(tmpTopicList))
+	for index := range tmptopicsOfInterest {
 		if tmptopicsOfInterest[index] == 1 {
 			tmptopicsOfInterestBytes[index] = 1
 		}
 	}
 
-	for topic := range tmptopicsOfInterest {
+	for topicIndex, topic := range tmptopicsOfInterest {
 		for index := range tmpTopicList {
 			if topic == index {
-				if pirQuerys[topic][0][index] == 1 {
-					pirQuerys[topic][1][index] = 0
+				if pirQuerys[topicIndex][0][index] == 1 {
+					pirQuerys[topicIndex][1][index] = 0
 				} else {
-					pirQuerys[topic][1][index] = 1
+					pirQuerys[topicIndex][1][index] = 1
 				}
 			} else {
-				if pirQuerys[topic][0][index] == 0 {
-					pirQuerys[topic][1][index] = 0
+				if pirQuerys[topicIndex][0][index] == 0 {
+					pirQuerys[topicIndex][1][index] = 0
 				} else {
-					pirQuerys[topic][1][index] = 1
+					pirQuerys[topicIndex][1][index] = 1
 				}
 			}
-
 		}
 	}
 
@@ -458,15 +453,28 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive boo
 
 		index := strings.Index(string(tweets), ";;")
 		if index != -1 && clientNumber == 0 {
+			//fmt.Println("Correct")
+
+			textArr := strings.Split(string(tweets), ";;;")
+			text := textArr[:len(textArr)-1]
+
+			fmt.Println("Round", round, text[0], "Length", len(tweets))
+			fmt.Println("padding", text[1])
 
-			fmt.Println("Correct")
-			//textArr := strings.Split(string(tweets), ";;;;;;;;")
-			//text := string(tweets)[:len(textArr)-1]
-			fmt.Println("Round", round, "Text", string(tweets), "Length", len(tweets))
+			/* doesnt work
+			goodPadding := strings.Index(text[0], text[1])
+			if text[0] == text[1] || text[1] == "" {
+				goodPadding = -1
+			}
+			if goodPadding != -1 {
+				fmt.Println("Round", round, text[0], "Length", len(tweets))
+				fmt.Println("padding", text[1])
+			}
+			*/
 
 		} else if index == -1 {
 			fmt.Println("error")
-			fmt.Println("round", round, "text:", string(tweets), "length", len(tweets))
+			fmt.Println("round", round, string(tweets), "length", len(tweets))
 			return
 			//panic("received text not of correct format")
 		}
@@ -542,15 +550,15 @@ func addFakeInterests(max int, topicsOfInterest []int, doAuditing bool) []int {
 		tmpNeededSubscriptions = len(topicList)
 	}
 
-	fakeTopicsOfInterest := make([]int, tmpNeededSubscriptions)
+	faketopicsOfInterest := make([]int, tmpNeededSubscriptions)
 	maxInt := max
 
 	//fills the array with unique random ascending values ranging from 0 to max
 	for i := 0; i < tmpNeededSubscriptions; i++ {
-		fakeTopicsOfInterest[i] = mr.Intn(maxInt)
+		faketopicsOfInterest[i] = mr.Intn(maxInt)
 
 		for j := 0; j < i; j++ {
-			if fakeTopicsOfInterest[i] == fakeTopicsOfInterest[j] {
+			if faketopicsOfInterest[i] == faketopicsOfInterest[j] {
 				i--
 				break
 			}
@@ -558,12 +566,12 @@ func addFakeInterests(max int, topicsOfInterest []int, doAuditing bool) []int {
 	}
 
 	if doAuditing {
-		sort.Ints(fakeTopicsOfInterest)
-		return fakeTopicsOfInterest
+		sort.Ints(faketopicsOfInterest)
+		return faketopicsOfInterest
 	}
 
 	//adds unique and new random numbers to topicOfInterests until length is satisfied
-	for _, number := range fakeTopicsOfInterest {
+	for _, number := range faketopicsOfInterest {
 		if !inList(number, topicsOfInterest) {
 			topicsOfInterest = append(topicsOfInterest, number)
 		}
@@ -728,7 +736,7 @@ func getRandomTweet(clientNumber int) []byte {
 	if maxTopics == 0 {
 		maxTopics = 1
 	}
-	maxInt := 3
+	maxInt := 100
 	topicNumbers := make([]int, maxTopics)
 	//fills the array with unique random ascending values ranging from 0 to maxInt
 	for i := 0; i < maxTopics; i++ {

+ 5 - 2
follower/follower.go

@@ -424,9 +424,9 @@ func phase2(leaderWorkerConnection net.Conn) {
 						} else {
 							//if topics are
 							//int
-							topic = topic + fmt.Sprint(int(letter))
+							//topic = topic + fmt.Sprint(int(letter))
 							//string
-							//topic = topic + string(letter)
+							topic = topic + string(letter)
 						}
 					} else if position == 1 {
 						text = text + string(letter)
@@ -453,6 +453,9 @@ func phase2(leaderWorkerConnection net.Conn) {
 	//gets current dbWriteSize from leader
 	dbWriteSizeBytes := readFrom(leaderWorkerConnection, 4)
 	dbWriteSize = byteToInt(dbWriteSizeBytes)
+
+	lib.CleanUpdbR(round)
+
 }
 
 func addTestTweets() {

+ 16 - 12
leader/leader.go

@@ -61,8 +61,8 @@ var topicAmount int
 var archiveTopicAmount int
 
 // every roundsBeforeUpdate the client updates his pirQuery
-const roundsBeforeUpdate = 5
-const neededSubscriptions = 1
+const roundsBeforeUpdate = 1
+const neededSubscriptions = 3
 const numThreads = 12
 const dataLength = 256
 const minDBWriteSize = 10
@@ -72,7 +72,7 @@ var collisionCounter []float64
 
 var clientsConnected int
 
-var maxTimePerRound time.Duration = 5 * time.Second
+var maxTimePerRound time.Duration = 10 * time.Second
 
 var clientsServedPhase1 []int
 var clientsServedPhase3 []int
@@ -193,7 +193,9 @@ func main() {
 		for {
 			clientConnection, err := lnClients.Accept()
 			if err != nil {
-				fmt.Println("error", err)
+				fmt.Println("Client connection error 1", err)
+				clientConnection.Close()
+				break
 			}
 			clientConnection.SetDeadline(time.Time{})
 
@@ -206,7 +208,7 @@ func main() {
 			//send leader publicKey
 			_, err = clientConnection.Write(leaderPublicKey[:])
 			if err != nil {
-				fmt.Println("error", err)
+				fmt.Println("Client connection error 2", err)
 				clientConnection.Close()
 				break
 			}
@@ -214,7 +216,7 @@ func main() {
 			//send follower publicKey
 			_, err = clientConnection.Write(followerPublicKey[:])
 			if err != nil {
-				fmt.Println("error", err)
+				fmt.Println("Client connection error 3", err)
 				clientConnection.Close()
 				break
 			}
@@ -224,21 +226,21 @@ func main() {
 			//gets publicKey from client
 			_, err = clientConnection.Read(tmpClientPublicKey[:])
 			if err != nil {
-				fmt.Println("error", err)
+				fmt.Println("Client connection error 4", err)
 				clientConnection.Close()
 				break
 			}
 
 			_, err = clientConnection.Write(intToByte(neededSubscriptions))
 			if err != nil {
-				fmt.Println("error", err)
+				fmt.Println("Client connection error 5", err)
 				clientConnection.Close()
 				break
 			}
 
 			_, err = clientConnection.Write(intToByte(int(startTime.Unix())))
 			if err != nil {
-				fmt.Println("error", err)
+				fmt.Println("Client connection error 6", err)
 				clientConnection.Close()
 				break
 			}
@@ -708,9 +710,9 @@ func phase2(followerConnection net.Conn) {
 						} else {
 							//if topics are
 							//int
-							topic = topic + fmt.Sprint(int(letter))
+							//topic = topic + fmt.Sprint(int(letter))
 							//string
-							//topic = topic + string(letter)
+							topic = topic + string(letter)
 						}
 					} else if position == 1 {
 						text = text + string(letter)
@@ -770,6 +772,8 @@ func phase2(followerConnection net.Conn) {
 
 	//writes dbWriteSize of current round to follower
 	writeTo(followerConnection, intToByte(int(dbWriteSize)))
+
+	lib.CleanUpdbR(round)
 }
 
 func addTestTweets() {
@@ -1350,7 +1354,7 @@ func handleError(connection, followerConnection net.Conn, size int, err error) b
 			if size > mtu {
 				fmt.Println("have a look here")
 			}
-			fmt.Println("error", err)
+			fmt.Println("handleError", err)
 			array := make([]byte, size)
 			array[0] = 1
 			_, err = followerConnection.Write(array)

+ 17 - 10
lib/databaseRead.go

@@ -23,7 +23,7 @@ var archive = make(map[string][]Tweet)
 //has to be dividable by 32
 var minimumBlockSize int
 
-var roundsBeforeArchiving = 3
+var roundsBeforeArchiving = 2
 
 var topicList []string
 
@@ -73,7 +73,7 @@ func NewEntries(inputTweets []Tweet, whereTo int) {
 //todo! add round to pirquery only get tweets that have been posted from that round onward
 func GetTweets(pirQuery []byte, dataLength int, whereFrom int, pubKey [32]byte) []byte {
 	//fmt.Println("query", pirQuery)
-	fmt.Println("dbR", dbR)
+	//fmt.Println("dbR", dbR)
 	tmpdb := dbR
 	if whereFrom == 1 {
 		tmpdb = archive
@@ -154,10 +154,14 @@ func tweetsToByteArray(tweetsToReturn [][]Tweet, whereFrom int) []byte {
 		}
 
 		//adds padding
-		blockToAppend = append(blockToAppend, []byte(";;")[:]...)
+		blockToAppend = append(blockToAppend, []byte(";;;")[:]...)
 		remainingLength := minimumBlockSize - len(blockToAppend)
 		//fmt.Println("block", string(blockToAppend))
 
+		if blockIndex == 0 {
+			//	fmt.Println("tweet", string(blockToAppend))
+		}
+
 		//grouping using topic from recovered tweets
 		index := 0
 		for len(topicPadding) > index && remainingLength > 0 && topicPadding[index] != "" {
@@ -175,7 +179,7 @@ func tweetsToByteArray(tweetsToReturn [][]Tweet, whereFrom int) []byte {
 		}
 
 		if blockIndex == 0 {
-			fmt.Println("0000", string(blockToAppend))
+			//	fmt.Println("tweetPadded", string(blockToAppend))
 		}
 
 		padding := bytes.Repeat([]byte(";"), remainingLength)
@@ -219,7 +223,7 @@ func getNextTweet(wantedTopic string, index, whereFrom int) ([]byte, bool) {
 		}
 	}
 
-	fmt.Println("nextTweet", tweetToReturn)
+	//fmt.Println("nextTweet", tweetToReturn)
 	var tweetToReturnBytes []byte
 	for _, topic := range tweetToReturn.Topics {
 		tweetToReturnBytes = append(tweetToReturnBytes, []byte(topic)...)
@@ -250,28 +254,30 @@ func GetTopicList(whereFrom int) ([]byte, int) {
 
 //iterates through full dbR and moves old tweets to archive
 func CleanUpdbR(round int) {
-	//is broken
-	return
 
+	//doesnt work
+	return
 	if roundsBeforeArchiving-round == 0 {
-		roundsBeforeArchiving = roundsBeforeArchiving + roundsBeforeArchiving
 		var tweetsToArchive []Tweet
 		for j := range dbR {
 			tweets := dbR[j]
 			for i := len(tweets) - 1; i >= 0; i-- {
-				if round-roundsBeforeArchiving == tweets[i].RoundPosted {
+				if round-roundsBeforeArchiving+1 == tweets[i].RoundPosted {
 					//only adds the tweet to the archive when there is text
 					if tweets[i].Text != "" {
 						tweetsToArchive = append(tweetsToArchive, tweets[i])
 					}
 					//delets the tweet from the array
-					tweets = append(tweets[:i], tweets[i+1:]...)
+					row := append(tweets[:i], tweets[i+1:]...)
+					tweets = row
 				}
 			}
 			dbR[j] = tweets
 		}
 		NewEntries(tweetsToArchive, 1)
 
+		roundsBeforeArchiving = roundsBeforeArchiving + roundsBeforeArchiving
+
 		//redoes the whole dbR to correct pointers
 		var tweetsToMain []Tweet
 
@@ -288,6 +294,7 @@ func CleanUpdbR(round int) {
 		dbR = make(map[string][]Tweet)
 		topicList = nil
 
+		fmt.Println("tweetsToMain", tweetsToMain)
 		NewEntries(tweetsToMain, 0)
 	}
 }