百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

ubuntu每五分钟执行企业微信机器人推送

nanshan 2025-05-21 15:22 4 浏览 0 评论

# -*- coding: utf-8 -*-
"""
Created on Tue Apr 22 09:05:46 2025

@author: 1
"""

import requests
import time
import schedule
import sched
from datetime import datetime, timedelta
import threading
import pymysql  # 用于连接MySQL数据库
from mysql.connector import Error

# 信息等级定义
INFO_LEVELS = {
    '紧急': {'retry_interval': 60, 'max_retries': 3},
    '重要': {'retry_interval': 60, 'max_retries': 3},
    '一般': {'retry_interval': 60, 'max_retries': 3}
}

def send_message(group, message, info_level):
    headers = {'Content-Type': 'application/json'}
    payload = {
        "msgtype": "text",
        "text": {
            "content": message
        }
    }
    try:
        print(13)
        print(group)
        response = requests.post(group['robot_webhook'], json=payload, headers=headers)
        print(14)
        if response.status_code == 200 and response.json().get('errcode') == 0:
            print(15)
            print(f"[{datetime.now()}] 成功发送到群 {group['group_name']}")
            return True
        else:
            print(16)
            print(f"[{datetime.now()}] 发送到群 {group['group_name']} 失败,状态码: {response.status_code}")
            return False
    except Exception as e:
        print(17)
        print(f"[{datetime.now()}] 发送到群 {group['group_name']} 异常: {e}")
        return False

def retry_send(group, message, info_level, attempt=1):
    max_retries = INFO_LEVELS[info_level]['max_retries']
    retry_interval = INFO_LEVELS[info_level]['retry_interval']
    print(6)
    if attempt > max_retries:
        print(f"[{datetime.now()}] 最终发送失败到群 {group['group_name']},将在1小时后重试")
        time.sleep(3600)  # 等待1小时后重试
        attempt = 1
    print(7)
    success = send_message(group, message, info_level)
    print(8)
    if not success:
        time.sleep(retry_interval)
        retry_send(group, message, info_level, attempt + 1)

def send_to_superior_groups(group, message, info_level):
    current_group = group
    while current_group['parent_group_id'] is not None:
        superior = find_group_by_id(current_group['parent_group_id'])
        if superior:
            retry_send(superior, message, info_level)
            current_group = superior
        else:
            break

def find_group_by_id(group_id):
    try:
        # 连接数据库
        connection = pymysql.connect(**DB_CONFIG)
        with connection.cursor() as cursor:
            # 查询未发送的紧急消息
            sql = "select id,group_name,level,robot_webhook,parent_group_id from wechat_groups where id = %s"
            cursor.execute(sql, (group_id,))
            results = cursor.fetchall()
            
            for row in results:
                target_group = row['id']  # 根据 upno 找到对应的群组
                if target_group == group_id:
                    return row
            return None
    except Exception as e:
        print(f"[{datetime.now()}] 遍历机器人时出错: {e}")
        return None
    finally:
        if connection:
            connection.close()
            
def find_group_by_name(group_name):
    try:
        # 连接数据库
        connection = pymysql.connect(**DB_CONFIG)
        with connection.cursor() as cursor:
            # 查询未发送的紧急消息
            sql = "select id,group_name,level,robot_webhook,parent_group_id from wechat_groups where group_name = %s"
            cursor.execute(sql, (group_name,))
            results = cursor.fetchall()
            
            for row in results:
                target_group = row['group_name']  # 根据 upno 找到对应的群组
                if target_group == group_name:
                    return row
            return None
    except Exception as e:
        print(f"[{datetime.now()}] 遍历机器人时出错: {e}")
        return None
    finally:
        if connection:
            connection.close()

def schedule_message(group, message, info_level, send_time):
    print(9)
    delay = (send_time - datetime.now()).total_seconds()
    print(10)
    if delay > 0:
        print(11)
        scheduler = sched.scheduler(time.time, time.sleep)
        scheduler.enterabs(send_time.timestamp(), 1, retry_send, argument=(group, message, info_level))
        scheduler.run()
    else:
        print(12)
        retry_send(group, message, info_level)

def send_message_with_scheduling(group, message, info_level, send_time=None):
    if send_time:
        schedule_message(group, message, info_level, send_time)
    else:
        retry_send(group, message, info_level)
    if group['parent_group_id'] is not None:
        send_to_superior_groups(group, message, info_level)

def run_scheduler():
    while True:
        schedule.run_pending()
        time.sleep(1)

# 数据库连接配置
DB_CONFIG = {
    'host': '*********',
    'user': 'root',
    'password': '************',
    'database': '*************',
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}

def check_and_send_urgent_messages():
    try:
        # 连接数据库
        connection = pymysql.connect(**DB_CONFIG)
        with connection.cursor() as cursor:
            # 查询未发送的紧急消息
            sql = "SELECT id, sqlcmd, result, createdate, role, details, levelname, upno FROM ai_dayairesult WHERE levelname = '紧急' AND isend = 0"
            cursor.execute(sql)
            results = cursor.fetchall()
            print(1)
            
            for row in results:
                message = f"紧急通知: {row['result']}"  # 只发送result字段内容
                target_group = find_group_by_name(row['role'])  # 根据role字段找到对应的群组
                print(2)
                if target_group:
                    print(3)
                    print(target_group)
                    print(31)
                    send_message_with_scheduling(target_group, message, "紧急")
                    print(4)
                    # 更新数据库状态为已发送
                    update_sql = "UPDATE ai_dayairesult SET isend = 1 WHERE id = %s"
                    cursor.execute(update_sql, (row['id'],))
                    print(5)
            # 提交事务
            connection.commit()
    except Exception as e:
        print(f"[{datetime.now()}] 检查并发送紧急消息时出错: {e}")
    finally:
        if connection:
            connection.close()

def schedule_next_check():
    # 设置每5分钟检查一次紧急消息
    schedule.every(5).minutes.do(check_and_send_urgent_messages)

# 示例用法
if __name__ == "__main__":
    # 启动调度器线程
    scheduler_thread = threading.Thread(target=run_scheduler)
    scheduler_thread.daemon = True
    scheduler_thread.start()
    
    # 设置定时检查紧急消息
    schedule_next_check()
    
    # 立即执行一次检查
    check_and_send_urgent_messages()
    
    # 保持主线程运行
    while True:
        schedule.run_pending()
        time.sleep(1)

相关推荐

虚拟机“播放器”:VMware Player 12.0.1下载

IT之家讯免费虚拟机软件VMwareWorkstationPlayer更新至12.0.1,本次更新为维护性更新,同今天更新的WorkstationPro版本一样,解决了某些情况下Skylake...

ZLG嵌入式笔记 | 移动硬盘和虚拟机的那些事儿

在Linux开发中,编译内核是一项常见任务,但不少开发者在移动硬盘或虚拟机环境下尝试时会遭遇失败。本文将简要探讨这些问题的成因,并介绍一些虚拟机使用技巧,帮助大家更好地应对相关问题。在移动硬盘里编译...

IT 技术小课堂「虚拟机」

你知道什么是虚拟机吗?虚拟机就是可以用一台电脑变成多台电脑的计算机魔法。推荐上手最快的虚拟机软件VMwareWorkstation。如果你的物理机是Win10以下的可以安装VMwareWo...

最简单的虚拟机安装黑苹果

给所有还没体验过Macos的人们。本文会以最简单的方式介绍虚拟机安装黑苹果的教程,准备四个工具,一次性安装黑苹果,中间基本不会有出现任何问题。一、准备工具,先把以下工具都准备好,网上搜索即可。1、...

VMware虚拟机与物理机文件交互,这8种方法送给你

上一期我们讲解过用VMWare虚拟机打造一个金刚不坏之身的操作系统,小伙伴们反应说方法不错,只是虚拟机与物理机文件交互方式不好操作,这期应小伙伴们的要求,给大家讲解下虚拟机与物理机文件交互的8种方式,...

VMware虚拟机:第二节安装VMware并创建Win10虚拟机

VMware虚拟机:第二节安装VMware并创建Windows10虚拟机本节我们介绍如何安装VMwareWorkStationPro17.6.2,以及利用VMware创建Windows10虚拟机。...

vSphere环境虚拟机安装VMware Tools(Windows操作系统)

本文描述Windows操作系统的VMwareTools安装。Windows操作系统的VMwareTools安装操作步骤Windows操作系统安装完成后,VMwarevSphere控制台会提示“该...

windows7虚拟机VMware tools报错——一招解决

前言最近,把VMwareworkstation更新到版本17后,在一次安装windows7虚拟机的时候,发现VMwaretools各种安装报错,安装无法正常进行,这可愁坏了我,让我把自己本来不多的...

博通紧急修复VMware Tools高危认证绕过漏洞

IT之家3月26日消息,博通昨日(3月25日)发布安全公告,修复VMwareToolsforWindows中存在的高危认证绕过漏洞(CVE-2025-22230)。该漏洞由俄罗...

linux中如何安装VMwaretools--一定要保存

安装好VMware后,有时候我想把真机的文件放到虚拟机了,除了使用远程工具外,也会用到VMwaretools,今天给大家说说如何安装VMwaretools。其实在网上也可以搜到,只是网上的东西太乱了。...

精品博文解决安装Ubuntu14.04不弹出图像界面的问题

今天遇到一个糗事,不写出来不足以解恨。暑期开始了,给一个辅导班做嵌入式系统的讲座。问题就出在讲完以后系统的安装这个环境。班里面好些同学安装好以后进入不了图像界面,举手问我啊,oh,mygod!就到了这...

虚拟机VMware Workstation 17教程,从入门到精通No.3-vmwaretools

安装VMwareTools是创建新的虚拟机过程的一部分,而升级VMwareTools是使虚拟机更易于使用的关键。尽管客户机操作系统在未安装VMwareTools的情况下仍可运行,但许...

国产操作系统虚拟机安装vm-tools

使用vmwareworkstation安装国产化操作系统虚拟机后,不会像windows一样自动安装vmtools工具,按vmware默认的由虚拟机菜单中安装的方法步骤比较复杂,且最终结果不理想,经实...

VmwareTools的安装 这个功能很强大 #计算机

vmwareTools的安装。现在装完之后,这么点看起来也别扭,怎么办?装工具:vmwaretools。·在这个地方选择虚拟机,虚拟机这里边有安装vmwaretools,安装这个工具点击一下。·下边这...

VMware Tools安装失败?手把手教你本地安装

VMwareTools安装失败?作为虚拟机与物理机之间的"桥梁",VMwareTools能实现文件拖拽、剪贴板共享、分辨率自适应等关键功能。没有它,VMware虚拟机就变得非常不好...

取消回复欢迎 发表评论: