Browse Source

started adding error handling for clientDC

Simon 2 years ago
parent
commit
b70f0b4e40
3 changed files with 437 additions and 147 deletions
  1. 22 21
      client/client.go
  2. 92 33
      follower/follower.go
  3. 323 93
      leader/leader.go

+ 22 - 21
client/client.go

@@ -41,7 +41,7 @@ const leader string = "127.0.0.1:4441"
 
 //needs to be changed at leader/follower/client at the same time
 const neededSubscriptions = 1
-const numClients = 2
+const numClients = 1
 const dataLength int = 64
 const numThreads int = 12
 
@@ -112,25 +112,25 @@ func client(clientNumber int) {
 	followerPublicKey = &tmpFollowerPubKey
 
 	//sends own public key
-	writeToConn(leaderConn, clientPublicKey[:])
+	writeTo(leaderConn, clientPublicKey[:])
 
 	//setup ends above
 	//while client is active he is always connected and has to participate
 
 	for {
 		//gets current phase
-		phase := readFromConn(leaderConn, 1)
+		phase := readFrom(leaderConn, 1)
 
 		fmt.Println("Phase: ", phase[0])
 
 		if phase[0] == 1 {
 
 			//gets current dbWriteSize from leader
-			dbWriteSizeBytes := readFromConn(leaderConn, 4)
+			dbWriteSizeBytes := readFrom(leaderConn, 4)
 			dbWriteSize = byteToInt(dbWriteSizeBytes)
 
 			//todo! put into tweet creation
-			//roundAsBytes := readFromConn(leaderConn, 4)
+			//roundAsBytes := readFrom(leaderConn, 4)
 			roundAsBytes := make([]byte, 4)
 			_, err = leaderConn.Read(roundAsBytes)
 			if err != nil {
@@ -180,11 +180,11 @@ func client(clientNumber int) {
 
 			//writes the dpfQuery to the leader
 			dpfLengthBytes := intToByte(len(dpfQueryAEncrypted))
-			writeToConn(leaderConn, dpfLengthBytes)
+			writeTo(leaderConn, dpfLengthBytes)
 
-			writeToConn(leaderConn, dpfQueryAEncrypted)
+			writeTo(leaderConn, dpfQueryAEncrypted)
 
-			writeToConn(leaderConn, dpfQueryBEncrypted)
+			writeTo(leaderConn, dpfQueryBEncrypted)
 
 			C.free(unsafe.Pointer(dpfQueryA))
 			C.free(unsafe.Pointer(dpfQueryB))
@@ -198,7 +198,7 @@ func client(clientNumber int) {
 				2 : no update needed
 					nothing
 			*/
-			subPhase := readFromConn(leaderConn, 1)
+			subPhase := readFrom(leaderConn, 1)
 
 			var encryptedQueryLeader, encryptedQueryFollower []byte
 			//first time participating
@@ -231,7 +231,7 @@ func client(clientNumber int) {
 				wantsArchive[0] = 0
 			}
 
-			writeToConn(leaderConn, wantsArchive)
+			writeTo(leaderConn, wantsArchive)
 
 			if wantsArchive[0] == 1 && len(archiveTopicList) > 0 {
 				encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(-1, clientNumber)
@@ -354,21 +354,21 @@ func sendQuerys(encryptedQueryLeader, encryptedQueryFollower []byte, leaderConn
 	encryptedLength := len(encryptedQueryLeader)
 
 	//sends the pirQuerysLength to the leader
-	writeToConn(leaderConn, intToByte(encryptedLength))
+	writeTo(leaderConn, intToByte(encryptedLength))
 
 	//sends the pirQuerys to the leader
-	writeToConn(leaderConn, encryptedQueryLeader)
+	writeTo(leaderConn, encryptedQueryLeader)
 
-	writeToConn(leaderConn, encryptedQueryFollower)
+	writeTo(leaderConn, encryptedQueryFollower)
 
 	if getArchive {
-		writeToConn(leaderConn, intToByte(len(archiveInterests)))
+		writeTo(leaderConn, intToByte(len(archiveInterests)))
 	}
 }
 
 func receiveVirtualAddress(sharedSecret [2][32]byte, leaderConn net.Conn) int {
 
-	virtualAddressByte := readFromConn(leaderConn, 4)
+	virtualAddressByte := readFrom(leaderConn, 4)
 
 	//xores the sharedSecret
 	for h := 0; h < 2; h++ {
@@ -393,10 +393,10 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive boo
 	}
 	for i := 0; i < tmpNeededSubscriptions; i++ {
 		//client receives tweets
-		tweetsLengthBytes := readFromConn(leaderConn, 4)
+		tweetsLengthBytes := readFrom(leaderConn, 4)
 		tweetsLength := byteToInt(tweetsLengthBytes)
 
-		tweets := readFromConn(leaderConn, tweetsLength)
+		tweets := readFrom(leaderConn, tweetsLength)
 
 		//expand sharedSecret so it is of right length
 		expandBy := len(tweets) / 32
@@ -452,6 +452,7 @@ func createAuditPIRQuery(clientNumber int) ([]byte, []byte) {
 	copy(pirQuerys[1], pirQuerys[0])
 
 	//the positon the virtual address will be taken from
+	//todo! invalid argument error
 	pos := mr.Intn(dbWriteSize)
 	pirQuerys[0][pos] = 1
 	pirQuerys[1][pos] = 0
@@ -536,9 +537,9 @@ func inList(number int, list []int) bool {
 
 func receiveTopicLists(leaderConn net.Conn) {
 	for i := 0; i < 2; i++ {
-		topicListLength := readFromConn(leaderConn, 4)
+		topicListLength := readFrom(leaderConn, 4)
 
-		recTopicList := readFromConn(leaderConn, byteToInt(topicListLength))
+		recTopicList := readFrom(leaderConn, byteToInt(topicListLength))
 
 		var tmpTopicList []string
 		arrayReader := bytes.NewReader(recTopicList[:])
@@ -572,7 +573,7 @@ func getTweet(clientNumber int) []byte {
 }
 
 //sends the array to the connection
-func writeToConn(connection net.Conn, array []byte) {
+func writeTo(connection net.Conn, array []byte) {
 	_, err := connection.Write(array)
 	if err != nil {
 		panic(err)
@@ -580,7 +581,7 @@ func writeToConn(connection net.Conn, array []byte) {
 }
 
 //reads an array which is returned and of size "size" from the connection
-func readFromConn(connection net.Conn, size int) []byte {
+func readFrom(connection net.Conn, size int) []byte {
 	array := make([]byte, size)
 	_, err := connection.Read(array)
 	if err != nil {

+ 92 - 33
follower/follower.go

@@ -56,6 +56,8 @@ var maxTimePerRound time.Duration = 5 * time.Second
 var round int = 1
 var startTime time.Time
 
+var ignoreMe []byte
+
 func main() {
 
 	generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
@@ -124,7 +126,7 @@ func main() {
 	}
 
 	//send publicKey to leader
-	writeToConn(leaderConnection, followerPublicKey[:])
+	writeTo(leaderConnection, followerPublicKey[:])
 
 	//receives leader PublicKey
 	var tmpLeaderPubKey [32]byte
@@ -154,7 +156,7 @@ func main() {
 		//receives the virtualAddresses
 		virtualAddresses := make([]int, dbWriteSize+1)
 		for i := 0; i <= dbWriteSize; i++ {
-			virtualAddress := readFromConn(leaderConnection, 4)
+			virtualAddress := readFrom(leaderConnection, 4)
 			virtualAddresses[i] = byteToInt(virtualAddress)
 		}
 
@@ -219,7 +221,7 @@ func phase1(id int, leaderWorkerConnection net.Conn, m *sync.RWMutex, wg *sync.W
 	gotClient := make([]byte, 1)
 
 	for {
-		gotClient = readFromConn(leaderWorkerConnection, 1)
+		gotClient = readFrom(leaderWorkerConnection, 1)
 
 		//this worker is done
 		if gotClient[0] == 0 {
@@ -239,6 +241,7 @@ func phase1(id int, leaderWorkerConnection net.Conn, m *sync.RWMutex, wg *sync.W
 		var tmpClientPublicKey [32]byte
 		_, err := leaderWorkerConnection.Read(tmpClientPublicKey[:])
 		if err != nil {
+			fmt.Println("no error handling")
 			panic(err)
 		}
 		clientPublicKey = &tmpClientPublicKey
@@ -247,7 +250,10 @@ func phase1(id int, leaderWorkerConnection net.Conn, m *sync.RWMutex, wg *sync.W
 		clientKeys := clientData[tmpClientPublicKey]
 		m.RUnlock()
 
-		clientKeys, pirQuery := handlePirQuery(clientKeys, leaderWorkerConnection, 0, tmpClientPublicKey, true)
+		clientKeys, pirQuery, errorBool := handlePirQuery(clientKeys, leaderWorkerConnection, 0, tmpClientPublicKey, true)
+		if errorBool {
+			continue
+		}
 		getSendVirtualAddress(pirQuery[0], virtualAddresses, clientKeys.SharedSecret, leaderWorkerConnection)
 
 		m.Lock()
@@ -255,12 +261,18 @@ func phase1(id int, leaderWorkerConnection net.Conn, m *sync.RWMutex, wg *sync.W
 		m.Unlock()
 
 		//gets dpfQuery from leader
-		dpfLengthBytes := readFromConn(leaderWorkerConnection, 4)
+		dpfLengthBytes, errorBool := readFromWError(leaderWorkerConnection, 4)
+		if errorBool {
+			continue
+		}
 		dpfLength := byteToInt(dpfLengthBytes)
 
 		dpfQueryBEncrypted := make([]byte, dpfLength)
 
-		dpfQueryBEncrypted = readFromConn(leaderWorkerConnection, dpfLength)
+		dpfQueryBEncrypted, errorBool = readFromWError(leaderWorkerConnection, dpfLength)
+		if errorBool {
+			continue
+		}
 
 		//decrypt dpfQueryB for sorting into db
 		var decryptNonce [24]byte
@@ -277,9 +289,12 @@ func phase1(id int, leaderWorkerConnection net.Conn, m *sync.RWMutex, wg *sync.W
 
 		dataShareLeader := make([]byte, ds)
 
-		writeToConn(leaderWorkerConnection, dataShareFollower)
+		writeTo(leaderWorkerConnection, dataShareFollower)
 
-		dataShareLeader = readFromConn(leaderWorkerConnection, ds)
+		dataShareLeader, errorBool = readFromWError(leaderWorkerConnection, ds)
+		if errorBool {
+			continue
+		}
 
 		auditXOR := make([]byte, ds)
 		passedAudit := true
@@ -331,7 +346,7 @@ func phase2(leaderWorkerConnection net.Conn) {
 	}
 
 	//receive seed from leader
-	seedLeader := readFromConn(leaderWorkerConnection, 16)
+	seedLeader := readFrom(leaderWorkerConnection, 16)
 
 	//receive data from leader
 	tmpdbLeader := make([][]byte, dbSize)
@@ -339,15 +354,15 @@ func phase2(leaderWorkerConnection net.Conn) {
 		tmpdbLeader[i] = make([]byte, dataLength)
 	}
 	for i := 0; i < dbSize; i++ {
-		tmpdbLeader[i] = readFromConn(leaderWorkerConnection, dataLength)
+		tmpdbLeader[i] = readFrom(leaderWorkerConnection, dataLength)
 	}
 
 	//writes seed to leader
-	writeToConn(leaderWorkerConnection, seedFollower)
+	writeTo(leaderWorkerConnection, seedFollower)
 
 	//write data to leader
 	for i := 0; i < dbSize; i++ {
-		writeToConn(leaderWorkerConnection, tmpdbFollower[i])
+		writeTo(leaderWorkerConnection, tmpdbFollower[i])
 	}
 	//put together the db
 	tmpdb := make([][]byte, dbSize)
@@ -375,7 +390,7 @@ func phase2(leaderWorkerConnection net.Conn) {
 
 	//send own Ciphers to leader
 	for i := 0; i < dbSize; i++ {
-		writeToConn(leaderWorkerConnection, C.GoBytes(unsafe.Pointer(ciphersFollowers[i]), 16))
+		writeTo(leaderWorkerConnection, C.GoBytes(unsafe.Pointer(ciphersFollowers[i]), 16))
 	}
 
 	//put in ciphers from leader
@@ -434,7 +449,7 @@ func phase2(leaderWorkerConnection net.Conn) {
 	C.resetDb()
 
 	//gets current dbWriteSize from leader
-	dbWriteSizeBytes := readFromConn(leaderWorkerConnection, 4)
+	dbWriteSizeBytes := readFrom(leaderWorkerConnection, 4)
 	dbWriteSize = byteToInt(dbWriteSizeBytes)
 }
 
@@ -458,10 +473,11 @@ func addTestTweets() {
 
 func phase3(leaderWorkerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex) {
 
-	gotClient := make([]byte, 1)
-
 	for {
-		gotClient = readFromConn(leaderWorkerConnection, 1)
+		gotClient, errorBool := readFromWError(leaderWorkerConnection, 1)
+		if errorBool {
+			continue
+		}
 
 		//this worker is done
 		if gotClient[0] == 0 {
@@ -469,11 +485,15 @@ func phase3(leaderWorkerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex
 			return
 		}
 
-		subPhase := readFromConn(leaderWorkerConnection, 1)
+		subPhase, errorBool := readFromWError(leaderWorkerConnection, 1)
+		if errorBool {
+			continue
+		}
 
 		var clientPublicKey [32]byte
 		_, err := leaderWorkerConnection.Read(clientPublicKey[:])
 		if err != nil {
+			fmt.Println("no error handling")
 			panic(err)
 		}
 
@@ -483,17 +503,25 @@ func phase3(leaderWorkerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex
 		m.RUnlock()
 
 		if subPhase[0] == 0 || subPhase[0] == 1 {
-			clientKeys, _ = handlePirQuery(clientKeys, leaderWorkerConnection, int(subPhase[0]), clientPublicKey, false)
+			clientKeys, _, errorBool = handlePirQuery(clientKeys, leaderWorkerConnection, int(subPhase[0]), clientPublicKey, false)
+			if errorBool {
+				continue
+			}
 		}
 
 		getSendTweets(clientKeys, nil, leaderWorkerConnection)
 
-		wantsArchive := readFromConn(leaderWorkerConnection, 1)
+		wantsArchive, errorBool := readFromWError(leaderWorkerConnection, 1)
+		if errorBool {
+			continue
+		}
 
 		if wantsArchive[0] == 1 {
-			_, archiveQuerys := handlePirQuery(clientKeys, leaderWorkerConnection, -1, clientPublicKey, false)
+			_, archiveQuerys, errorBool := handlePirQuery(clientKeys, leaderWorkerConnection, -1, clientPublicKey, false)
+			if errorBool {
+				continue
+			}
 			getSendTweets(clientKeys, archiveQuerys, leaderWorkerConnection)
-
 		}
 
 		//saves clientKeys
@@ -503,6 +531,7 @@ func phase3(leaderWorkerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex
 	}
 }
 
+//gets tweet from db and sends them to leader
 func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, leaderWorkerConnection net.Conn) {
 	tmpNeededSubscriptions := neededSubscriptions
 	if archiveQuerys != nil {
@@ -532,25 +561,39 @@ func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, leaderWorkerCo
 		//sends tweets to leader
 		tweetsLengthBytes := intToByte(len(tweets))
 
-		writeToConn(leaderWorkerConnection, tweetsLengthBytes)
+		writeTo(leaderWorkerConnection, tweetsLengthBytes)
 
-		writeToConn(leaderWorkerConnection, tweets)
+		writeTo(leaderWorkerConnection, tweets)
 	}
 }
 
-func handlePirQuery(clientKeys clientKeys, leaderWorkerConnection net.Conn, subPhase int, clientPublicKey [32]byte, doAuditing bool) (clientKeys, [][]byte) {
+//returns true if client connection is lost
+func handlePirQuery(clientKeys clientKeys, leaderWorkerConnection net.Conn, subPhase int, clientPublicKey [32]byte, doAuditing bool) (clientKeys, [][]byte, bool) {
+
+	test := readFrom(leaderWorkerConnection, 1)
+	fmt.Println("test", test)
 
 	archiveNeededSubscriptions := make([]byte, 4)
 	if subPhase == -1 {
-		archiveNeededSubscriptions = readFromConn(leaderWorkerConnection, 4)
+		archiveNeededSubscriptions, errorBool := readFromWError(leaderWorkerConnection, 4)
+		if errorBool {
+			return clientKeys, nil, true
+		}
+		ignoreMe = archiveNeededSubscriptions
 	}
 
 	//gets the msg length
-	msgLengthBytes := readFromConn(leaderWorkerConnection, 4)
+	msgLengthBytes, errorBool := readFromWError(leaderWorkerConnection, 4)
+	if errorBool {
+		return clientKeys, nil, true
+	}
 	msgLength := byteToInt(msgLengthBytes)
 
 	//gets the message
-	message := readFromConn(leaderWorkerConnection, msgLength)
+	message, errorBool := readFromWError(leaderWorkerConnection, msgLength)
+	if errorBool {
+		return clientKeys, nil, true
+	}
 
 	var decryptNonce [24]byte
 	copy(decryptNonce[:], message[:24])
@@ -572,7 +615,7 @@ func handlePirQuery(clientKeys clientKeys, leaderWorkerConnection net.Conn, subP
 		if doAuditing {
 			result := make([][]byte, 1)
 			result[0] = decrypted
-			return clientKeys, result
+			return clientKeys, result, false
 		}
 
 		//follower updates sharedSecret
@@ -606,7 +649,7 @@ func handlePirQuery(clientKeys clientKeys, leaderWorkerConnection net.Conn, subP
 		clientKeys.PirQuery = pirQuerys
 	}
 
-	return clientKeys, pirQuerys
+	return clientKeys, pirQuerys, false
 }
 
 func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret [32]byte, leaderWorkerConnection net.Conn) {
@@ -627,11 +670,11 @@ func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret
 		virtualAddress[i] = virtualAddress[i] ^ sharedSecret[i]
 	}
 
-	writeToConn(leaderWorkerConnection, virtualAddress)
+	writeTo(leaderWorkerConnection, virtualAddress)
 }
 
 //sends the array to the connection
-func writeToConn(connection net.Conn, array []byte) {
+func writeTo(connection net.Conn, array []byte) {
 	_, err := connection.Write(array)
 	if err != nil {
 		panic(err)
@@ -639,7 +682,8 @@ func writeToConn(connection net.Conn, array []byte) {
 }
 
 //reads an array which is returned and of size "size" from the connection
-func readFromConn(connection net.Conn, size int) []byte {
+//returned bool is one if connection to client was lost
+func readFrom(connection net.Conn, size int) []byte {
 	array := make([]byte, size)
 	_, err := connection.Read(array)
 	if err != nil {
@@ -648,6 +692,21 @@ func readFromConn(connection net.Conn, size int) []byte {
 	return array
 }
 
+//reads an array which is returned and of size "size" from the connection
+//returns true if connection to client is lost
+func readFromWError(connection net.Conn, size int) ([]byte, bool) {
+	array := make([]byte, size+1)
+	_, err := connection.Read(array)
+	if err != nil {
+		panic(err)
+	}
+	if array[0] == 1 {
+		return nil, true
+	}
+	fmt.Println(array)
+	return array, false
+}
+
 func transformBytesToStringArray(topicsAsBytes []byte) []string {
 	var topics []string
 	var topic string

+ 323 - 93
leader/leader.go

@@ -125,7 +125,7 @@ func main() {
 	followerPublicKey = &tmpFollowerPubKey
 
 	//send publicKey to follower
-	writeToConn(followerConnection, leaderPublicKey[:])
+	writeTo(followerConnection, leaderPublicKey[:], nil, 0)
 
 	//goroutine for accepting new clients
 	go func() {
@@ -184,7 +184,10 @@ func main() {
 			clientConnection.SetDeadline(time.Time{})
 
 			//sends topicList so client can participate in phase 3 asap
-			sendTopicLists(clientConnection)
+			errorBool := sendTopicLists(clientConnection, followerConnection, true)
+			if errorBool {
+				break
+			}
 
 			//send leader publicKey
 			_, err = clientConnection.Write(leaderPublicKey[:])
@@ -252,7 +255,7 @@ func main() {
 		virtualAddresses := createVirtualAddresses()
 		//send all virtualAddresses to follower
 		for i := 0; i <= dbWriteSize; i++ {
-			writeToConn(followerConnection, intToByte(virtualAddresses[i]))
+			writeTo(followerConnection, intToByte(virtualAddresses[i]), nil, 0)
 		}
 
 		for id := 0; id < numThreads; id++ {
@@ -323,7 +326,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 	for len(phase1Channel) == 0 {
 		if time.Since(startTime) > maxTimePerRound {
 			//tells follower that this worker is done
-			writeToConn(followerConnection, gotClient)
+			writeTo(followerConnection, gotClient, nil, 0)
 			wg.Done()
 			return
 		}
@@ -334,13 +337,13 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		gotClient[0] = 1
 		//tells follower that this worker got a clientConnection
-		writeToConn(followerConnection, gotClient)
+		writeTo(followerConnection, gotClient, nil, 0)
 
 		//sends clients publicKey to follower
 		m.RLock()
 		clientPublicKey := clientData[clientConnection.RemoteAddr()].PublicKey
 		m.RUnlock()
-		writeToConn(followerConnection, clientPublicKey[:])
+		writeTo(followerConnection, clientPublicKey[:], nil, 0)
 
 		//setup the worker-specific db
 		dbSize := int(C.dbSize)
@@ -350,40 +353,101 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		}
 
 		//tells client that phase 1 has begun
-		writeToConn(clientConnection, phase)
+		errorBool := writeTo(clientConnection, phase, followerConnection, 5)
+		if errorBool {
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			if contBool {
+				continue
+			} else {
+				return
+			}
+		}
 
 		//tells client current dbWriteSize
-		writeToConn(clientConnection, intToByte(dbWriteSize))
+		errorBool = writeTo(clientConnection, intToByte(dbWriteSize), followerConnection, 5)
+		if errorBool {
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			if contBool {
+				continue
+			} else {
+				return
+			}
+		}
 
 		//tells client current round
-		writeToConn(clientConnection, roundAsBytes)
+		errorBool = writeTo(clientConnection, roundAsBytes, followerConnection, 5)
+		if errorBool {
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			if contBool {
+				continue
+			} else {
+				return
+			}
+		}
 
 		m.RLock()
 		var clientKeys = clientData[clientConnection.RemoteAddr()]
 		m.RUnlock()
-		clientKeys, pirQuery := handlePirQuery(clientKeys, clientConnection, followerConnection, 0, true)
-		getSendVirtualAddress(pirQuery[0], virtualAddresses, clientKeys.SharedSecret, clientConnection, followerConnection)
+		clientKeys, pirQuery, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, 0, true)
+		if errorBool {
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			if contBool {
+				continue
+			} else {
+				return
+			}
+		}
+		errorBool = getSendVirtualAddress(pirQuery[0], virtualAddresses, clientKeys.SharedSecret, clientConnection, followerConnection)
+		if errorBool {
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			if contBool {
+				continue
+			} else {
+				return
+			}
+		}
 
 		m.Lock()
 		clientData[clientConnection.RemoteAddr()] = clientKeys
 		m.Unlock()
 
 		//accept dpfQuery from client
-		dpfLengthBytes := make([]byte, 4)
-		dpfLengthBytes = readFromConn(clientConnection, 4)
+		dpfLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
+		if errorBool {
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			if contBool {
+				continue
+			} else {
+				return
+			}
+		}
 
 		dpfLength := byteToInt(dpfLengthBytes)
 
-		dpfQueryAEncrypted := make([]byte, dpfLength)
-		dpfQueryBEncrypted := make([]byte, dpfLength)
-
-		dpfQueryAEncrypted = readFromConn(clientConnection, dpfLength)
+		dpfQueryAEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
+		if errorBool {
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			if contBool {
+				continue
+			} else {
+				return
+			}
+		}
 
-		dpfQueryBEncrypted = readFromConn(clientConnection, dpfLength)
+		dpfQueryBEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
+		if errorBool {
+			contBool := handleClientDC(wg, followerConnection, phase1Channel)
+			if contBool {
+				continue
+			} else {
+				return
+			}
+		}
 
-		writeToConn(followerConnection, dpfLengthBytes)
+		//handledc
+		writeTo(followerConnection, dpfLengthBytes, nil, 0)
 
-		writeToConn(followerConnection, dpfQueryBEncrypted)
+		writeTo(followerConnection, dpfQueryBEncrypted, nil, 0)
 
 		//decrypt dpfQueryA for sorting into db
 		var decryptNonce [24]byte
@@ -398,11 +462,9 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		pos := C.getUint128_t(C.int(virtualAddresses[dbWriteSize]))
 		C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShareLeader[0]))
 
-		dataShareFollower := make([]byte, ds)
-
-		dataShareFollower = readFromConn(followerConnection, ds)
+		dataShareFollower, _ := readFrom(followerConnection, ds, nil, 0)
 
-		writeToConn(followerConnection, dataShareLeader)
+		writeTo(followerConnection, dataShareLeader, nil, 0)
 
 		auditXOR := make([]byte, ds)
 		passedAudit := true
@@ -452,7 +514,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 				//tells follower that this worker is done
 				gotClient[0] = 0
 
-				writeToConn(followerConnection, gotClient)
+				writeTo(followerConnection, gotClient, nil, 0)
 
 				wg.Done()
 				return
@@ -478,17 +540,16 @@ func phase2(followerConnection net.Conn) {
 	}
 
 	//writes seed to follower
-	writeToConn(followerConnection, seedLeader)
+	writeTo(followerConnection, seedLeader, nil, 0)
 
 	//write data to follower
 	//this is surely inefficent
 	for i := 0; i < dbSize; i++ {
-		writeToConn(followerConnection, tmpdbLeader[i])
+		writeTo(followerConnection, tmpdbLeader[i], nil, 0)
 	}
 
 	//receive seed from follower
-	seedFollower := make([]byte, 16)
-	seedFollower = readFromConn(followerConnection, 16)
+	seedFollower, _ := readFrom(followerConnection, 16, nil, 0)
 
 	//receive data from follower
 	tmpdbFollower := make([][]byte, dbSize)
@@ -496,7 +557,7 @@ func phase2(followerConnection net.Conn) {
 		tmpdbFollower[i] = make([]byte, dataLength)
 	}
 	for i := 0; i < dbSize; i++ {
-		tmpdbFollower[i] = readFromConn(followerConnection, dataLength)
+		tmpdbFollower[i], _ = readFrom(followerConnection, dataLength, nil, 0)
 	}
 
 	//put together the db
@@ -516,7 +577,7 @@ func phase2(followerConnection net.Conn) {
 
 	//send own Ciphers to follower
 	for i := 0; i < dbSize; i++ {
-		writeToConn(followerConnection, C.GoBytes(unsafe.Pointer(ciphersLeader[i]), 16))
+		writeTo(followerConnection, C.GoBytes(unsafe.Pointer(ciphersLeader[i]), 16), nil, 0)
 	}
 
 	//receive ciphers from follower
@@ -605,7 +666,7 @@ func phase2(followerConnection net.Conn) {
 	dbWriteSize = int(math.Ceil(19.5 * float64(publisherAverage)))
 
 	//writes dbWriteSize of current round to follower
-	writeToConn(followerConnection, intToByte(dbWriteSize))
+	writeTo(followerConnection, intToByte(dbWriteSize), nil, 0)
 }
 
 func addTestTweets() {
@@ -636,7 +697,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 	for len(phase3Channel) == 0 {
 		if time.Since(startTime) > maxTimePerRound {
 			//tells follower that this worker is done
-			writeToConn(followerConnection, gotClient)
+			writeTo(followerConnection, gotClient, nil, 0)
 			wg.Done()
 			return
 		}
@@ -647,10 +708,18 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		gotClient[0] = 1
 		//tells follower that this worker got a clientConnection
-		writeToConn(followerConnection, gotClient)
+		writeTo(followerConnection, gotClient, nil, 0)
 
 		//tells client current phase
-		writeToConn(clientConnection, phase)
+		errorBool := writeTo(clientConnection, phase, followerConnection, 2)
+		if errorBool {
+			contBool := handleClientDC(wg, followerConnection, phase3Channel)
+			if contBool {
+				continue
+			} else {
+				return
+			}
+		}
 
 		/*
 			possible Values
@@ -679,13 +748,22 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		}
 
 		//tells client what leader expects
-		writeToConn(clientConnection, subPhase)
+		errorBool = writeTo(clientConnection, subPhase, followerConnection, 2)
+		if errorBool {
+			contBool := handleClientDC(wg, followerConnection, phase3Channel)
+			if contBool {
+				continue
+			} else {
+				return
+			}
+		}
 
+		//handledc switch order
 		//tells follower what will happen
-		writeToConn(followerConnection, subPhase)
+		writeTo(followerConnection, subPhase, nil, 0)
 
 		//sends clients publicKey so follower knows which client is being served
-		writeToConn(followerConnection, clientKeys.PublicKey[:])
+		writeTo(followerConnection, clientKeys.PublicKey[:], nil, 0)
 
 		//increases rounds participating for client
 		clientKeys.roundsParticipating = roundsParticipating + 1
@@ -696,28 +774,90 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		m.RUnlock()
 
 		if subPhase[0] == 0 {
-			sendTopicLists(clientConnection)
-			clientKeys, _ = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
+			errorBool := sendTopicLists(clientConnection, followerConnection, false)
+			if errorBool {
+				contBool := handleClientDC(wg, followerConnection, phase3Channel)
+				if contBool {
+					continue
+				} else {
+					return
+				}
+			}
+			clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
+			if errorBool {
+				contBool := handleClientDC(wg, followerConnection, phase3Channel)
+				if contBool {
+					continue
+				} else {
+					return
+				}
+			}
 		} else if subPhase[0] == 1 {
-			sendTopicLists(clientConnection)
-
+			errorBool := sendTopicLists(clientConnection, followerConnection, false)
+			if errorBool {
+				contBool := handleClientDC(wg, followerConnection, phase3Channel)
+				if contBool {
+					continue
+				} else {
+					return
+				}
+			}
 			//updates sharedSecret
 			sharedSecret = sha256.Sum256(sharedSecret[:])
 			clientKeys.SharedSecret = sharedSecret
 
-			clientKeys, _ = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
+			clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
+			if errorBool {
+				contBool := handleClientDC(wg, followerConnection, phase3Channel)
+				if contBool {
+					continue
+				} else {
+					return
+				}
+			}
 		}
 
-		getSendTweets(clientKeys, nil, clientConnection, followerConnection)
+		errorBool = getSendTweets(clientKeys, nil, clientConnection, followerConnection)
+		if errorBool {
+			contBool := handleClientDC(wg, followerConnection, phase3Channel)
+			if contBool {
+				continue
+			} else {
+				return
+			}
+		}
 
-		wantsArchive := make([]byte, 1)
-		wantsArchive = readFromConn(clientConnection, 1)
+		wantsArchive, errorBool := readFrom(clientConnection, 1, followerConnection, 2)
+		if errorBool {
+			contBool := handleClientDC(wg, followerConnection, phase3Channel)
+			if contBool {
+				continue
+			} else {
+				return
+			}
+		}
 
-		writeToConn(followerConnection, wantsArchive)
+		writeTo(followerConnection, wantsArchive, nil, 0)
 
 		if wantsArchive[0] == 1 && archiveTopicAmount > 0 {
-			_, archiveQuerys := handlePirQuery(clientKeys, clientConnection, followerConnection, -1, false)
-			getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection)
+			_, archiveQuerys, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, -1, false)
+			if errorBool {
+				contBool := handleClientDC(wg, followerConnection, phase3Channel)
+				if contBool {
+					continue
+				} else {
+					return
+				}
+			}
+			errorBool = getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection)
+			if errorBool {
+				contBool := handleClientDC(wg, followerConnection, phase3Channel)
+				if contBool {
+					continue
+				} else {
+					return
+				}
+			}
 		}
 
 		//saves all changes for client
@@ -740,7 +880,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			} else {
 				//tells follower that this worker is done
 				gotClient[0] = 0
-				writeToConn(followerConnection, gotClient)
+				writeTo(followerConnection, gotClient, nil, 0)
 				wg.Done()
 				return
 			}
@@ -748,6 +888,32 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 	}
 }
 
+//returns true if there is another client
+func handleClientDC(wg *sync.WaitGroup, followerConnection net.Conn, channel chan net.Conn) bool {
+	//loop that waits for new client or leaves phase1 if time is up
+	for {
+		if time.Since(startTime) < maxTimePerRound {
+			//this worker handles the next client
+			if len(channel) > 0 {
+				return true
+				//this worker waits for next client
+			} else {
+				time.Sleep(1 * time.Second)
+			}
+			//times up
+		} else {
+			//tells follower that this worker is done
+			gotClient := make([]byte, 1)
+			gotClient[0] = 0
+
+			writeTo(followerConnection, gotClient, nil, 0)
+
+			wg.Done()
+			return false
+		}
+	}
+}
+
 func createVirtualAddresses() []int {
 	//array will be filled with unique random ascending values
 	//adapted from: https://stackoverflow.com/questions/20039025/java-array-of-unique-randomly-generated-integers
@@ -770,7 +936,7 @@ func createVirtualAddresses() []int {
 	return virtualAddresses
 }
 
-func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret [32]byte, clientConnection, followerConnection net.Conn) {
+func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret [32]byte, clientConnection, followerConnection net.Conn) bool {
 	//xores all requested addresses into virtuallAddress
 	virtualAddress := make([]byte, 4)
 	for index, num := range pirQuery {
@@ -787,29 +953,30 @@ func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret
 		virtualAddress[i] = virtualAddress[i] ^ sharedSecret[i]
 	}
 
-	virtualAddressFollower := make([]byte, 4)
-	virtualAddressFollower = readFromConn(followerConnection, 4)
+	virtualAddressFollower, _ := readFrom(followerConnection, 4, nil, 0)
 
 	//xores the data from follower
 	for i := 0; i < 4; i++ {
 		virtualAddress[i] = virtualAddress[i] ^ virtualAddressFollower[i]
 	}
 
-	writeToConn(clientConnection, virtualAddress)
+	errorBool := writeTo(clientConnection, virtualAddress, followerConnection, 5)
+
+	return errorBool
 }
 
-func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnection, followerConnection net.Conn) {
+func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnection, followerConnection net.Conn) bool {
 	tmpNeededSubscriptions := neededSubscriptions
 	if archiveQuerys != nil {
 		tmpNeededSubscriptions = len(archiveQuerys)
 	}
+	tweets := make([][]byte, tmpNeededSubscriptions)
 	for i := 0; i < tmpNeededSubscriptions; i++ {
 		//gets all requested tweets
-		var tweets []byte
 		if archiveQuerys == nil {
-			tweets = lib.GetTweets(clientKeys.PirQuery[i], dataLength, 0)
+			tweets[i] = lib.GetTweets(clientKeys.PirQuery[i], dataLength, 0)
 		} else {
-			tweets = lib.GetTweets(archiveQuerys[i], dataLength, 1)
+			tweets[i] = lib.GetTweets(archiveQuerys[i], dataLength, 1)
 		}
 
 		//expand sharedSecret so it is of right length
@@ -820,52 +987,69 @@ func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnecti
 		}
 
 		//Xor's sharedSecret with all tweets
-		lib.Xor(expandedSharedSecret[:], tweets)
+		lib.Xor(expandedSharedSecret[:], tweets[i])
 
 		//receives tweets from follower and Xor's them in
-		tweetsLengthBytes := make([]byte, 4)
-
-		tweetsLengthBytes = readFromConn(followerConnection, 4)
+		tweetsLengthBytes, _ := readFrom(followerConnection, 4, nil, 0)
 
 		tweetsReceivedLength := byteToInt(tweetsLengthBytes)
 
-		receivedTweets := make([]byte, tweetsReceivedLength)
-		receivedTweets = readFromConn(followerConnection, tweetsReceivedLength)
+		receivedTweets, _ := readFrom(followerConnection, tweetsReceivedLength, nil, 0)
 
-		lib.Xor(receivedTweets, tweets)
+		lib.Xor(receivedTweets, tweets[i])
+	}
 
-		//sends tweets to client
-		tweetsLengthBytes = intToByte(len(tweets))
-		writeToConn(clientConnection, tweetsLengthBytes)
-		writeToConn(clientConnection, tweets)
+	//sends tweets to client
+	for i := 0; i < tmpNeededSubscriptions; i++ {
+		tweetsLengthBytes := intToByte(len(tweets[i]))
+		errorBool := writeTo(clientConnection, tweetsLengthBytes, followerConnection, 2)
+		if errorBool {
+			return true
+		}
+		errorBool = writeTo(clientConnection, tweets[i], followerConnection, 2)
+		if errorBool {
+			return true
+		}
 	}
+	return false
 }
 
-func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerConnection net.Conn, subPhase int, doAuditing bool) (clientKeys, [][]byte) {
+func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerConnection net.Conn, subPhase int, doAuditing bool) (clientKeys, [][]byte, bool) {
 
 	clientPublicKey := clientKeys.PublicKey
 
+	test := make([]byte, 1)
+	test[0] = 1
+	followerConnection.Write(test)
+
 	//gets the msg length
-	msgLengthBytes := make([]byte, 4)
-	msgLengthBytes = readFromConn(clientConnection, 4)
+	msgLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
+	if errorBool {
+		return clientKeys, nil, true
+	}
 	msgLength := byteToInt(msgLengthBytes)
 
-	leaderBox := make([]byte, msgLength)
-	followerBox := make([]byte, msgLength)
-
 	//gets the leader box
-	leaderBox = readFromConn(clientConnection, msgLength)
+	leaderBox, errorBool := readFrom(clientConnection, msgLength, followerConnection, 5)
+	if errorBool {
+		return clientKeys, nil, true
+	}
 
 	//gets the follower box
-	followerBox = readFromConn(clientConnection, msgLength)
+	followerBox, errorBool := readFrom(clientConnection, msgLength, followerConnection, 5)
+	if errorBool {
+		return clientKeys, nil, true
+	}
 
 	tmpNeededSubscriptions := neededSubscriptions
 	tmpTopicAmount := topicAmount
 	if subPhase == -1 {
-		archiveNeededSubscriptions := make([]byte, 4)
-		archiveNeededSubscriptions = readFromConn(clientConnection, 4)
+		archiveNeededSubscriptions, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
+		if errorBool {
+			return clientKeys, nil, true
+		}
 
-		writeToConn(followerConnection, archiveNeededSubscriptions)
+		writeTo(followerConnection, archiveNeededSubscriptions, nil, 0)
 		tmpNeededSubscriptions = byteToInt(archiveNeededSubscriptions)
 		tmpTopicAmount = archiveTopicAmount
 	}
@@ -875,10 +1059,11 @@ func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerCo
 	}
 
 	//send length to follower
-	writeToConn(followerConnection, msgLengthBytes)
+	fmt.Println(msgLengthBytes)
+	writeTo(followerConnection, msgLengthBytes, nil, 0)
 
 	//send box to follower
-	writeToConn(followerConnection, followerBox)
+	writeTo(followerConnection, followerBox, nil, 0)
 
 	var decryptNonce [24]byte
 	copy(decryptNonce[:], leaderBox[:24])
@@ -900,7 +1085,7 @@ func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerCo
 	if doAuditing {
 		result := make([][]byte, 1)
 		result[0] = decrypted
-		return clientKeys, result
+		return clientKeys, result, false
 	}
 
 	//transforms byteArray to ints of wanted topics
@@ -918,7 +1103,7 @@ func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerCo
 		clientKeys.PirQuery = pirQuerys
 	}
 
-	return clientKeys, pirQuerys
+	return clientKeys, pirQuerys, false
 }
 
 func transformBytesToStringArray(topicsAsBytes []byte) []string {
@@ -942,7 +1127,8 @@ func byteToInt(myBytes []byte) (x int) {
 	return
 }
 
-func sendTopicLists(clientConnection net.Conn) {
+//returns true if error occured
+func sendTopicLists(clientConnection, followerConnection net.Conn, setup bool) bool {
 	for i := 0; i < 2; i++ {
 		var topicList []byte
 		if i == 0 {
@@ -952,28 +1138,72 @@ func sendTopicLists(clientConnection net.Conn) {
 		}
 		topicListLengthBytes := intToByte(len(topicList))
 
-		writeToConn(clientConnection, topicListLengthBytes)
-
-		writeToConn(clientConnection, topicList)
+		if !setup {
+			err := writeTo(clientConnection, topicListLengthBytes, followerConnection, 5)
+			if err {
+				return true
+			}
+			err = writeTo(clientConnection, topicList, followerConnection, 5)
+			if err {
+				return true
+			}
+		} else {
+			_, err := clientConnection.Write(topicListLengthBytes)
+			if err != nil {
+				return true
+			}
+			_, err = clientConnection.Write(topicList)
+			if err != nil {
+				return true
+			}
+		}
 	}
+	return false
 }
 
 //sends the array to the connection
-func writeToConn(connection net.Conn, array []byte) {
+//todo! need to split into WErro and WOError
+func writeTo(connection net.Conn, array []byte, followerConnection net.Conn, size int) bool {
 	_, err := connection.Write(array)
 	if err != nil {
-		panic(err)
+		//lets follower know that client has disconnected unexpectedly
+		if connection.RemoteAddr().String() != follower {
+			fmt.Println(err)
+			array := make([]byte, size)
+			array[0] = 1
+			_, err = followerConnection.Write(array)
+			if err != nil {
+				panic(err)
+			}
+			return true
+		} else {
+			panic(err)
+		}
 	}
+	return false
 }
 
 //reads an array which is returned and of size "size" from the connection
-func readFromConn(connection net.Conn, size int) []byte {
+//returns true if error occured
+func readFrom(connection net.Conn, size int, followerConnection net.Conn, sizeError int) ([]byte, bool) {
 	array := make([]byte, size)
 	_, err := connection.Read(array)
 	if err != nil {
-		panic(err)
+		//lets follower know that client has disconnected unexpectedly
+		if connection.RemoteAddr().String() != follower {
+			fmt.Println(err)
+			array := make([]byte, sizeError)
+			array[0] = 1
+			_, err = followerConnection.Write(array)
+			if err != nil {
+				panic(err)
+			}
+			return nil, true
+		} else {
+			panic(err)
+		}
 	}
-	return array
+	return array, false
 }
 
 func intToByte(myInt int) (retBytes []byte) {