Merge pull request #569 from MihailRis/fix-resume-dead-coroutine
fix "cannot resume dead coroutine"
This commit is contained in:
commit
00c7c986bb
@ -46,22 +46,66 @@ local Socket = {__index={
|
|||||||
get_address=function(self) return network.__get_address(self.id) end,
|
get_address=function(self) return network.__get_address(self.id) end,
|
||||||
}}
|
}}
|
||||||
|
|
||||||
network.tcp_connect = function(address, port, callback)
|
|
||||||
local socket = setmetatable({id=0}, Socket)
|
|
||||||
socket.id = network.__connect(address, port, function(id)
|
|
||||||
callback(socket)
|
|
||||||
end)
|
|
||||||
return socket
|
|
||||||
end
|
|
||||||
|
|
||||||
local ServerSocket = {__index={
|
local ServerSocket = {__index={
|
||||||
close=function(self) return network.__closeserver(self.id) end,
|
close=function(self) return network.__closeserver(self.id) end,
|
||||||
is_open=function(self) return network.__is_serveropen(self.id) end,
|
is_open=function(self) return network.__is_serveropen(self.id) end,
|
||||||
get_port=function(self) return network.__get_serverport(self.id) end,
|
get_port=function(self) return network.__get_serverport(self.id) end,
|
||||||
}}
|
}}
|
||||||
|
|
||||||
|
|
||||||
|
local _tcp_server_callbacks = {}
|
||||||
|
local _tcp_client_callbacks = {}
|
||||||
|
|
||||||
network.tcp_open = function (port, handler)
|
network.tcp_open = function (port, handler)
|
||||||
return setmetatable({id=network.__open(port, function(id)
|
local socket = setmetatable({id=network.__open(port)}, ServerSocket)
|
||||||
|
|
||||||
|
_tcp_server_callbacks[socket.id] = function(id)
|
||||||
handler(setmetatable({id=id}, Socket))
|
handler(setmetatable({id=id}, Socket))
|
||||||
end)}, ServerSocket)
|
end
|
||||||
|
return socket
|
||||||
|
end
|
||||||
|
|
||||||
|
network.tcp_connect = function(address, port, callback)
|
||||||
|
local socket = setmetatable({id=0}, Socket)
|
||||||
|
socket.id = network.__connect(address, port)
|
||||||
|
_tcp_client_callbacks[socket.id] = function() callback(socket) end
|
||||||
|
return socket
|
||||||
|
end
|
||||||
|
|
||||||
|
network.__process_events = function()
|
||||||
|
local CLIENT_CONNECTED = 1
|
||||||
|
local CONNECTED_TO_SERVER = 2
|
||||||
|
|
||||||
|
local cleaned = false
|
||||||
|
local events = network.__pull_events()
|
||||||
|
for i, event in ipairs(events) do
|
||||||
|
local etype, sid, cid = unpack(event)
|
||||||
|
|
||||||
|
if etype == CLIENT_CONNECTED then
|
||||||
|
local callback = _tcp_server_callbacks[sid]
|
||||||
|
if callback then
|
||||||
|
callback(cid)
|
||||||
|
end
|
||||||
|
elseif etype == CONNECTED_TO_SERVER then
|
||||||
|
local callback = _tcp_client_callbacks[cid]
|
||||||
|
if callback then
|
||||||
|
callback()
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
-- remove dead servers
|
||||||
|
if not cleaned then
|
||||||
|
for sid, _ in pairs(_tcp_server_callbacks) do
|
||||||
|
if not network.__is_serveropen(sid) then
|
||||||
|
_tcp_server_callbacks[sid] = nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
for cid, _ in pairs(_tcp_client_callbacks) do
|
||||||
|
if not network.__is_alive(cid) then
|
||||||
|
_tcp_client_callbacks[cid] = nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
cleaned = true
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@ -34,7 +34,10 @@ local function complete_app_lib(app)
|
|||||||
app.reconfig_packs = core.reconfig_packs
|
app.reconfig_packs = core.reconfig_packs
|
||||||
app.get_setting = core.get_setting
|
app.get_setting = core.get_setting
|
||||||
app.set_setting = core.set_setting
|
app.set_setting = core.set_setting
|
||||||
app.tick = coroutine.yield
|
app.tick = function()
|
||||||
|
coroutine.yield()
|
||||||
|
network.__process_events()
|
||||||
|
end
|
||||||
app.get_version = core.get_version
|
app.get_version = core.get_version
|
||||||
app.get_setting_info = core.get_setting_info
|
app.get_setting_info = core.get_setting_info
|
||||||
app.load_content = function()
|
app.load_content = function()
|
||||||
|
|||||||
@ -89,20 +89,6 @@ static int l_post(lua::State* L, network::Network& network) {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int l_connect(lua::State* L, network::Network& network) {
|
|
||||||
std::string address = lua::require_string(L, 1);
|
|
||||||
int port = lua::tointeger(L, 2);
|
|
||||||
lua::pushvalue(L, 3);
|
|
||||||
auto callback = lua::create_lambda_nothrow(L);
|
|
||||||
u64id_t id = network.connect(address, port, [callback](u64id_t id) {
|
|
||||||
engine->postRunnable([=]() {
|
|
||||||
callback({id});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
return lua::pushinteger(L, id);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static int l_close(lua::State* L, network::Network& network) {
|
static int l_close(lua::State* L, network::Network& network) {
|
||||||
u64id_t id = lua::tointeger(L, 1);
|
u64id_t id = lua::tointeger(L, 1);
|
||||||
if (auto connection = network.getConnection(id)) {
|
if (auto connection = network.getConnection(id)) {
|
||||||
@ -182,14 +168,32 @@ static int l_available(lua::State* L, network::Network& network) {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum NetworkEventType {
|
||||||
|
CLIENT_CONNECTED = 1,
|
||||||
|
CONNECTED_TO_SERVER
|
||||||
|
};
|
||||||
|
|
||||||
|
struct NetworkEvent {
|
||||||
|
NetworkEventType type;
|
||||||
|
u64id_t server;
|
||||||
|
u64id_t client;
|
||||||
|
};
|
||||||
|
|
||||||
|
static std::vector<NetworkEvent> events_queue {};
|
||||||
|
|
||||||
|
static int l_connect(lua::State* L, network::Network& network) {
|
||||||
|
std::string address = lua::require_string(L, 1);
|
||||||
|
int port = lua::tointeger(L, 2);
|
||||||
|
u64id_t id = network.connect(address, port, [](u64id_t cid) {
|
||||||
|
events_queue.push_back({CONNECTED_TO_SERVER, 0, cid});
|
||||||
|
});
|
||||||
|
return lua::pushinteger(L, id);
|
||||||
|
}
|
||||||
|
|
||||||
static int l_open(lua::State* L, network::Network& network) {
|
static int l_open(lua::State* L, network::Network& network) {
|
||||||
int port = lua::tointeger(L, 1);
|
int port = lua::tointeger(L, 1);
|
||||||
lua::pushvalue(L, 2);
|
u64id_t id = network.openServer(port, [](u64id_t sid, u64id_t id) {
|
||||||
auto callback = lua::create_lambda_nothrow(L);
|
events_queue.push_back({CLIENT_CONNECTED, sid, id});
|
||||||
u64id_t id = network.openServer(port, [callback](u64id_t id) {
|
|
||||||
engine->postRunnable([=]() {
|
|
||||||
callback({id});
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
return lua::pushinteger(L, id);
|
return lua::pushinteger(L, id);
|
||||||
}
|
}
|
||||||
@ -250,6 +254,26 @@ static int l_get_total_download(lua::State* L, network::Network& network) {
|
|||||||
return lua::pushinteger(L, network.getTotalDownload());
|
return lua::pushinteger(L, network.getTotalDownload());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int l_pull_events(lua::State* L, network::Network& network) {
|
||||||
|
lua::createtable(L, events_queue.size(), 0);
|
||||||
|
for (size_t i = 0; i < events_queue.size(); i++) {
|
||||||
|
lua::createtable(L, 3, 0);
|
||||||
|
|
||||||
|
lua::pushinteger(L, events_queue[i].type);
|
||||||
|
lua::rawseti(L, 1);
|
||||||
|
|
||||||
|
lua::pushinteger(L, events_queue[i].server);
|
||||||
|
lua::rawseti(L, 2);
|
||||||
|
|
||||||
|
lua::pushinteger(L, events_queue[i].client);
|
||||||
|
lua::rawseti(L, 3);
|
||||||
|
|
||||||
|
lua::rawseti(L, i + 1);
|
||||||
|
}
|
||||||
|
events_queue.clear();
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
template <int(*func)(lua::State*, network::Network&)>
|
template <int(*func)(lua::State*, network::Network&)>
|
||||||
int wrap(lua_State* L) {
|
int wrap(lua_State* L) {
|
||||||
int result = 0;
|
int result = 0;
|
||||||
@ -267,13 +291,13 @@ int wrap(lua_State* L) {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
const luaL_Reg networklib[] = {
|
const luaL_Reg networklib[] = {
|
||||||
{"get", wrap<l_get>},
|
{"get", wrap<l_get>},
|
||||||
{"get_binary", wrap<l_get_binary>},
|
{"get_binary", wrap<l_get_binary>},
|
||||||
{"post", wrap<l_post>},
|
{"post", wrap<l_post>},
|
||||||
{"get_total_upload", wrap<l_get_total_upload>},
|
{"get_total_upload", wrap<l_get_total_upload>},
|
||||||
{"get_total_download", wrap<l_get_total_download>},
|
{"get_total_download", wrap<l_get_total_download>},
|
||||||
|
{"__pull_events", wrap<l_pull_events>},
|
||||||
{"__open", wrap<l_open>},
|
{"__open", wrap<l_open>},
|
||||||
{"__closeserver", wrap<l_closeserver>},
|
{"__closeserver", wrap<l_closeserver>},
|
||||||
{"__connect", wrap<l_connect>},
|
{"__connect", wrap<l_connect>},
|
||||||
|
|||||||
@ -90,7 +90,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
void update() override {
|
void update() override {
|
||||||
if (id == 0) {
|
if (!alive || id == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (lua::requireglobal(L, "__vc_resume_coroutine")) {
|
if (lua::requireglobal(L, "__vc_resume_coroutine")) {
|
||||||
|
|||||||
@ -477,6 +477,7 @@ public:
|
|||||||
};
|
};
|
||||||
|
|
||||||
class SocketTcpSServer : public TcpServer {
|
class SocketTcpSServer : public TcpServer {
|
||||||
|
u64id_t id;
|
||||||
Network* network;
|
Network* network;
|
||||||
SOCKET descriptor;
|
SOCKET descriptor;
|
||||||
std::vector<u64id_t> clients;
|
std::vector<u64id_t> clients;
|
||||||
@ -485,14 +486,14 @@ class SocketTcpSServer : public TcpServer {
|
|||||||
std::unique_ptr<std::thread> thread = nullptr;
|
std::unique_ptr<std::thread> thread = nullptr;
|
||||||
int port;
|
int port;
|
||||||
public:
|
public:
|
||||||
SocketTcpSServer(Network* network, SOCKET descriptor, int port)
|
SocketTcpSServer(u64id_t id, Network* network, SOCKET descriptor, int port)
|
||||||
: network(network), descriptor(descriptor), port(port) {}
|
: id(id), network(network), descriptor(descriptor), port(port) {}
|
||||||
|
|
||||||
~SocketTcpSServer() {
|
~SocketTcpSServer() {
|
||||||
closeSocket();
|
closeSocket();
|
||||||
}
|
}
|
||||||
|
|
||||||
void startListen(consumer<u64id_t> handler) override {
|
void startListen(ConnectCallback handler) override {
|
||||||
thread = std::make_unique<std::thread>([this, handler]() {
|
thread = std::make_unique<std::thread>([this, handler]() {
|
||||||
while (open) {
|
while (open) {
|
||||||
logger.info() << "listening for connections";
|
logger.info() << "listening for connections";
|
||||||
@ -518,7 +519,7 @@ public:
|
|||||||
std::lock_guard lock(clientsMutex);
|
std::lock_guard lock(clientsMutex);
|
||||||
clients.push_back(id);
|
clients.push_back(id);
|
||||||
}
|
}
|
||||||
handler(id);
|
handler(this->id, id);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -558,7 +559,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
static std::shared_ptr<SocketTcpSServer> openServer(
|
static std::shared_ptr<SocketTcpSServer> openServer(
|
||||||
Network* network, int port, consumer<u64id_t> handler
|
u64id_t id, Network* network, int port, ConnectCallback handler
|
||||||
) {
|
) {
|
||||||
SOCKET descriptor = socket(
|
SOCKET descriptor = socket(
|
||||||
AF_INET, SOCK_STREAM, 0
|
AF_INET, SOCK_STREAM, 0
|
||||||
@ -585,7 +586,7 @@ public:
|
|||||||
}
|
}
|
||||||
logger.info() << "opened server at port " << port;
|
logger.info() << "opened server at port " << port;
|
||||||
auto server =
|
auto server =
|
||||||
std::make_shared<SocketTcpSServer>(network, descriptor, port);
|
std::make_shared<SocketTcpSServer>(id, network, descriptor, port);
|
||||||
server->startListen(std::move(handler));
|
server->startListen(std::move(handler));
|
||||||
return server;
|
return server;
|
||||||
}
|
}
|
||||||
@ -645,9 +646,9 @@ u64id_t Network::connect(const std::string& address, int port, consumer<u64id_t>
|
|||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
u64id_t Network::openServer(int port, consumer<u64id_t> handler) {
|
u64id_t Network::openServer(int port, ConnectCallback handler) {
|
||||||
u64id_t id = nextServer++;
|
u64id_t id = nextServer++;
|
||||||
auto server = SocketTcpSServer::openServer(this, port, handler);
|
auto server = SocketTcpSServer::openServer(id, this, port, handler);
|
||||||
servers[id] = std::move(server);
|
servers[id] = std::move(server);
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,6 +12,7 @@
|
|||||||
namespace network {
|
namespace network {
|
||||||
using OnResponse = std::function<void(std::vector<char>)>;
|
using OnResponse = std::function<void(std::vector<char>)>;
|
||||||
using OnReject = std::function<void(int)>;
|
using OnReject = std::function<void(int)>;
|
||||||
|
using ConnectCallback = std::function<void(u64id_t, u64id_t)>;
|
||||||
|
|
||||||
class Requests {
|
class Requests {
|
||||||
public:
|
public:
|
||||||
@ -64,7 +65,7 @@ namespace network {
|
|||||||
class TcpServer {
|
class TcpServer {
|
||||||
public:
|
public:
|
||||||
virtual ~TcpServer() {}
|
virtual ~TcpServer() {}
|
||||||
virtual void startListen(consumer<u64id_t> handler) = 0;
|
virtual void startListen(ConnectCallback handler) = 0;
|
||||||
virtual void close() = 0;
|
virtual void close() = 0;
|
||||||
virtual bool isOpen() = 0;
|
virtual bool isOpen() = 0;
|
||||||
virtual int getPort() const = 0;
|
virtual int getPort() const = 0;
|
||||||
@ -106,7 +107,7 @@ namespace network {
|
|||||||
|
|
||||||
u64id_t connect(const std::string& address, int port, consumer<u64id_t> callback);
|
u64id_t connect(const std::string& address, int port, consumer<u64id_t> callback);
|
||||||
|
|
||||||
u64id_t openServer(int port, consumer<u64id_t> handler);
|
u64id_t openServer(int port, ConnectCallback handler);
|
||||||
|
|
||||||
u64id_t addConnection(const std::shared_ptr<Connection>& connection);
|
u64id_t addConnection(const std::shared_ptr<Connection>& connection);
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user