当前位置:网站首页>Startup of openstack service
Startup of openstack service
2022-04-23 02:02:00 【qq_ forty-two million five hundred and thirty-three thousand tw】
Basically everything openstack Services all depend on evenlet Complete various concurrent tasks , Its processes can be divided into two categories :
1、 WSGIService: Receive and process http request , rely on eventlet.wsgi
Of wsgi server
Handle http request , such as nova-api
2、 Service: Receive and process rpc request , Such as nova-operation
etc.
Whether it's WSGIService
still Service
Type of process , Every time a request is received (http or rpc), Will allocate a process in the process pool to process the request
One 、WSGIService Start of
Let's say nova Service as an example .
nova-api from nova/cmd/api.py start-up , It initializes a WSGIService( from service.py Definition ) object .
def main():
objects.register_all()
CONF(sys.argv[1:], project='nova',
version=version.version_string())
logging.setup(CONF, "nova")
rpc.init(CONF)
launcher = service.get_launcher()
server = service.WSGIService('osapi_nova')
launcher.launch_service(server, workers=server.workers)
launcher.wait()
api In the from service Layer to get an initiator object , The final will be server Object passed into the initiator object launch_service In the method ,launch_service(server, workers=server.workers) The method is defined as follows :
class Launcher(object):
def __init__(self):
super(Launcher, self).__init__()
self.launch_service = serve
self.wait = wait
This method is referenced to serve Method ,serve The method is defined as follows :
def serve(server, workers=None):
global _launcher
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
_launcher = service.launch(CONF, server, workers=workers)
Finally called oslo_service/service.py Under the launch Method ,launch The method is defined as follows :
def launch(conf, service, workers=1, restart_method='reload'):
…
if workers is not None and workers <= 0:
raise ValueError(_("Number of workers should be positive!"))
if workers is None or workers == 1:
launcher = ServiceLauncher(conf, restart_method=restart_method)
else:
launcher = ProcessLauncher(conf, restart_method=restart_method)
launcher.launch_service(service, workers=workers)
You can see that there are two kinds of starters used here , In the process of further explaining the startup, first introduce openstack The starter in
Two 、Openstack Medium Launcher
Openstack One of them is called Launcher The concept of , Specifically used to start the service , This class is placed in oslo_service In this bag ,Launcher Divided into two :
One is ServiceLauncher;
The other is ProcessLauncher.
ServiceLauncher A service used to start a single process ;
and ProcessLauncher Used to start multiple worker Child process services , Such as all kinds api service (nova-api、cinder-api) etc.
oslo_service/service.py
1、ServiceLauncher
ServiceLauncher Inherited from Launcher, An important member of the startup service is launcher_service,ServiceLauncher This member of is inherited from Launcher
def launch_service(self, service, workers=1):
…
if workers is not None and workers != 1:
raise ValueError(_("Launcher asked to start multiple workers"))
_check_service_base(service)
service.backdoor_port = self.backdoor_port
self.services.add(service)
aucher_service Is to add services to self.services Inside the members ,services The type of members is class Services, Look at it. add Method
class Services(object):
def __init__(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
self.done = event.Event()
def add(self, service):
"""Add a service to a list and create a thread to run it.
:param service: service to run
"""
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)
Services The initialization of this class is very simple , Create a ThreadGroup,ThreadGroup It's actually eventlet Of GreenPool,Openstack utilize eventlet Implement concurrency ,add Method , take self.run_service This method is put into pool in , and service It's its parameters .run_service It's easy , It's called service Of start Method , This completes the start of the service
2、ProcessLauncher
ProcessLauncher Directly inherited from Object, There are also launch_service Method
def launch_service(self, service, workers=1):
…
_check_service_base(service)
wrap = ServiceWrapper(service, workers)
LOG.info('Starting %d workers', wrap.workers)
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
lauch_service Except for acceptance service outside , You also need to accept one workers Parameters , That is, the number of child processes , And then call _start_child Start multiple subprocesses
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# Limit ourselves to one process a second (over the period of
# number of workers * 1 second). This will allow workers to
# start up quickly but ensure we don't fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info('Forking too fast, sleeping')
time.sleep(1)
wrap.forktimes.pop(0)
wrap.forktimes.append(time.time())
pid = os.fork()
if pid == 0:
self.launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(
self.launcher)
if not _is_sighup_and_daemon(signo):
self.launcher.wait()
break
self.launcher.restart()
os._exit(status)
LOG.debug('Started child %d', pid)
wrap.children.add(pid)
self.children[pid] = wrap
See the familiar fork No, , Just a simple call os.fork(), Then the child process starts running , Subprocess call _child_process
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn_n(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher(self.conf, restart_method=self.restart_method)
launcher.launch_service(service)
return launcher
_child_process It's very simple , Create a Launcher, call Laucher.launch_service Method , As mentioned earlier , Actually ServiceLauncher Inherited from Launcher, Also called launcher_service Method , Start the service , Therefore, the next steps can refer to the previous , In the end service.start Method start service
3、 ... and 、WSGIService Start of — To continue
Back to the previous startup section , from launcher Section , We know that the startup of the service finally calls service Of start Method , And here it is service That's where we started api.py Created in the service, Then it is transmitted layer by layer into the starter behind , Let's go back to WSGIService Class start(self) Method
def start(self):
…
if self.manager:
self.manager.init_host()
self.server.start()
self.port = self.server.port
Here we call oslo_service/wsgi.py Medium start(self) Method
def start(self):
…
self.dup_socket = self.socket.dup()
if self._use_ssl:
self.dup_socket = sslutils.wrap(self.conf, self.dup_socket)
wsgi_kwargs = {
'func': eventlet.wsgi.server,
'sock': self.dup_socket,
'site': self.app,
'protocol': self._protocol,
'custom_pool': self._pool,
'log': self._logger,
'log_format': self.conf.wsgi_log_format,
'debug': False,
'keepalive': self.conf.wsgi_keep_alive,
'socket_timeout': self.client_socket_timeout
}
if self._max_url_len:
wsgi_kwargs['url_length_limit'] = self._max_url_len
self._server = eventlet.spawn(**wsgi_kwargs)
Be careful wsgi_kwargs Parameters in func, Its value is eventlet.wsgi.server, stay eventlet/wsgi.py Is defined as follows :
def server(sock, site,
…
try:
serv.log.info("(%s) wsgi starting up on %s" % (
serv.pid, socket_repr(sock)))
while is_accepting:
try:
client_socket = sock.accept()
client_socket[0].settimeout(serv.socket_timeout)
serv.log.debug("(%s) accepted %r" % (
serv.pid, client_socket[1]))
try:
pool.spawn_n(serv.process_request, client_socket)
except AttributeError:
warnings.warn("wsgi's pool should be an instance of "
"eventlet.greenpool.GreenPool, is %s. Please convert your"
" call site to use GreenPool instead" % type(pool),
DeprecationWarning, stacklevel=2)
pool.execute_async(serv.process_request, client_socket)
except ACCEPT_EXCEPTIONS as e:
if support.get_errno(e) not in ACCEPT_ERRNO:
raise
except (KeyboardInterrupt, SystemExit):
serv.log.info("wsgi exiting")
break
finally:
pool.waitall()
…
see , Did you see a familiar scene !sock.accept() Monitor requests , Whenever a new request is received , call pool.spawn_n() Start a process to process the request
Four 、Service Start of
Service Types of processes are also controlled by nova/cmd/* Create some files in the directory :
- nova-schedule: nova/cmd/schedule.py
- ……
As a consumer of message oriented middleware , They monitor their own queue, Whenever there is rpc When the request comes , They create a new co process rpc request . With nova-schedule For example , Initialize a at startup Server( from service.py Definition ) object .
Whole Launcher Process follow WSGIServer equally , It's just service Of start() There are just some differences
def start(self):
…
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
endpoints.extend(self.manager.additional_endpoints)
serializer = objects_base.KarborObjectSerializer()
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
Call through , Finally, such a RPCServer object
class RPCServer(msg_server.MessageHandlingServer):
def __init__(self, transport, target, dispatcher, executor='blocking'):
super(RPCServer, self).__init__(transport, dispatcher, executor)
self._target = target
This class inherits from MessageHandlingServer;
notes :nova All components of depend on oslo.messaging Access the message server , adopt oslo/messaging/server.py Initialize a MessageHandlingServer The object of , Listen message queue .
Finally, the service Of start Method
def start(self, override_pool_size=None):
…
if self._started:
LOG.warning(_LW('Restarting a MessageHandlingServer is inherently '
'racy. It is deprecated, and will become a noop '
'in a future release of oslo.messaging. If you '
'need to restart MessageHandlingServer you should '
'instantiate a new object.'))
self._started = True
executor_opts = {}
if self.executor_type in ("threading", "eventlet"):
executor_opts["max_workers"] = (
override_pool_size or self.conf.executor_thread_pool_size
)
self._work_executor = self._executor_cls(**executor_opts)
try:
self.listener = self._create_listener()
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
# HACK(sileht): We temporary pass the executor to the rabbit
# listener to fix a race with the deprecated blocking executor.
# We do this hack because this is need only for 'synchronous'
# executor like blocking. And this one is deprecated. Making
# driver working in an sync and an async way is complicated
# and blocking have 0% tests coverage.
if hasattr(self.listener, '_poll_style_listener'):
l = self.listener._poll_style_listener
if hasattr(l, "_message_operations_handler"):
l._message_operations_handler._executor = (
self.executor_type)
self.listener.start(self._on_incoming)
The above object initializes another EventletExecutor( from oslo/messaging/_executors/impl_eventlet.py) Type of excuete object , It calls self.listener.poll() monitor rpc request , Every time a request is received , Create a process to handle the request .
class EventletExecutor(base.ExecutorBase):
......
def start(self):
if self._thread is not None:
return
@excutils.forever_retry_uncaught_exceptions
def _executor_thread():
try:
while True:
incoming = self.listener.poll()
spawn_with(ctxt=self.dispatcher(incoming),
pool=self._greenpool)
except greenlet.GreenletExit:
return
self._thread = eventlet.spawn(_executor_thread)
official account : Skilled people with culture
版权声明
本文为[qq_ forty-two million five hundred and thirty-three thousand tw]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230202212324.html
边栏推荐
- Easyswool environment configuration
- MySQL basic record
- Is CICC fortune a state-owned enterprise and is it safe to open an account
- EBS:PO_ EMPLOYEE_ HIERARCHIES_ ALL
- Unity editor hierarchy drop-down menu extension
- Is the sinking coffee industry a false prosperity or the eve of a broken situation?
- Shardingsphere sub database and sub table
- 中金财富跟中金公司是一家公司吗,安全吗
- W801 / w800 WiFi socket development (II) - UDP Bluetooth control WiFi connection
- Question bank and online simulation examination for safety management personnel of hazardous chemical business units in 2022
猜你喜欢
批处理多个文件合成一个HEX
Server 2019 the available memory of the server is half of the actual memory
The leader / teacher asks to fill in the EXCEL form document. How to edit the word / Excel file on the mobile phone and fill in the Excel / word electronic document?
89 logistic回归用户画像用户响应度预测
leetcode:27. 移除元素【count remove小操作】
使用代理IP是需要注意什么?
Challenges often faced by client project management
What is a dial-up server and what is its use?
C语言中如何“指名道姓”的进行初始化
拨号服务器是什么,有什么用处?
随机推荐
关于局域网浅谈
如何对代理IP进行分类?
Use Xdebug breakpoint debugging in postman
What are the test steps of dynamic proxy IP?
What businesses use physical servers?
How to initialize "naming and surname" in C language
Question bank and online simulation examination for safety management personnel of hazardous chemical business units in 2022
PID精讲
How does Axure set the content of the text box to the current date when the page is loaded
Makefile文件是什么?
Performance introduction of the first new version of cdr2022
[Dahua cloud native] micro service chapter - service mode of five-star hotels
J-Link RTT使用
我国科学家揭示突破水稻产量瓶颈新机制
ThinkPHP kernel development blind box mall source code v2 0 docking easy payment / Alibaba cloud SMS / qiniu cloud storage
拨号服务器是什么,有什么用处?
Problem solving: dpkg DEB: error: package name has characters that are't lowercase alphanums or '- +‘
【汇编语言】从最底层的角度理解“堆栈”
Halo open source project learning (I): project launch
校园转转二手市场源码