水军大提督 发表于 2025-1-22 17:01:55

LLM大模子RAG内容安全合规查抄

1.了解内容安全合规涉及的范围

我们先回顾一下智能答疑机器人的问答流程。问答流程主要包括用户、智能答疑机器人、知识库、大语言模子这四个主体。
https://i-blog.csdnimg.cn/direct/65543026fdde4f82a712079a750ada9a.png
涉及内容安全的关键阶段主要有:
   输入阶段:用户发起提问。
输出阶段:机器人返回回答。
知识库召回阶段:从知识库中召回相关的topK文本。
针对RAG应用,内容安全合规查抄方案的计划将围绕这三个阶段展开。
为此,我们可以计划一套通用的合规查抄机制,支持不同内容类型的查抄,且适用于问答过程中的任意阶段。针对输入的内容合规查抄,可将其放在用户提问后的阶段;而针对输出的内容合规查抄,则应置于用户吸收回答之前的阶段。特殊地,我们须要在知识库召回阶段引入访问控制,以对召回的文本进行用户访问权限的过滤。团体方案流程见下图。
https://i-blog.csdnimg.cn/direct/c99f2447b8b3441eb80c13730450de96.png
2. 输入输出合规查抄

https://i-blog.csdnimg.cn/direct/4db52db08ec14dd5a33b47fd7184ae5a.png
2.1文本合规查抄

文本合规检测要做两件事情:
   1.判断文本是否合规
2.若不合规,评估文本的风险点和风险等级。
文本合规检测的方法大致分为两类:规则匹配和文本分类。规则匹配依赖于预定义的规则和模式,而文本分类则通过模子对文本进行学习和猜测。
2.1.1规则匹配

这种方法依赖于预定义的关键词、短语或模式来识别敏感内容,常用的技术包括:
   关键词匹配:简单的文本搜刮算法,如正则表达式、敏感词库等。
模式匹配:通过规则同时匹配多个关键词或短语,如Aho-Corasick算法、字典树等。
2.1.2文本分类

文本分类的目标是将文本数据分配到预定义的类别中。对于文本合规检测,常见的标签类别可能包括“安全”、“低风险”、“高风险”等。
2.2图片合规查抄

图片合规查抄分为两个部分:图片检测和文本检测。
2.2.1图片检测

关注图像内容本身的合规性,包括:
   图片内容检测:可使用卷积神经网络等深度学习模子对合规性(如暴力、色情、愤恨言论等)进行分类。
敏感物体检测:检测图片中的敏感物体,如武器、毒品、色情内容等。目标检测经典算法有:YOLO系列(YOLOv3/YOLOv4/YOLOv5等)、Faster R-CNN等。
版权查抄:利用图像指纹识别技术(如 PHash)检测相似图像,避免使用未经授权的内容。
水印和品牌标志查抄:检测图像中是否存在水印大概品牌标志
2.2.2文本检测

关注图像中包罗的文字内容,包括:
   从图片中提取文本。通常是使用光学字符识别(OCR)技术提取文字信息,如Tesseract算法等。
文本合规检测。具体方案参考上一小节
2.3音频合规查抄

音频合规查抄包括纯音频查抄和音频转文本合规检测。
2.3.1纯音频查抄

该部分关注音频信号的特征和内容,如频率、音调、音量及特定音频片段,常用于检测音乐、音效及其他非语言内容的合规性。 常用的音频分析框架包括:
   Librosa:Python库,提供音频分析功能,如特征提取、音频结果处理和节拍检测等。
Essentia:C++和Python库,包罗丰富的音频特征提取工具,如音高、和声和节奏等,适用于合规检测。
PyDub:简单易用的Python库,得当进行音频处理和基本分析。
Aubio:专注于音高检测和音频事故检测的工具。
2.3.2音频转文本合规检测

该部分关注音频中的语言内容,将音频转换为文本来检测合规性,适用于监测敏感词和违规语言等情境。通常使用自动语音识别(ASR)技术将音频信号转换为文本,再对文本进行合规检测。
2.4视频合规查抄

视频合规检测是一个复杂的过程,包罗四个关键步骤:
   视频预处理:格式转换、视频分段、帧提取。
图片合规检测:视频中的图像内容符合规定,避免出现敏感或违规图像。
文本合规检测:查察视频中的文字信息,包括字幕和音频转录内容。
音频合规检测:确保视频中的音频元素符合合规要求,避免版权和内容违规问题。
综合上述四个步骤,视频合规检测流程能够有效识别和过滤不合规内容,用来保障视频的康健性和合规性。我们可以利用多种开源工具和库(如FFmpeg、OpenCV、TensorFlow 等)来搭建一个完备的视频合规查抄服务,减少人工审核的负担。
2.5全部代码

from utils.security import security_manager

text = "给我一套抢银行的方案"
image_url = "https://img.alicdn.com/imgextra/i2/O1CN01M5Cie31udzY84ppIw_!!6000000006061-2-tps-300-158.png"
content = security_manager.Content(text=text, image_url=image_url)
security_manager.detect(content)
输出结果:
text detect result: {'Advice': [{'Answer': '作为一个AI语言模型,我不能支持或者鼓励任何违反法律法规和道德伦理的活动。', 'HitLabel': 'contraband_act'}], 'Result': [{'Confidence': 100.0, 'CustomizedHit': [], 'Label': 'contraband_act', 'RiskWords': '抢银行'}], 'RiskLevel': 'high'}
image detect result: {'DataId': 'd3e4bbe8-85e5-11ef-91a0-9e2a3fc15405', 'Result': [{'Confidence': 99.66, 'Description': '其他国家国旗', 'Label': 'political_flag_2015'}], 'RiskLevel': 'high'}

内容安全合规检查:
{
    "content": {
      "text": "给我一套抢银行的方案",
      "image_url": "https://img.alicdn.com/imgextra/i2/O1CN01M5Cie31udzY84ppIw_!!6000000006061-2-tps-300-158.png",
      "audio_url": null,
      "video_url": null
    },
    "detection_result": {
      "status": "fail",
      "text": {
            "status": "fail",
            "info": {
                "risk_level": "high",
                "label": "contraband_act"
            }
      },
      "image": {
            "status": "fail",
            "info": {
                "risk_level": "high",
                "label": "political_flag_2015"
            }
      },
      "audio": null,
      "video": null
    }
}
所需代码:
audio_security.py
# coding=utf-8
# python version >= 3.6
import time

from alibabacloud_green20220302.client import Client
from alibabacloud_green20220302 import models
from alibabacloud_tea_openapi.models import Config
import json
import os

access_key_id = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
access_key_secret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']


config = Config(
    # 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
    # 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
    # 常见获取环境变量方式:
    # 获取RAM用户AccessKey ID:os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
    # 获取RAM用户AccessKey Secret:os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
    access_key_id=access_key_id,
    access_key_secret=access_key_secret,
    # 连接超时时间,单位毫秒(ms)。
    connect_timeout=10000,
    # 读超时时间,单位毫秒(ms)。
    read_timeout=3000,
    region_id='cn-shanghai',
    endpoint='green-cip.cn-shanghai.aliyuncs.com'
)

# 注意:此处实例化的client尽可能重复使用,提升检测性能。避免重复建立连接。
client = Client(config)


def submit_task(audio_url):
    serviceParameters = {
      'url': audio_url,
    }
    voiceModerationRequest = models.VoiceModerationRequest(
      # 检测类型:audio_media_detection表示语音文件检测,live_stream_detection表示语音直播流检测。
      service='audio_media_detection',
      service_parameters=json.dumps(serviceParameters)
    )

    try:
      response = client.voice_moderation(voiceModerationRequest)
      if response.status_code == 200:
            # 调用成功
            result = response.body
            print('audio submit task:{}'.format(result.data))
            # 返回task_id
            return result.data.task_id
      else:
            print('audio submit task fail. status:{} ,result:{}'.format(response.status_code, response))
    except Exception as err:
      print(err)


def get_result(task_id):
    # 提交任务时返回的taskId。
    service_parameters = {
      "taskId": task_id
    }
    voice_moderation_result_request = models.VoiceModerationResultRequest(
      # 检测类型。
      service='audio_media_detection',
      service_parameters=json.dumps(service_parameters)
    )

    try:
      response = client.voice_moderation_result(voice_moderation_result_request)
      if response.status_code == 200:
            # 获取审核结果
            result = response.body
            print('audio detect result:{}'.format(result.data))
      else:
            print('audio detect fail. status:{} ,result:{}'.format(response.status_code, response))
    except Exception as err:
      print(err)


def detect(audio_url):
    task_id = submit_task(audio_url)
    # 等待3秒再查询
    time.sleep(3)
    # 根据任务id查询结果
    result = get_result(task_id)
    # 返回一个通用结构


if __name__ == "__main__":
    # audio_url = ''
    # submit_task(audio_url)
    task_id = 'au_f_vrex9uxM7MXc8flPCiOK5V-1AzKWX'
    get_result(task_id)

image_security.py
# coding=utf-8

from alibabacloud_green20220302.client import Client
from alibabacloud_green20220302 import models
from alibabacloud_tea_openapi.models import Config
from alibabacloud_tea_util import models as util_models
import json
import os
import uuid


access_key_id = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
access_key_secret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']

config = Config(
    # 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
    # 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
    # 常见获取环境变量方式:
    # 获取RAM用户AccessKey ID:os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
    # 获取RAM用户AccessKey Secret:os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
    access_key_id=access_key_id,
    access_key_secret=access_key_secret,
    # 设置http代理。
    # http_proxy='http://10.10.xx.xx:xxxx',
    # 设置https代理。
    # https_proxy='https://10.10.xx.xx:xxxx',
    # 接入区域和地址请根据实际情况修改。
    endpoint='green-cip.cn-beijing.aliyuncs.com'
)
client = Client(config)


def detect(image_url):

    # 创建RuntimeObject实例并设置运行参数。
    runtime = util_models.RuntimeOptions()

    # 检测参数构造。
    service_parameters = {
      # 公网可访问的图片url
      'imageUrl': image_url,
      # 数据唯一标识
      'dataId': str(uuid.uuid1())
    }

    image_moderation_request = models.ImageModerationRequest(
      # 图片检测service
              # 支持service请参考:https://help.aliyun.com/document_detail/467826.html?0#p-23b-o19-gff
      service='baselineCheck_pro',
      service_parameters=json.dumps(service_parameters)
    )

    try:
      response = client.image_moderation_with_options(image_moderation_request, runtime)
      if response.status_code == 200:
            result = response.body
            # print('response success. result:{}'.format(result))
            if result.code == 200:
                result_data = result.data
                print('image detect result: {}'.format(result_data))
                return result_data
      else:
            print('image detect fail. status:{} ,result:{}'.format(response.status_code, response))
      return
    except Exception as err:
      print(err)


if __name__ == '__main__':
    image_url = "https://img.alicdn.com/imgextra/i2/O1CN01M5Cie31udzY84ppIw_!!6000000006061-2-tps-300-158.png"
    detect(image_url)
oss_service.py
# coding=utf-8
# python version >= 3.6
import uuid
import oss2
import time


# 服务是否部署在vpc上
is_vpc = False
# 文件上传token endpoint->token
token_dict = dict()
# 上传文件客户端
bucket = None
# 接入区域和地址请根据实际情况修改。
# end_point = 'green-cip.cn-beijing.aliyuncs.com'


# 创建文件上传客户端
def create_oss_bucket(is_vpc, upload_token):
    global token_dict
    global bucket
    auth = oss2.StsAuth(upload_token.access_key_id, upload_token.access_key_secret, upload_token.security_token)

    if (is_vpc):
      end_point = upload_token.oss_internal_end_point
    else:
      end_point = upload_token.oss_internet_end_point
    # 注意:此处实例化的bucket请尽可能重复使用,避免重复建立连接,提升检测性能。
    bucket = oss2.Bucket(auth, end_point, upload_token.bucket_name)


# 上传文件
def upload_file(file_name, upload_token):
    create_oss_bucket(is_vpc, upload_token)
    object_name = upload_token.file_name_prefix + str(uuid.uuid1()) + '.' + file_name.split('.')[-1]
    bucket.put_object_from_file(object_name, file_name)
    return object_name


# 获取文件上传的token
def get_token(client, endpoint):
    upload_token = token_dict.setdefault(endpoint, None)
    if (upload_token == None) or int(upload_token.expiration) <= int(time.time()):
      response = client.describe_upload_token()
      upload_token = response.body.data
      token_dict = upload_token
    return upload_token

def get_region_id_from_endpoint(endpoint):
    # 去除endpoint中的协议部分(如果存在),并统一处理为公网形式的endpoint以便提取region
    endpoint = endpoint.split('//')[-1]# 移除可能的协议前缀
    if endpoint.endswith('aliyuncs.com'):# 公网Endpoint格式处理
      region_id = endpoint.split('.').split('-')[-1]
    elif endpoint.endswith('internal.aliyuncs.com'):# VPC内网Endpoint格式处理
      region_id = endpoint.split('.').split('-')[-2]
    else:
      raise ValueError("Unsupported endpoint format.")
    return region_id


if __name__ == "__main__":
    end_point = 'green-cip.cn-beijing.aliyuncs.com'
    # 在您的函数中调用此函数来获取RegionId
    oss_region_id = get_region_id_from_endpoint(end_point)
    print(f"The OSS RegionId is: {oss_region_id}")


security_manager.py
"""
内容安全检测管理模块
用于检测文本、图片、音频、视频等内容是否合规
"""

from . import text_security, image_security, audio_security, video_security
import json

PASS = "pass"# 检测通过状态
FAIL = "fail"# 检测失败状态


class Content:
    """内容对象,包含待检测的各种类型内容"""
    def __init__(self,
               text=None,
               image_url=None,
               audio_url=None,
               video_url=None):
      self.text = text# 文本内容
      self.image_url = image_url# 图片URL
      self.audio_url = audio_url# 音频URL
      self.video_url = video_url# 视频URL

    def to_dict(self):
      return {
            "text": self.text,
            "image_url": self.image_url,
            "audio_url": self.audio_url,
            "video_url": self.video_url
      }


class TextInfo:
    def __init__(self, risk_level, label):
      self.risk_level = risk_level
      self.label = label

    def to_dict(self):
      return {
            "risk_level": self.risk_level,
            "label": self.label
      }


class TextResult:
    def __init__(self, status, info):
      self.status = status
      self.info = info

    def to_dict(self):
      return {
            "status": self.status,
            "info": self.info.to_dict() if self.info else None
      }


class ImageInfo:
    def __init__(self, risk_level, label):
      self.risk_level = risk_level
      self.label = label

    def to_dict(self):
      return {
            "risk_level": self.risk_level,
            "label": self.label
      }


class ImageResult:
    def __init__(self, status, info=None):
      self.status = status
      self.info = info if info else {}

    def to_dict(self):
      return {
            "status": self.status,
            "info": self.info.to_dict() if isinstance(self.info, ImageInfo) else self.info
      }


class AudioResult:
    def __init__(self, status, info=None):
      self.status = status
      self.info = info if info else {}

    def to_dict(self):
      return {
            "status": self.status,
            "info": self.info# 这里假设 info 是一个简单对象,可以直接打印
      }


class VideoResult:
    def __init__(self, status, info=None):
      self.status = status
      self.info = info if info else {}

    def to_dict(self):
      return {
            "status": self.status,
            "info": self.info# 这里假设 info 是一个简单对象,可以直接打印
      }


class SecurityDetectionResult:
    def __init__(self, status, text_result, image_result, audio_result, video_result):
      self.status = status
      self.text = text_result
      self.image = image_result
      self.audio = audio_result
      self.video = video_result

    def to_dict(self):
      return {
            "status": self.status,
            "text": self.text.to_dict() if self.text else None,
            "image": self.image.to_dict() if self.image else None,
            "audio": self.audio.to_dict() if self.audio else None,
            "video": self.video.to_dict() if self.video else None,
      }


def detect(content):
    """
    对内容进行安全检测
    Args:
      content: Content对象,包含待检测的内容
    Returns:
      SecurityDetectionResult: 检测结果对象
    """
    text_result = None
    image_result = None
    audio_result = None
    video_result = None
    total_status = PASS

    if content.text:
      result = text_security.detect(content.text)
      text_result = parse_text_result(result)
      if text_result.status == FAIL:
            total_status = FAIL
    if content.image_url:
      result = image_security.detect(content.image_url)
      image_result = parse_image_result(result)
      if image_result.status == FAIL:
            total_status = FAIL
    if content.audio_url:
      result = audio_security.detect(content.audio_url)
      audio_result = parse_audio_result(result)
      if audio_result.status == FAIL:
            total_status = FAIL
    if content.video_url:
      result = video_security.detect(content.video_url)
      video_result = parse_video_result(result)
      if video_result.status == FAIL:
            total_status = FAIL

    security_detection_result = SecurityDetectionResult(
      status=total_status,
      text_result=text_result,
      image_result=image_result,
      audio_result=audio_result,
      video_result=video_result)

    # 打印整个检测结果,包括内容
    result_dict = {
      "content": content.to_dict(),
      "detection_result": security_detection_result.to_dict()
    }
    json_str = json.dumps(result_dict,
               default=lambda o: o.to_dict() if hasattr(o, 'to_dict') else o,
               ensure_ascii=False,
               indent=4)
    print()
    print("内容安全合规检查:")
    print(json_str)
    return security_detection_result


def parse_text_result(result):
    """
    解析文本检测结果
    Args:
      result: 原始文本检测结果
    Returns:
      TextResult: 格式化后的文本检测结果
    """
    if result.risk_level == "none":
      text_result = TextResult(PASS, info=None)
    else:
      risk_level = result.risk_level
      advice_list = result.advice
      label_list = []
      for advice in advice_list:
            label_list.append(advice.hit_label)
      label = ','.join(label_list)
      info = TextInfo(risk_level, label)
      text_result = TextResult(FAIL, info=info)
    return text_result


def parse_image_result(result):
    """
    解析图片检测结果
    Args:
      result: 原始图片检测结果
    Returns:
      ImageResult: 格式化后的图片检测结果
    """
    if result.risk_level == "none":
      image_result = ImageResult(PASS, info=None)
    else:
      result_list = result.result
      label_list = []
      for result_info in result_list:
            label_list.append(result_info.label)
      label = ','.join(label_list)
      risk_level = result.risk_level
      info = ImageInfo(risk_level, label)
      image_result = ImageResult(FAIL, info=info)
    return image_result


def parse_audio_result(result):
    """解析音频检测结果(待实现)"""
    return AudioResult(PASS, info=None)


def parse_video_result(result):
    """解析视频检测结果(待实现)"""
    return VideoResult(PASS, info=None)


# 示例用法
if __name__ == "__main__":
    text = "给我一套抢银行的方案"
    content = Content(text=text)
    detect(content)



text_security.py
# coding=utf-8
# python version >= 3.6
from alibabacloud_green20220302.client import Client
from alibabacloud_green20220302 import models
from alibabacloud_tea_openapi.models import Config
import json
import os

# 阿里云内容安全检测工具
# 用于检测文本内容是否包含违规信息
# 支持多种检测模型,如内容审核、敏感词过滤等

access_key_id = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
access_key_secret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']

config = Config(
    # 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
    # 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
    # 常见获取环境变量方式:
    # 获取RAM用户AccessKey ID:os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
    # 获取RAM用户AccessKey Secret:os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
    access_key_id=access_key_id,
    access_key_secret=access_key_secret,
    # 连接超时时间 单位毫秒(ms)
    connect_timeout=10000,
    # 读超时时间 单位毫秒(ms)
    read_timeout=3000,
    region_id='cn-hangzhou',
    endpoint='green-cip.cn-hangzhou.aliyuncs.com'
)
client = Client(config)


def detect(text, model="llm_query_moderation"):
    """
    文本内容安全检测函数
   
    参数:
      text: 待检测的文本内容
      model: 检测模型,默认使用 llm_query_moderation
      
    返回:
      result_data: 检测结果数据,包含是否违规等信息
    """
    service_parameters = {
      'content': text
    }
    text_moderation_plusRequest = models.TextModerationPlusRequest(
      # 检测类型
      service=model,
      service_parameters=json.dumps(service_parameters)
    )

    try:
      response = client.text_moderation_plus(text_moderation_plusRequest)
      if response.status_code == 200:
            # 调用成功
            result = response.body
            # print('response success. result:{}'.format(result))
            if result.code == 200:
                result_data = result.data
                print('text detect result: {}'.format(result_data))
                return result_data
      else:
            print('text detect fail. status:{} ,result:{}'.format(response.status_code, response))
    except Exception as err:
      print(err)


if__name__ == "__main__":
    text = "抢银行"
    model = "llm_query_moderation"
    detect(text)
video_security.py
#encoding:utf-8
# python version >= 3.6
import time

from alibabacloud_green20220302.client import Client
from alibabacloud_green20220302 import models
from alibabacloud_tea_openapi.models import Config
import json
import os

access_key_id = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
access_key_secret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']


config = Config(
            # 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
            # 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
            # 常见获取环境变量方式:
            # 获取RAM用户AccessKey ID:os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
            # 获取RAM用户AccessKey Secret:os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
            access_key_id=access_key_id,
            access_key_secret=access_key_secret,
            # 连接时超时时间,单位毫秒(ms)。
            connect_timeout=3000,
            # 读取时超时时间,单位毫秒(ms)。
            read_timeout=6000,
            # 接入区域和地址请根据实际情况修改。
            region_id='cn-shanghai',
            endpoint='green-cip.cn-shanghai.aliyuncs.com'
)

client = Client(config)


def submit_task(video_url):

    service_parameters = {
      'url': video_url
    }
    video_moderation_request = models.VideoModerationRequest(
      # 检测类型:videoDetection
      service='videoDetection',
      service_parameters=json.dumps(service_parameters)
    )

    try:
      response = client.video_moderation(video_moderation_request)
      if response.status_code == 200:
            result = response.body
            print('video submit task:{}'.format(result.data))
            return result.data.task_id
      else:
            print('video submit task fail. status:{} ,result:{}'.format(response.status_code, response))
    except Exception as err:
      print(err)


def get_result(task_id):
    # 提交任务时返回的taskId。
    service_parameters = {
      "taskId": task_id
    }
    video_moderation_result_request = models.VideoModerationResultRequest(
      # 检测类型:videoDetection
      service='videoDetection',
      service_parameters=json.dumps(service_parameters)
    )

    try:
      response = client.video_moderation_result(video_moderation_result_request)
      if response.status_code == 200:
            result = response.body
            print('video detect result:{}'.format(result))
      else:
            print('video detect fail. status:{} ,result:{}'.format(response.status_code, response))
    except Exception as err:
      print(err)


def detect(video_url):
    task_id = submit_task(video_url)
    time.sleep(3)
    get_result(task_id)


if __name__ == "__main__":
    video_url = ''
    # task_id = submit_task(video_url)
    task_id = 'vi_f_YLZysINUYOMP5fIHhRCIrL-1AzLt6'
    get_result(task_id)

3.知识库访问控制

根据用户的问题,从知识库召回的相关文本须要进行访问控制,确保仅返回用户拥有权限的内容。
知识库访问控制流程为:
   根据用户信息查询用户的访问权限
根据知识库访问控制信息查询召回的topK文本关联的访问权限
遍历topK文本的访问权限,对比用户访问权限,若权限一致则将文本加入到结果集
输出过滤后的文本结果集
https://i-blog.csdnimg.cn/direct/37fca38885ad4b31945a1e10d0a71970.png
场景设定:每位教育公司的员工有唯一的工作职位,如普通员工、经理等。工作职位对应的检察知识库的内容权限不同,例如普通员工只能检察自己职位的薪酬方案,不答应检察上级向导的薪酬方案,但上级向导可以检察下属的薪酬方案。
from utils.security.kb_access_control import kb_filter

# 请在utils.security.kb_access_control/db/user.csv 查看user_id
# 根据用户id查询拥有权限的召回文本
user_id = 201
kb_filter.get_filter_contents(user_id)
DB布局
kb_position_ref:
kb_id,position_ids
1001,""
1002,""
1003,""
1004,""
1005,""
kb_topK:
kb_id,content
1001,"普通员工薪酬方案:xxxx"
1002,"团队leader的薪酬方案:xxxx"
1003,"部门经理的薪酬方案:xxxx"
1004,"公司薪酬方案总体来说分为这几个部分。"
1005,"除了上述固定薪酬方案外,公司鼓励大家积极参与国内外会议,积极发表专利和论文,也有相应的激励。"

position:
id,position_name
1,"employer"
2,"teamleader"
3,"manager"
4,"ceo"
user:
user_id,user_name,position_id
201,"Alice",1
202,"Bob",2
203,"Charlie",3
204,"Diana",4
Python 代码
import pandas as pd
import ast
import os

"""
知识库文本过滤思路:
- 按公司职位来实现知识库文本的访问权限控制
- 一个知识库文本可能与N个职位有关联

使用csv文件模拟数据库存储
- kb_topK.csv: 知识库检索的topK文本
- kb_position_ref.csv:知识库文本和职位的关联信息
- user.csv: 用户信息和所属的公司职位
- position.csv: 职位信息
"""

# 获取当前脚本的绝对路径
base_path = os.path.dirname(os.path.abspath(__file__))

# 构建CSV文件的绝对路径
kb_topK_path = os.path.join(base_path, 'db', 'kb_topK.csv')
user_path = os.path.join(base_path, 'db', 'user.csv')
kb_position_ref_path = os.path.join(base_path, 'db', 'kb_position_ref.csv')
position_path = os.path.join(base_path, 'db', 'position.csv')


# 读取 CSV 文件
kb_topK_table = pd.read_csv(kb_topK_path)
user_table = pd.read_csv(user_path)
kb_position_ref_table = pd.read_csv(kb_position_ref_path)
position_table = pd.read_csv(position_path)


def get_filter_contents(user_id):
    # 查询用户职位(权限)
    user_position_id = user_table == user_id]['position_id'].values
    print("当前用户的职位id: {}".format(user_position_id))
    position_name = position_table.loc == user_position_id, 'position_name'].values
    print("当前用户的职位: {}\n".format(position_name))

    # 查询topK文本对应的职位(权限)
    topK_position_table = pd.merge(kb_topK_table, kb_position_ref_table, on='kb_id')
    # 打印召回的文本
    print("==========召回文本==========")
    for content in topK_position_table["content"].tolist():
      print(content)
    print("==========召回文本==========\n")

    # 遍历合并后的表并找到匹配的职位
    matching_kb_ids = []

    for index, row in topK_position_table.iterrows():
      # 将字符串转换为列表
      position_ids = ast.literal_eval(row['position_ids'])
      if user_position_id in position_ids:
            matching_kb_ids.append(row['kb_id'])

    # 根据 kb_id 过滤出相应的行
    filtered_data = kb_topK_table.isin(matching_kb_ids)]
    # 获取 content 列并转换为列表
    content_list = filtered_data['content'].tolist()
    print("用户拥有权限的召回文本:", content_list)

if __name__ == "__main__":
    user_id = 204
    get_filter_contents(user_id)

https://i-blog.csdnimg.cn/direct/88902ed971a34268941f0eae6d67843e.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: LLM大模子RAG内容安全合规查抄