当前位置:网站首页>Startup of openstack service
Startup of openstack service
2022-04-23 02:02:00 【qq_ forty-two million five hundred and thirty-three thousand tw】
Basically everything openstack Services all depend on evenlet Complete various concurrent tasks , Its processes can be divided into two categories :
1、 WSGIService: Receive and process http request , rely on eventlet.wsgi Of wsgi server Handle http request , such as nova-api
2、 Service: Receive and process rpc request , Such as nova-operation etc.
Whether it's WSGIService still Service Type of process , Every time a request is received (http or rpc), Will allocate a process in the process pool to process the request
One 、WSGIService Start of
Let's say nova Service as an example .
nova-api from nova/cmd/api.py start-up , It initializes a WSGIService( from service.py Definition ) object .
def main():
objects.register_all()
CONF(sys.argv[1:], project='nova',
version=version.version_string())
logging.setup(CONF, "nova")
rpc.init(CONF)
launcher = service.get_launcher()
server = service.WSGIService('osapi_nova')
launcher.launch_service(server, workers=server.workers)
launcher.wait()
api In the from service Layer to get an initiator object , The final will be server Object passed into the initiator object launch_service In the method ,launch_service(server, workers=server.workers) The method is defined as follows :
class Launcher(object):
def __init__(self):
super(Launcher, self).__init__()
self.launch_service = serve
self.wait = wait
This method is referenced to serve Method ,serve The method is defined as follows :
def serve(server, workers=None):
global _launcher
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
_launcher = service.launch(CONF, server, workers=workers)
Finally called oslo_service/service.py Under the launch Method ,launch The method is defined as follows :
def launch(conf, service, workers=1, restart_method='reload'):
…
if workers is not None and workers <= 0:
raise ValueError(_("Number of workers should be positive!"))
if workers is None or workers == 1:
launcher = ServiceLauncher(conf, restart_method=restart_method)
else:
launcher = ProcessLauncher(conf, restart_method=restart_method)
launcher.launch_service(service, workers=workers)
You can see that there are two kinds of starters used here , In the process of further explaining the startup, first introduce openstack The starter in
Two 、Openstack Medium Launcher
Openstack One of them is called Launcher The concept of , Specifically used to start the service , This class is placed in oslo_service In this bag ,Launcher Divided into two :
One is ServiceLauncher;
The other is ProcessLauncher.
ServiceLauncher A service used to start a single process ;
and ProcessLauncher Used to start multiple worker Child process services , Such as all kinds api service (nova-api、cinder-api) etc.
oslo_service/service.py
1、ServiceLauncher
ServiceLauncher Inherited from Launcher, An important member of the startup service is launcher_service,ServiceLauncher This member of is inherited from Launcher
def launch_service(self, service, workers=1):
…
if workers is not None and workers != 1:
raise ValueError(_("Launcher asked to start multiple workers"))
_check_service_base(service)
service.backdoor_port = self.backdoor_port
self.services.add(service)
aucher_service Is to add services to self.services Inside the members ,services The type of members is class Services, Look at it. add Method
class Services(object):
def __init__(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
self.done = event.Event()
def add(self, service):
"""Add a service to a list and create a thread to run it.
:param service: service to run
"""
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)
Services The initialization of this class is very simple , Create a ThreadGroup,ThreadGroup It's actually eventlet Of GreenPool,Openstack utilize eventlet Implement concurrency ,add Method , take self.run_service This method is put into pool in , and service It's its parameters .run_service It's easy , It's called service Of start Method , This completes the start of the service
2、ProcessLauncher
ProcessLauncher Directly inherited from Object, There are also launch_service Method
def launch_service(self, service, workers=1):
…
_check_service_base(service)
wrap = ServiceWrapper(service, workers)
LOG.info('Starting %d workers', wrap.workers)
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
lauch_service Except for acceptance service outside , You also need to accept one workers Parameters , That is, the number of child processes , And then call _start_child Start multiple subprocesses
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# Limit ourselves to one process a second (over the period of
# number of workers * 1 second). This will allow workers to
# start up quickly but ensure we don't fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info('Forking too fast, sleeping')
time.sleep(1)
wrap.forktimes.pop(0)
wrap.forktimes.append(time.time())
pid = os.fork()
if pid == 0:
self.launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(
self.launcher)
if not _is_sighup_and_daemon(signo):
self.launcher.wait()
break
self.launcher.restart()
os._exit(status)
LOG.debug('Started child %d', pid)
wrap.children.add(pid)
self.children[pid] = wrap
See the familiar fork No, , Just a simple call os.fork(), Then the child process starts running , Subprocess call _child_process
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn_n(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher(self.conf, restart_method=self.restart_method)
launcher.launch_service(service)
return launcher
_child_process It's very simple , Create a Launcher, call Laucher.launch_service Method , As mentioned earlier , Actually ServiceLauncher Inherited from Launcher, Also called launcher_service Method , Start the service , Therefore, the next steps can refer to the previous , In the end service.start Method start service
3、 ... and 、WSGIService Start of — To continue
Back to the previous startup section , from launcher Section , We know that the startup of the service finally calls service Of start Method , And here it is service That's where we started api.py Created in the service, Then it is transmitted layer by layer into the starter behind , Let's go back to WSGIService Class start(self) Method
def start(self):
…
if self.manager:
self.manager.init_host()
self.server.start()
self.port = self.server.port
Here we call oslo_service/wsgi.py Medium start(self) Method
def start(self):
…
self.dup_socket = self.socket.dup()
if self._use_ssl:
self.dup_socket = sslutils.wrap(self.conf, self.dup_socket)
wsgi_kwargs = {
'func': eventlet.wsgi.server,
'sock': self.dup_socket,
'site': self.app,
'protocol': self._protocol,
'custom_pool': self._pool,
'log': self._logger,
'log_format': self.conf.wsgi_log_format,
'debug': False,
'keepalive': self.conf.wsgi_keep_alive,
'socket_timeout': self.client_socket_timeout
}
if self._max_url_len:
wsgi_kwargs['url_length_limit'] = self._max_url_len
self._server = eventlet.spawn(**wsgi_kwargs)
Be careful wsgi_kwargs Parameters in func, Its value is eventlet.wsgi.server, stay eventlet/wsgi.py Is defined as follows :
def server(sock, site,
…
try:
serv.log.info("(%s) wsgi starting up on %s" % (
serv.pid, socket_repr(sock)))
while is_accepting:
try:
client_socket = sock.accept()
client_socket[0].settimeout(serv.socket_timeout)
serv.log.debug("(%s) accepted %r" % (
serv.pid, client_socket[1]))
try:
pool.spawn_n(serv.process_request, client_socket)
except AttributeError:
warnings.warn("wsgi's pool should be an instance of "
"eventlet.greenpool.GreenPool, is %s. Please convert your"
" call site to use GreenPool instead" % type(pool),
DeprecationWarning, stacklevel=2)
pool.execute_async(serv.process_request, client_socket)
except ACCEPT_EXCEPTIONS as e:
if support.get_errno(e) not in ACCEPT_ERRNO:
raise
except (KeyboardInterrupt, SystemExit):
serv.log.info("wsgi exiting")
break
finally:
pool.waitall()
…
see , Did you see a familiar scene !sock.accept() Monitor requests , Whenever a new request is received , call pool.spawn_n() Start a process to process the request
Four 、Service Start of
Service Types of processes are also controlled by nova/cmd/* Create some files in the directory :
- nova-schedule: nova/cmd/schedule.py
- ……
As a consumer of message oriented middleware , They monitor their own queue, Whenever there is rpc When the request comes , They create a new co process rpc request . With nova-schedule For example , Initialize a at startup Server( from service.py Definition ) object .
Whole Launcher Process follow WSGIServer equally , It's just service Of start() There are just some differences
def start(self):
…
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
endpoints.extend(self.manager.additional_endpoints)
serializer = objects_base.KarborObjectSerializer()
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
Call through , Finally, such a RPCServer object
class RPCServer(msg_server.MessageHandlingServer):
def __init__(self, transport, target, dispatcher, executor='blocking'):
super(RPCServer, self).__init__(transport, dispatcher, executor)
self._target = target
This class inherits from MessageHandlingServer;
notes :nova All components of depend on oslo.messaging Access the message server , adopt oslo/messaging/server.py Initialize a MessageHandlingServer The object of , Listen message queue .
Finally, the service Of start Method
def start(self, override_pool_size=None):
…
if self._started:
LOG.warning(_LW('Restarting a MessageHandlingServer is inherently '
'racy. It is deprecated, and will become a noop '
'in a future release of oslo.messaging. If you '
'need to restart MessageHandlingServer you should '
'instantiate a new object.'))
self._started = True
executor_opts = {}
if self.executor_type in ("threading", "eventlet"):
executor_opts["max_workers"] = (
override_pool_size or self.conf.executor_thread_pool_size
)
self._work_executor = self._executor_cls(**executor_opts)
try:
self.listener = self._create_listener()
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
# HACK(sileht): We temporary pass the executor to the rabbit
# listener to fix a race with the deprecated blocking executor.
# We do this hack because this is need only for 'synchronous'
# executor like blocking. And this one is deprecated. Making
# driver working in an sync and an async way is complicated
# and blocking have 0% tests coverage.
if hasattr(self.listener, '_poll_style_listener'):
l = self.listener._poll_style_listener
if hasattr(l, "_message_operations_handler"):
l._message_operations_handler._executor = (
self.executor_type)
self.listener.start(self._on_incoming)
The above object initializes another EventletExecutor( from oslo/messaging/_executors/impl_eventlet.py) Type of excuete object , It calls self.listener.poll() monitor rpc request , Every time a request is received , Create a process to handle the request .
class EventletExecutor(base.ExecutorBase):
......
def start(self):
if self._thread is not None:
return
@excutils.forever_retry_uncaught_exceptions
def _executor_thread():
try:
while True:
incoming = self.listener.poll()
spawn_with(ctxt=self.dispatcher(incoming),
pool=self._greenpool)
except greenlet.GreenletExit:
return
self._thread = eventlet.spawn(_executor_thread)
official account : Skilled people with culture
版权声明
本文为[qq_ forty-two million five hundred and thirty-three thousand tw]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230202212324.html
边栏推荐
- 如何选择一台好的拨号服务器?
- Thinkphp内核开发盲盒商城源码v2.0 对接易支付/阿里云短信/七牛云存储
- What categories do you need to know before using proxy IP?
- Under the pressure of sales, domestic mobile phones began to reduce prices, but they haven't put down their final face
- Is CICC fortune a company with CICC? Is it safe
- Esp32 message queue using FreeRTOS
- 【动手学深度学习V2】循环神经网络-1.序列模型
- A simple and open source navigation website source code
- keil mdk中文乱码,两种解决方法,字体不再难看
- 一加一为什么等于二
猜你喜欢

The sixth season of 2022, the perfect children's model IPA national race leads the yuanuniverse track

《维C中国》乡村助农暖人心第三站嘉宝果农场

世界读书日 | 技术人不要错过的好书(IT前沿技术)

How to "gracefully" measure system performance

ThinkPHP kernel development blind box mall source code v2 0 docking easy payment / Alibaba cloud SMS / qiniu cloud storage

Halo open source project learning (I): project launch

Batch multiple files into one hex

What is a proxy IP pool and how to build it?

2022第六季完美童模 IPA國民賽領跑元宇宙賽道

Virtual serial port function of j-link V9 using skills
随机推荐
Performance introduction of the first new version of cdr2022
什么是代理IP池,如何构建?
About how to import C4d animation into lumion
What categories do you need to know before using proxy IP?
Encrypted compressed backup bat script
Problem solving: dpkg DEB: error: package name has characters that are't lowercase alphanums or '- +‘
Is CICC fortune a state-owned enterprise and is it safe to open an account
How to install mysql-5.7.9 in RPM mode under Linux system
Heap overflow of kernel PWN basic tutorial
MySQL basic record
Redis memory recycling strategy
简洁开源的一款导航网站源码
EBS:PO_ EMPLOYEE_ HIERARCHIES_ ALL
紫光国微财报一枝独秀 2021年净利润三位数增长靠什么
What are the benefits of writing unit tests using the unit test framework?
Use Xdebug breakpoint debugging in postman
[leetcode daily question] 396 Rotation function
CC2541的仿真器CC Debugger使用教程
揭秘被Arm编译器所隐藏的浮点运算
《维C中国》乡村助农暖人心第三站嘉宝果农场