当前位置:网站首页>Startup of openstack service
Startup of openstack service
2022-04-23 02:02:00 【qq_ forty-two million five hundred and thirty-three thousand tw】
Basically everything openstack Services all depend on evenlet Complete various concurrent tasks , Its processes can be divided into two categories :
1、 WSGIService: Receive and process http request , rely on eventlet.wsgi Of wsgi server Handle http request , such as nova-api
2、 Service: Receive and process rpc request , Such as nova-operation etc.
Whether it's WSGIService still Service Type of process , Every time a request is received (http or rpc), Will allocate a process in the process pool to process the request
One 、WSGIService Start of
Let's say nova Service as an example .
nova-api from nova/cmd/api.py start-up , It initializes a WSGIService( from service.py Definition ) object .
def main():
objects.register_all()
CONF(sys.argv[1:], project='nova',
version=version.version_string())
logging.setup(CONF, "nova")
rpc.init(CONF)
launcher = service.get_launcher()
server = service.WSGIService('osapi_nova')
launcher.launch_service(server, workers=server.workers)
launcher.wait()
api In the from service Layer to get an initiator object , The final will be server Object passed into the initiator object launch_service In the method ,launch_service(server, workers=server.workers) The method is defined as follows :
class Launcher(object):
def __init__(self):
super(Launcher, self).__init__()
self.launch_service = serve
self.wait = wait
This method is referenced to serve Method ,serve The method is defined as follows :
def serve(server, workers=None):
global _launcher
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
_launcher = service.launch(CONF, server, workers=workers)
Finally called oslo_service/service.py Under the launch Method ,launch The method is defined as follows :
def launch(conf, service, workers=1, restart_method='reload'):
…
if workers is not None and workers <= 0:
raise ValueError(_("Number of workers should be positive!"))
if workers is None or workers == 1:
launcher = ServiceLauncher(conf, restart_method=restart_method)
else:
launcher = ProcessLauncher(conf, restart_method=restart_method)
launcher.launch_service(service, workers=workers)
You can see that there are two kinds of starters used here , In the process of further explaining the startup, first introduce openstack The starter in
Two 、Openstack Medium Launcher
Openstack One of them is called Launcher The concept of , Specifically used to start the service , This class is placed in oslo_service In this bag ,Launcher Divided into two :
One is ServiceLauncher;
The other is ProcessLauncher.
ServiceLauncher A service used to start a single process ;
and ProcessLauncher Used to start multiple worker Child process services , Such as all kinds api service (nova-api、cinder-api) etc.
oslo_service/service.py
1、ServiceLauncher
ServiceLauncher Inherited from Launcher, An important member of the startup service is launcher_service,ServiceLauncher This member of is inherited from Launcher
def launch_service(self, service, workers=1):
…
if workers is not None and workers != 1:
raise ValueError(_("Launcher asked to start multiple workers"))
_check_service_base(service)
service.backdoor_port = self.backdoor_port
self.services.add(service)
aucher_service Is to add services to self.services Inside the members ,services The type of members is class Services, Look at it. add Method
class Services(object):
def __init__(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
self.done = event.Event()
def add(self, service):
"""Add a service to a list and create a thread to run it.
:param service: service to run
"""
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)
Services The initialization of this class is very simple , Create a ThreadGroup,ThreadGroup It's actually eventlet Of GreenPool,Openstack utilize eventlet Implement concurrency ,add Method , take self.run_service This method is put into pool in , and service It's its parameters .run_service It's easy , It's called service Of start Method , This completes the start of the service
2、ProcessLauncher
ProcessLauncher Directly inherited from Object, There are also launch_service Method
def launch_service(self, service, workers=1):
…
_check_service_base(service)
wrap = ServiceWrapper(service, workers)
LOG.info('Starting %d workers', wrap.workers)
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
lauch_service Except for acceptance service outside , You also need to accept one workers Parameters , That is, the number of child processes , And then call _start_child Start multiple subprocesses
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# Limit ourselves to one process a second (over the period of
# number of workers * 1 second). This will allow workers to
# start up quickly but ensure we don't fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info('Forking too fast, sleeping')
time.sleep(1)
wrap.forktimes.pop(0)
wrap.forktimes.append(time.time())
pid = os.fork()
if pid == 0:
self.launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(
self.launcher)
if not _is_sighup_and_daemon(signo):
self.launcher.wait()
break
self.launcher.restart()
os._exit(status)
LOG.debug('Started child %d', pid)
wrap.children.add(pid)
self.children[pid] = wrap
See the familiar fork No, , Just a simple call os.fork(), Then the child process starts running , Subprocess call _child_process
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn_n(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher(self.conf, restart_method=self.restart_method)
launcher.launch_service(service)
return launcher
_child_process It's very simple , Create a Launcher, call Laucher.launch_service Method , As mentioned earlier , Actually ServiceLauncher Inherited from Launcher, Also called launcher_service Method , Start the service , Therefore, the next steps can refer to the previous , In the end service.start Method start service
3、 ... and 、WSGIService Start of — To continue
Back to the previous startup section , from launcher Section , We know that the startup of the service finally calls service Of start Method , And here it is service That's where we started api.py Created in the service, Then it is transmitted layer by layer into the starter behind , Let's go back to WSGIService Class start(self) Method
def start(self):
…
if self.manager:
self.manager.init_host()
self.server.start()
self.port = self.server.port
Here we call oslo_service/wsgi.py Medium start(self) Method
def start(self):
…
self.dup_socket = self.socket.dup()
if self._use_ssl:
self.dup_socket = sslutils.wrap(self.conf, self.dup_socket)
wsgi_kwargs = {
'func': eventlet.wsgi.server,
'sock': self.dup_socket,
'site': self.app,
'protocol': self._protocol,
'custom_pool': self._pool,
'log': self._logger,
'log_format': self.conf.wsgi_log_format,
'debug': False,
'keepalive': self.conf.wsgi_keep_alive,
'socket_timeout': self.client_socket_timeout
}
if self._max_url_len:
wsgi_kwargs['url_length_limit'] = self._max_url_len
self._server = eventlet.spawn(**wsgi_kwargs)
Be careful wsgi_kwargs Parameters in func, Its value is eventlet.wsgi.server, stay eventlet/wsgi.py Is defined as follows :
def server(sock, site,
…
try:
serv.log.info("(%s) wsgi starting up on %s" % (
serv.pid, socket_repr(sock)))
while is_accepting:
try:
client_socket = sock.accept()
client_socket[0].settimeout(serv.socket_timeout)
serv.log.debug("(%s) accepted %r" % (
serv.pid, client_socket[1]))
try:
pool.spawn_n(serv.process_request, client_socket)
except AttributeError:
warnings.warn("wsgi's pool should be an instance of "
"eventlet.greenpool.GreenPool, is %s. Please convert your"
" call site to use GreenPool instead" % type(pool),
DeprecationWarning, stacklevel=2)
pool.execute_async(serv.process_request, client_socket)
except ACCEPT_EXCEPTIONS as e:
if support.get_errno(e) not in ACCEPT_ERRNO:
raise
except (KeyboardInterrupt, SystemExit):
serv.log.info("wsgi exiting")
break
finally:
pool.waitall()
…
see , Did you see a familiar scene !sock.accept() Monitor requests , Whenever a new request is received , call pool.spawn_n() Start a process to process the request
Four 、Service Start of
Service Types of processes are also controlled by nova/cmd/* Create some files in the directory :
- nova-schedule: nova/cmd/schedule.py
- ……
As a consumer of message oriented middleware , They monitor their own queue, Whenever there is rpc When the request comes , They create a new co process rpc request . With nova-schedule For example , Initialize a at startup Server( from service.py Definition ) object .
Whole Launcher Process follow WSGIServer equally , It's just service Of start() There are just some differences
def start(self):
…
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
endpoints.extend(self.manager.additional_endpoints)
serializer = objects_base.KarborObjectSerializer()
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
Call through , Finally, such a RPCServer object
class RPCServer(msg_server.MessageHandlingServer):
def __init__(self, transport, target, dispatcher, executor='blocking'):
super(RPCServer, self).__init__(transport, dispatcher, executor)
self._target = target
This class inherits from MessageHandlingServer;
notes :nova All components of depend on oslo.messaging Access the message server , adopt oslo/messaging/server.py Initialize a MessageHandlingServer The object of , Listen message queue .
Finally, the service Of start Method
def start(self, override_pool_size=None):
…
if self._started:
LOG.warning(_LW('Restarting a MessageHandlingServer is inherently '
'racy. It is deprecated, and will become a noop '
'in a future release of oslo.messaging. If you '
'need to restart MessageHandlingServer you should '
'instantiate a new object.'))
self._started = True
executor_opts = {}
if self.executor_type in ("threading", "eventlet"):
executor_opts["max_workers"] = (
override_pool_size or self.conf.executor_thread_pool_size
)
self._work_executor = self._executor_cls(**executor_opts)
try:
self.listener = self._create_listener()
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
# HACK(sileht): We temporary pass the executor to the rabbit
# listener to fix a race with the deprecated blocking executor.
# We do this hack because this is need only for 'synchronous'
# executor like blocking. And this one is deprecated. Making
# driver working in an sync and an async way is complicated
# and blocking have 0% tests coverage.
if hasattr(self.listener, '_poll_style_listener'):
l = self.listener._poll_style_listener
if hasattr(l, "_message_operations_handler"):
l._message_operations_handler._executor = (
self.executor_type)
self.listener.start(self._on_incoming)
The above object initializes another EventletExecutor( from oslo/messaging/_executors/impl_eventlet.py) Type of excuete object , It calls self.listener.poll() monitor rpc request , Every time a request is received , Create a process to handle the request .
class EventletExecutor(base.ExecutorBase):
......
def start(self):
if self._thread is not None:
return
@excutils.forever_retry_uncaught_exceptions
def _executor_thread():
try:
while True:
incoming = self.listener.poll()
spawn_with(ctxt=self.dispatcher(incoming),
pool=self._greenpool)
except greenlet.GreenletExit:
return
self._thread = eventlet.spawn(_executor_thread)
official account : Skilled people with culture
版权声明
本文为[qq_ forty-two million five hundred and thirty-three thousand tw]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230202212324.html
边栏推荐
- 2022.4.22-----leetcode.396
- OJ daily practice - Finish
- ESP32使用freeRTOS的消息队列
- 教程】如何用GCC“零汇编”白嫖MDK
- How to initialize "naming and surname" in C language
- [Leetcode每日一题]396. 旋转函数
- Challenges often faced by client project management
- Shardingsphere read write separation
- 我国科学家揭示突破水稻产量瓶颈新机制
- Uncover floating-point operations hidden by the ARM compiler
猜你喜欢

How to set computer IP?

Encrypted compressed backup bat script

Server 2019 the available memory of the server is half of the actual memory

Thinkphp内核开发盲盒商城源码v2.0 对接易支付/阿里云短信/七牛云存储

Virtual serial port function of j-link V9 using skills

Longest common subsequence (record path version)
![[tutorial] how to use GCC](/img/60/c5804fc4da965afaa3cc72c44a11f9.png)
[tutorial] how to use GCC "zero assembly" for white whoring MDK

89 logistic回归用户画像用户响应度预测

Realize linear regression with tensorflow (including problems and solutions in the process)

如何设置电脑ip?
随机推荐
What businesses use physical servers?
Use of j-link RTT
Halo open source project learning (I): project launch
Why is one plus one equal to two
PID refinement
mb_ substr()、mb_ Strpos() function (story)
【汇编语言】从最底层的角度理解“堆栈”
The leader / teacher asks to fill in the EXCEL form document. How to edit the word / Excel file on the mobile phone and fill in the Excel / word electronic document?
Unity editor hierarchy drop-down menu extension
如何对代理IP进行分类?
如何“优雅”的测量系统性能
Find the largest number of two-dimensional arrays
Micro build low code zero foundation introductory course
有哪些常见的代理ip问题?
easyswoole环境配置
RuntimeError: The size of tensor a (4) must match the size of tensor b (3) at non-singleton dimensio
Digital collection platform settled in digital collection platform to develop SaaS platform of digital collection
2022.4.10-----leetcode.804
BGP服务器在什么业务场景会被用到?
Performance introduction of the first new version of cdr2022