【BUG】记一次context canceled的报错

打印 上一主题 下一主题

主题 852|帖子 852|积分 2556

案例分析

报错信息
  1. {
  2.     "L":"ERROR",
  3.     "T":"2024-12-17T11:11:33.005+0800",
  4.     "file":"*/log.go:61",
  5.     "message":"sql_trace",
  6.     "__type":"sql",
  7.     "trace_id":"6ab69b5d333de40c8327d8572336fa2c",
  8.     "error":"context canceled; invalid connection",
  9.     "elapsed":"2.292ms",
  10.     "rows":0,
  11.     "sql":"UPDATE `logs` SET `response_time`=1734405092,`status`='success' WHERE id = 226081"
  12. }
复制代码
案发代码:
  1. func Sync(c *gin.Context) {
  2.         var params services.Params
  3.         // 参数绑定
  4.         c.ShouldBindBodyWith(&params, binding.JSON)
  5.         // 参数效验
  6.        
  7.         // 记录日志
  8.         ...
  9.        
  10.         // 开协程 更新日志
  11.         go func() {
  12.                         defer helpers.Recovery(c)
  13.                         models.Log{Ctx: c.Request.Context()}.UpdateLog(logId, res)
  14.                 }()
  15.         c.JSON(200, response.Success(nil))
  16.         return
  17. }
  18. func UpdateLog(id uint, r *services.ResJson) bool {
  19.         exec := models.DefaultDB().WithContext(s.Ctx).Where("id  = ?", id).Model(&Log{}).Updates(map[string]interface{}{
  20.                         "status":        StatusSuccess,
  21.                         "response_time": time.Now().Unix(),
  22.                 })
  23.         return exec.RowsAffected > 0
  24. }
复制代码
在更新数据库时,开了一个协程去更新
gorm源码解读

gorm Find、Update方法会触发GORM内部的处理器链,其中包括构建SQL语句、预备参数等。
终极,会调用到processor.Execute(db *DB)方法,这个方法会遍历并实行一系列注册的回调函数。
gorm.io/gorm@v1.25.11/finisher_api.go
  1. // Update updates column with value using callbacks. Reference: https://gorm.io/docs/update.html#Update-Changed-Fields
  2. func (db *DB) Update(column string, value interface{}) (tx *DB) {
  3.         tx = db.getInstance()
  4.         tx.Statement.Dest = map[string]interface{}{column: value}
  5.         return tx.callbacks.Update().Execute(tx)
  6. }
  7. // gorm.io/gorm@v1.25.11/callbacks.go
  8. func (p *processor) Execute(db *DB) *DB {
  9.         ...
  10.         for _, f := range p.fns {
  11.                 f(db)
  12.         }
  13. }
复制代码
// 注册回调函数
gorm@v1.25.11/callbacks/callbacks.go
  1. func RegisterDefaultCallbacks(db *gorm.DB, config *Config) {
  2.         enableTransaction := func(db *gorm.DB) bool {
  3.                 return !db.SkipDefaultTransaction
  4.         }
  5.         if len(config.CreateClauses) == 0 {
  6.                 config.CreateClauses = createClauses
  7.         }
  8.         if len(config.QueryClauses) == 0 {
  9.                 config.QueryClauses = queryClauses
  10.         }
  11.         if len(config.DeleteClauses) == 0 {
  12.                 config.DeleteClauses = deleteClauses
  13.         }
  14.         if len(config.UpdateClauses) == 0 {
  15.                 config.UpdateClauses = updateClauses
  16.         }
  17.         createCallback := db.Callback().Create()
  18.         createCallback.Match(enableTransaction).Register("gorm:begin_transaction", BeginTransaction)
  19.         createCallback.Register("gorm:before_create", BeforeCreate)
  20.         createCallback.Register("gorm:save_before_associations", SaveBeforeAssociations(true))
  21.         createCallback.Register("gorm:create", Create(config))
  22.         createCallback.Register("gorm:save_after_associations", SaveAfterAssociations(true))
  23.         createCallback.Register("gorm:after_create", AfterCreate)
  24.         createCallback.Match(enableTransaction).Register("gorm:commit_or_rollback_transaction", CommitOrRollbackTransaction)
  25.         createCallback.Clauses = config.CreateClauses
  26.         queryCallback := db.Callback().Query()
  27.         queryCallback.Register("gorm:query", Query)
  28.         queryCallback.Register("gorm:preload", Preload)
  29.         queryCallback.Register("gorm:after_query", AfterQuery)
  30.         queryCallback.Clauses = config.QueryClauses
  31.         deleteCallback := db.Callback().Delete()
  32.         deleteCallback.Match(enableTransaction).Register("gorm:begin_transaction", BeginTransaction)
  33.         deleteCallback.Register("gorm:before_delete", BeforeDelete)
  34.         deleteCallback.Register("gorm:delete_before_associations", DeleteBeforeAssociations)
  35.         deleteCallback.Register("gorm:delete", Delete(config))
  36.         deleteCallback.Register("gorm:after_delete", AfterDelete)
  37.         deleteCallback.Match(enableTransaction).Register("gorm:commit_or_rollback_transaction", CommitOrRollbackTransaction)
  38.         deleteCallback.Clauses = config.DeleteClauses
  39.         updateCallback := db.Callback().Update()
  40.         updateCallback.Match(enableTransaction).Register("gorm:begin_transaction", BeginTransaction)
  41.         updateCallback.Register("gorm:setup_reflect_value", SetupUpdateReflectValue)
  42.         updateCallback.Register("gorm:before_update", BeforeUpdate)
  43.         updateCallback.Register("gorm:save_before_associations", SaveBeforeAssociations(false))
  44.         updateCallback.Register("gorm:update", Update(config))
  45.         updateCallback.Register("gorm:save_after_associations", SaveAfterAssociations(false))
  46.         updateCallback.Register("gorm:after_update", AfterUpdate)
  47.         ....
  48. }
复制代码
gorm.io/gorm@v1.25.11/callbacks/update.go
  1. // Update update hook
  2. func Update(config *Config) func(db *gorm.DB) {
  3.         supportReturning := utils.Contains(config.UpdateClauses, "RETURNING")
  4.         return func(db *gorm.DB) {
  5.                 if db.Error != nil {
  6.                         return
  7.                 }
  8.                 if db.Statement.Schema != nil {
  9.                         for _, c := range db.Statement.Schema.UpdateClauses {
  10.                                 db.Statement.AddClause(c)
  11.                         }
  12.                 }
  13.                 if db.Statement.SQL.Len() == 0 {
  14.                         db.Statement.SQL.Grow(180)
  15.                         db.Statement.AddClauseIfNotExists(clause.Update{})
  16.                         if _, ok := db.Statement.Clauses["SET"]; !ok {
  17.                                 if set := ConvertToAssignments(db.Statement); len(set) != 0 {
  18.                                         defer delete(db.Statement.Clauses, "SET")
  19.                                         db.Statement.AddClause(set)
  20.                                 } else {
  21.                                         return
  22.                                 }
  23.                         }
  24.                         db.Statement.Build(db.Statement.BuildClauses...)
  25.                 }
  26.                 checkMissingWhereConditions(db)
  27.                 if !db.DryRun && db.Error == nil {
  28.                         if ok, mode := hasReturning(db, supportReturning); ok {
  29.                                 // Update函数最终会调用到底层数据库驱动的QueryContext方法,这个方法接受一个context.Context对象作为参数。
  30.                                 if rows, err := db.Statement.ConnPool.QueryContext(db.Statement.Context, db.Statement.SQL.String(), db.Statement.Vars...); db.AddError(err) == nil {
  31.                                         dest := db.Statement.Dest
  32.                                         db.Statement.Dest = db.Statement.ReflectValue.Addr().Interface()
  33.                                         gorm.Scan(rows, db, mode)
  34.                                         db.Statement.Dest = dest
  35.                                         db.AddError(rows.Close())
  36.                                 }
  37.                         } else {
  38.                                 result, err := db.Statement.ConnPool.ExecContext(db.Statement.Context, db.Statement.SQL.String(), db.Statement.Vars...)
  39.                                 if db.AddError(err) == nil {
  40.                                         db.RowsAffected, _ = result.RowsAffected()
  41.                                 }
  42.                         }
  43.                 }
  44.         }
  45. }
复制代码
调用数据库驱动:
Update函数终极会调用到底层数据库驱动的QueryContext方法,这个方法接受一个context.Context对象作为参数。
go1.22.3/src/database/sql/sql.go:1727
  1. // QueryContext executes a query that returns rows, typically a SELECT.
  2. // The args are for any placeholder parameters in the query.
  3. func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
  4.         var rows *Rows
  5.         var err error
  6.         err = db.retry(func(strategy connReuseStrategy) error {
  7.                 rows, err = db.query(ctx, query, args, strategy)
  8.                 return err
  9.         })
  10.         return rows, err
  11. }
复制代码
底层数据库毗连:
QueryContext方法会进一步调用query方法,这个方法会处理数据库毗连的重试逻辑。
在query方法中,会调用conn方法来获取一个数据库毗连,并在这个毗连上实行查询。
conn方法会处理context的取消和超时信号,如果context被取消或超时,它会停止数据库毗连操作并返回错误。
go1.22.3/src/database/sql/sql.go:1748
  1. func (db *DB) query(ctx context.Context, query string, args []any, strategy connReuseStrategy) (*Rows, error) {
  2.         dc, err := db.conn(ctx, strategy)
  3.         if err != nil {
  4.                 return nil, err
  5.         }
  6.         return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
  7. }
  8. // conn returns a newly-opened or cached *driverConn.
  9. func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
  10.         db.mu.Lock()
  11.         if db.closed {
  12.                 db.mu.Unlock()
  13.                 return nil, errDBClosed
  14.         }
  15.         // Check if the context is expired.
  16.         select {
  17.         default:
  18.         case <-ctx.Done():
  19.                 db.mu.Unlock()
  20.                 return nil, ctx.Err()
  21.         }
复制代码
那为什么会出现context canceled?
gin context 生命周期


大多数情况下,context一直能连续到请求结束
当请求发生错误的时间,context会立刻被cancel掉
context什么时间cancel的

server端接受新请求时会起一个协程go c.serve(connCtx)
  1. func (srv *Server) Serve(l net.Listener) error {
  2. // ...
  3.     for {
  4.         rw, err := l.Accept()
  5.         connCtx := ctx
  6.      // ...
  7.         go c.serve(connCtx)
  8.     }
  9. }
复制代码
协程里面for循环从链接中读取请求,重点是这里每次读取到请求的时间都会启动后台协程(w.conn.r.startBackgroundRead())继续从链接中读取。
  1. // Serve a new connection.
  2. func (c *conn) serve(ctx context.Context) {
  3.                 // ...
  4.    // HTTP/1.x from here on.
  5.    ctx, cancelCtx := context.WithCancel(ctx)
  6.    c.cancelCtx = cancelCtx
  7.    defer cancelCtx()
  8.              // ...
  9.    for {
  10.       // 从链接中读取请求
  11.       w, err := c.readRequest(ctx)
  12.       if c.r.remain != c.server.initialReadLimitSize() {
  13.          // If we read any bytes off the wire, we're active.
  14.          c.setState(c.rwc, StateActive, runHooks)
  15.       }
  16.                             // ....
  17.       // 启动协程后台读取链接
  18.       if requestBodyRemains(req.Body) {
  19.          registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
  20.       } else {
  21.          w.conn.r.startBackgroundRead()
  22.       }
  23.       // ...
  24.       // 这里转到gin里面的serverHttp方法
  25.       serverHandler{c.server}.ServeHTTP(w, w.req)
  26.       // 请求结束之后cancel掉context
  27.       w.cancelCtx()
  28.       // ...
  29.    }
  30. }
复制代码
gin中实行ServeHttp方法
  1. // ServeHTTP conforms to the http.Handler interface.
  2. func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  3.     // ...
  4.     // 执行我们写的handle方法
  5.           engine.handleHTTPRequest(c)
  6.     // ...
  7. }
复制代码
正常请求结束之后gin框架会主动cancel掉context, ctx会清空,采取到ctx pool中。
  1. // github.com/gin-gonic/gin@v1.7.7/gin.go
  2. // ServeHTTP conforms to the http.Handler interface.
  3. func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  4.         c := engine.pool.Get().(*Context)
  5.         c.writermem.reset(w)
  6.         c.Request = req
  7.         c.reset()
  8.         engine.handleHTTPRequest(c)
  9.         engine.pool.Put(c)
  10. }
  11. // github.com/gin-gonic/gin@v1.7.7/context.go
  12. func (c *Context) reset() {
  13.         c.Writer = &c.writermem
  14.         c.Params = c.Params[0:0]
  15.         c.handlers = nil
  16.         c.index = -1
  17.         c.fullPath = ""
  18.         c.Keys = nil
  19.         c.Errors = c.Errors[0:0]
  20.         c.Accepted = nil
  21.         c.queryCache = nil
  22.         c.formCache = nil
  23.         *c.params = (*c.params)[:0]
  24.         *c.skippedNodes = (*c.skippedNodes)[:0]
  25. }
复制代码
什么时间context会被动cancel掉呢?

秘密就在w.conn.r.startBackgroundRead()这个后台读取的协程里了。
  1. func (cr *connReader) startBackgroundRead() {
  2.     // ...
  3.     go cr.backgroundRead()
  4. }
  5. func (cr *connReader) backgroundRead() {
  6.     n, err := cr.conn.rwc.Read(cr.byteBuf[:])
  7. // ...
  8.     if ne, ok := err.(net.Error); ok && cr.aborted && ne.Timeout() {
  9.         // Ignore this error. It's the expected error from
  10.         // another goroutine calling abortPendingRead.
  11.     } else if err != nil {
  12.                     cr.handleReadError(err)
  13.     }
  14.     // ...
  15. }
  16. func (cr *connReader) handleReadError(_ error) {
  17.        // 这里cancel了context
  18.     cr.conn.cancelCtx()
  19.     cr.closeNotify()
  20. }
复制代码
startBackgroundRead -> backgroundRead -> handleReadError。在handleReadError函数里面会把context cancel掉。
当服务端在处理业务的同时,后台有个协程监控链接的状态,如果链接有题目就会把context cancel掉。(cancel的目的就是快速失败——业务不消处理了,就算服务端返回结果了,客户端也不处理了)
野生协程如何处理



  • http请求如有野生协程,不能利用request context(因为response之后context就会被cancel掉了),应当利用独立的context(比如context.Background())
  • 禁用野生协程,控制协程生命周期

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

道家人

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表