#include "../include/ioman.h" #include "../include/global.h" #include #include #include #include #include #include #include #include using std::string; using std::vector; using boost::asio::buffer; using boost::asio::ip::address; using boost::asio::ip::tcp; /* this will be provided by main.cpp for the readline callback */ extern IoMan *gIOMAN; void ioman_externalDebugPrint(string msg) { gIOMAN->printMessage(msg, gIOMAN->OutMsgType::debug); } IoMan::IoMan(bool enablessl, const char *certfile) : cmdman(fileman, &ioman_externalDebugPrint), recvbuf(16384) { ipstring = ""; port = 0; tcpsock = new tcp::socket(ios); connected = false; sendtimestampValid = false; /* setup json stuff */ Json::CharReaderBuilder rbuilder; wbuilder.settings_["indentation"] = ""; reader = rbuilder.newCharReader(); runnetwork = false; runinput = false; runresponse = false; usessl = enablessl; if (usessl) { sslctx = new boost::asio::ssl::context(boost::asio::ssl::context::sslv23); sslctx->set_verify_mode(boost::asio::ssl::verify_peer); sslctx->set_options(boost::asio::ssl::context::no_sslv2); sslctx->load_verify_file(string(certfile), errcode); if (errcode) { throw std::runtime_error("Couldn't initialize SSL, Boost reports: " + errcode.message()); } sslsock = new boost::asio::ssl::stream(*tcpsock, *sslctx); } } IoMan::~IoMan() { if (connected) { disconnect(); } if (runnetwork) { networkmutex.lock(); runnetwork = false; networkmutex.unlock(); tnetwork.join(); } if (runinput) { inputmutex.lock(); runinput = false; inputmutex.unlock(); localcv.notify_all(); tinput.join(); } if (runresponse) { responsemutex.lock(); runresponse = false; responsemutex.unlock(); netcv.notify_all(); tresponse.join(); } if (usessl) delete sslsock; delete tcpsock; if (usessl) delete sslctx; delete reader; } void IoMan::printMessage(string nouse, OutMsgType nouse2) {} vector IoMan::tokenizeInput(string in) { size_t prev, index, quot; vector args; /* tokenize string into command and arguments vector*/ if ((index = in.find(" ")) == string::npos) { // only command no args args.push_back(in); } else { args.push_back(in.substr(0, index)); index++; bool end_tokenizing = false; while (!end_tokenizing) { // find first char thats not a space while (in[index] == ' ') { index++; // bounds check if (index == in.size()) end_tokenizing = true; } if (end_tokenizing) break; in = in.substr(index); if (in[0] == '\"') { // quoted string in = in.substr(1); index = in.find("\""); args.push_back(in.substr(0, index)); index++; /* tokens.push_back(in.substr(0, ++index)); */ // char after closing quote should be space while within bounds if (index == in.size()) end_tokenizing = true; } else { // non-quoted string index = in.find(" "); if (index == string::npos) { // no spaces, last arg args.push_back(in); end_tokenizing = true; } else { args.push_back(in.substr(0, index)); } } } } return args; } // callback for async connect, used to get timeout void connect_async_handler(const boost::system::error_code &error) { gIOMAN->errcode = error; } bool IoMan::connect() { tcp::endpoint *ep = NULL; address addr; Json::Value root; root["command"] = "connect"; root["address"] = ipstring; root["port"] = port; addr = address::from_string(ipstring, errcode); if (errcode) { root["error"] = errcode.message(); connected = false; } else { if (!ios.stopped()) ios.stop(); ios.restart(); // establish connection printMessage(string(__PRETTY_FUNCTION__) + string(" connecting to ") + ipstring, debug); ep = new tcp::endpoint(addr, port); // connect never returns would_block, initialize errcode so we can determine whats up errcode = boost::asio::error::would_block; tcpsock->async_connect(*ep, &connect_async_handler); ios.run_for(std::chrono::seconds(5)); //~ tcpsock->connect(*ep, errcode); if (errcode) { root["error"] = errcode.message(); connected = false; disconnect(); } else { connected = true; root["error"] = ""; } delete ep; } if (connected && usessl) { // try to do ssl handshake printMessage(string(__PRETTY_FUNCTION__) + string(" doing ssl handshake with ") + ipstring, debug); sslsock->handshake(boost::asio::ssl::stream_base::client, errcode); if (errcode) { root["error"] = string("couldnt connect via ssl: ") + errcode.message(); connected = false; disconnect(); } else { connected = true; root["error"] = ""; } } root["accept"] = connected; printMessage(Json::writeString(wbuilder, root), normal); return connected; } void IoMan::disconnect() { printMessage("IoMan::disconnect()", debug); if (connected) { connected = false; runnetwork = false; } if (usessl) sslsock->shutdown(errcode); if (errcode) printMessage(string(__PRETTY_FUNCTION__) + string("ssl shutdown says ") + errcode.message(), debug); tcpsock->shutdown(tcp::socket::shutdown_both, errcode); if (errcode) printMessage(string(__PRETTY_FUNCTION__) + string("tcp shutdown says ") + errcode.message(), debug); tcpsock->close(errcode); if (errcode) printMessage(string(__PRETTY_FUNCTION__) + string("tcp close says ") + errcode.message(), debug); cmdman.stateSetDisconnected(); } bool IoMan::init() { CmdMan::CmdRet ret; string work; Json::Value root; std::unique_lock ulock; printMessage(string(__PRETTY_FUNCTION__) + string(" begin"), debug); runinput = true; runresponse = true; tinput = std::thread(&IoMan::inputMain, this); tresponse = std::thread(&IoMan::responseMain, this); printWelcomeMessage(); return true; } /* loop to fetch data from the network, doing light preprocessing on it to be * handled by responseMain */ void IoMan::networkMain() { vector toput; char *recvjson; Json::Value root; unsigned int jsonsize, readsize; bool firstWasGood = false; printMessage("IoMan::networkMain() begin", debug); networkmutex.lock(); while (runnetwork) { networkmutex.unlock(); /* read from network until \n try to parse json - output error if not ok store all ok json in local vector get networkmutex put all local jsons into network vector release networkmutex */ // read from network if (usessl) readsize = boost::asio::read_until(*sslsock, recvbuf, '\n', errcode); else readsize = boost::asio::read_until(*tcpsock, recvbuf, '\n', errcode); printMessage(string(__PRETTY_FUNCTION__) + string(" asio::read() ok ") + std::to_string(readsize), debug); // printMessage(string("have ") + std::to_string(toprocess.size()) + // string(" commands"), debug); if (readsize < 1) { printMessage(string(__PRETTY_FUNCTION__) + string(" no read size stopping network"), debug); networkmutex.lock(); runnetwork = false; networkmutex.unlock(); break; } timestampmutex.lock(); sendtimestampValid = false; timestampmutex.unlock(); if (errcode && errcode != boost::asio::error::eof) { printMessage("IoMan::networkMain() couldnt read json data\n" + errcode.message(), debug); continue; } recvjson = (char *)(boost::asio::buffer_cast(recvbuf.data())); recvjson[recvbuf.size()] = 0; while (strchr(recvjson, '\n')) { // parse jsonsize = strchr(recvjson, '\n') - recvjson + 1; printMessage(string(__PRETTY_FUNCTION__) + string(" found jsondata ") + string(recvjson), debug); if (!reader->parse(recvjson, recvjson + jsonsize, &root, &jsonerror)) { printMessage("IoMan::networkMain() couldnt parse json data: " + jsonerror, debug); if (firstWasGood) { // we found garbage at the end break; } // we found garbage at the beginning recvbuf.consume(jsonsize); recvjson += jsonsize; continue; } firstWasGood = true; recvbuf.consume(jsonsize); printMessage(string(__PRETTY_FUNCTION__) + string(" remaining recvbuf ") + string(boost::asio::buffer_cast(recvbuf.data())), debug); recvjson += jsonsize; // store locally toput.push_back(root); } firstWasGood = false; if (toput.size()) { // put into global vector netmutex.lock(); printMessage(string(__PRETTY_FUNCTION__) + string(" get netmutex"), debug); netinput.insert(netinput.end(), toput.begin(), toput.end()); netmutex.unlock(); printMessage(string(__PRETTY_FUNCTION__) + string(" release netmutex"), debug); } netcv.notify_all(); // clean up local stuff toput = vector(); recvbuf.consume(recvbuf.size() + 1); networkmutex.lock(); } } /* loop to handle input from the user and responseMain, sending data via network * if required */ void IoMan::inputMain() { vector toprocess; string command; vector args; CmdMan::CmdRet cmdret; std::unique_lock ulock; printMessage(string(__PRETTY_FUNCTION__) + string(" begin"), debug); inputmutex.lock(); while (runinput) { inputmutex.unlock(); /* get inputmutex read all input vector into local vector release inputmutex process inputs send to server if required */ // read into local vector ulock = std::unique_lock(localmutex); while (!localinput.size() && runinput) { localcv.wait(ulock); } printMessage(string(__PRETTY_FUNCTION__) + string(" has localmutex"), debug); toprocess = vector(localinput); localinput = vector(); localmutex.unlock(); printMessage(string(__PRETTY_FUNCTION__) + string(" release localmutex"), debug); localcv.notify_all(); if (!runinput) return; // printMessage(string("have ") + std::to_string(toprocess.size()) + // string(" commands"), debug); // process for (string cmd : toprocess) { args = tokenizeInput(cmd); command = args.front(); args.erase(args.begin()); cmdret = cmdman.execute(command, args); handleInCmdResponse(cmdret); } // clean up local stuff toprocess = vector(); inputmutex.lock(); } } void IoMan::handleInCmdResponse(CmdMan::CmdRet cmdret) { // determine wether to send something and do so if required if (cmdret.type & CmdMan::rettype::print) { printMessage(Json::writeString(wbuilder, cmdret.msg), normal); } if (cmdret.type & CmdMan::rettype::send) { printMessage("IoMan::inputMain() sending json \"" + Json::writeString(wbuilder, cmdret.msg) + "\"", debug); timestampmutex.lock(); if (!sendtimestampValid) { if (cmdret.type & CmdMan::rettype::noanswerexpected) { // there will be no answer from the server, do not set a timestamp } else { sendtimestampValid = true; time(&sendtimestamp); // set timestamp } } timestampmutex.unlock(); if (usessl) boost::asio::write(*sslsock, buffer(Json::writeString(wbuilder, cmdret.msg) + "\n"), errcode); else boost::asio::write(*tcpsock, buffer(Json::writeString(wbuilder, cmdret.msg) + "\n"), errcode); if (errcode) { printMessage("IoMan::inputMain() couldnt send json data\n" + errcode.message() + "\n", debug); return; } } if (cmdret.type & CmdMan::rettype::error) { printMessage(Json::writeString(wbuilder, cmdret.msg), error); } if (cmdret.type & CmdMan::rettype::close) { // connection closed, stop network thread and shutdown any operations remaining networkmutex.lock(); runnetwork = false; networkmutex.unlock(); disconnect(); tnetwork.join(); } if (cmdret.type & CmdMan::rettype::connect) { ipstring = cmdret.msg["address"].asString(); port = cmdret.msg["port"].asUInt(); if (connect()) { runnetwork = true; tnetwork = std::thread(&IoMan::networkMain, this); // put new commands into global vector localmutex.lock(); printMessage(string(__PRETTY_FUNCTION__) + string(" get localmutex"), debug); localinput.push_back("version"); cmdman.stateSetConnectionOk(); localmutex.unlock(); printMessage(string(__PRETTY_FUNCTION__) + string(" release localmutex"), debug); localcv.notify_all(); } } if (cmdret.type & CmdMan::rettype::exit) { mainmutex.lock(); runmain = false; mainmutex.unlock(); } if (cmdret.nextcommand.size()) { localmutex.lock(); printMessage(string(__PRETTY_FUNCTION__) + string(" get localmutex"), debug); localinput.push_back(cmdret.nextcommand); localmutex.unlock(); printMessage(string(__PRETTY_FUNCTION__) + string(" release localmutex"), debug); localcv.notify_all(); } } /* loop to handle responses that have been fetched by netMain and possibly add * new commands to be handled by inputMain */ void IoMan::responseMain() { vector toprocess; vector toput; CmdMan::CmdRet cmdret; std::unique_lock ulock; printMessage(string(__PRETTY_FUNCTION__) + string(" begin"), debug); responsemutex.lock(); while (runresponse) { responsemutex.unlock(); /* get networkmutex read all network vector into local vector release networkmutex process all jsons process putdata process getdata process listdata get inputmutex place new commands into input vector release inputmutex */ // read into local vector ulock = std::unique_lock(netmutex); while (!netinput.size() && runresponse) { netcv.wait(ulock); } printMessage(string(__PRETTY_FUNCTION__) + string(" get netmutex"), debug); toprocess = vector(netinput); netinput = vector(); netmutex.unlock(); printMessage(string(__PRETTY_FUNCTION__) + string(" release netmutex"), debug); netcv.notify_all(); if (!runresponse) return; // process jsons for (Json::Value root : toprocess) { cmdret = cmdman.handle(root); handleOutCmdResponse(cmdret, toput); } if (toput.size()) { // put new commands into global vector localmutex.lock(); printMessage(string(__PRETTY_FUNCTION__) + string(" get localmutex"), debug); localinput.insert(localinput.end(), toput.begin(), toput.end()); localmutex.unlock(); printMessage(string(__PRETTY_FUNCTION__) + string(" release localmutex"), debug); } localcv.notify_all(); // clean up local stuff toprocess = vector(); toput = vector(); responsemutex.lock(); } } void IoMan::handleOutCmdResponse(CmdMan::CmdRet cmdret, vector &toput) { if (cmdret.type & CmdMan::rettype::close) { // connection closed, stop network thread and shutdown any operations remaining networkmutex.lock(); runnetwork = false; networkmutex.unlock(); disconnect(); tnetwork.join(); if (cmdret.nextcommand.size()) { toput.push_back(cmdret.nextcommand); } } if (cmdret.type & CmdMan::rettype::error) { printMessage(Json::writeString(wbuilder, cmdret.msg), error); } if (cmdret.type & CmdMan::rettype::print) { printMessage(Json::writeString(wbuilder, cmdret.msg), normal); } if (cmdret.type & CmdMan::rettype::send) { if (cmdret.nextcommand.size()) { toput.push_back(cmdret.nextcommand); } } if (cmdret.type & CmdMan::rettype::exit) { mainmutex.lock(); runmain = false; mainmutex.unlock(); } } /* this is the handler that readlines alternative interface will use to process * user input */ void ioman_readlineHandler(char *line) { vector tokens; if (!line) { printf("\nNULLBURGER\n"); gIOMAN->mainmutex.lock(); gIOMAN->runmain = false; gIOMAN->mainmutex.unlock(); } else { // split input line into tokens boost::algorithm::split(tokens, std::string(line), boost::algorithm::is_any_of(" "), boost::algorithm::token_compress_on); if (strlen(line) && tokens.size()) { add_history(line); gIOMAN->localmutex.lock(); gIOMAN->printMessage(string(__PRETTY_FUNCTION__) + string(" get localmutex"), gIOMAN->debug); gIOMAN->localinput.push_back(line); gIOMAN->localmutex.unlock(); gIOMAN->printMessage(string(__PRETTY_FUNCTION__) + string(" release localmutex"), gIOMAN->debug); gIOMAN->localcv.notify_all(); } free(line); } } /* main user input loop */ void IoMan::run() { printMessage(string(__PRETTY_FUNCTION__) + string(" begin"), debug); struct pollfd inpipestatus; inpipestatus.fd = STDIN_FILENO; inpipestatus.events = POLLIN; runmain = true; // Install readline handler rl_callback_handler_install(getCmdPrompt().c_str(), (rl_vcpfunc_t *)&ioman_readlineHandler); mainmutex.lock(); while (runmain) { mainmutex.unlock(); timestampmutex.lock(); if (sendtimestampValid && (!runnetwork || std::difftime(time(NULL), sendtimestamp) > 15)) { // answer took more than 15 seconds or network thread stopped runnetwork = false; disconnect(); tnetwork.join(); // cmdman gets informed inside disconnect method // inform the user by giving output Json::Value root; root["command"] = "connectionerror"; root["error"] = "The server does not respond. You are now disconnected."; printMessage(Json::writeString(wbuilder, root), normal); sendtimestampValid = false; } timestampmutex.unlock(); poll(&inpipestatus, 1, 100); if (inpipestatus.revents & POLLIN) { rl_callback_read_char(); } mainmutex.lock(); } mainmutex.unlock(); // Clean up the terminal rl_set_prompt(""); rl_replace_line("", 0); rl_redisplay(); rl_clear_history(); // Remove the handler rl_callback_handler_remove(); }