标签:请求 get app send django 接口 import id channel
背景
消息服务使用websocket发送消息,其他同事测试过程中发送消息不太方便,编写了有页面发送消息的脚本
项目结构
- urls.py
- 路由配置
"""ims_send_msg URL Configuration The `urlpatterns` list routes URLs to views. For more information please see: https://docs.djangoproject.com/en/3.0/topics/http/urls/ Examples: Function views 1. Add an import: from my_app import views 2. Add a URL to urlpatterns: path('', views.home, name='home') Class-based views 1. Add an import: from other_app.views import Home 2. Add a URL to urlpatterns: path('', Home.as_view(), name='home') Including another URLconf 1. Import the include() function: from django.urls import include, path 2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) """ from django.contrib import admin from django.urls import path from send_msg import views urlpatterns = [ path('admin/', admin.site.urls), path('index/', views.index), ]
- tools.py
- 一些工具方法
import json import requests import re import time from websocket import create_connection import urllib3 urllib3.disable_warnings() requests.packages.urllib3.disable_warnings() def get_send_id(): send_id = 'send_id_' + str(int(round(time.time() * 10000000))) return send_id def get_target_id(): target_id = 'target_id_' + str(int(round(time.time() * 10000000))) return target_id def get_message(): message = 'message_' + str(int(round(time.time() * 10000000))) return message def msg_id(): user_id = str(int(round(time.time() * 10000000))) # print(user_id) return user_id # 10位时间戳 def get_timestamp_ten(): timestamp_ten = str(int(round(time.time()))) # print(timestamp_ten) return timestamp_ten # 13位时间戳 def get_timestamp_thirteen(): timestamp_thirteen = str(int(round(time.time() * 1000))) # print(timestamp_thirteen) return timestamp_thirteen def send_msg(url, app_id, channel_id, send_id, target_id, message): global socket url = 'http://{}'.format(url) data = {'app_id': app_id, 'channel_id': channel_id, 'access_token': '', 'client': 'pc_browser', 'package_check': 'package_check', 'third_party_user_id': send_id, 'context': 'e30', 'hide': '0'} try: r = requests.post(url, data, verify= False) if r.json().get('code') == 200: print('\nNode: {}'.format(r.json())) socket_domain = ''.join(re.findall(r'connect_str\":\"https://(.*?com)', r.text)) ip = ''.join(re.findall(r'ip=(.*?)&', r.text)) request_id = r.json()['request_id'] socket_dict = {'ip': ip, 'channelId': channel_id, 'appId': app_id, 'userId': send_id, 'context': 'e30', 'version': 'v4', 'hide': '0', 'client': 'pc_browser', 'requestId': request_id, 'EIO': '3', 'transport': 'websocket', } socket_path = '' for i in socket_dict: socket_path = socket_path + i + '=' + socket_dict[i] + '&' socket_path = socket_path[:-1] socket = 'wss://' + socket_domain + '/socket.io/?' + socket_path print('Socket: {}'.format(socket)) msg = [ "message", { "userId": send_id, "targetId": target_id, "channelId": channel_id, "msgData": { "type": "text", "text_content": message }, "terminal": "pc_browser", "context": {}, "version": "v4", "appId": app_id, "msgType": "chat", "msgId": msg_id(), "dataTime": get_timestamp_thirteen() } ] wss = create_connection(socket, verify= False) status = wss.getstatus() if status == 101: wss.send('40/' + channel_id) msg_json = json.dumps(msg).replace('["message",','')[1:-1] socket_msg = "42/" + channel_id + "," + json.dumps(msg) if wss.recv() is not None: wss.send(socket_msg) print('Message: {}\n'.format(msg_json)) return msg_json else: return 'ERROR: {}'.format(wss.recv()) else: return 'ERROR: {}'.format(r.json()) except Exception as error: print('ERROR: {}\n'.format(error)) return error def query_msg(url, app_id, channel_id, send_id): url = url + '' data = {'app_id': app_id, 'signed_at' : get_timestamp_ten(), 'channel_id': channel_id, 'sign': 'vhall', 'send_id': send_id } r = requests.post('http://' + url, data, verify= False) if r.json().get('code') == 200: # print('Query : {}'.format(r.json())) return r.text else: return 'ERROR: {}'.format(r.text)
- views.py
- 视图层逻辑
import json from django.shortcuts import render from send_msg.tools import * import time # Create your views here. def index(request): env = request.POST.get('env') if env == 'online': env = '' if env == 'standby': env = '' if env == 'pre': env = '' if env == 'test': env = '' if env == 'b3': env = '' app_id = request.POST.get('app_id') channel_id = request.POST.get('channel_id') if request.POST.get('send_id'): send_id = request.POST.get('send_id') else: send_id = get_send_id() if request.POST.get('target_id'): target_id = request.POST.get('target_id') else: target_id = get_target_id() if request.POST.get('message'): message = request.POST.get('message') else: message = get_message() if request.POST.get('num'): num = request.POST.get('num') else: num = 1 respoen = {} respoen['env'] = env respoen['app_id'] = app_id respoen['channel_id'] = channel_id respoen['send_id'] = send_id respoen['target_id'] = target_id respoen['message'] = message respoen['num'] = num for i in range(int(num)): try: respoen['result'] = send_msg(env, app_id, channel_id, send_id, target_id, message) print('Send Request parameters : {}'.format(respoen)) except Exception as requ_error: respoen['result'] = requ_error time.sleep(1) try: respoen['query_result'] = query_msg(env, app_id, channel_id, send_id) print('Query Request parameters : {}'.format(respoen)) except Exception as query_error: respoen['query_result'] = query_error return render(request, 'index.html', respoen)
- index.html
- 主页面html代码
{% load static %} <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>ims</title> <style> *{ margin:0; padding:0; } .body{ margin:0 auto; width:1650px; } </style> <style> .div{ width: 150px; height: 25px; font-weight:bold; font-size:16px; text-align:center; } </style> </head> <body class="body"> <form action="/index/" method="post"> {% csrf_token %} <!--加入这行 --> <div> <br><br> <label> <select style="height: 35px; width: 350px; font-weight:bold; font-size:18px; text-align:left;" name="env"> <option value="online">线上: </option> <option value="standby">备用集群 2: </option> <option value="b3">备用集群 3: </option> <option value="pre">预发布: </option> <option value="test">测试: </option> </select> </label> <div> <br> <label class="div"> app_id : <input type="text" value="" class="div" name="app_id"/> </label> <br><br> <label class="div"> channel_id : <input type="text" value="" class="div" name="channel_id"/> </label> <br><br> <label class="div"> send_id : <input type="text" value="" class="div" name="send_id"/> </label> <br><br> <label class="div"> target_id : <input type="text" value="" class="div" name="target_id"/> </label> <br><br> <label class="div"> message : <input type="text" value="" class="div" name="message"/> </label> <br><br> <label class="div"> num : <input type="text" value="1" class="div" name="num"/> </label> <br><br> </div> <input type="submit" style="width: 90px; height: 30px" value="提交" /> </div> </form> <br> <h3>请求参数</h3> <label> <textarea wrap="soft" type="text" style="width: 1600px; height: 210px; font-size:16px; color: blueviolet;"> 环境: {{ env }} app_id: {{ app_id }} channel_id: {{ channel_id }} 消息发送者: {{ send_id }} 消息接收者: {{ target_id }} 消息体: {{ result }} </textarea> <br><br> <h3>查询结果</h3> <textarea wrap="soft" type="text" style="width: 1600px; height: 210px; font-size:16px; color: blueviolet;"> 环境: {{ env }} app_id: {{ app_id }} channel_id: {{ channel_id }} 消息发送者: {{ send_id }} 查询结果: {{ query_result }} </textarea> </label> </body> </html>
- run.bat
- Windows平台启动脚本
if "%1"=="hide" goto CmdBegin start mshta vbscript:createobject("wscript.shell").run("""%~0"" hide",0)(window.close)&&exit :CmdBegin python manage.py runserver 127.0.0.1:8888
- run.sh
- Linux平台启动脚本
nohup python3 -u manage.py runserver 0.0.0.0:80 > out.log 2>&1 &
页面
标签:请求,get,app,send,django,接口,import,id,channel 来源: https://www.cnblogs.com/Echo-Mikasa/p/15880065.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。