前言

众说周知,微信未经过认证的订阅号在接口权限上面有非常大的限制,这里主要做的事是:用户通过关注订阅号发送消息到后台,处理调用chatgpt接口,缓存到redis中。因为chatgpt接口延迟问题,如果微信三次回调均没有收到chatgpt回复,就将数据存到redis通过openid 和msgid绑定用户和会话,用户输入 “继续” 查询返回回答。

本项目已经在github上面开源,地址附在末尾

大家可以关注本人订阅号体验一下

微信订阅号二维码

一、准备工作

  • 申请一个个人订阅号(很简单,不说了)
  • 到微信公众号的管理界面,点击 设置与开发 —> 基本配置
    开启服务器配置,自定义令牌,选择明文模式。
    微信服务器配置图片
  • 如下所示,填写到配置文件中
#wechatmp
wechatmp:
  #这里就是服务器配置中自己填写的 令牌(Token)
  token: xxxxxxxxxxxx

#chatgpt
chatgpt:
  model: gpt-3.5-turbo-1106
  # openAI 的接口
  apikey:
    - sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
  #oepnai 接口基础地址 https://openai.xxx.com/ 或者使用自己的代理地址
  baseUrl: https://openai.xxxx.com/
  # 多轮会话携带的历史会话条数
  messageSize: 5

二、验证服务器配置中的服务器URL

当我们填写服务器配置的URL时候,是需要验证URL地址的,验证代码如下:

  • Controller
    @GetMapping("")
    public ResponseEntity<Object> checkSignature(WeChatBean weChatBean) {
        //验证是否为微信消息
        String signatureHashcode = weChatService.checkWeChatSignature(weChatBean);
        if (!signatureHashcode.equals(weChatBean.getSignature())) {
            return ResponseEntity.ok("非法数据");
        }
        //微信公众号接口认证
        if (StringUtils.isNotBlank(weChatBean.getEchostr())) {
            return ResponseEntity.ok(weChatBean.getEchostr());
        }
        return ResponseEntity.ok(null);
    }
  • Service
    @Override
    public String checkWeChatSignature(WeChatBean weChatBean) {
        String hashSignature = null;
        if (StringUtils.isBlank(weChatBean.getTimestamp()) || StringUtils.isBlank(weChatBean.getNonce())) {
            return hashSignature;
        }
        hashSignature = SignatureUtils.generateEventMessageSignature(wechatMpConfig.getToken(),
                weChatBean.getTimestamp(), weChatBean.getNonce());
        return hashSignature;
    }

三、 接受订阅号用户对话,调用chatgpt接口

1、微信回复的处理

  • 过滤掉不是文本的对话信息(暂时只处理文本对话)
if (!params.get("MsgType").equals("text")) {
    return getReplyWeChat(weChatBean.getOpenid(), params.get("ToUserName"), "暂时只支持接收文本信息");
}
  • chatgpt在微信三次推送对话信息还没有回复时,输出提示,让用户输入 “继续” 再从redis取出回复
String msgId = params.get("MsgId");
messageCountMap.merge(msgId, 1, Integer::sum);
if (messageCountMap.get(msgId) == 3) {
	messageCountMap.remove(msgId);
    return getReplyWeChat(weChatBean.getOpenid(), params.get("ToUserName"), "结果处理中,请稍后输入\"继续\"查看AI处理结果(只支持查询最新的一条处理记录)");
}
  • 用户输入继续,查询chatgpt是否完成回复,没有就输入继续等待,有就返回chatgpt回复,对于回复,因为订阅号信息限制,做了 580字符的截断,需要再次输入 “继续” 才能完整显示
final String content = params.get("Content");
if (StringUtils.isNotBlank(content) && content.equals("继续")) {
	messageCountMap.remove(msgId);
    if (redisCacheUtils.hasKey(waitKey)) {
        Object o = redisCacheUtils.getCacheObject(waitKey);
        Integer contentLength = getByteSize(String.valueOf(o));
        if (contentLength < 2048) {
             redisCacheUtils.deleteObject(waitKey);
             return getReplyWeChat(weChatBean.getOpenid(), params.get("ToUserName"), String.valueOf(o));
        } else {
             String replyContent = String.valueOf(o).substring(0, 580);
             redisCacheUtils.setCacheObject(waitKey, String.valueOf(o).replace(replyContent, ""), 60, TimeUnit.MINUTES);
             return getReplyWeChat(weChatBean.getOpenid(), params.get("ToUserName"), replyContent + "\n  (公众号回复字符限制,输入\"继续\"查看后续内容)");
        }
     } else {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (Exception e) {
            log.error("", e);
        }
        return getReplyWeChat(weChatBean.getOpenid(), params.get("ToUserName"), "结果处理中,请稍后输入\"继续\"查看AI处理结果(只支持查询最新的一条处理记录)");
     }
}
  • 判断是否有当前会话的缓存,没有的话新增一个缓存,并且开启一个线程,删除历史会话等待,调用chatgpt多轮会话(也可调用不带上下文的会话)
if (!redisCacheUtils.hasKey(msgKey)) {
    redisCacheUtils.setCacheObject(msgKey, "success", 60, TimeUnit.SECONDS);
    new Thread(() -> {
        if (StringUtils.isNotBlank(content)) {
            redisCacheUtils.deleteObject(waitKey);
            chatgptService.multiChatStreamToWX(weChatBean.getOpenid(), msgId, content);
        }
    }).start();
}
  • 循环等待 5秒钟,这里是为了防止在微信多次发送信息,没有及时回复加的
    private Object putOff(String msgKey) {
        for (int i = 0; i < 5; i++) {
            Object o = redisCacheUtils.getCacheObject(msgKey);
            if ("success".equals(String.valueOf(o))) {
                try {
                    TimeUnit.MILLISECONDS.sleep(1000);
                } catch (Exception e) {
                    log.error("", e);
                }
            } else {
                return o;
            }
        }
        return null;
    }
  • 判断chatgpt是否回复,并且时间要小于 5秒钟,可以直接将数据推给微信回复,否则就回复一个 “success”
        Object o = redisCacheUtils.getCacheObject(msgKey);
        long time = System.currentTimeMillis() - start;
        if (!"success".equals(String.valueOf(o)) && time < 4900L) {
            redisCacheUtils.deleteObject(waitKey);
            messageCountMap.remove(msgId);
            return getReplyWeChat(weChatBean.getOpenid(), params.get("ToUserName"), String.valueOf(o));
        }
        return String.valueOf(o);

2、调用chatgpt接口处理

  • 调用接口分为单轮对话和多轮对话(这里使用了开源的openai调用sdk)
		<dependency>
			<groupId>com.unfbx</groupId>
			<artifactId>chatgpt-java</artifactId>
			<version>1.1.5</version>
		</dependency>
    /**
     * 多轮会话
     * @param openId 用户openid 
     * @param msgId 消息id
     * @param content 问话内容
     */
    @Override
    public void multiChatStreamToWX(String openId, String msgId, String content) {
        OpenAiStreamClient streamClient = getStreamClient();
        WeChatEventSourceListener weChatEventSourceListener = new WeChatEventSourceListener(openId, msgId);
        //获取历史会话记录
        List<Message> messages = getWxMessageList(openId, content);
        ChatCompletion chatCompletion = ChatCompletion.builder().stream(true).messages(messages).build();
        streamClient.streamChatCompletion(chatCompletion, weChatEventSourceListener);
    }

    /**
     * 单轮会话
     * @param openId 用户openid 
     * @param msgId 消息id
     * @param content 问话内容
     */
    @Override
    public void singleChatStreamToWX(String openId, String msgId, String content) {
        OpenAiStreamClient streamClient = getStreamClient();
        WeChatEventSourceListener weChatEventSourceListener = new WeChatEventSourceListener(openId, msgId);
        Message message = Message.builder().role(BaseMessage.Role.USER).content(content).build();
        ChatCompletion chatCompletion = ChatCompletion.builder().stream(true).messages(Arrays.asList(message)).build();
        streamClient.streamChatCompletion(chatCompletion, weChatEventSourceListener);
    }

  • 这里处理opeAI接口接口返回,都是使用sse的形式,因此所有的结果处理都是在WeChatEventSourceListener中
    public WeChatEventSourceListener(String openId, String msgId) {
        this.openId = openId;
        this.msgId = msgId;
        this.sb = new StringBuffer();
    }

    @Override
    public void onClosed(@NotNull EventSource eventSource) {
        log.info("OpenAI关闭sse连接...");
        //缓存回复到redis
        redisCacheUtils.setCacheObject(waitKey, sb.toString(),  60, TimeUnit.MINUTES);
        redisCacheUtils.setCacheObject(msgKey, sb.toString(), 60, TimeUnit.SECONDS);
        //缓存上下文到redis
        cacheReply();
        eventSource.cancel();
    }

    @Override
    public void onEvent(@NotNull EventSource eventSource, @Nullable String id, @Nullable String type, @NotNull String data) {
        log.debug("OpenAI返回数据:{}", data);
        if (!"[DONE]".equals(data)) {
            ChatCompletionResponse response = JSON.parseObject(data, ChatCompletionResponse.class);
            if (null == response.getChoices().get(0).getFinishReason()) {
                String content = response.getChoices().get(0).getDelta().getContent();
                sb.append(content);
            }
        } else {
            log.info("OpenAI返回数据结束了");
        }
    }

结束语

以上只是实现了简单的订阅号对话chatgpt,项目开源地址:https://github.com/cmyang-it/wechatgpt