changed a lot of thread locking code in the server, rooms are working with ISL now

This commit is contained in:
Max-Wilhelm Bruker 2012-03-17 16:09:00 +01:00
parent 572e4eaafa
commit c23af44749
16 changed files with 449 additions and 221 deletions

View file

@ -11,6 +11,10 @@
#include "pb/event_user_message.pb.h"
#include "pb/event_user_joined.pb.h"
#include "pb/event_user_left.pb.h"
#include "pb/event_join_room.pb.h"
#include "pb/event_leave_room.pb.h"
#include "pb/event_room_say.pb.h"
#include "pb/event_list_games.pb.h"
#include <google/protobuf/descriptor.h>
void IslInterface::sharedCtor(const QSslCertificate &cert, const QSslKey &privateKey)
@ -19,7 +23,7 @@ void IslInterface::sharedCtor(const QSslCertificate &cert, const QSslKey &privat
socket->setLocalCertificate(cert);
socket->setPrivateKey(privateKey);
connect(socket, SIGNAL(readyRead()), this, SLOT(readClient()));
connect(socket, SIGNAL(readyRead()), this, SLOT(readClient()), Qt::QueuedConnection);
connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(catchSocketError(QAbstractSocket::SocketError)));
connect(this, SIGNAL(outputBufferChanged()), this, SLOT(flushOutputBuffer()), Qt::QueuedConnection);
}
@ -42,18 +46,31 @@ IslInterface::~IslInterface()
flushOutputBuffer();
QStringList usersToDelete;
server->serverMutex.lock();
QMapIterator<QString, Server_AbstractUserInterface *> extUsers = server->getExternalUsers();
// As these signals are connected with Qt::QueuedConnection implicitly,
// we don't need to worry about them modifying the lists while we're iterating.
server->clientsLock.lockForRead();
QMapIterator<QString, Server_AbstractUserInterface *> extUsers(server->getExternalUsers());
while (extUsers.hasNext()) {
extUsers.next();
if (extUsers.value()->getUserInfo()->server_id() == serverId)
usersToDelete.append(extUsers.key());
emit externalUserLeft(extUsers.key());
}
server->serverMutex.unlock();
server->clientsLock.unlock();
for (int i = 0; i < usersToDelete.size(); ++i)
emit externalUserLeft(usersToDelete[i]);
server->roomsLock.lockForRead();
QMapIterator<int, Server_Room *> roomIterator(server->getRooms());
while (roomIterator.hasNext()) {
Server_Room *room = roomIterator.next().value();
room->usersLock.lockForRead();
QMapIterator<QString, ServerInfo_User_Container> roomUsers(room->getExternalUsers());
while (roomUsers.hasNext()) {
roomUsers.next();
if (roomUsers.value().getUserInfo()->server_id() == serverId)
emit externalRoomUserLeft(room->getId(), roomUsers.key());
}
room->usersLock.unlock();
}
server->roomsLock.unlock();
}
void IslInterface::initServer()
@ -98,15 +115,18 @@ void IslInterface::initServer()
Event_ServerCompleteList event;
event.set_server_id(server->getServerId());
server->serverMutex.lock();
server->clientsLock.lockForRead();
QMapIterator<QString, Server_ProtocolHandler *> userIterator(server->getUsers());
while (userIterator.hasNext())
event.add_user_list()->CopyFrom(userIterator.next().value()->copyUserInfo(true, true));
server->clientsLock.unlock();
server->roomsLock.lockForRead();
QMapIterator<int, Server_Room *> roomIterator(server->getRooms());
while (roomIterator.hasNext()) {
Server_Room *room = roomIterator.next().value();
room->roomMutex.lock();
room->usersLock.lockForRead();
room->gamesMutex.lock();
event.add_room_list()->CopyFrom(room->getInfo(true, true, false, false));
}
@ -126,9 +146,12 @@ void IslInterface::initServer()
server->islLock.unlock();
roomIterator.toFront();
while (roomIterator.hasNext())
roomIterator.next().value()->roomMutex.unlock();
server->serverMutex.unlock();
while (roomIterator.hasNext()) {
roomIterator.next();
roomIterator.value()->gamesMutex.unlock();
roomIterator.value()->usersLock.unlock();
}
server->roomsLock.unlock();
}
void IslInterface::initClient()
@ -161,7 +184,7 @@ void IslInterface::initClient()
deleteLater();
return;
}
server->addIslInterface(serverId, this);
server->islLock.unlock();
}
@ -242,11 +265,24 @@ void IslInterface::sessionEvent_ServerCompleteList(const Event_ServerCompleteLis
temp.set_server_id(serverId);
emit externalUserJoined(temp);
}
for (int i = 0; i < event.room_list_size(); ++i) {
const ServerInfo_Room &room = event.room_list(i);
for (int j = 0; j < room.user_list_size(); ++j) {
ServerInfo_User userInfo(room.user_list(j));
userInfo.set_server_id(serverId);
emit externalRoomUserJoined(room.room_id(), userInfo);
}
for (int j = 0; j < room.game_list_size(); ++j) {
ServerInfo_Game gameInfo(room.game_list(j));
gameInfo.set_server_id(serverId);
emit externalRoomGameListChanged(room.room_id(), gameInfo);
}
}
}
void IslInterface::sessionEvent_UserMessage(const SessionEvent &sessionEvent, const Event_UserMessage &event)
{
QMutexLocker locker(&server->serverMutex);
QReadLocker locker(&server->clientsLock);
Server_ProtocolHandler *userInterface = server->getUsers().value(QString::fromStdString(event.receiver_name()));
if (userInterface)
@ -255,7 +291,9 @@ void IslInterface::sessionEvent_UserMessage(const SessionEvent &sessionEvent, co
void IslInterface::sessionEvent_UserJoined(const Event_UserJoined &event)
{
emit externalUserJoined(event.user_info());
ServerInfo_User userInfo(event.user_info());
userInfo.set_server_id(serverId);
emit externalUserJoined(userInfo);
}
void IslInterface::sessionEvent_UserLeft(const Event_UserLeft &event)
@ -263,6 +301,32 @@ void IslInterface::sessionEvent_UserLeft(const Event_UserLeft &event)
emit externalUserLeft(QString::fromStdString(event.name()));
}
void IslInterface::roomEvent_UserJoined(int roomId, const Event_JoinRoom &event)
{
ServerInfo_User userInfo(event.user_info());
userInfo.set_server_id(serverId);
emit externalRoomUserJoined(roomId, userInfo);
}
void IslInterface::roomEvent_UserLeft(int roomId, const Event_LeaveRoom &event)
{
emit externalRoomUserLeft(roomId, QString::fromStdString(event.name()));
}
void IslInterface::roomEvent_Say(int roomId, const Event_RoomSay &event)
{
emit externalRoomSay(roomId, QString::fromStdString(event.name()), QString::fromStdString(event.message()));
}
void IslInterface::roomEvent_ListGames(int roomId, const Event_ListGames &event)
{
for (int i = 0; i < event.game_list_size(); ++i) {
ServerInfo_Game gameInfo(event.game_list(i));
gameInfo.set_server_id(serverId);
emit externalRoomGameListChanged(roomId, gameInfo);
}
}
void IslInterface::processSessionEvent(const SessionEvent &event)
{
switch (getPbExtension(event)) {
@ -274,6 +338,17 @@ void IslInterface::processSessionEvent(const SessionEvent &event)
}
}
void IslInterface::processRoomEvent(const RoomEvent &event)
{
switch (getPbExtension(event)) {
case RoomEvent::JOIN_ROOM: roomEvent_UserJoined(event.room_id(), event.GetExtension(Event_JoinRoom::ext)); break;
case RoomEvent::LEAVE_ROOM: roomEvent_UserLeft(event.room_id(), event.GetExtension(Event_LeaveRoom::ext)); break;
case RoomEvent::ROOM_SAY: roomEvent_Say(event.room_id(), event.GetExtension(Event_RoomSay::ext)); break;
case RoomEvent::LIST_GAMES: roomEvent_ListGames(event.room_id(), event.GetExtension(Event_ListGames::ext)); break;
default: ;
}
}
void IslInterface::processMessage(const IslMessage &item)
{
qDebug() << QString::fromStdString(item.DebugString());
@ -290,6 +365,7 @@ void IslInterface::processMessage(const IslMessage &item)
break;
}
case IslMessage::ROOM_EVENT: {
processRoomEvent(item.room_event()); break;
break;
}
default: ;

View file

@ -5,6 +5,8 @@
#include <QSslCertificate>
#include <QWaitCondition>
#include "pb/serverinfo_user.pb.h"
#include "pb/serverinfo_room.pb.h"
#include "pb/serverinfo_game.pb.h"
class Servatrice;
class QSslSocket;
@ -15,6 +17,10 @@ class Event_ServerCompleteList;
class Event_UserMessage;
class Event_UserJoined;
class Event_UserLeft;
class Event_JoinRoom;
class Event_LeaveRoom;
class Event_RoomSay;
class Event_ListGames;
class IslInterface : public QObject {
Q_OBJECT
@ -26,7 +32,11 @@ signals:
void outputBufferChanged();
void externalUserJoined(ServerInfo_User userInfo);
void externalUserLeft(QString name);
void externalUserLeft(QString userName);
void externalRoomUserJoined(int roomId, ServerInfo_User userInfo);
void externalRoomUserLeft(int roomId, QString userName);
void externalRoomSay(int roomId, QString userName, QString message);
void externalRoomGameListChanged(int roomId, ServerInfo_Game gameInfo);
private:
int serverId;
int socketDescriptor;
@ -47,7 +57,13 @@ private:
void sessionEvent_UserJoined(const Event_UserJoined &event);
void sessionEvent_UserLeft(const Event_UserLeft &event);
void roomEvent_UserJoined(int roomId, const Event_JoinRoom &event);
void roomEvent_UserLeft(int roomId, const Event_LeaveRoom &event);
void roomEvent_Say(int roomId, const Event_RoomSay &event);
void roomEvent_ListGames(int roomId, const Event_ListGames &event);
void processSessionEvent(const SessionEvent &event);
void processRoomEvent(const RoomEvent &event);
void processMessage(const IslMessage &item);
void sharedCtor(const QSslCertificate &cert, const QSslKey &privateKey);

View file

@ -35,7 +35,7 @@
RNG_Abstract *rng;
ServerLogger *logger;
ServerLoggerThread *loggerThread;
QThread *loggerThread;
void testRNG()
{
@ -100,7 +100,11 @@ void sigSegvHandler(int sig)
logger->logMessage("CRASH: SIGSEGV");
else if (sig == SIGABRT)
logger->logMessage("CRASH: SIGABRT");
logger->deleteLater();
loggerThread->wait();
delete loggerThread;
raise(sig);
}
#endif
@ -121,10 +125,13 @@ int main(int argc, char *argv[])
QSettings *settings = new QSettings("servatrice.ini", QSettings::IniFormat);
loggerThread = new ServerLoggerThread(settings->value("server/logfile").toString());
loggerThread = new QThread;
logger = new ServerLogger;
logger->moveToThread(loggerThread);
QObject::connect(logger, SIGNAL(destroyed()), loggerThread, SLOT(quit()));
loggerThread->start();
loggerThread->waitForInit();
logger = loggerThread->getLogger();
QMetaObject::invokeMethod(logger, "startLog", Qt::BlockingQueuedConnection, Q_ARG(QString, settings->value("server/logfile").toString()));
qInstallMsgHandler(myMessageOutput2);
#ifdef Q_OS_UNIX
@ -166,6 +173,9 @@ int main(int argc, char *argv[])
delete rng;
delete settings;
logger->deleteLater();
loggerThread->wait();
delete loggerThread;
return retval;

View file

@ -87,30 +87,55 @@ Servatrice::Servatrice(QSettings *_settings, QObject *parent)
updateServerList();
clearSessionTables();
int size = settings->beginReadArray("rooms");
for (int i = 0; i < size; ++i) {
settings->setArrayIndex(i);
QStringList gameTypes;
int size2 = settings->beginReadArray("game_types");
for (int j = 0; j < size2; ++j) {
settings->setArrayIndex(j);
gameTypes.append(settings->value("name").toString());
const QString roomMethod = settings->value("rooms/method").toString();
if (roomMethod == "sql") {
QSqlQuery query;
query.prepare("select id, name, descr, auto_join, join_message from " + dbPrefix + "_rooms order by id asc");
execSqlQuery(query);
while (query.next()) {
QSqlQuery query2;
query2.prepare("select name from " + dbPrefix + "_rooms_gametypes where id_room = :id_room");
query2.bindValue(":id_room", query.value(0).toInt());
execSqlQuery(query2);
QStringList gameTypes;
while (query2.next())
gameTypes.append(query2.value(0).toString());
addRoom(new Server_Room(query.value(0).toInt(),
query.value(1).toString(),
query.value(2).toString(),
query.value(3).toInt(),
query.value(4).toString(),
gameTypes,
this
));
}
} else {
int size = settings->beginReadArray("rooms/roomlist");
for (int i = 0; i < size; ++i) {
settings->setArrayIndex(i);
QStringList gameTypes;
int size2 = settings->beginReadArray("game_types");
for (int j = 0; j < size2; ++j) {
settings->setArrayIndex(j);
gameTypes.append(settings->value("name").toString());
}
settings->endArray();
Server_Room *newRoom = new Server_Room(
i,
settings->value("name").toString(),
settings->value("description").toString(),
settings->value("autojoin").toBool(),
settings->value("joinmessage").toString(),
gameTypes,
this
);
addRoom(newRoom);
}
settings->endArray();
Server_Room *newRoom = new Server_Room(
i,
settings->value("name").toString(),
settings->value("description").toString(),
settings->value("autojoin").toBool(),
settings->value("joinmessage").toString(),
gameTypes,
this
);
addRoom(newRoom);
}
settings->endArray();
updateLoginMessage();
@ -499,8 +524,8 @@ ServerInfo_User Servatrice::getUserData(const QString &name, bool withId)
int Servatrice::getUsersWithAddress(const QHostAddress &address) const
{
QMutexLocker locker(&serverMutex);
int result = 0;
QReadLocker locker(&clientsLock);
for (int i = 0; i < clients.size(); ++i)
if (static_cast<ServerSocketInterface *>(clients[i])->getPeerAddress() == address)
++result;
@ -509,8 +534,8 @@ int Servatrice::getUsersWithAddress(const QHostAddress &address) const
QList<ServerSocketInterface *> Servatrice::getUsersWithAddressAsList(const QHostAddress &address) const
{
QMutexLocker locker(&serverMutex);
QList<ServerSocketInterface *> result;
QReadLocker locker(&clientsLock);
for (int i = 0; i < clients.size(); ++i)
if (static_cast<ServerSocketInterface *>(clients[i])->getPeerAddress() == address)
result.append(static_cast<ServerSocketInterface *>(clients[i]));
@ -519,6 +544,8 @@ QList<ServerSocketInterface *> Servatrice::getUsersWithAddressAsList(const QHost
void Servatrice::clearSessionTables()
{
qDebug() << "Clearing previous sessions...";
lockSessionTables();
QSqlQuery query;
query.prepare("update " + dbPrefix + "_sessions set end_time=now() where end_time is null and id_server = :id_server");
@ -740,13 +767,13 @@ void Servatrice::storeGameInformation(int secondsElapsed, const QSet<QString> &a
SessionEvent *sessionEvent = Server_ProtocolHandler::prepareSessionEvent(replayEvent);
allUsersIterator.toFront();
serverMutex.lock();
clientsLock.lockForRead();
while (allUsersIterator.hasNext()) {
Server_ProtocolHandler *userHandler = users.value(allUsersIterator.next());
if (userHandler)
userHandler->sendProtocolItem(*sessionEvent);
}
serverMutex.unlock();
clientsLock.unlock();
delete sessionEvent;
QMutexLocker locker(&dbMutex);
@ -790,8 +817,6 @@ void Servatrice::storeGameInformation(int secondsElapsed, const QSet<QString> &a
void Servatrice::scheduleShutdown(const QString &reason, int minutes)
{
QMutexLocker locker(&serverMutex);
shutdownReason = reason;
shutdownMinutes = minutes + 1;
if (minutes > 0) {
@ -818,8 +843,6 @@ void Servatrice::incRxBytes(quint64 num)
void Servatrice::shutdownTimeout()
{
QMutexLocker locker(&serverMutex);
--shutdownMinutes;
SessionEvent *se;
@ -833,9 +856,11 @@ void Servatrice::shutdownTimeout()
event.set_reason(Event_ConnectionClosed::SERVER_SHUTDOWN);
se = Server_ProtocolHandler::prepareSessionEvent(event);
}
clientsLock.lockForRead();
for (int i = 0; i < clients.size(); ++i)
clients[i]->sendProtocolItem(*se);
clientsLock.unlock();
delete se;
if (!shutdownMinutes)
@ -856,7 +881,10 @@ void Servatrice::addIslInterface(int serverId, IslInterface *interface)
islInterfaces.insert(serverId, interface);
connect(interface, SIGNAL(externalUserJoined(ServerInfo_User)), this, SLOT(externalUserJoined(ServerInfo_User)));
connect(interface, SIGNAL(externalUserLeft(QString)), this, SLOT(externalUserLeft(QString)));
connect(interface, SIGNAL(externalRoomUpdated(ServerInfo_Room)), this, SLOT(externalRoomUpdated(const ServerInfo_Room &)));
connect(interface, SIGNAL(externalRoomUserJoined(int, ServerInfo_User)), this, SLOT(externalRoomUserJoined(int, ServerInfo_User)));
connect(interface, SIGNAL(externalRoomUserLeft(int, QString)), this, SLOT(externalRoomUserLeft(int, QString)));
connect(interface, SIGNAL(externalRoomSay(int, QString, QString)), this, SLOT(externalRoomSay(int, QString, QString)));
connect(interface, SIGNAL(externalRoomGameListChanged(int, ServerInfo_Game)), this, SLOT(externalRoomGameListChanged(int, ServerInfo_Game)));
}
void Servatrice::removeIslInterface(int serverId)

View file

@ -26,7 +26,6 @@
#include <QSslKey>
#include <QHostAddress>
#include <QReadWriteLock>
#include <QMetaType>
#include "server.h"
class QSqlDatabase;
@ -167,6 +166,4 @@ private:
QMap<int, IslInterface *> islInterfaces;
};
Q_DECLARE_METATYPE(ServerInfo_User)
#endif

View file

@ -8,8 +8,17 @@
# include <sys/socket.h>
#endif
ServerLogger::ServerLogger(const QString &logFileName, QObject *parent)
ServerLogger::ServerLogger(QObject *parent)
: QObject(parent), flushRunning(false)
{
}
ServerLogger::~ServerLogger()
{
flushBuffer();
}
void ServerLogger::startLog(const QString &logFileName)
{
if (!logFileName.isEmpty()) {
logFile = new QFile("server.log", this);
@ -26,11 +35,6 @@ ServerLogger::ServerLogger(const QString &logFileName, QObject *parent)
connect(this, SIGNAL(sigFlushBuffer()), this, SLOT(flushBuffer()), Qt::QueuedConnection);
}
ServerLogger::~ServerLogger()
{
flushBuffer();
}
void ServerLogger::logMessage(QString message, void *caller)
{
if (!logFile)
@ -98,34 +102,3 @@ void ServerLogger::handleSigHup()
QFile *ServerLogger::logFile;
int ServerLogger::sigHupFD[2];
ServerLoggerThread::ServerLoggerThread(const QString &_fileName, QObject *parent)
: QThread(parent), fileName(_fileName)
{
}
ServerLoggerThread::~ServerLoggerThread()
{
quit();
wait();
}
void ServerLoggerThread::run()
{
logger = new ServerLogger(fileName);
usleep(100);
initWaitCondition.wakeAll();
exec();
delete logger;
}
void ServerLoggerThread::waitForInit()
{
QMutex mutex;
mutex.lock();
initWaitCondition.wait(&mutex);
mutex.unlock();
}

View file

@ -14,10 +14,11 @@ class Server_ProtocolHandler;
class ServerLogger : public QObject {
Q_OBJECT
public:
ServerLogger(const QString &logFileName, QObject *parent = 0);
ServerLogger(QObject *parent = 0);
~ServerLogger();
static void hupSignalHandler(int unused);
public slots:
void startLog(const QString &logFileName);
void logMessage(QString message, void *caller = 0);
private slots:
void handleSigHup();
@ -33,19 +34,4 @@ private:
QMutex bufferMutex;
};
class ServerLoggerThread : public QThread {
Q_OBJECT
private:
QString fileName;
ServerLogger *logger;
QWaitCondition initWaitCondition;
protected:
void run();
public:
ServerLoggerThread(const QString &_fileName, QObject *parent = 0);
~ServerLoggerThread();
ServerLogger *getLogger() const { return logger; }
void waitForInit();
};
#endif

View file

@ -642,6 +642,6 @@ Response::ResponseCode ServerSocketInterface::cmdUpdateServerMessage(const Comma
Response::ResponseCode ServerSocketInterface::cmdShutdownServer(const Command_ShutdownServer &cmd, ResponseContainer & /*rc*/)
{
servatrice->scheduleShutdown(QString::fromStdString(cmd.reason()), cmd.minutes());
QMetaObject::invokeMethod(server, "scheduleShutdown", Q_ARG(QString, QString::fromStdString(cmd.reason())), Q_ARG(int, cmd.minutes()));
return Response::RespOk;
}