服务端Skynet(一)——源码浅析
创始人
2024-01-28 14:28:21
0

服务端Skynet(一)——源码浅析

文章目录

  • 服务端Skynet(一)——源码浅析
    • 1、skynet的本质
    • 2、skynet基本的数据结构
      • 1、skynet_modules管理模块
      • 2、skynet_context模块
      • 3、skynet_message模块
    • 3、skynet启动服务步骤
    • 4、启动服务例子(logger)

参考文献:

skynet设计综述

skynet源码赏析

1、skynet的本质

Skynet 仅解决一个问题:把一个符合规范的 C 模块,从动态库(so 文件)中启动起来,绑定一个永不重复(即使模块退出)的数字 id 做为其 handle 。模块被称为服务(Service),服务间可以自由发送消息。每个模块可以向 Skynet 框架注册一个 callback 函数,用来接收发给它的消息。每个服务都是被一个个消息包驱动,当没有包到来的时候,它们就会处于挂起状态,对 CPU 资源零消耗。如果需要自主逻辑,则可以利用 Skynet 系统提供的 timeout 消息,定期触发。(Skynet 提供了名字服务,还可以给特定的服务起一个易读的名字,而不是用 id 来指代它。id 和运行时态相关,无法保证每次启动服务,都有一致的 id ,但名字可以。)

从上面的意思来看,skynet可以让我们写的不同的业务逻辑,独立运行在不同的上下文环境中,并且能够通过某种方式,相互协作,最终共同服务(actor模型)

skynet机制

在这里插入图片描述

  • 我们编写好的c文件,在编译成so库以后,在某个时机,调用该so库api的句柄,会被加载到一个modules列表中,一般这样的模块会被要求定义4种接口create,init,signal和release
  • 我们要创建一个新的,运行该业务逻辑的上下文环境时,则从modules列表中,找到对应的so库句柄,并且调用create接口,创建一个该类业务模块的数据实例,并且创建一个上下文环境(context),引用该类业务的接口和数据实例,该context会被存放在一个统一存放context的列表中,这种context被称之为服务
  • 一个服务,默认不会执行任何逻辑,需要别人向它发出请求时,才会执行对应的逻辑(定时器也是通过消息队列,告诉指定服务,要执行定时事件),并在需要时返回结果给请求者。请求者往往也是其他服务。服务间的请求、响应和推送,并不是直接调用对方的api来执行,而是通过一个消息队列,也就是说,不论是请求、回应还是推送,都需要通过这个消息队列转发到另一个服务中。skynet的消息队列,分为两级,一个全局消息队列,他包含一个头尾指针,分别指向两个隶属于指定服务的次级消息队列。skynet中的每一个服务,都有一个唯一的、专属的次级消息队列。
  • skynet一共有4种线程,monitor线程用于检测节点内的消息是否堵住,timer线程运行定时器,socket线程进行网络数据的收发,worker线程则负责对消息队列进行调度(worker线程的数量,可以通过配置表指定)。消息调度规则是,每条worker线程,每次从全局消息队列中pop出一个次级消息队列,并从次级消息队列中pop出一条消息,并找到该次级消息队列的所属服务,将消息传给该服务的callback函数,执行指定业务,当逻辑执行完毕时,再将次级消息队列push回全局消息队列中。因为每个服务只有一个次级消息队列,每当一条worker线程,从全局消息队列中pop出一个次级消息队列时,其他线程是拿不到同一个服务,并调用callback函数,因此不用担心一个服务同时在多条线程内消费不同的消息,一个服务执行,不存在并发,线程是安全的
  • socket线程、timer线程甚至是worker线程,都有可能会往指定服务的次级消息队列中push消息,push函数内有加一个自旋锁,避免同时多条线程同时向一个次级消息队列push消息的惨局。

2、skynet基本的数据结构

1、skynet_modules管理模块

/*一个模块被加载以后,将被放置到modules的skynet_module数组中,当要创建该module的实例时,将会从skynet_module中取出对应的模块,并调用create函数创建实例,然后将实例指针传入init函数完成初始化以后,赋值给context。一个C服务,定义以上四个接口时,一定要以文件名作为前缀,然后通过下划线和对应函数连接起来,因为skynet加载的时候,就是通过这种方式去寻找对应函数的地址的,比如一个c服务文件名为logger,那么对应的4个函数名则为logger_create、logger_init、logger_signal、logger_release(在程序中动态加载到skynet_module列表中,这里通过dlopen函数来获取so库的访问句柄,并通过dlsym将so库中对应的函数绑定到函数指针中)
*/
// skynet_module.h
typedef void * (*skynet_dl_create)(void);												//create
//skynet_context 对象会注册在 skynet_context list
typedef int (*skynet_dl_init)(void * inst, struct skynet_context *, const char * parm);		//init   
typedef void (*skynet_dl_release)(void * inst);											//release								
typedef void (*skynet_dl_signal)(void * inst, int signal);								 //signalstruct skynet_module {const char * name;          // C服务名称,一般是C服务的文件名void * module;              // 访问该so库的dl句柄,该句柄通过dlopen函数获得skynet_dl_create create;    // 绑定so库中的xxx_create函数,通过dlsym函数实现绑定,调用该create即是调用xxx_createskynet_dl_init init;        // 绑定so库中的xxx_init函数,调用该init即是调用xxx_initskynet_dl_release release;  // 绑定so库中的xxx_release函数,调用该release即是调用xxx_releaseskynet_dl_signal signal;    // 绑定so库中的xxx_signal函数,调用该signal即是调用xxx_signal
};// skynet_module.c
#define MAX_MODULE_TYPE 32struct modules {int count;                  // modules的数量struct spinlock lock;       // 自旋锁,避免多个线程同时向skynet_module写入数据,保证线程安全const char * path;          // 由skynet配置表中的cpath指定,一般包含./cservice/?.so路径struct skynet_module m[MAX_MODULE_TYPE];  // 存放服务模块的数组,最多32类
};static struct modules * M = NULL;

2、skynet_context模块

/*对于一个新服务的创建流程:对应的module -> module实例化和初始化 -> 创建skynet_context上下文环境 -> module实例和模块与skynet_context关联 -> 放置到skynet_context list当一个消息送达一个context时,其callback函数就会被调用,callback函数一般在module的init函数里指定,调用callback函数时,会传入userdata(一般是instance指针),source(发送方的服务id),type(消息类型),msg和sz(数据及其大小),每个服务的callback处理各自的逻辑
*/// skynet_server.c
struct skynet_context {void * instance;                // 由指定module的create函数,创建的数据实例指针,同一类服务可能有多个实例,// 因此每个服务都应该有自己的数据struct skynet_module * mod;     // 引用服务module的指针,方便后面对create、init、signal和release函数进行调用void * cb_ud;                   // 调用callback函数时,回传给callback的userdata,一般是instance指针skynet_cb cb;                   // 服务的消息回调函数,一般在skynet_module的init函数里指定struct message_queue *queue;    // 服务专属的次级消息队列指针FILE * logfile;                 // 日志句柄char result[32];                // 操作skynet_context的返回值,会写到这里uint32_t handle;                // 标识唯一context的服务idint session_id;                 // 在发出请求后,收到对方的返回消息时,通过session_id来匹配一个返回,对应哪个请求int ref;                        // 引用计数变量,当为0时,表示内存可以被释放bool init;                      // 是否完成初始化bool endless;                   // 消息是否堵住CHECKCALLING_DECL
};// skynet_handle.c
// 这个结构用于记录,服务对应的别名,当应用层为某个服务命名时,会写到这里来
struct handle_name {char * name;                   // 服务别名uint32_t handle;               // 服务id
};struct handle_storage {struct rwlock lock;            // 读写锁uint32_t harbor;               // harbor iduint32_t handle_index;         // 创建下一个服务时,该服务的slot idx,一般会先判断该slot是否被占用,后面会详细讨论int slot_size;                 // slot的大小,一定是2^n,初始值是4struct skynet_context ** slot; // skynet_context listint name_cap;                  // 别名列表大小,大小为2^nint name_count;                // 别名数量struct handle_name *name;      // 别名列表
};static struct handle_storage *H = NULL;

3、skynet_message模块

/*skynet包含两级消息队列1、global_mq		包含一个head和tail指针  分别指向次级消息队列的头部和尾部2、次级消息队列(mq)	单线链表次级消息队列,实际上是一个数组,并且用两个int型数据,分别指向他的头部和尾部(head和tail),不论是head还是tail,当他们的值>=数组尺寸时,都会进行回绕(即从下标为0开始,比如值为数组的size时,会被重新赋值为0),在push操作后,head等于tail意味着队列已满(此时,队列会扩充两倍,并从头到尾重新赋值,此时head指向0,而tail为扩充前,数组的大小),在pop操作后,head等于tail意味着队列已经空了(后面他会从skynet全局消息队列中,被剔除掉)。
*//*1、消息驱动消息派发的机制:(worker线程 --> global_mq --(pop mq)--> mq --(pop msg)-->context的callback函数 --(push mq)-->global_mq)工作线程,会从global_mq里pop一个次级消息队列来,然后从次级消息队列中,pop出一个消息,并传给context的callback函数,在完成驱动以后,再将次级消息队列push回global_mq中
*/
// skynet_mq.h
struct skynet_message {uint32_t source;            // 消息发送方的服务地址// 如果这是一个回应消息,那么要通过session找回对应的一次请求,在lua层,我们每次调用call的时候,都会往对  // 方的消息队列中,push一个消息,并且生成一个session,然后将本地的协程挂起,挂起时,会以session为key,协程句  // 柄为值,放入一个table中,当回应消息送达时,通过session找到对应的协程,并将其唤醒。后面章节会详细讨论int session; void * data;        // 消息地址size_t sz;          // 消息大小
};// skynet_mq.c
#define DEFAULT_QUEUE_SIZE 64
#define MAX_GLOBAL_MQ 0x10000// 0 means mq is not in global mq.
// 1 means mq is in global mq , or the message is dispatching.#define MQ_IN_GLOBAL 1
#define MQ_OVERLOAD 1024struct message_queue {// 自旋锁,可能存在多个线程,向同一个队列写入的情况,加上自旋锁避免并发带来的发现,//后面会讨论互斥锁,自旋锁,读写锁和条件变量的区别struct spinlock lock;     uint32_t handle;                // 拥有此消息队列的服务的idint cap;                        // 消息大小int head;                       // 头部indexint tail;                       // 尾部indexint release;                    // 是否能释放消息int in_global;                  // 是否在全局消息队列中,0表示不是,1表示是int overload;                   // 是否过载int overload_threshold;struct skynet_message *queue;   // 消息队列struct message_queue *next;     // 下一个次级消息队列的指针
};struct global_queue {struct message_queue *head;struct message_queue *tail;struct spinlock lock;
};static struct global_queue *Q = NULL;/*
2、消息写入
我们要向一个服务发消息,最终是通过调用skynet.send接口,将消息插入到该服务专属的次级消息队列的,次级消息队列的内容,并不是context结构的一部分(context只是引用了他的指针),因此,在一个服务执行callback的同时,其他服务(可能是多个线程内执行callback的其他服务)可以向它的消息队列里push消息,而mq的push操作,是加了一个自旋锁,以避免多个线程,同时操作一个消息队列。lua层的skynet.send接口,最终会调到c层的skynet_context_push。这个接口实质上,是通过handle将context指针取出来,然后再往消息队列里push消息:
*/
// skynet_server.c
int skynet_context_push(uint32_t handle, struct skynet_message *message) {struct skynet_context * ctx = skynet_handle_grab(handle);if (ctx == NULL) {return -1;}skynet_mq_push(ctx->queue, message);skynet_context_release(ctx);return 0;
}// skynet_handle.c
struct skynet_context * 
skynet_handle_grab(uint32_t handle) {struct handle_storage *s = H;struct skynet_context * result = NULL;rwlock_rlock(&s->lock);uint32_t hash = handle & (s->slot_size-1);struct skynet_context * ctx = s->slot[hash];if (ctx && skynet_context_handle(ctx) == handle) {result = ctx;skynet_context_grab(result);}/*因为我们访问一个服务的机会,远大于创建一个服务并写入列表的机会,因此这里用了读写锁,在通过handle获取context指针时,加了一个读取锁,这样当在读取的过程中,同时有新的服务创建,并且存在要扩充skynet_context list容量的风险,因此不论如何,他都应当被阻塞住,直到所有的读取锁都释放掉。*/rwlock_runlock(&s->lock);return result;
}

3、skynet启动服务步骤

  1. 从modules列表中,查找对应的服务模块,如果找到则返回,否则到modules的path中去查找对应的so库,创建一个skynet_module对象(skynet_modules管理模块所示),将so库加载到内存,并将访问该so库的句柄和skynet_module对象关联(_try_open做了这件事),并将so库中的xxx_create,xxx_init,xxx_signal,xxx_release四个函数地址赋值给skynet_module的create、init、signal和release四个函数中,这样这个skynet_module对象,就能调用so库中,对应的四个接口(_open_sym做了这件事)。
  2. 创建一个服务实例即skynet_context对象,他包含一个次级消息队列指针,服务模块指针(skynet_module对象,便于他访问module自定义的create、init、signal和release函数),由服务模块调用create接口创建的数据实例等。
  3. 将新创建的服务实例(skynet_context对象)注册到全局的服务列表中(skynet_modules管理模块所示)。
  4. 初始化服务模块(skynet_module创建的数据实例),并在初始化函数中,注册新创建的skynet_context实例的callback函数。
  5. 将该服务实例(skynet_context实例)的次级消息队列,插入到全局消息队列中。
    经过上面的步骤,一个c服务模块就被创建出来了,在回调函数被指定以后,其他服务发送给他的消息,会被pop出来,最终传给服务对应的callback函数,最后达到驱动服务的目的。

创建c服务的工作,一般在c层进行,一般会调用skynet_context_new接口,如下所示:

// skynet_server.c
struct skynet_context * 
skynet_context_new(const char * name, const char *param) {struct skynet_module * mod = skynet_module_query(name);if (mod == NULL)return NULL;void *inst = skynet_module_instance_create(mod);if (inst == NULL)return NULL;struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));CHECKCALLING_INIT(ctx)ctx->mod = mod;ctx->instance = inst;ctx->ref = 2;ctx->cb = NULL;ctx->cb_ud = NULL;ctx->session_id = 0;ctx->logfile = NULL;ctx->init = false;ctx->endless = false;// Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handlectx->handle = 0;	ctx->handle = skynet_handle_register(ctx);struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);// init function maybe use ctx->handle, so it must init at lastcontext_inc();CHECKCALLING_BEGIN(ctx)int r = skynet_module_instance_init(mod, inst, ctx, param);CHECKCALLING_END(ctx)if (r == 0) {struct skynet_context * ret = skynet_context_release(ctx);if (ret) {ctx->init = true;}skynet_globalmq_push(queue);if (ret) {skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");}return ret;} else {skynet_error(ctx, "FAILED launch %s", name);uint32_t handle = ctx->handle;skynet_context_release(ctx);skynet_handle_retire(handle);struct drop_t d = { handle };skynet_mq_release(queue, drop_message, &d);return NULL;}
}// skynet_module.c
struct skynet_module * 
skynet_module_query(const char * name) {struct skynet_module * result = _query(name);if (result)return result;SPIN_LOCK(M)result = _query(name); // double checkif (result == NULL && M->count < MAX_MODULE_TYPE) {int index = M->count;void * dl = _try_open(M,name);if (dl) {M->m[index].name = name;M->m[index].module = dl;if (_open_sym(&M->m[index]) == 0) {M->m[index].name = skynet_strdup(name);M->count ++;result = &M->m[index];}}}SPIN_UNLOCK(M)return result;
}static void *
_try_open(struct modules *m, const char * name) {const char *l;const char * path = m->path;size_t path_size = strlen(path);size_t name_size = strlen(name);int sz = path_size + name_size;//search pathvoid * dl = NULL;char tmp[sz];do{memset(tmp,0,sz);while (*path == ';') path++;if (*path == '\0') break;l = strchr(path, ';');if (l == NULL) l = path + strlen(path);int len = l - path;int i;for (i=0;path[i]!='?' && i < len ;i++) {tmp[i] = path[i];}memcpy(tmp+i,name,name_size);if (path[i] == '?') {strncpy(tmp+i+name_size,path+i+1,len - i - 1);} else {fprintf(stderr,"Invalid C service path\n");exit(1);}dl = dlopen(tmp, RTLD_NOW | RTLD_GLOBAL);path = l;}while(dl == NULL);if (dl == NULL) {fprintf(stderr, "try open %s failed : %s\n",name,dlerror());}return dl;
}_open_sym(struct skynet_module *mod) {size_t name_size = strlen(mod->name);char tmp[name_size + 9]; // create/init/release/signal , longest name is release (7)memcpy(tmp, mod->name, name_size);strcpy(tmp+name_size, "_create");mod->create = dlsym(mod->module, tmp);strcpy(tmp+name_size, "_init");mod->init = dlsym(mod->module, tmp);strcpy(tmp+name_size, "_release");mod->release = dlsym(mod->module, tmp);strcpy(tmp+name_size, "_signal");mod->signal = dlsym(mod->module, tmp);return mod->init == NULL;
}

4、启动服务例子(logger)

1、启动skynet节点时,会启动一个logger c服务

// skynet_start.c
void
skynet_start(struct skynet_config * config) {...struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);if (ctx == NULL) {fprintf(stderr, "Can't launch %s service\n", config->logservice);exit(1);}...
}

2、此时,skynet_module list列表中,搜索logger服务模块,如果没找到则在so库的输出路径中,寻找名为logger的so库,找到则将该so库加载到内存中,并将对应的logger_create,logger_init,logger_release函数地址分别赋值给logger模块中的create,init,release函数指针,此时skynet_module list中,多了一个logger模块。

3、创建服务实例,即创建一个skynet_context实例,为了使skynet_context实例拥有访问logger服务内部函数的权限,这里将logger模块指针,赋值给skynet_context实例的mod变量中。

4、创建一个logger服务的数据实例,调用logger服务的create函数:

// service_logger.c
struct logger {FILE * handle;int close;
};
struct logger *
logger_create(void) {struct logger * inst = skynet_malloc(sizeof(*inst));inst->handle = NULL;inst->close = 0;return inst;
}

此时,将新创建的数据实例赋值给skynet_context的instance变量,此时,一个服务对象运行时,所要用到的逻辑,能够通过mod变量,访问logger服务对应的函数,而通过instance可以找到该服务自己的数据块。

5、将新创建的skynet_context对象,注册skynet_context list中,此时skynet_context list多了一个logger服务实例

6、初始化logger服务,注册logger服务的callback函数:

// service_logger.c
static int
_logger(struct skynet_context * context, void *ud, int type, int session, uint32_t source, const void * msg, size_t sz) {struct logger * inst = ud;fprintf(inst->handle, "[:%08x] ",source);fwrite(msg, sz , 1, inst->handle);fprintf(inst->handle, "\n");fflush(inst->handle);return 0;
}
int
logger_init(struct logger * inst, struct skynet_context *ctx, const char * parm) {if (parm) {inst->handle = fopen(parm,"w");if (inst->handle == NULL) {return 1;}inst->close = 1;} else {inst->handle = stdout;}if (inst->handle) {skynet_callback(ctx, inst, _logger);skynet_command(ctx, "REG", ".logger");return 0;}return 1;
}
// skynet_server.c
void 
skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {context->cb = cb;context->cb_ud = ud;
}

上面这段逻辑,将skynet_context的callback函数设置为logger服务的_logger函数,并将调用callback时,传入的userdata设置为先前创建的数据实例

7、为logger服务实例创建一个次级消息队列,并将队列插入到全局消息队列中

简单的启动初始化流程:

在这里插入图片描述

相关内容

热门资讯

谁能绝地反击?《歌手2025》...   《歌手2025》突围赛名单官宣,歌王之战名额仅剩4席,谁能绝地反击,成功进入歌王之战?(编辑 孟...
雨霁抢通“生命线”:徒步3小时...   央广网北京7月30日消息(记者 庞婷 朱冠安)持续暴雨过后,7月29日,北京迎来久违的晴空,为抢...
俄罗斯勘察加地震后鲸鱼被冲上日...   据日本广播协会(NHK)等媒体报道,7月30日在日本千叶县馆山市平砂浦海岸发现4头鲸搁浅。有目击...
【职引未来】全国各地护航青年就...   稳就业,高校毕业生等青年群体就业是重中之重。当前,高校毕业生就业工作进入离校后接续服务的新阶段。...
湖北上半年经济跑出加速度 锻造...   风险挑战越大,越能考验经济韧性。  2025年经济“期中考”各地陆续放榜,这也为观察中国经济的强...
卖气球大妈和小孩发生口角 大妈...   7月28日,山东威海。卖气球大妈和小孩发生口角,大妈破口大骂并报警。警方到场认定气球易燃易爆,依...
复兴号智能动车组“变脸”,帅爆...   暑运高峰期  深圳北站站台  一列复兴号智能动车组到站后  白灯变红灯,霸气“变脸”  这酷炫的...
注意!海啸预计对我国部分沿岸地...   自然资源部海啸预警中心发布海啸黄色警报。北京时间7月30日7时24分,堪察加东岸远海海域(52....
蚊媒传染病怎么防?——国家疾控...   近期,我国南方个别城市发生基孔肯雅热输入疫情并引发本地传播。针对当前公众关心的诸多蚊媒传染病防治...