案例分析
报错信息
- {
- "L":"ERROR",
- "T":"2024-12-17T11:11:33.005+0800",
- "file":"*/log.go:61",
- "message":"sql_trace",
- "__type":"sql",
- "trace_id":"6ab69b5d333de40c8327d8572336fa2c",
- "error":"context canceled; invalid connection",
- "elapsed":"2.292ms",
- "rows":0,
- "sql":"UPDATE `logs` SET `response_time`=1734405092,`status`='success' WHERE id = 226081"
- }
复制代码 案发代码:
- func Sync(c *gin.Context) {
- var params services.Params
- // 参数绑定
- c.ShouldBindBodyWith(¶ms, binding.JSON)
- // 参数效验
-
- // 记录日志
- ...
-
- // 开协程 更新日志
- go func() {
- defer helpers.Recovery(c)
- models.Log{Ctx: c.Request.Context()}.UpdateLog(logId, res)
- }()
- c.JSON(200, response.Success(nil))
- return
- }
- func UpdateLog(id uint, r *services.ResJson) bool {
- exec := models.DefaultDB().WithContext(s.Ctx).Where("id = ?", id).Model(&Log{}).Updates(map[string]interface{}{
- "status": StatusSuccess,
- "response_time": time.Now().Unix(),
- })
- return exec.RowsAffected > 0
- }
复制代码 在更新数据库时,开了一个协程去更新
gorm源码解读
gorm Find、Update方法会触发GORM内部的处理器链,其中包括构建SQL语句、预备参数等。
终极,会调用到processor.Execute(db *DB)方法,这个方法会遍历并实行一系列注册的回调函数。
gorm.io/gorm@v1.25.11/finisher_api.go
- // Update updates column with value using callbacks. Reference: https://gorm.io/docs/update.html#Update-Changed-Fields
- func (db *DB) Update(column string, value interface{}) (tx *DB) {
- tx = db.getInstance()
- tx.Statement.Dest = map[string]interface{}{column: value}
- return tx.callbacks.Update().Execute(tx)
- }
- // gorm.io/gorm@v1.25.11/callbacks.go
- func (p *processor) Execute(db *DB) *DB {
- ...
- for _, f := range p.fns {
- f(db)
- }
- }
复制代码 // 注册回调函数
gorm@v1.25.11/callbacks/callbacks.go
- func RegisterDefaultCallbacks(db *gorm.DB, config *Config) {
- enableTransaction := func(db *gorm.DB) bool {
- return !db.SkipDefaultTransaction
- }
- if len(config.CreateClauses) == 0 {
- config.CreateClauses = createClauses
- }
- if len(config.QueryClauses) == 0 {
- config.QueryClauses = queryClauses
- }
- if len(config.DeleteClauses) == 0 {
- config.DeleteClauses = deleteClauses
- }
- if len(config.UpdateClauses) == 0 {
- config.UpdateClauses = updateClauses
- }
- createCallback := db.Callback().Create()
- createCallback.Match(enableTransaction).Register("gorm:begin_transaction", BeginTransaction)
- createCallback.Register("gorm:before_create", BeforeCreate)
- createCallback.Register("gorm:save_before_associations", SaveBeforeAssociations(true))
- createCallback.Register("gorm:create", Create(config))
- createCallback.Register("gorm:save_after_associations", SaveAfterAssociations(true))
- createCallback.Register("gorm:after_create", AfterCreate)
- createCallback.Match(enableTransaction).Register("gorm:commit_or_rollback_transaction", CommitOrRollbackTransaction)
- createCallback.Clauses = config.CreateClauses
- queryCallback := db.Callback().Query()
- queryCallback.Register("gorm:query", Query)
- queryCallback.Register("gorm:preload", Preload)
- queryCallback.Register("gorm:after_query", AfterQuery)
- queryCallback.Clauses = config.QueryClauses
- deleteCallback := db.Callback().Delete()
- deleteCallback.Match(enableTransaction).Register("gorm:begin_transaction", BeginTransaction)
- deleteCallback.Register("gorm:before_delete", BeforeDelete)
- deleteCallback.Register("gorm:delete_before_associations", DeleteBeforeAssociations)
- deleteCallback.Register("gorm:delete", Delete(config))
- deleteCallback.Register("gorm:after_delete", AfterDelete)
- deleteCallback.Match(enableTransaction).Register("gorm:commit_or_rollback_transaction", CommitOrRollbackTransaction)
- deleteCallback.Clauses = config.DeleteClauses
- updateCallback := db.Callback().Update()
- updateCallback.Match(enableTransaction).Register("gorm:begin_transaction", BeginTransaction)
- updateCallback.Register("gorm:setup_reflect_value", SetupUpdateReflectValue)
- updateCallback.Register("gorm:before_update", BeforeUpdate)
- updateCallback.Register("gorm:save_before_associations", SaveBeforeAssociations(false))
- updateCallback.Register("gorm:update", Update(config))
- updateCallback.Register("gorm:save_after_associations", SaveAfterAssociations(false))
- updateCallback.Register("gorm:after_update", AfterUpdate)
- ....
- }
复制代码 gorm.io/gorm@v1.25.11/callbacks/update.go
- // Update update hook
- func Update(config *Config) func(db *gorm.DB) {
- supportReturning := utils.Contains(config.UpdateClauses, "RETURNING")
- return func(db *gorm.DB) {
- if db.Error != nil {
- return
- }
- if db.Statement.Schema != nil {
- for _, c := range db.Statement.Schema.UpdateClauses {
- db.Statement.AddClause(c)
- }
- }
- if db.Statement.SQL.Len() == 0 {
- db.Statement.SQL.Grow(180)
- db.Statement.AddClauseIfNotExists(clause.Update{})
- if _, ok := db.Statement.Clauses["SET"]; !ok {
- if set := ConvertToAssignments(db.Statement); len(set) != 0 {
- defer delete(db.Statement.Clauses, "SET")
- db.Statement.AddClause(set)
- } else {
- return
- }
- }
- db.Statement.Build(db.Statement.BuildClauses...)
- }
- checkMissingWhereConditions(db)
- if !db.DryRun && db.Error == nil {
- if ok, mode := hasReturning(db, supportReturning); ok {
- // Update函数最终会调用到底层数据库驱动的QueryContext方法,这个方法接受一个context.Context对象作为参数。
- if rows, err := db.Statement.ConnPool.QueryContext(db.Statement.Context, db.Statement.SQL.String(), db.Statement.Vars...); db.AddError(err) == nil {
- dest := db.Statement.Dest
- db.Statement.Dest = db.Statement.ReflectValue.Addr().Interface()
- gorm.Scan(rows, db, mode)
- db.Statement.Dest = dest
- db.AddError(rows.Close())
- }
- } else {
- result, err := db.Statement.ConnPool.ExecContext(db.Statement.Context, db.Statement.SQL.String(), db.Statement.Vars...)
- if db.AddError(err) == nil {
- db.RowsAffected, _ = result.RowsAffected()
- }
- }
- }
- }
- }
复制代码 调用数据库驱动:
Update函数终极会调用到底层数据库驱动的QueryContext方法,这个方法接受一个context.Context对象作为参数。
go1.22.3/src/database/sql/sql.go:1727
- // QueryContext executes a query that returns rows, typically a SELECT.
- // The args are for any placeholder parameters in the query.
- func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
- var rows *Rows
- var err error
- err = db.retry(func(strategy connReuseStrategy) error {
- rows, err = db.query(ctx, query, args, strategy)
- return err
- })
- return rows, err
- }
复制代码 底层数据库毗连:
QueryContext方法会进一步调用query方法,这个方法会处理数据库毗连的重试逻辑。
在query方法中,会调用conn方法来获取一个数据库毗连,并在这个毗连上实行查询。
conn方法会处理context的取消和超时信号,如果context被取消或超时,它会停止数据库毗连操作并返回错误。
go1.22.3/src/database/sql/sql.go:1748
- func (db *DB) query(ctx context.Context, query string, args []any, strategy connReuseStrategy) (*Rows, error) {
- dc, err := db.conn(ctx, strategy)
- if err != nil {
- return nil, err
- }
- return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
- }
- // conn returns a newly-opened or cached *driverConn.
- func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
- db.mu.Lock()
- if db.closed {
- db.mu.Unlock()
- return nil, errDBClosed
- }
- // Check if the context is expired.
- select {
- default:
- case <-ctx.Done():
- db.mu.Unlock()
- return nil, ctx.Err()
- }
复制代码 那为什么会出现context canceled?
gin context 生命周期
大多数情况下,context一直能连续到请求结束
当请求发生错误的时间,context会立刻被cancel掉
context什么时间cancel的
server端接受新请求时会起一个协程go c.serve(connCtx)
- func (srv *Server) Serve(l net.Listener) error {
- // ...
- for {
- rw, err := l.Accept()
- connCtx := ctx
- // ...
- go c.serve(connCtx)
- }
- }
复制代码 协程里面for循环从链接中读取请求,重点是这里每次读取到请求的时间都会启动后台协程(w.conn.r.startBackgroundRead())继续从链接中读取。
- // Serve a new connection.
- func (c *conn) serve(ctx context.Context) {
- // ...
- // HTTP/1.x from here on.
- ctx, cancelCtx := context.WithCancel(ctx)
- c.cancelCtx = cancelCtx
- defer cancelCtx()
- // ...
- for {
- // 从链接中读取请求
- w, err := c.readRequest(ctx)
- if c.r.remain != c.server.initialReadLimitSize() {
- // If we read any bytes off the wire, we're active.
- c.setState(c.rwc, StateActive, runHooks)
- }
- // ....
- // 启动协程后台读取链接
- if requestBodyRemains(req.Body) {
- registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
- } else {
- w.conn.r.startBackgroundRead()
- }
- // ...
- // 这里转到gin里面的serverHttp方法
- serverHandler{c.server}.ServeHTTP(w, w.req)
- // 请求结束之后cancel掉context
- w.cancelCtx()
- // ...
- }
- }
复制代码 gin中实行ServeHttp方法
- // ServeHTTP conforms to the http.Handler interface.
- func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- // ...
- // 执行我们写的handle方法
- engine.handleHTTPRequest(c)
- // ...
- }
复制代码 正常请求结束之后gin框架会主动cancel掉context, ctx会清空,采取到ctx pool中。
- // github.com/gin-gonic/gin@v1.7.7/gin.go
- // ServeHTTP conforms to the http.Handler interface.
- func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- c := engine.pool.Get().(*Context)
- c.writermem.reset(w)
- c.Request = req
- c.reset()
- engine.handleHTTPRequest(c)
- engine.pool.Put(c)
- }
- // github.com/gin-gonic/gin@v1.7.7/context.go
- func (c *Context) reset() {
- c.Writer = &c.writermem
- c.Params = c.Params[0:0]
- c.handlers = nil
- c.index = -1
- c.fullPath = ""
- c.Keys = nil
- c.Errors = c.Errors[0:0]
- c.Accepted = nil
- c.queryCache = nil
- c.formCache = nil
- *c.params = (*c.params)[:0]
- *c.skippedNodes = (*c.skippedNodes)[:0]
- }
复制代码 什么时间context会被动cancel掉呢?
秘密就在w.conn.r.startBackgroundRead()这个后台读取的协程里了。
- func (cr *connReader) startBackgroundRead() {
- // ...
- go cr.backgroundRead()
- }
- func (cr *connReader) backgroundRead() {
- n, err := cr.conn.rwc.Read(cr.byteBuf[:])
- // ...
- if ne, ok := err.(net.Error); ok && cr.aborted && ne.Timeout() {
- // Ignore this error. It's the expected error from
- // another goroutine calling abortPendingRead.
- } else if err != nil {
- cr.handleReadError(err)
- }
- // ...
- }
- func (cr *connReader) handleReadError(_ error) {
- // 这里cancel了context
- cr.conn.cancelCtx()
- cr.closeNotify()
- }
复制代码 startBackgroundRead -> backgroundRead -> handleReadError。在handleReadError函数里面会把context cancel掉。
当服务端在处理业务的同时,后台有个协程监控链接的状态,如果链接有题目就会把context cancel掉。(cancel的目的就是快速失败——业务不消处理了,就算服务端返回结果了,客户端也不处理了)
野生协程如何处理
- http请求如有野生协程,不能利用request context(因为response之后context就会被cancel掉了),应当利用独立的context(比如context.Background())
- 禁用野生协程,控制协程生命周期
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |