1.Nacos API
本代码包括列出服务列表,列出指定服务下实例列表,服务上下线,实例注册,服务信息修改等方法,更多关于纳科萨皮的详细信息,请参考https://nacos.io/zh-cn/docs/open-api.html,
导入请求类nacos API(object): def _ _ init _ _(self,url,用户名,密码): self.auth,self.url={'username':用户名,'密码' :密码},网址自。响应=请求。post(' % s/nacos/v1/auth/users/log in ' %(self。URL),data=self.auth,verify=False).JSON()self。params=((' access token ',self.response'accessToken '),)def servicesList(self): ' ' '列出服务列表' ' '数据=请求。get(' % s/nacos/v1/ns/catalog/services?pageNo=1pageSize=100' % self.url,params=self.params).'文本打印(数据)定义实例列表(自助,服务): ' ' '列出指定服务下实例列表' ' '数据=请求。get(' % s/nacos/v1/ns/instance/list?服务名称=% s"%(自身。URL,service),params=self.params).'文本打印(数据)定义更改状态(自身,服务名称,ip,端口,已启用): ' ' '实例上下线启用:真上线,假的下线' '自我。data={ '服务名' :服务名,' ip': ip,'端口' :端口,'启用' :启用} data=请求。put(' % s/nacos/v1/ns/instance ' %(self。URL),params=self.params,data=self.data,verify=False). '''文本打印(数据)定义寄存器(自身,服务名,ip,端口): ' ' '实例注册' ' ' URL=' % s/nacos/v1/ns/instance ' %(self。URL)数据=请求。post(URL,data={'ip':ip,' port':port,' service name ' : service name })print(data。'文本)定义实例更改(自身): ' ' '修改实例信息' ' ' data={ ' service name ' : ' merchant-order ',' ip': '10.42.4.129 ',' port': 8080,' clusterName': 'DEFAULT ',' weight': 1,' enabled ' : False }响应=请求。put(URL=' % s/NACOS/v1/ns/instance ' %(self。URL)、数据=数据)打印(响应。text)if _ _ name _ _=' _ _ main '阿波罗前台界面应用程序接口
因开放平台未支持授权,故抓取了前台界面的美国石油学会(美国石油协会)实现批量授权功能,开放平台的管理需要提前授权才能管理
导入请求类别
apollo_web(): def __init__(self,potalurl,username,password): self.potalurl,self.username,self.password=potalurl,username,password data = {'username': username,'password': password} response = requests.post(f'{self.potalurl}/signin',data=data,allow_redirects=False) self.SESSION=dict(response.cookies)<'SESSION'> def Authorize_App(self,Token,app:str): '''授权应用给用户''' cookies = {'NG_TRANSLATE_LANG_KEY': 'zh-CN','SESSION': self.SESSION} params = (('envs', <'DEV', 'SIT'>),('type', 'AppRole')) headers = {'Content-Type': 'application/json'} data = '{"appId":"%s"}' % app response = requests.post(f'{self.potalurl}/consumers/{Token}/assign-role' , params=params, headers=headers,cookies=cookies, data=data, verify=False) print(response.text)if __name__=="__main__": apollo=apollo_web() apollo.Authorize_App("Token","sms")3.Apollo 开放平台 API本代码包含获取配置,修改配置,发布配置,批量发布配置等功能, 在我的日常工作中,批量发布用的很频繁,
import requests,jsonfrom loguru import loggerclass ApolloAPI(object): def __init__(self,apolloenv): apollo_info= {"aliyun":{"url":"http://apollo.ali.cn","token":"a2814e7476b64fbdb08d9e9ec0b2d2d76e621cac"} ,"local":{"url":"http://apollo.tenx.cn","token":"a2814e7476b64fbdb08d9e9ec0b2d2d76e621cac"}} self.apollourl=apollo_info
本代码支持文件和文本认证信息,方便根据实际情况开发
from kubernetes import client, configfrom kubernetes.client.rest import ApiExceptionclass KubernetesAPI: def __init__(self,kube_conf): '''文件''' config.load_kube_config(kube_conf) def __init__(self,kube_conf_text): '''文本''' yamldata=yaml.load(kube_conf_text, Loader=yaml.Loader) k8_loader = kube_config.KubeConfigLoader(yamldata) call_config = type.__call__(Configuration) k8_loader.load_and_set(call_config) Configuration.set_default(call_config) def list_node(self): '''列出node''' v1 = client.CoreV1Api() try: ret=v1.list_node(watch=False) result={"code":200,"data:":<{"uid":x.metadata.uid,"name":x.metadata.name,"creattime":x.metadata.creation_timestamp,"pod_cid_rs":x.spec.pod_cid_rs, "pod_cidr":x.spec.pod_cidr,"unschedulable":x.spec.unschedulable} for x in ret.items>} except ApiException as e: result = {"code": 500, "msg": e} return result def list_namespace(self): '''列出namespace''' v1 = client.CoreV1Api() try: ret=v1.list_namespace(watch=False) result={"code":200,"data:":<{"uid":x.metadata.uid,"name":x.metadata.name,"creattime":x.metadata.creation_timestamp} for x in ret.items>} except ApiException as e: result = {"code": 500, "msg": e} return result def list_deploymet(self,namespaces): '''列出指定namespaces的deploymet''' v1 = client.AppsV1Api() try: ret = v1.list_namespaced_deployment(namespaces,watch=False) result = {"code": 200, "data":<{"uid":x.metadata.uid,"name":x.metadata.name,"namespace":x.metadata.namespace,"replicas":x.spec.replicas, "status":x.status.available_replicas,"creattime":x.metadata.creation_timestamp} for x in ret.items>} except ApiException as e: result = {"code": 500, "msg": e} return result def list_pods(self,namespace): v1 = client.CoreV1Api() try: ret=v1.list_namespaced_pod(namespace) result={"code":200,"data":<{"uid":x.metadata.uid,"name":x.metadata.name,"pod_ip":x.status.pod_ip,"host_ip":x.status.host_ip,"creattime":x.metadata.creation_timestamp} for x in ret.items>} except ApiException as e: result = {"code": 500, "msg": e} return result def list_pods_deployment(self,namespace,deployment): '''查询deployment的pods''' v1 = client.CoreV1Api() try: ret=v1.list_namespaced_pod(namespace,label_selector="app.kubernetes.io/name=%s" % deployment) result={"code":200,"data":<{"uid":x.metadata.uid,"name":x.metadata.name,"pod_ip":x.status.pod_ip,"host_ip":x.status.host_ip,"creattime":x.metadata.creation_timestamp} for x in ret.items>} except ApiException as e: result = {"code": 500, "msg": e} return result def query_pod_stdoutlogs(self,namespaces,pod): '''查看pod stdout日志''' v1 = client.CoreV1Api() try: result = {"code": 200, "data":v1.read_namespaced_pod_log(pod,namespaces)} except ApiException as e: result = {"code": 500, "msg": e} return result def query_deployment_stdoutlogs(self,namespace,deployment): '''查看deployment下pod的日志''' result=<> querypods=self.list_pods_deployment(namespace,deployment) if querypods<"code">==200: for x in querypods<"data">: result.append({"name":x<"name">,"logs":self.query_pod_stdoutlogs(namespace,x<"name">)}) else: result = {"code": 500, "msg": "查询pod出错:%s" % querypods<"msg">} return result def remove_pod(self,namespace,podname): '''删除pod''' v1 = client.CoreV1Api() try: v1.delete_namespaced_pod(podname, namespace) return {"code": 200, "msg": "删除成功"} except ApiException as e: return {"code": 500, "msg": str(e)} def deployment_setvar(self,namespace,deployment,key,value): '''设置deployment变量''' v1=client.AppsV1Api() body={"spec":{"template":{"spec":{"containers":<{"env":<{"name":key,"value":value}>,"name":deployment}>}}}} try: v1.patch_namespaced_deployment(name=deployment,namespace=namespace,body=body) return {"code": 200, "msg":"设置成功"} except ApiException as e: return {"code":500,"msg":str(e)}if __name__=="__main__": ss=KubernetesAPI('D:\Software\config') sss=ss.query_deployment_stdoutlogs('dev','qtz') print(sss)