标签:settings python consul args server grpc 注册 import port
注册http
import requests
headers = {
"contentType": "application/json"
}
def register(name, id, address, port):
url = "http://192.168.190.129:8500/v1/agent/service/register"
rsp = requests.put(url, headers=headers, json={
"Name": name,
"ID": id,
"Tags": ["mxshop", "bobby", "imocc", "web"],
"Address": address,
"Port": port,
"Check": {
"HTTP": f"http://{address}:{port}/health",
"Timeout": "5s",
"Interval": "5s",
"DeregisterCriticalServiceAfter": "5s",
}
})
if rsp.status_code == 200:
print("注册成功")
else:
print(f"注册失败:{rsp.status_code}")
def deregister(id):
url = f"http://192.168.190.129:8500/v1/agent/service/deregister/{id}"
rsp = requests.put(url, headers=headers)
if rsp.status_code == 200:
print("注销成功")
else:
print(f"注销失败:{rsp.status_code}")
if __name__ == '__main__':
register("mxshop-web", "mxshop-web", "192.168.230.1", 8021)
# deregister("mxshop-web")
pass
注册grpc
import socket
import sys
from concurrent import futures
import signal
import argparse
import os
import grpc
from loguru import logger
from user_srv.settings.settings import client
BASE_DIR = os.path.dirname(os.path.abspath(os.path.dirname(file)))
sys.path.insert(0, BASE_DIR)
from user_srv.proto import user_pb2_grpc
from user_srv.handler.user import UserServicer
from common.grpc_health.v1 import health,health_pb2_grpc
from common.register import consul
from user_srv.settings import settings
def on_exit(signo, frame):
logger.info("进程中端")
sys.exit(0)
def get_free_tcp_port():
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp.bind(("", 0))
_, port = tcp.getsockname()
tcp.close()
return port
def serve():
parse = argparse.ArgumentParser()
parse.add_argument("--ip",
nargs="?",
type=str,
default="192.168.230.1",
help="binding ip"
)
parse.add_argument("--port",
nargs="?",
type=int,
default=0,
help="the listening port"
)
args = parse.parse_args()
if args.port == 0:
args.port = get_free_tcp_port()
# logger.add("logs/user_srv_{time}.log")
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# 注册用户服务
user_pb2_grpc.add_UserServicer_to_server(UserServicer(), server)
# 注册健康检查
health_pb2_grpc.add_HealthServicer_to_server(health.HealthServicer(), server)
server.add_insecure_port(f"{args.ip}:{(args.port)}")
server.start()
logger.info(f"启动服务: {args.ip}:{args.port}]")
"""
SIGINT ctrl+C
SIGTERM kill 发出的终止
"""
signal.signal(signal.SIGINT, on_exit)
signal.signal(signal.SIGTERM, on_exit)
logger.info(f"服务注册开始")
register = consul.ConsulRegister(settings.CONSUL_HOST, settings.CONSUL_PORT)
if not register.register(name=settings.SERVICE_NAME, id=settings.SERVICE_ID, address=args.ip, port=args.port, tags=settings.SERVICE_TAGS, check=None):
logger.info(f"服务注册失败")
sys.exit(0)
logger.info(f"服务注册成功")
server.wait_for_termination()
def test_cb(args):
print("配置文件产生变化")
print(args)
if name == 'main':
# print(get_free_tcp_port())
client.add_config_watcher(settings.NACOS["DataId"], settings.NACOS["Group"], test_cb)
serve()
标签:settings,python,consul,args,server,grpc,注册,import,port 来源: https://www.cnblogs.com/myboran/p/15414304.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。