本文共 6126 字,大约阅读时间需要 20 分钟。
本文主要研究一下kingbus的binlog_server_handler.go
kingbus/api/binlog_server_handler.go
//StartBinlogServer implements start a binlog serverfunc (h *BinlogServerHandler) StartBinlogServer(echoCtx echo.Context) error { h.l.Lock() defer h.l.Unlock() var args config.BinlogServerConfig var err error defer func() { if err != nil { log.Log.Errorf("StartBinlogServer error,err: %s", err) echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error())) } }() err = echoCtx.Bind(&args) if err != nil { return err } kingbusIP := h.svr.GetIP() //check args err = args.Check(kingbusIP) if err != nil { return err } //start syncer server err = h.svr.StartServer(config.BinlogServerType, &args) if err != nil { log.Log.Errorf("start server error,err:%s,args:%v", err, args) return err } return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))}复制代码
kingbus/server/server.go
//StartServer start sub servers:syncer server or binlog master serverfunc (s *KingbusServer) StartServer(svrType config.SubServerType, args interface{}) error { var err error switch svrType { case config.SyncerServerType: if s.IsSyncerStarted() { return ErrStarted } syncerArgs, ok := args.(*config.SyncerArgs) if !ok { log.Log.Errorf("StartServer args is illegal,args:%v", args) return ErrArgs } err = s.startSyncerServer(syncerArgs) if err != nil { log.Log.Errorf("startSyncerServer error,err:%s,args:%v", err, *syncerArgs) return ErrArgs } //start to propose binlog event to raft cluster s.StartProposeBinlog(s.syncer.ctx) log.Log.Debugf("start syncer,and propose!!!") return nil case config.BinlogServerType: if s.IsBinlogServerStarted() { return ErrStarted } masterArgs, ok := args.(*config.BinlogServerConfig) if !ok { log.Log.Errorf("StartServer args is illegal,args:%v", args) return ErrArgs } err = s.startMasterServer(masterArgs) if err != nil { log.Log.Errorf("startMasterServer error,err:%s,args:%v", err, *masterArgs) return ErrArgs } return nil default: log.Log.Fatalf("StartServer:server type not support,serverType:%v", svrType) } return nil}复制代码
kingbus/api/binlog_server_handler.go
//StopBinlogServer implements stop binlog serverfunc (h *BinlogServerHandler) StopBinlogServer(echoCtx echo.Context) error { h.l.Lock() defer h.l.Unlock() h.svr.StopServer(config.BinlogServerType) return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))}复制代码
kingbus/server/server.go
//StopServer stop sub serverfunc (s *KingbusServer) StopServer(svrType config.SubServerType) { switch svrType { case config.SyncerServerType: if s.IsSyncerStarted() { s.syncer.Stop() } case config.BinlogServerType: if s.IsBinlogServerStarted() { s.master.Stop() } default: log.Log.Fatalf("StopServer:server type not support,serverType:%v", svrType) }}复制代码
kingbus/api/binlog_server_handler.go
//GetBinlogServerStatus implements get binlog server status in the runtime statefunc (h *BinlogServerHandler) GetBinlogServerStatus(echoCtx echo.Context) error { h.l.Lock() defer h.l.Unlock() status := h.svr.GetServerStatus(config.BinlogServerType) if masterStatus, ok := status.(*config.BinlogServerStatus); ok { return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(masterStatus)) } return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError("no resp"))}复制代码
//GetServerStatus get the sub server statusfunc (s *KingbusServer) GetServerStatus(svrType config.SubServerType) interface{} { switch svrType { case config.SyncerServerType: var syncerStatus config.SyncerStatus if s.IsSyncerStarted() { cfg := s.syncer.cfg syncerStatus.MysqlAddr = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) syncerStatus.MysqlUser = cfg.User syncerStatus.MysqlPassword = cfg.Password syncerStatus.SemiSync = cfg.SemiSyncEnabled syncerStatus.Status = config.ServerRunningStatus syncerStatus.CurrentGtid = s.CurrentGtidStr() syncerStatus.LastBinlogFile = s.LastBinlogFile() syncerStatus.LastFilePosition = s.LastFilePosition() syncerStatus.ExecutedGtidSet = s.ExecutedGtidSetStr() purgedGtids, err := s.store.GetGtidSet("mysql", storage.GtidPurgedKey) if err != nil { log.Log.Fatalf("get PurgedGtidSet error,err:%s", err) } syncerStatus.PurgedGtidSet = purgedGtids.String() } else { syncerStatus.Status = config.ServerStoppedStatus } return &syncerStatus case config.BinlogServerType: var status config.BinlogServerStatus if s.IsBinlogServerStarted() { cfg := s.master.cfg status.Addr = cfg.Addr status.User = cfg.User status.Password = cfg.Password status.Slaves = make([]*mysql.Slave, 0, 2) slaves := s.master.GetSlaves() for _, s := range slaves { status.Slaves = append(status.Slaves, s) } status.CurrentGtid = s.CurrentGtidStr() status.LastBinlogFile = s.LastBinlogFile() status.LastFilePosition = s.LastFilePosition() status.ExecutedGtidSet = s.ExecutedGtidSetStr() purgedGtids, err := s.store.GetGtidSet("mysql", storage.GtidPurgedKey) if err != nil { log.Log.Fatalf("get PurgedGtidSet error,err:%s", err) } status.PurgedGtidSet = purgedGtids.String() status.Status = config.ServerRunningStatus } else { status.Status = config.ServerStoppedStatus } return &status default: log.Log.Fatalf("StopServer:server type not support,serverType:%v", svrType) } return nil}复制代码
kingbus的binlog_server_handler提供了StartBinlogServer、StopBinlogServer、GetBinlogServerStatus,他们均委托给了server.go的对应方法
转载地址:http://tyfvi.baihongyu.com/