如何用 Python 手撸一个 GitLab 代码安全审计工具?

打印 上一主题 下一主题

主题 1017|帖子 1017|积分 3051

本文分享了极狐GitLab 的代码安全审计 & 审计事件流功能,而且演示如何用 Python 编写一个安全审计流接收器,通过接收安全审计日记并分析后发出通知。
极狐GitLab 为 GitLab 中文发行版,中文版本对中国用户更友好,可以一键私有化部署,也可以直接使用 SaaS(JihuLab.com)。本文讲述的安全审计 & 审计事件流属于专业版 & 旗舰版功能。可以申请 60 天专业版免费试用 https://dl.gitlab.cn/xup0wa40 来体验该功能功能
本文内容比力丰富,主要分为以下几个部分:

  • 关于代码安全审计
  • 极狐GitLab 安全审计
  • 极狐GitLab 安全审计流
  • 用 Python 构建审计流目的地担当器
  • 结束语
  • 代码附录
代码安全审计

所谓代码安全审计,就是对代码堆栈的所有操作进行相应的记录,目的是为了方便安全部门对代码堆栈的操作进行安全审计,或者是代码堆栈出问题以后,通过审计日记发现问题所在。大白话说就是看看谁对堆栈做了什么操作,比如常规的堆栈克隆、拉取、推送,当然最可骇的就是传说中的删除跑路或者修改堆栈的可见性(从私有修改为公开,很多著名的信息泄漏就是由堆栈可见性修改引起的)。另有这些年很常见的,有员工在离职前疯狂下载代码,然后看成自己的知识产权,从而带离公司。
这每一件发生在公司内部都是一件大事,毕竟如今数字化时代,很多企业的焦点资产就是“一坨坨”的代码。那能够避免这种事情发生或者在事件发生后能及时找到“肇事者”的方法实在就是代码安全审计,这玩意的英文名称叫做Audit event。当然,国内很多开发者可能也叫做“代码追踪”,“代码泄漏之类的”。Whatever,不管叫什么,焦点就是希望能够用一些手段来保护代码的安全,不要被偷、不要被删,所有的操作都要留下痕迹,而且这痕迹至少要包含三个要素:

  • Who:事件的操作主体。主要是指对代码进行操作的人,一般来讲当然就是公司内部的研发人员啦;
  • When:事件发生的时间。主要是指操作是什么时间段发生的;
  • What:操作主体做了什么具体操作。主要就是看看对堆栈代码都做了啥,克隆还是推送,拉取还是删库等。
说半天,这玩意到底咋做呢?
说白了,只能依靠平台自身,平台要是自带了这个功能,那就方便很多,要是不带就没办法了。
极狐GitLab 安全审计 & 安全审计流

好巧不巧的是,GitLab 本身就自带了这个功能,而且随着版本的迭代更新,审计的事件也越来越多,到现在为止(最新为 17.4 版本)审计事件已经多到130+ 项,从实例到群组、到项目,都有。

需要注意的是:安全审计和安全审计流都属于极狐GitLab 专业版及以上功能,但是当前可以申请免费试用 60天 https://dl.gitlab.cn/xup0wa40。在官网申请后会立马收到一个 license,导入即可!
安全审计功能

极狐GitLab 审计事件可以在实例、群组、项目三个级别查看,路径分别为(以 17.4 为例):

  • 实例:管理中心 --> 监控 --> 审计事件
  • 群组:群组 --> 安全 --> 审计事件
  • 项目:项目 --> 安全 --> 审计事件
比如添加一个项目,会产生对应的审计事件:

安全审计事件流

极狐GitLab 审计事件流功能可以将审计事件流发送到外部的流数据系统(可以担当并处理 JSON 格式的数据),然后再由流数据系统对数据进行分析、存储、可视化及告警等操作。
  1. {
  2.     "severity": "INFO",
  3.     "time": "2024-09-26T08:54:16.339Z",
  4.     "correlation_id": "01J8PRKGB20R989VA752DN9ES4",
  5.     "meta.caller_id": "PostReceive",
  6.     "meta.remote_ip": "127.0.0.1",
  7.     "meta.feature_category": "source_code_management",
  8.     "meta.user": "root",
  9.     "meta.user_id": 1,
  10.     "meta.project": "devsecops/ai",
  11.     "meta.root_namespace": "devsecops",
  12.     "meta.client_id": "user/1",
  13.     "meta.root_caller_id": "POST /api/:version/internal/post_receive",
  14.     "id": 274,
  15.     "author_id": 1,
  16.     "entity_id": 7,
  17.     "entity_type": "Project",
  18.     "details": {
  19.     "push_access_levels": ["Maintainers"],
  20.     "merge_access_levels": ["Maintainers"],
  21.     "allow_force_push": false,
  22.     "code_owner_approval_required": false,
  23.     "event_name": "protected_branch_created",
  24.     "author_name": "Administrator",
  25.     "author_class": "User",
  26.     "target_id": 7,
  27.     "target_type": "ProtectedBranch",
  28.     "target_details": "main",
  29.     "custom_message": "Added protected branch with [allowed to push: ["Maintainers"], allowed to merge: ["Maintainers"], allow force push: false, code owner approval required: false]",
  30.     "ip_address": "218.60.118.175",
  31.     "entity_path": "devsecops/ai"
  32.     },
  33.     "ip_address": "218.60.118.175",
  34.     "author_name": "Administrator",
  35.     "entity_path": "devsecops/ai",
  36.     "target_details": "main",
  37.     "created_at": "2024-09-26T08:54:16.308Z",
  38.     "target_type": "ProtectedBranch",
  39.     "target_id": 7,
  40.     "push_access_levels": ["Maintainers"],
  41.     "merge_access_levels": ["Maintainers"],
  42.     "allow_force_push": false,
  43.     "code_owner_approval_required": false,
  44.     "event_name": "protected_branch_created",
  45.     "author_class": "User",
  46.     "custom_message": "Added protected branch with [allowed to push: ["Maintainers"], allowed to merge: ["Maintainers"], allow force push: false, code owner approval required: false]"
  47. }
复制代码
极狐GitLab 可以将审计日记以 JSON 的方式往外发,只要有一个服务能够担当这些 JSON 格式的数据就可以。而且极狐GitLab 本身支持添加第三方的流接收器。
可以在实例、群组级别添加事件流外部接收器:

  • 实例:管理中心 --> 监控 --> 审计事件 --> 事件流
  • 群组:群组 --> 安全 --> 审计事件 --> 事件流
比如在实例级别添加了一个事件流外部接收器:

主要参数:

  • 目的地名称:写明事件流目的地名称,由于可以添加多个,因此需要用差别的名称来区分
  • 目的地 URL:事件流目的地的地址,也就是担当 JSON 数据的服务地址。这也是本文的焦点,这个服务可以自己构建一个。
用 Python 构建审计流目的地担当器

用 Python 主流的 web 框架都可以构建此类接收器,本文使用常用的 fastapi 来构建,代码如下:
  1. from fastapi import FastAPI
  2. import uvicorn
  3. app = FastAPI()
  4. @app.post("/jh-gitlab")
  5. async def gitlab_payload(data: dict):
  6.     audit_event_info = {
  7.         "Action": data['details']['custom_message'],
  8.         "Author": data['details']['author_name'],
  9.         "IP Address": data['details']['ip_address'],
  10.         "Entity Path": data['details']['entity_path'],
  11.         "Target Details": data['target_details']
  12.     }
  13.     print(audit_event_info)
  14. if __name__ == "__main__":
  15.     uvicorn.run(app, host="0.0.0.0", port=8000)
复制代码
前面看到实际的审计事件日记有很多信息,但是一般想要的就是开头提到的Who、When、What,对应日记里面的字段基本就是action、author、ipaddress、entity_path、target_details。以是,接收到数据以后,先把这些数据取出来,然后做下一步。
将上面的代码存到一个 python 文件里面,然后在服务器上运行起来即可:
  1. python3 main.py
  2. INFO:     Started server process [2140728]
  3. INFO:     Waiting for application startup.
  4. INFO:     Application startup complete.
  5. INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
复制代码
这时间对代码库做一次变动,比如来个暴力的,直接删除堆栈,看看能接收到什么数据:

可以看到,将堆栈删除的话,有两个动作:

  • 修改堆栈的名称
  1. {
  2.     "Action": "Changed name from DevSecOps / ai to DevSecOps / ai-deleted-7",
  3.     "Author": "Administrator",
  4.     "IP Address": "36.133.246.166",
  5.     "Entity Path": "devsecops/ai-deleted-7",
  6.     "Target Details": "devsecops/ai-deleted-7"
  7. }
复制代码
从上面的信息就能看出,是 adminstor(对,也就是管理员)把 devsecops群组下面的 ai项目删除了。

  • 将堆栈标记为等待删除
  1. {
  2.     "Action": "Project marked for deletion",
  3.     "Author": "Administrator",
  4.     "IP Address": "36.133.246.166",
  5.     "Entity Path": "devsecops/ai-deleted-7",
  6.     "Target Details": "ai-deleted-7"
  7. }
复制代码
从上面的信息就能看出,项目 ai被标记为等待删除,这个可以在项目界面上看到:

接下来就要对差别的操作做一些区分了。由于差别的操作 action 的内容也不尽相同。当然,重要的是这些事件发生以后,假如想特殊关注,那就搞一个通知发送机制。下面是一个发送到钉钉群的参考代码:
  1. def notification(payload: dict):
  2.     webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=你的钉钉token"
  3.     # 发送消息的内容
  4.     message = {
  5.         "msgtype": "text",
  6.         "text": {
  7.             "content" : "GitLab: {}".format(json.dumps(payload))
  8.         }
  9.     }
  10.     # 发送 POST 请求
  11.     headers = {'Content-Type': 'application/json'}
  12.     response = requests.post(webhook_url, data=json.dumps(message), headers=headers)
  13.     # 对结果进行判断
  14.     if json.loads(response.text)['errcode'] == 0:
  15.         print("Send Message Success!")
  16.     else:
  17.         print("Send Message Failed!")
复制代码
然后对堆栈做一些操作,比如新建项目、删除项目、克隆项目、推送代码等,就可以看到对应的消息发送到了钉钉群:

当然,假如以为上面的这种方式不太容易理解的话,就做一个转换,把 Action 的内容转化成任何人都能看懂的消息,毕竟 git-upload-pack对很多人来说都不是很常见。就把这个任务交给对此感兴趣的小伙伴吧。
结束语

代码安全审计是安全合规非常重要的一环,但是同时也是很多企业容易忽略的一环,究其原因是能够具备如此完整功能的产物不是很多,由于这需要产物不断地持续迭代更新,而且得从早期就做好产物规划。而在这一点上,GitLab 是值得歌颂的。当然,说再多也不去亲自去体验。接待感兴趣的小伙伴申请专业版免费使用 license 来体验完整的功能。
附录

把这个测试用的代码完整附录如下:
  1. from fastapi import FastAPI
  2. import uvicorn
  3. import requests
  4. import json
  5. app = FastAPI()
  6. @app.post("/jh-gitlab")
  7. async def gitlab_payload(data: dict):
  8.     # 抓取审计事件中的主要信息
  9.     audit_event_info = {
  10.         "Action": data['details']['custom_message'],
  11.         "Author": data['details']['author_name'],
  12.         "IP Address": data['details']['ip_address'],
  13.         "Entity Path": data['details']['entity_path'],
  14.         "Target Details": data['target_details']
  15.     }
  16.     print(audit_event_info)
  17.     # 发送消息通知
  18.     notification(audit_event_info)
  19. def notification(payload: dict):
  20.     webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=你的钉钉 webhook token"
  21.     # 发送消息的内容
  22.     message = {
  23.         "msgtype": "text",
  24.         "text": {
  25.             "content" : "GitLab: {}".format(json.dumps(payload))
  26.         }
  27.     }
  28.     # 发送 POST 请求
  29.     headers = {'Content-Type': 'application/json'}
  30.     response = requests.post(webhook_url, data=json.dumps(message), headers=headers)
  31.     print(response.text)
  32.     if json.loads(response.text)['errcode'] == 0:
  33.         print("Send Message Success!")
  34.         return True
  35.     else:
  36.         print("Send Message Failed!")
  37.         return json.loads(response.text)['errmsg']
  38. if __name__ == "__main__":
  39.     uvicorn.run(app, host="0.0.0.0", port=8000)
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

熊熊出没

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表