Collecting usage info, change sqlite lib.
continuous-integration/drone/push Build is passing

* X-User-ID header is processed to get user ID.
* Time of last request for a user is saved into DB.
* Time of last upload and download is stored for a user.
* Added DB migration to add columns into users table to store the times and app version.
* Backward fix of datatype of the deadline column in features table.
* Switched from crawshaw.io/sqlite to zombiezen.com/go/sqlite.
  * Refactored DB handling.
  * Used migration routine from zombiezen in favour of manual one.
  * Runtime DB reinit simply deletes the db file and initializes the db anew.

Fix #6
This commit is contained in:
zegkljan
2023-06-11 18:06:33 +02:00
parent 63e79c657c
commit 8440e3b7d7
11 changed files with 740 additions and 475 deletions
+2 -1
View File
@@ -20,5 +20,6 @@ const (
URITileserver = URITileserverRoot + "/*x"
URITileTemplate = URITileserverRoot + "/map/tiles/{z}/{x}/{y}.pbf"
AppName = "OKO"
AppName = "OKO"
UserIDHeader = "X-User-ID"
)
+229
View File
@@ -0,0 +1,229 @@
package server
import (
"embed"
"fmt"
"os"
"path"
"regexp"
"sort"
"strconv"
"time"
"zombiezen.com/go/sqlite"
"zombiezen.com/go/sqlite/sqlitemigration"
"zombiezen.com/go/sqlite/sqlitex"
)
func (s *Server) cleanupDb() {
close(s.checkpointNotice)
s.log.Info("Closing db connection pool...")
s.dbpool.Close()
// manually force truncate checkpoint
conn, err := sqlite.OpenConn(fmt.Sprintf("file:%s", s.config.DbPath), 0)
if err != nil {
s.log.WithError(err).Error("Failed to open connection for final checkpoint.")
return
}
err = sqlitex.Execute(conn, "vacuum", nil)
if err != nil {
s.log.WithError(err).Error("Failed to vacuum db.")
}
s.checkpointDb(conn, true)
conn.Close()
}
func (s *Server) getDbConn() *sqlite.Conn {
conn, err := s.dbpool.Get(s.ctx)
if err != nil {
panic(err)
}
return conn
}
func (s *Server) returnDbConn(conn *sqlite.Conn) {
s.dbpool.Put(conn)
}
func withDbConn[T any](s *Server, f func(conn *sqlite.Conn) T) T {
conn := s.getDbConn()
defer s.returnDbConn(conn)
return f(conn)
}
func (s *Server) checkpointDb(conn *sqlite.Conn, truncate bool) {
var query string
if truncate {
query = "PRAGMA wal_checkpoint(TRUNCATE)"
} else {
query = "PRAGMA wal_checkpoint(RESTART)"
}
stmt, _, err := conn.PrepareTransient(query)
if err != nil {
s.log.WithError(err).Error("Failed to prepare checkpoint query.")
return
}
defer stmt.Finalize()
has, err := stmt.Step()
if err != nil {
s.log.WithError(err).Error("Failed to step through checkpoint query.")
return
}
if !has {
s.log.Error("Checkpoint query returned no rows.")
return
}
blocked := stmt.ColumnInt(0)
noWalPages := stmt.ColumnInt(1)
noReclaimedPages := stmt.ColumnInt(2)
if blocked == 1 {
s.log.Warn("Checkpoint query was blocked.")
}
s.log.Debugf("Checkpoint complete. %d pages written to WAL, %d pages written back to DB.", noWalPages, noReclaimedPages)
}
func (s *Server) setupDB() error {
s.dbAvailable.Store(false)
s.log.Debugf("Using db %s", s.config.DbPath)
ready := make(chan struct{})
migErr := make(chan error)
s.dbpool = sqlitemigration.NewPool(fmt.Sprintf("file:%s", s.config.DbPath), sqlSchema, sqlitemigration.Options{
PoolSize: 10,
PrepareConn: func(conn *sqlite.Conn) error {
return sqlitex.ExecuteTransient(conn, "PRAGMA foreign_keys = ON;", nil)
},
OnReady: func() {
close(ready)
},
OnError: func(err error) {
migErr <- err
},
})
select {
case <-ready:
case err := <-migErr:
return fmt.Errorf("error during db migration: %w", err)
}
s.checkpointNotice = make(chan struct{})
// aggressively checkpoint the database on idle times
go func() {
s.log.Debug("Starting manual restart checkpointing.")
defer s.log.Debug("Manual restart checkpointing stopped.")
delay := time.Minute * 15
var (
timer <-chan time.Time
ok bool
)
for {
select {
case _, ok = <-s.checkpointNotice:
if !ok {
return
}
timer = time.After(delay)
case <-timer:
withDbConn(s, func(conn *sqlite.Conn) any {
s.checkpointDb(conn, false)
return nil
})
timer = nil
}
}
}()
s.dbAvailable.Store(true)
return nil
}
func (s *Server) requestCheckpoint() {
go func() {
s.checkpointNotice <- struct{}{}
}()
}
func (s *Server) reinitDb() error {
s.log.Debug("Reinitializing db.")
s.dbAvailable.Store(false)
defer s.dbAvailable.Store(true)
close(s.checkpointNotice)
err := s.dbpool.Close()
if err != nil {
return fmt.Errorf("failed to close db: %w", err)
}
s.log.Debug("Removing main db file.")
err = os.Remove(s.config.DbPath)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove db file %s: %w", s.config.DbPath, err)
}
s.log.Debug("Removing WAL file.")
err = os.Remove(s.config.DbPath + "-wal")
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove db-wal file %s-wal: %w", s.config.DbPath, err)
}
s.log.Debug("Initializing db.")
err = s.setupDB()
if err != nil {
return fmt.Errorf("failed to setup db during reinit")
}
s.log.Debug("DB reinitialized.")
return nil
}
// SQL schema
//go:embed sql_schema/V*.sql
var sqlSchemaFiles embed.FS
var sqlSchema sqlitemigration.Schema = func() sqlitemigration.Schema {
type migration struct {
content string
version int
name string
}
entries, err := sqlSchemaFiles.ReadDir("sql_schema")
if err != nil {
panic(fmt.Errorf("failed to read sql_schema migrations: %w", err))
}
pattern := regexp.MustCompile("^V([0-9]+)_(.*)[.][sS][qQ][lL]$")
migrations := []*migration{}
for _, entry := range entries {
name := entry.Name()
if entry.IsDir() {
panic(fmt.Errorf("embedded sql migration '%s' is a directory", name))
}
matches := pattern.FindStringSubmatch(name)
if matches == nil {
panic(fmt.Errorf("embedded sql migration '%s' does not match the filename pattern", name))
}
if len(matches) != 3 {
panic(fmt.Errorf("embedded sql migration '%s' does not have the correct number of submatches", name))
}
version, err := strconv.Atoi(matches[1])
if err != nil {
panic(fmt.Errorf("failed to parse version number of migration '%s': %w", name, err))
}
migName := matches[2]
file := path.Join("sql_schema", name)
content, err := sqlSchemaFiles.ReadFile(file)
if err != nil {
panic(fmt.Errorf("failed to read embedded migration %s", entry.Name()))
}
migrations = append(migrations, &migration{
content: string(content),
version: version,
name: migName,
})
}
sort.Slice(migrations, func(i, j int) bool {
return migrations[i].version < migrations[j].version
})
return sqlitemigration.Schema{
Migrations: Map(func(m *migration) string { return m.content }, migrations),
}
}()
+151 -50
View File
@@ -17,6 +17,8 @@ import (
"github.com/gin-gonic/gin"
"github.com/mssola/user_agent"
"github.com/sirupsen/logrus"
"zombiezen.com/go/sqlite"
"zombiezen.com/go/sqlite/sqlitex"
)
func internalError(gc *gin.Context, err error) {
@@ -39,50 +41,104 @@ func (s *Server) setupRouter() *gin.Engine {
if err != nil {
hostname = "unknown"
}
router.Use(func(gc *gin.Context) {
path := gc.Request.URL.Path
start := time.Now()
gc.Next()
stop := time.Since(start)
latency := int(math.Ceil(float64(stop.Nanoseconds()) / 1_000_000.0))
statusCode := gc.Writer.Status()
clientIP := gc.ClientIP()
clientUserAgent := gc.Request.UserAgent()
referer := gc.Request.Referer()
dataLength := gc.Writer.Size()
if dataLength < 0 {
dataLength = 0
}
entry := s.log.WithFields(logrus.Fields{
"hostname": hostname,
"statusCode": statusCode,
"latency": latency,
"clientIP": clientIP,
"method": gc.Request.Method,
"path": path,
"referer": referer,
"dataLength": dataLength,
"userAgent": clientUserAgent,
})
if len(gc.Errors) > 0 {
entry.Error(gc.Errors.ByType(gin.ErrorTypePrivate).String())
} else {
msg := fmt.Sprintf(
"%s - %s [%s] \"%s %s\" %d %d \"%s\" \"%s\" (%dms)",
clientIP, hostname, time.Now().Format(time.RFC3339), gc.Request.Method, path, statusCode, dataLength, referer, clientUserAgent, latency,
)
if statusCode >= 500 {
entry.Error(msg)
} else if statusCode >= 400 {
entry.Warn(msg)
} else if path == URIPing {
entry.Debug(msg)
} else {
entry.Info(msg)
router.Use(
func(gc *gin.Context) {
path := gc.Request.URL.Path
start := time.Now()
gc.Next()
stop := time.Since(start)
latency := int(math.Ceil(float64(stop.Nanoseconds()) / 1_000_000.0))
statusCode := gc.Writer.Status()
clientIP := gc.ClientIP()
clientUserAgent := gc.Request.UserAgent()
userId := gc.GetHeader(UserIDHeader)
referer := gc.Request.Referer()
dataLength := gc.Writer.Size()
if dataLength < 0 {
dataLength = 0
}
}
})
entry := s.log.WithFields(logrus.Fields{
"hostname": hostname,
"statusCode": statusCode,
"latency": latency,
"clientIP": clientIP,
"method": gc.Request.Method,
"path": path,
"referer": referer,
"dataLength": dataLength,
"userAgent": clientUserAgent,
"userID": userId,
})
if len(gc.Errors) > 0 {
entry.Error(gc.Errors.ByType(gin.ErrorTypePrivate).String())
} else {
msg := fmt.Sprintf(
"%s - %s [%s] \"%s %s\" %d %d \"%s\" \"%s\" (%dms)",
clientIP, hostname, time.Now().Format(time.RFC3339), gc.Request.Method, path, statusCode, dataLength, referer, clientUserAgent, latency,
)
if statusCode >= 500 {
entry.Error(msg)
} else if statusCode >= 400 {
entry.Warn(msg)
} else if path == URIPing {
entry.Debug(msg)
} else {
entry.Info(msg)
}
}
},
func(gc *gin.Context) {
if !s.dbAvailable.Load() {
gc.AbortWithError(http.StatusServiceUnavailable, fmt.Errorf("server database is not ready/available"))
gc.Header("Retry-After", "60")
return
}
gc.Next()
},
func(gc *gin.Context) {
userIdStr := gc.GetHeader(UserIDHeader)
userIdPresent := userIdStr != ""
var userId int64
var userIdErr error
if userIdPresent {
userId, userIdErr = strconv.ParseInt(userIdStr, 10, 0)
}
appVersion, appVersionErr := extractAppVersion(gc)
if userIdErr != nil {
gc.Error(fmt.Errorf("malformed %s: %w", UserIDHeader, userIdErr))
} else if userIdPresent {
gc.Set("uid", userId)
var err error
if appVersion == nil {
err = withDbConn(s, func(conn *sqlite.Conn) error {
err := sqlitex.Execute(conn, "update users set last_seen_time = ? where id = ?", &sqlitex.ExecOptions{
Args: []interface{}{time.Now().Unix(), userId},
})
return err
})
} else {
err = withDbConn(s, func(conn *sqlite.Conn) error {
err := sqlitex.Execute(conn, "update users set app_version = ?, last_seen_time = ? where id = ?", &sqlitex.ExecOptions{
Args: []interface{}{appVersion, time.Now().Unix(), userId},
})
return err
})
}
if err != nil {
gc.AbortWithError(http.StatusInternalServerError, fmt.Errorf("failed to store user data into db: %w", err))
}
}
if appVersion != nil {
gc.Set("app-version", appVersion)
}
if appVersionErr != nil {
gc.Error(appVersionErr)
gc.Set("app-version-error", appVersionErr.Error())
}
gc.Next()
},
)
// tileserver
router.GET(URITileserver, gin.WrapH(s.tileserverSvSet.Handler()))
@@ -118,6 +174,7 @@ func (s *Server) setupRouter() *gin.Engine {
router.GET(URIAppVersion, s.handleGETAppVersion)
router.DELETE(URIAppVersion, s.handleDELETEAppVersion)
router.POST(URIReinit, s.handlePOSTReset)
router.GET(URIUsageInfo, s.handleGETUsageInfo)
return router
}
@@ -137,9 +194,19 @@ func extractAppVersion(gc *gin.Context) (*semver.Version, error) {
}
func (s *Server) handleGETPing(gc *gin.Context) {
version, err := extractAppVersion(gc)
if err != nil {
badRequest(gc, err)
versionRaw, exists := gc.Get("app-version")
if !exists {
versionErr := gc.GetString("app-version-error")
if versionErr != "" {
badRequest(gc, fmt.Errorf("malformed app version: %v", &versionErr))
} else {
badRequest(gc, fmt.Errorf("app version not specified"))
}
return
}
version, ok := versionRaw.(*semver.Version)
if !ok {
internalError(gc, fmt.Errorf("malformed app version extracted"))
return
}
@@ -220,12 +287,12 @@ func (s *Server) handleDELETEAppVersion(gc *gin.Context) {
}
func (s *Server) handlePOSTReset(gc *gin.Context) {
err := s.initDB(true)
err := s.reinitDb()
if err != nil {
internalError(gc, err)
return
}
gc.Status(http.StatusOK)
gc.Status(http.StatusNoContent)
}
func (s *Server) handleGETTilepack(gc *gin.Context) {
@@ -268,6 +335,8 @@ func (s *Server) handlePOSTHandshake(gc *gin.Context) {
}
func (s *Server) handleGETData(gc *gin.Context) {
uidRaw, uidExists := gc.Get("uid")
uid, _ := uidRaw.(int64)
accept := gc.GetHeader("Accept")
if accept == "application/json" {
data, err := s.getDataOnly()
@@ -276,7 +345,6 @@ func (s *Server) handleGETData(gc *gin.Context) {
return
}
gc.JSON(http.StatusOK, data)
return
} else if accept == "application/zip" {
file, err := s.getDataWithPhotos()
defer func() {
@@ -299,12 +367,25 @@ func (s *Server) handleGETData(gc *gin.Context) {
return
}
gc.DataFromReader(http.StatusOK, size, "application/zip", file, nil)
} else {
gc.String(http.StatusNotAcceptable, "%s is not acceptable", accept)
return
}
gc.String(http.StatusNotAcceptable, "%s is not acceptable", accept)
if uidExists {
err := withDbConn(s, func(conn *sqlite.Conn) error {
return sqlitex.Execute(conn, "update users set last_download_time = ? where id = ?", &sqlitex.ExecOptions{
Args: []interface{}{time.Now().Unix(), uid},
})
})
if err != nil {
gc.Error(fmt.Errorf("failed to store last download time: %w", err))
}
}
}
func (s *Server) handlePOSTData(gc *gin.Context) {
uidRaw, uidExists := gc.Get("uid")
uid, _ := uidRaw.(int64)
switch gc.ContentType() {
case "application/json":
s.handlePOSTDataJSON(gc)
@@ -312,6 +393,17 @@ func (s *Server) handlePOSTData(gc *gin.Context) {
s.handlePOSTDataMultipart(gc)
default:
badRequest(gc, fmt.Errorf("unsupported Content-Type"))
return
}
if uidExists {
err := withDbConn(s, func(conn *sqlite.Conn) error {
return sqlitex.Execute(conn, "update users set last_upload_time = ? where id = ?", &sqlitex.ExecOptions{
Args: []interface{}{time.Now().Unix(), uid},
})
})
if err != nil {
gc.Error(fmt.Errorf("failed to store last upload time: %w", err))
}
}
}
@@ -470,3 +562,12 @@ func (s *Server) handleGETDataProposals(gc *gin.Context) {
}
gc.JSON(http.StatusOK, proposals)
}
func (s *Server) handleGETUsageInfo(gc *gin.Context) {
usage, err := s.getUsageInfo(nil)
if err != nil {
internalError(gc, err)
return
}
gc.JSON(http.StatusOK, usage)
}
+233 -420
View File
@@ -4,35 +4,30 @@ import (
"archive/zip"
"bytes"
"context"
"embed"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path"
"regexp"
"sort"
"strconv"
"sync/atomic"
"time"
"cernobor.cz/oko-server/errs"
"cernobor.cz/oko-server/models"
"crawshaw.io/sqlite"
"crawshaw.io/sqlite/sqlitex"
mbsh "github.com/consbio/mbtileserver/handlers"
"github.com/coreos/go-semver/semver"
geojson "github.com/paulmach/go.geojson"
"github.com/sirupsen/logrus"
"zombiezen.com/go/sqlite"
"zombiezen.com/go/sqlite/sqlitemigration"
"zombiezen.com/go/sqlite/sqlitex"
)
//go:embed sql_schema/V*.sql
var sqlSchema embed.FS
type Server struct {
config ServerConfig
dbpool *sqlitex.Pool
dbpool *sqlitemigration.Pool
dbAvailable atomic.Bool
checkpointNotice chan struct{}
log *logrus.Logger
ctx context.Context
@@ -66,8 +61,13 @@ func (s *Server) Run(ctx context.Context) {
s.log.SetLevel(logrus.DebugLevel)
}
s.dbAvailable.Store(false)
s.ctx = ctx
s.setupDB()
err := s.setupDB()
if err != nil {
panic(err)
}
defer s.cleanupDb()
s.setupTiles()
@@ -96,116 +96,6 @@ func (s *Server) Run(ctx context.Context) {
}
}
func (s *Server) cleanupDb() {
close(s.checkpointNotice)
s.log.Info("Closing db connection pool...")
s.dbpool.Close()
// manually force truncate checkpoint
conn, err := sqlite.OpenConn(fmt.Sprintf("file:%s", s.config.DbPath), 0)
if err != nil {
s.log.WithError(err).Error("Failed to open connection for final checkpoint.")
return
}
err = sqlitex.Exec(conn, "vacuum", nil)
if err != nil {
s.log.WithError(err).Error("Failed to vacuum db.")
}
s.checkpointDb(conn, true)
conn.Close()
}
func (s *Server) getDbConn() *sqlite.Conn {
conn := s.dbpool.Get(s.ctx)
_, err := conn.Prep("PRAGMA foreign_keys = ON").Step()
if err != nil {
panic(err)
}
return conn
}
func (s *Server) checkpointDb(conn *sqlite.Conn, truncate bool) {
var query string
if truncate {
query = "PRAGMA wal_checkpoint(TRUNCATE)"
} else {
query = "PRAGMA wal_checkpoint(RESTART)"
}
stmt, _, err := conn.PrepareTransient(query)
if err != nil {
s.log.WithError(err).Error("Failed to prepare checkpoint query.")
return
}
defer stmt.Finalize()
has, err := stmt.Step()
if err != nil {
s.log.WithError(err).Error("Failed to step through checkpoint query.")
return
}
if !has {
s.log.Error("Checkpoint query returned no rows.")
return
}
blocked := stmt.ColumnInt(0)
noWalPages := stmt.ColumnInt(1)
noReclaimedPages := stmt.ColumnInt(2)
if blocked == 1 {
s.log.Warn("Checkpoint query was blocked.")
}
s.log.Debugf("Checkpoint complete. %d pages written to WAL, %d pages written back to DB.", noWalPages, noReclaimedPages)
}
func (s *Server) setupDB() {
sqlitex.PoolCloseTimeout = time.Second * 10
s.log.Debugf("Using db %s", s.config.DbPath)
dbpool, err := sqlitex.Open(fmt.Sprintf("file:%s", s.config.DbPath), 0, 10)
if err != nil {
s.log.WithError(err).Fatal("Failed to open/create DB.")
}
s.dbpool = dbpool
s.checkpointNotice = make(chan struct{})
err = s.initDB(s.config.ReinitDB)
if err != nil {
s.log.WithError(err).Fatal("init DB transaction failed")
}
// aggressively checkpoint the database on idle times
go func() {
s.log.Debug("Starting manual restart checkpointing.")
defer s.log.Debug("Manual restart checkpointing stopped.")
delay := time.Minute * 15
var (
timer <-chan time.Time
ok bool
)
for {
select {
case _, ok = <-s.checkpointNotice:
if !ok {
return
}
timer = time.After(delay)
case <-timer:
func() {
conn := s.dbpool.Get(s.ctx)
defer s.dbpool.Put(conn)
s.checkpointDb(conn, false)
timer = nil
}()
}
}
}()
}
func (s *Server) requestCheckpoint() {
go func() {
s.checkpointNotice <- struct{}{}
}()
}
func (s *Server) setupTiles() {
tsRootURL, err := url.Parse(URITileserverRoot)
if err != nil {
@@ -234,146 +124,6 @@ func (s *Server) setupTiles() {
s.mapPackSize = info.Size()
}
func (s *Server) initDB(reinit bool) error {
s.log.Info("Initializing DB.")
conn := s.getDbConn()
defer s.dbpool.Put(conn)
defer s.requestCheckpoint()
if reinit {
s.log.Warn("Reinitializing DB.")
tables := []string{}
err := sqlitex.Exec(conn, "select name from sqlite_master where type = 'table'", func(stmt *sqlite.Stmt) error {
tables = append(tables, stmt.ColumnText(0))
return nil
})
if err != nil {
return fmt.Errorf("failed to get table names: %w", err)
}
for _, table := range tables {
err = sqlitex.Exec(conn, "drop table "+table, nil)
if err != nil {
return fmt.Errorf("failed to drop tables: %w", err)
}
}
err = sqlitex.Exec(conn, "PRAGMA user_version = 0", nil)
if err != nil {
return fmt.Errorf("failed to reset user version: %w", err)
}
}
err := s.migrateDb(conn)
if err != nil {
return fmt.Errorf("failed to migrate db: %w", err)
}
return nil
}
func (s *Server) migrateDb(conn *sqlite.Conn) error {
var version int
err := sqlitex.Exec(conn, "PRAGMA user_version", func(stmt *sqlite.Stmt) error {
version = stmt.ColumnInt(0)
return nil
})
if err != nil {
return fmt.Errorf("failed to get user version: %w", err)
}
s.log.Debugf("Current db version: %d", version)
entries, err := sqlSchema.ReadDir("sql_schema")
if err != nil {
return fmt.Errorf("failed to read sql_schema migrations: %w", err)
}
type migration struct {
file string
version int
name string
}
pattern := regexp.MustCompile("^V([0-9]+)_(.*)[.][sS][qQ][lL]$")
migrations := []migration{}
for _, entry := range entries {
name := entry.Name()
if entry.IsDir() {
return fmt.Errorf("embedded sql migration '%s' is a directory", name)
}
matches := pattern.FindStringSubmatch(name)
if matches == nil {
return fmt.Errorf("embedded sql migration '%s' does not match the filename pattern", name)
}
if len(matches) != 3 {
return fmt.Errorf("embedded sql migration '%s' does not have the correct number of submatches", name)
}
version, err := strconv.Atoi(matches[1])
if err != nil {
return fmt.Errorf("failed to parse version number of migration '%s': %w", name, err)
}
migName := matches[2]
file := path.Join("sql_schema", name)
migrations = append(migrations, migration{
file: file,
version: version,
name: migName,
})
}
sort.Slice(migrations, func(i, j int) bool {
return migrations[i].version < migrations[j].version
})
for _, migration := range migrations {
if version >= migration.version {
s.log.Debugf("Skipping migration version %d because current version %d is not smaller.", migration.version, version)
continue
}
migContent, err := sqlSchema.ReadFile(migration.file)
if err != nil {
return fmt.Errorf("failed to read embedded migration '%s': %w", migration.file, err)
}
err = func() (err error) {
rollback := sqlitex.Save(conn)
defer func() {
if err != nil {
s.log.Info("Rolling back last migration attempt.")
}
rollback(&err)
}()
s.log.Infof("Executing migration V%d - %s", migration.version, migration.name)
err = sqlitex.ExecScript(conn, string(migContent))
if err != nil {
return fmt.Errorf("failed to execute migration '%s': %w", migration.name, err)
}
err = sqlitex.Exec(conn, fmt.Sprintf("PRAGMA user_version = %d", migration.version), nil)
if err != nil {
return fmt.Errorf("failed to set user_version in db: %w", err)
}
err = sqlitex.Exec(conn, "PRAGMA user_version", func(stmt *sqlite.Stmt) error {
version = stmt.ColumnInt(0)
return nil
})
if err != nil {
return fmt.Errorf("failed to get user_version: %w", err)
}
s.log.Infof("Migrated db to version: %d", version)
return nil
}()
if err != nil {
return err
}
}
return nil
}
func (s *Server) getLatestVersion(v *semver.Version) (*models.AppVersionInfo, error) {
conn := s.getDbConn()
defer s.dbpool.Put(conn)
@@ -398,20 +148,22 @@ func (s *Server) getAppVersions() ([]*models.AppVersionInfo, error) {
defer s.dbpool.Put(conn)
versions := []*models.AppVersionInfo{}
err := sqlitex.Exec(conn, "select version, address from app_versions", func(stmt *sqlite.Stmt) error {
verStr := stmt.ColumnText(0)
addr := stmt.ColumnText(1)
err := sqlitex.Execute(conn, "select version, address from app_versions", &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
verStr := stmt.ColumnText(0)
addr := stmt.ColumnText(1)
ver, err := semver.NewVersion(verStr)
if err != nil {
return fmt.Errorf("failed to parse version: %w", err)
}
ver, err := semver.NewVersion(verStr)
if err != nil {
return fmt.Errorf("failed to parse version: %w", err)
}
versions = append(versions, &models.AppVersionInfo{
Version: *ver,
Address: addr,
})
return nil
versions = append(versions, &models.AppVersionInfo{
Version: *ver,
Address: addr,
})
return nil
},
})
if err != nil {
return nil, fmt.Errorf("failed to insert/retrieve user from db: %w", err)
@@ -424,7 +176,9 @@ func (s *Server) putAppVersion(versionInfo *models.AppVersionInfo) error {
conn := s.getDbConn()
defer s.dbpool.Put(conn)
err := sqlitex.Exec(conn, "insert into app_versions(version, address) values(?, ?) on conflict(version) do update set address = excluded.address", nil, versionInfo.Version.String(), versionInfo.Address)
err := sqlitex.Execute(conn, "insert into app_versions(version, address) values(?, ?) on conflict(version) do update set address = excluded.address", &sqlitex.ExecOptions{
Args: []interface{}{versionInfo.Version.String(), versionInfo.Address},
})
if err != nil {
return fmt.Errorf("failed to insert app version into db: %w", err)
}
@@ -437,19 +191,22 @@ func (s *Server) getAppVersion(version string) (*models.AppVersionInfo, error) {
defer s.dbpool.Put(conn)
var v *models.AppVersionInfo
err := sqlitex.Exec(conn, "select version, address from app_versions where version = ?", func(stmt *sqlite.Stmt) error {
verStr := stmt.ColumnText(0)
ver, err := semver.NewVersion(verStr)
if err != nil {
return fmt.Errorf("failed to parse version string %s from db: %w", verStr, err)
}
addr := stmt.ColumnText(1)
v = &models.AppVersionInfo{
Version: *ver,
Address: addr,
}
return nil
}, version)
err := sqlitex.Execute(conn, "select version, address from app_versions where version = ?", &sqlitex.ExecOptions{
Args: []interface{}{version},
ResultFunc: func(stmt *sqlite.Stmt) error {
verStr := stmt.ColumnText(0)
ver, err := semver.NewVersion(verStr)
if err != nil {
return fmt.Errorf("failed to parse version string %s from db: %w", verStr, err)
}
addr := stmt.ColumnText(1)
v = &models.AppVersionInfo{
Version: *ver,
Address: addr,
}
return nil
},
})
if err != nil {
return nil, fmt.Errorf("failed to retrieve app version %s from db: %w", version, err)
}
@@ -461,7 +218,9 @@ func (s *Server) deleteAppVersion(version string) (bool, error) {
conn := s.getDbConn()
defer s.dbpool.Put(conn)
err := sqlitex.Exec(conn, "delete from app_versions where version = ?", nil, version)
err := sqlitex.Execute(conn, "delete from app_versions where version = ?", &sqlitex.ExecOptions{
Args: []interface{}{version},
})
if err != nil {
return false, fmt.Errorf("failed to delete app version %s from db: %w", version, err)
}
@@ -479,11 +238,14 @@ func (s *Server) handshake(hc models.HandshakeChallenge) (models.UserID, error)
var id *int64
if hc.Exists {
err = sqlitex.Exec(conn, "select id from users where name = ?", func(stmt *sqlite.Stmt) error {
id = ptr(stmt.ColumnInt64(0))
return nil
}, hc.Name)
if sqlite.ErrCode(err) != sqlite.SQLITE_OK {
err = sqlitex.Execute(conn, "select id from users where name = ?", &sqlitex.ExecOptions{
Args: []interface{}{hc.Name},
ResultFunc: func(stmt *sqlite.Stmt) error {
id = ptr(stmt.ColumnInt64(0))
return nil
},
})
if sqlite.ErrCode(err) != sqlite.ResultOK {
return 0, err
}
if id == nil {
@@ -493,14 +255,17 @@ func (s *Server) handshake(hc models.HandshakeChallenge) (models.UserID, error)
return 0, errs.ErrAttemptedSystemUser
}
} else {
err = sqlitex.Exec(conn, "insert into users(name) values(?)", func(stmt *sqlite.Stmt) error {
id = ptr(stmt.ColumnInt64(0))
return nil
}, hc.Name)
if sqlite.ErrCode(err) == sqlite.SQLITE_CONSTRAINT_UNIQUE {
err = sqlitex.Execute(conn, "insert into users(name) values(?)", &sqlitex.ExecOptions{
Args: []interface{}{hc.Name},
ResultFunc: func(stmt *sqlite.Stmt) error {
id = ptr(stmt.ColumnInt64(0))
return nil
},
})
if sqlite.ErrCode(err) == sqlite.ResultConstraintUnique {
return 0, errs.ErrUserAlreadyExists
}
if sqlite.ErrCode(err) != sqlite.SQLITE_OK {
if sqlite.ErrCode(err) != sqlite.ResultOK {
return 0, err
}
id = ptr(conn.LastInsertRowID())
@@ -566,17 +331,19 @@ func (s *Server) getDataWithPhotos() (file *os.File, err error) {
}
data.PhotoMetadata = make(map[string]models.PhotoMetadata, 100)
err = sqlitex.Exec(conn, "select id, content_type, length(contents) from feature_photos", func(stmt *sqlite.Stmt) error {
id := models.FeaturePhotoID(stmt.ColumnInt64(0))
contentType := stmt.ColumnText(1)
fileSize := stmt.ColumnInt64(2)
data.PhotoMetadata[makePhotoFilename(id)] = models.PhotoMetadata{
ContentType: contentType,
Size: fileSize,
ID: id,
ThumbnailFilename: makeThumbnailFilename(id),
}
return nil
err = sqlitex.Execute(conn, "select id, content_type, length(contents) from feature_photos", &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
id := models.FeaturePhotoID(stmt.ColumnInt64(0))
contentType := stmt.ColumnText(1)
fileSize := stmt.ColumnInt64(2)
data.PhotoMetadata[makePhotoFilename(id)] = models.PhotoMetadata{
ContentType: contentType,
Size: fileSize,
ID: id,
ThumbnailFilename: makeThumbnailFilename(id),
}
return nil
},
})
if err != nil {
return nil, fmt.Errorf("failed to collect photo metadata: %w", err)
@@ -603,50 +370,52 @@ func (s *Server) getDataWithPhotos() (file *os.File, err error) {
return nil, fmt.Errorf("failed to write data zip entry: %w", err)
}
err = sqlitex.Exec(conn, "select id from feature_photos fp where exists (select 1 from features f where f.id = fp.feature_id)", func(stmt *sqlite.Stmt) error {
id := stmt.ColumnInt64(0)
err = sqlitex.Execute(conn, "select id from feature_photos fp where exists (select 1 from features f where f.id = fp.feature_id)", &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
id := stmt.ColumnInt64(0)
blob, err := conn.OpenBlob("", "feature_photos", "thumbnail_contents", id, false)
if err != nil {
return fmt.Errorf("failed to open photo ID %d thumbnail content blob: %w", id, err)
}
err = func() error {
defer blob.Close()
w, err := zw.Create(makeThumbnailFilename(models.FeaturePhotoID(id)))
blob, err := conn.OpenBlob("", "feature_photos", "thumbnail_contents", id, false)
if err != nil {
return fmt.Errorf("failed to create zip entry: %w", err)
return fmt.Errorf("failed to open photo ID %d thumbnail content blob: %w", id, err)
}
_, err = io.Copy(w, blob)
err = func() error {
defer blob.Close()
w, err := zw.Create(makeThumbnailFilename(models.FeaturePhotoID(id)))
if err != nil {
return fmt.Errorf("failed to create zip entry: %w", err)
}
_, err = io.Copy(w, blob)
if err != nil {
return fmt.Errorf("failed to write zip entry: %w", err)
}
return nil
}()
if err != nil {
return fmt.Errorf("failed to write zip entry: %w", err)
return fmt.Errorf("failed to write photo ID %d thumbnail: %w", id, err)
}
blob, err = conn.OpenBlob("", "feature_photos", "contents", id, false)
if err != nil {
return fmt.Errorf("failed to open photo ID %d photo content blob: %w", id, err)
}
err = func() error {
defer blob.Close()
w, err := zw.Create(makePhotoFilename(models.FeaturePhotoID(id)))
if err != nil {
return fmt.Errorf("failed to create zip entry: %w", err)
}
_, err = io.Copy(w, blob)
if err != nil {
return fmt.Errorf("failed to write zip entry: %w", err)
}
return nil
}()
if err != nil {
return fmt.Errorf("failed to write photo ID %d photo: %w", id, err)
}
return nil
}()
if err != nil {
return fmt.Errorf("failed to write photo ID %d thumbnail: %w", id, err)
}
blob, err = conn.OpenBlob("", "feature_photos", "contents", id, false)
if err != nil {
return fmt.Errorf("failed to open photo ID %d photo content blob: %w", id, err)
}
err = func() error {
defer blob.Close()
w, err := zw.Create(makePhotoFilename(models.FeaturePhotoID(id)))
if err != nil {
return fmt.Errorf("failed to create zip entry: %w", err)
}
_, err = io.Copy(w, blob)
if err != nil {
return fmt.Errorf("failed to write zip entry: %w", err)
}
return nil
}()
if err != nil {
return fmt.Errorf("failed to write photo ID %d photo: %w", id, err)
}
return nil
},
})
if err != nil {
return nil, fmt.Errorf("failed to collect photo files: %w", err)
@@ -661,7 +430,7 @@ func (s *Server) update(data models.Update, photos map[string]models.Photo) erro
defer s.dbpool.Put(conn)
return func() (err error) {
defer sqlitex.Save(conn)(&err)
defer sqlitex.Transaction(conn)(&err)
var createdIDMapping map[models.FeatureID]models.FeatureID
if data.Create != nil {
@@ -716,14 +485,10 @@ func (s *Server) update(data models.Update, photos map[string]models.Photo) erro
}
func (s *Server) deleteExpiredFeatures(conn *sqlite.Conn) error {
stmt, err := conn.Prepare("delete from features where deadline < ?")
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer stmt.Finalize()
now := time.Now().Unix()
err = sqlitex.Exec(conn, "delete from features where deadline < ?", func(stmt *sqlite.Stmt) error { return nil }, now)
err := sqlitex.Execute(conn, "delete from features where deadline < ?", &sqlitex.ExecOptions{
Args: []interface{}{now},
})
if err != nil {
return fmt.Errorf("failed to delete expired features: %w", err)
}
@@ -734,16 +499,18 @@ func (s *Server) deleteExpiredFeatures(conn *sqlite.Conn) error {
func (s *Server) getPeople(conn *sqlite.Conn) ([]models.User, error) {
if conn == nil {
conn = s.getDbConn()
defer s.dbpool.Put(conn)
defer s.returnDbConn(conn)
}
users := make([]models.User, 0, 50)
err := sqlitex.Exec(conn, "select id, name from users", func(stmt *sqlite.Stmt) error {
users = append(users, models.User{
ID: models.UserID(stmt.ColumnInt(0)),
Name: stmt.ColumnText(1),
})
return nil
err := sqlitex.Execute(conn, "select id, name from users", &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
users = append(users, models.User{
ID: models.UserID(stmt.ColumnInt(0)),
Name: stmt.ColumnText(1),
})
return nil
},
})
if err != nil {
return nil, fmt.Errorf("failed to get users from db: %w", err)
@@ -758,56 +525,58 @@ func (s *Server) getFeatures(conn *sqlite.Conn) ([]models.Feature, error) {
}
features := make([]models.Feature, 0, 100)
err := sqlitex.Exec(conn, `select f.id, f.owner_id, f.name, f.deadline, f.properties, f.geom, '[' || coalesce(group_concat(p.id, ', '), '') || ']'
err := sqlitex.Execute(conn, `select f.id, f.owner_id, f.name, f.deadline, f.properties, f.geom, '[' || coalesce(group_concat(p.id, ', '), '') || ']'
from features f
left join feature_photos p on f.id = p.feature_id
group by f.id, f.owner_id, f.name, f.properties, f.geom`, func(stmt *sqlite.Stmt) error {
group by f.id, f.owner_id, f.name, f.properties, f.geom`, &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
id := stmt.ColumnInt64(0)
id := stmt.ColumnInt64(0)
ownerID := stmt.ColumnInt64(1)
ownerID := stmt.ColumnInt64(1)
name := stmt.ColumnText(2)
name := stmt.ColumnText(2)
var deadline *time.Time
if stmt.ColumnType(3) != sqlite.SQLITE_NULL {
dl := time.Unix(stmt.ColumnInt64(3), 0)
deadline = &dl
}
var deadline *time.Time
if stmt.ColumnType(3) != sqlite.TypeNull {
dl := time.Unix(stmt.ColumnInt64(3), 0)
deadline = &dl
}
propertiesRaw := stmt.ColumnText(4)
var properties map[string]interface{}
err := json.Unmarshal([]byte(propertiesRaw), &properties)
if err != nil {
return fmt.Errorf("failed to parse properties for feature id=%d: %w", id, err)
}
propertiesRaw := stmt.ColumnText(4)
var properties map[string]interface{}
err := json.Unmarshal([]byte(propertiesRaw), &properties)
if err != nil {
return fmt.Errorf("failed to parse properties for feature id=%d: %w", id, err)
}
geomRaw := stmt.ColumnText(5)
var geom geojson.Geometry
err = json.Unmarshal([]byte(geomRaw), &geom)
if err != nil {
return fmt.Errorf("failed to parse geometry for feature id=%d: %w", id, err)
}
geomRaw := stmt.ColumnText(5)
var geom geojson.Geometry
err = json.Unmarshal([]byte(geomRaw), &geom)
if err != nil {
return fmt.Errorf("failed to parse geometry for feature id=%d: %w", id, err)
}
photosRaw := stmt.ColumnText(6)
var photos []models.FeaturePhotoID
err = json.Unmarshal([]byte(photosRaw), &photos)
if err != nil {
return fmt.Errorf("failed to parse list of photo IDs: %w", err)
}
photosRaw := stmt.ColumnText(6)
var photos []models.FeaturePhotoID
err = json.Unmarshal([]byte(photosRaw), &photos)
if err != nil {
return fmt.Errorf("failed to parse list of photo IDs: %w", err)
}
feature := models.Feature{
ID: models.FeatureID(id),
OwnerID: models.UserID(ownerID),
Name: name,
Deadline: deadline,
Properties: properties,
Geometry: geom,
PhotoIDs: photos,
}
feature := models.Feature{
ID: models.FeatureID(id),
OwnerID: models.UserID(ownerID),
Name: name,
Deadline: deadline,
Properties: properties,
Geometry: geom,
PhotoIDs: photos,
}
features = append(features, feature)
return nil
features = append(features, feature)
return nil
},
})
if err != nil {
return nil, fmt.Errorf("failed to get users from db: %w", err)
@@ -1101,16 +870,19 @@ func (s *Server) getPhoto(featureID models.FeatureID, photoID models.FeaturePhot
var contentType *string = nil
var data []byte = nil
found := false
err := sqlitex.Exec(conn, "select content_type, contents from feature_photos where id = ? and feature_id = ?", func(stmt *sqlite.Stmt) error {
if found {
return fmt.Errorf("multiple photos returned for feature id %d, photo id %d", featureID, photoID)
}
contentType = ptr(stmt.ColumnText(0))
data = make([]byte, stmt.ColumnLen(1))
stmt.ColumnBytes(1, data)
found = true
return nil
}, photoID, featureID)
err := sqlitex.Execute(conn, "select content_type, contents from feature_photos where id = ? and feature_id = ?", &sqlitex.ExecOptions{
Args: []interface{}{photoID, featureID},
ResultFunc: func(stmt *sqlite.Stmt) error {
if found {
return fmt.Errorf("multiple photos returned for feature id %d, photo id %d", featureID, photoID)
}
contentType = ptr(stmt.ColumnText(0))
data = make([]byte, stmt.ColumnLen(1))
stmt.ColumnBytes(1, data)
found = true
return nil
},
})
if err != nil {
return nil, "", fmt.Errorf("photo db query failed: %w", err)
}
@@ -1159,16 +931,57 @@ func (s *Server) getProposals(conn *sqlite.Conn) ([]models.Proposal, error) {
}
proposals := make([]models.Proposal, 0, 100)
err := sqlitex.Exec(conn, "select owner_id, description, how from proposals", func(stmt *sqlite.Stmt) error {
proposals = append(proposals, models.Proposal{
OwnerID: models.UserID(stmt.ColumnInt(0)),
Description: stmt.ColumnText(1),
How: stmt.ColumnText(2),
})
return nil
err := sqlitex.Execute(conn, "select owner_id, description, how from proposals", &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
proposals = append(proposals, models.Proposal{
OwnerID: models.UserID(stmt.ColumnInt(0)),
Description: stmt.ColumnText(1),
How: stmt.ColumnText(2),
})
return nil
},
})
if err != nil {
return nil, fmt.Errorf("failed to get proposals from db: %w", err)
}
return proposals, nil
}
func (s *Server) getUsageInfo(conn *sqlite.Conn) ([]models.UserInfo, error) {
if conn == nil {
conn = s.getDbConn()
defer s.returnDbConn(conn)
}
users := make([]models.UserInfo, 0, 50)
err := sqlitex.Execute(conn, "select id, name, app_version, last_seen_time, last_upload_time, last_download_time from users", &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
ver, _ := semver.NewVersion(stmt.ColumnText(2))
var lstp, lutp, ldtp *time.Time
if stmt.ColumnType(3) != sqlite.TypeNull {
lstp = ptr(time.Unix(stmt.ColumnInt64(3), 0))
}
if stmt.ColumnType(4) != sqlite.TypeNull {
lutp = ptr(time.Unix(stmt.ColumnInt64(3), 0))
}
if stmt.ColumnType(5) != sqlite.TypeNull {
ldtp = ptr(time.Unix(stmt.ColumnInt64(3), 0))
}
users = append(users, models.UserInfo{
User: models.User{
ID: models.UserID(stmt.ColumnInt(0)),
Name: stmt.ColumnText(1),
},
AppVersion: ver,
LastSeenTime: lstp,
LastUploadTime: lutp,
LastDownloadTime: ldtp,
})
return nil
},
})
if err != nil {
return nil, fmt.Errorf("failed to get users from db: %w", err)
}
return users, nil
}
+1 -1
View File
@@ -8,7 +8,7 @@ CREATE TABLE features (
id integer PRIMARY KEY AUTOINCREMENT,
owner_id integer NOT NULL REFERENCES users(id) ON DELETE CASCADE,
name text NOT NULL,
deadline text,
deadline integer,
properties text NOT NULL,
geom text NOT NULL
);
+4
View File
@@ -0,0 +1,4 @@
ALTER TABLE users ADD COLUMN app_version text;
ALTER TABLE users ADD COLUMN last_seen_time integer;
ALTER TABLE users ADD COLUMN last_upload_time integer;
ALTER TABLE users ADD COLUMN last_download_time integer;
+8
View File
@@ -17,6 +17,14 @@ func ptr[T any](x T) *T {
return &x
}
func Map[T any, U any](f func(T) U, x []T) []U {
res := make([]U, len(x))
for i, e := range x {
res[i] = f(e)
}
return res
}
var contentTypes map[string]struct{} = map[string]struct{}{"image/jpeg": {}, "image/png": {}}
func checkImageContentType(contentType string) bool {