mirror of
https://github.com/Cernobor/oko-server.git
synced 2025-02-24 08:27:17 +00:00
Upgraded go & alpine, fix db checkpointing.
* Go version set to 1.19 in go.mod. * Alpine upgraded to 3.16 in Dockerfile. * DB restart checkpoint is forced manually after 15 mins of inactivity. * DB truncate checkpoint is forced manually at exit.
This commit is contained in:
+101
-7
@@ -26,12 +26,13 @@ import (
|
||||
var initDB string
|
||||
|
||||
type Server struct {
|
||||
config ServerConfig
|
||||
dbpool *sqlitex.Pool
|
||||
log *logrus.Logger
|
||||
ctx context.Context
|
||||
tileserverSvSet *mbsh.ServiceSet
|
||||
mapPackSize int64
|
||||
config ServerConfig
|
||||
dbpool *sqlitex.Pool
|
||||
checkpointNotice chan struct{}
|
||||
log *logrus.Logger
|
||||
ctx context.Context
|
||||
tileserverSvSet *mbsh.ServiceSet
|
||||
mapPackSize int64
|
||||
}
|
||||
|
||||
type ServerConfig struct {
|
||||
@@ -75,15 +76,25 @@ func (s *Server) Run(ctx context.Context) {
|
||||
|
||||
<-s.ctx.Done()
|
||||
s.log.Info("Shutting down server...")
|
||||
defer s.log.Info("Server exitting.")
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer cancel()
|
||||
if err := server.Shutdown(ctx); err != nil {
|
||||
s.log.WithError(err).Fatal("Server forced to shutdown.")
|
||||
}
|
||||
close(s.checkpointNotice)
|
||||
s.log.Info("Closing db connection pool...")
|
||||
s.dbpool.Close()
|
||||
|
||||
s.log.Info("Server exitting.")
|
||||
// manually force truncate checkpoint
|
||||
conn, err := sqlite.OpenConn(fmt.Sprintf("file:%s", s.config.DbPath), 0)
|
||||
if err != nil {
|
||||
s.log.WithError(err).Error("Failed to open connection for final checkpoint.")
|
||||
return
|
||||
}
|
||||
s.checkpointDb(conn, true)
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
func (s *Server) getDbConn() *sqlite.Conn {
|
||||
@@ -95,12 +106,47 @@ func (s *Server) getDbConn() *sqlite.Conn {
|
||||
return conn
|
||||
}
|
||||
|
||||
func (s *Server) checkpointDb(conn *sqlite.Conn, truncate bool) {
|
||||
var query string
|
||||
if truncate {
|
||||
query = "PRAGMA wal_checkpoint(TRUNCATE)"
|
||||
} else {
|
||||
query = "PRAGMA wal_checkpoint(RESTART)"
|
||||
}
|
||||
stmt, _, err := conn.PrepareTransient(query)
|
||||
if err != nil {
|
||||
s.log.WithError(err).Error("Failed to prepare checkpoint query.")
|
||||
return
|
||||
}
|
||||
defer stmt.Finalize()
|
||||
|
||||
has, err := stmt.Step()
|
||||
if err != nil {
|
||||
s.log.WithError(err).Error("Failed to step through checkpoint query.")
|
||||
return
|
||||
}
|
||||
if !has {
|
||||
s.log.Error("Checkpoint query returned no rows.")
|
||||
return
|
||||
}
|
||||
|
||||
blocked := stmt.ColumnInt(0)
|
||||
noWalPages := stmt.ColumnInt(1)
|
||||
noReclaimedPages := stmt.ColumnInt(2)
|
||||
if blocked == 1 {
|
||||
s.log.Warn("Checkpoint query was blocked.")
|
||||
}
|
||||
s.log.Debugf("Checkpoint complete. %d pages written to WAL, %d pages written back to DB.", noWalPages, noReclaimedPages)
|
||||
}
|
||||
|
||||
func (s *Server) setupDB() {
|
||||
sqlitex.PoolCloseTimeout = time.Second * 10
|
||||
dbpool, err := sqlitex.Open(fmt.Sprintf("file:%s", s.config.DbPath), 0, 10)
|
||||
if err != nil {
|
||||
s.log.WithError(err).Fatal("Failed to open/create DB.")
|
||||
}
|
||||
s.dbpool = dbpool
|
||||
s.checkpointNotice = make(chan struct{})
|
||||
|
||||
if s.config.ReinitDB {
|
||||
err = s.reinitDB()
|
||||
@@ -108,6 +154,39 @@ func (s *Server) setupDB() {
|
||||
s.log.WithError(err).Fatal("init DB transaction failed")
|
||||
}
|
||||
}
|
||||
|
||||
// aggressively checkpoint the database on idle times
|
||||
go func() {
|
||||
s.log.Debug("Starting manual restart checkpointing.")
|
||||
defer s.log.Debug("Manual restart checkpointing stopped.")
|
||||
delay := time.Minute * 15
|
||||
var (
|
||||
timer <-chan time.Time
|
||||
ok bool
|
||||
)
|
||||
for {
|
||||
select {
|
||||
case _, ok = <-s.checkpointNotice:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
timer = time.After(delay)
|
||||
case <-timer:
|
||||
func() {
|
||||
conn := s.dbpool.Get(s.ctx)
|
||||
defer s.dbpool.Put(conn)
|
||||
s.checkpointDb(conn, false)
|
||||
timer = nil
|
||||
}()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Server) requestCheckpoint() {
|
||||
go func() {
|
||||
s.checkpointNotice <- struct{}{}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Server) setupTiles() {
|
||||
@@ -143,6 +222,7 @@ func (s *Server) reinitDB() error {
|
||||
conn := s.getDbConn()
|
||||
defer s.dbpool.Put(conn)
|
||||
|
||||
defer s.requestCheckpoint()
|
||||
return sqlitex.ExecScript(conn, initDB)
|
||||
}
|
||||
|
||||
@@ -180,6 +260,7 @@ func (s *Server) handshake(hc models.HandshakeChallenge) (models.UserID, error)
|
||||
return 0, err
|
||||
}
|
||||
id = ptrInt64(conn.LastInsertRowID())
|
||||
s.requestCheckpoint()
|
||||
}
|
||||
return *id, nil
|
||||
}()
|
||||
@@ -395,6 +476,7 @@ func (s *Server) deleteExpiredFeatures(conn *sqlite.Conn) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete expired features: %w", err)
|
||||
}
|
||||
s.requestCheckpoint()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -536,6 +618,7 @@ func (s *Server) addFeatures(conn *sqlite.Conn, features []models.Feature) (map[
|
||||
|
||||
localIDMapping[feature.ID] = models.FeatureID(conn.LastInsertRowID())
|
||||
}
|
||||
s.requestCheckpoint()
|
||||
return localIDMapping, nil
|
||||
}
|
||||
|
||||
@@ -613,18 +696,26 @@ func (s *Server) addPhotos(conn *sqlite.Conn, createdFeatureMapping, addedFeatur
|
||||
return nil
|
||||
}
|
||||
|
||||
changed := false
|
||||
defer func() {
|
||||
if changed {
|
||||
s.requestCheckpoint()
|
||||
}
|
||||
}()
|
||||
for localFeatureID, photoNames := range createdFeatureMapping {
|
||||
featureID, ok := createdIDMapping[localFeatureID]
|
||||
if !ok {
|
||||
return errs.NewErrFeatureForPhotoNotExists(int64(localFeatureID))
|
||||
}
|
||||
err = uploadPhotos(featureID, photoNames)
|
||||
changed = true
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to upload photos for created features: %w", err)
|
||||
}
|
||||
}
|
||||
for featureID, photoNames := range addedFeatureMapping {
|
||||
err = uploadPhotos(featureID, photoNames)
|
||||
changed = true
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to upload photos for existing features: %w", err)
|
||||
}
|
||||
@@ -682,6 +773,7 @@ func (s *Server) updateFeatures(conn *sqlite.Conn, features []models.Feature) er
|
||||
return fmt.Errorf("failed to evaluate prepared statement: %w", err)
|
||||
}
|
||||
}
|
||||
s.requestCheckpoint()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -709,6 +801,7 @@ func (s *Server) deleteFeatures(conn *sqlite.Conn, featureIDs []models.FeatureID
|
||||
return fmt.Errorf("failed to evaluate prepared statement: %w", err)
|
||||
}
|
||||
}
|
||||
s.requestCheckpoint()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -736,6 +829,7 @@ func (s *Server) deletePhotos(conn *sqlite.Conn, photoIDs []models.FeaturePhotoI
|
||||
return fmt.Errorf("failed to evaluate prepared statement: %w", err)
|
||||
}
|
||||
}
|
||||
s.requestCheckpoint()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user