Compare commits

...

2 Commits

Author SHA1 Message Date
Frédéric Guillot 0d0395b4e3 Do not try to update a duplicated feed after a refresh 4 years ago
Frédéric Guillot e6c6ee441a Use a transaction to refresh and create entries 4 years ago
  1. 2
      database/migration.go
  2. 2
      database/sql.go
  3. 1
      database/sql/schema_version_36.sql
  4. 9
      reader/feed/handler.go
  5. 27
      storage/enclosure.go
  6. 56
      storage/entry.go
  7. 23
      storage/feed.go

@ -12,7 +12,7 @@ import (
"miniflux.app/logger"
)
const schemaVersion = 35
const schemaVersion = 36
// Migrate executes database migrations.
func Migrate(db *sql.DB) {

@ -189,6 +189,7 @@ create index entries_user_feed_idx on entries (user_id, feed_id);
"schema_version_34": `CREATE INDEX entries_id_user_status_idx ON entries USING btree (id, user_id, status);`,
"schema_version_35": `alter table feeds add column fetch_via_proxy bool default false;
`,
"schema_version_36": `CREATE INDEX entries_feed_id_status_hash_idx ON entries USING btree (feed_id, status, hash);`,
"schema_version_4": `create type entry_sorting_direction as enum('asc', 'desc');
alter table users add column entry_direction entry_sorting_direction default 'asc';
`,
@ -247,6 +248,7 @@ var SqlMapChecksums = map[string]string{
"schema_version_33": "bf38514efeb6c12511f41b1cc484f92722240b0a6ae874c32a958dfea3433d02",
"schema_version_34": "1a3e036f652fc98b7564a27013f04e1eb36dd0d68893c723168f134dc1065822",
"schema_version_35": "162a55df78eed4b9c9c141878132d5f1d97944b96f35a79e38f55716cdd6b3d2",
"schema_version_36": "8164be7818268ad3d4bdcad03a7868b58e32b27cde9b4f056cd82f7b182a0722",
"schema_version_4": "216ea3a7d3e1704e40c797b5dc47456517c27dbb6ca98bf88812f4f63d74b5d9",
"schema_version_5": "46397e2f5f2c82116786127e9f6a403e975b14d2ca7b652a48cd1ba843e6a27c",
"schema_version_6": "9d05b4fb223f0e60efc716add5048b0ca9c37511cf2041721e20505d6d798ce4",

@ -0,0 +1 @@
CREATE INDEX entries_feed_id_status_hash_idx ON entries USING btree (feed_id, status, hash);

@ -127,6 +127,13 @@ func (h *Handler) RefreshFeed(userID, feedID int64) error {
return requestErr
}
if h.store.AnotherFeedURLExists(userID, originalFeed.ID, response.EffectiveURL) {
storeErr := errors.NewLocalizedError(errDuplicate, response.EffectiveURL)
originalFeed.WithError(storeErr.Error())
h.store.UpdateFeedError(originalFeed)
return storeErr
}
if originalFeed.IgnoreHTTPCache || response.IsModified(originalFeed.EtagHeader, originalFeed.LastModifiedHeader) {
logger.Debug("[Handler:RefreshFeed] Feed #%d has been modified", feedID)
@ -141,7 +148,7 @@ func (h *Handler) RefreshFeed(userID, feedID int64) error {
processor.ProcessFeedEntries(h.store, originalFeed)
// We don't update existing entries when the crawler is enabled (we crawl only inexisting entries).
if storeErr := h.store.UpdateEntries(originalFeed.UserID, originalFeed.ID, originalFeed.Entries, !originalFeed.Crawler); storeErr != nil {
if storeErr := h.store.RefreshFeedEntries(originalFeed.UserID, originalFeed.ID, originalFeed.Entries, !originalFeed.Crawler); storeErr != nil {
originalFeed.WithError(storeErr.Error())
h.store.UpdateFeedError(originalFeed)
return storeErr

@ -5,6 +5,7 @@
package storage // import "miniflux.app/storage"
import (
"database/sql"
"fmt"
"miniflux.app/model"
@ -55,8 +56,7 @@ func (s *Storage) GetEnclosures(entryID int64) (model.EnclosureList, error) {
return enclosures, nil
}
// CreateEnclosure creates a new attachment.
func (s *Storage) CreateEnclosure(enclosure *model.Enclosure) error {
func (s *Storage) createEnclosure(tx *sql.Tx, enclosure *model.Enclosure) error {
if enclosure.URL == "" {
return nil
}
@ -69,7 +69,7 @@ func (s *Storage) CreateEnclosure(enclosure *model.Enclosure) error {
RETURNING
id
`
err := s.db.QueryRow(
err := tx.QueryRow(
query,
enclosure.URL,
enclosure.Size,
@ -85,22 +85,15 @@ func (s *Storage) CreateEnclosure(enclosure *model.Enclosure) error {
return nil
}
// IsEnclosureExists checks if an attachment exists.
func (s *Storage) IsEnclosureExists(enclosure *model.Enclosure) bool {
var result int
query := `SELECT 1 FROM enclosures WHERE user_id=$1 AND entry_id=$2 AND url=$3`
s.db.QueryRow(query, enclosure.UserID, enclosure.EntryID, enclosure.URL).Scan(&result)
return result >= 1
}
func (s *Storage) updateEnclosures(tx *sql.Tx, userID, entryID int64, enclosures model.EnclosureList) error {
// We delete all attachments in the transaction to keep only the ones visible in the feeds.
if _, err := tx.Exec(`DELETE FROM enclosures WHERE user_id=$1 AND entry_id=$2`, userID, entryID); err != nil {
return err
}
// UpdateEnclosures add missing attachments while updating a feed.
func (s *Storage) UpdateEnclosures(enclosures model.EnclosureList) error {
for _, enclosure := range enclosures {
if !s.IsEnclosureExists(enclosure) {
err := s.CreateEnclosure(enclosure)
if err != nil {
return err
}
if err := s.createEnclosure(tx, enclosure); err != nil {
return err
}
}

@ -5,6 +5,7 @@
package storage // import "miniflux.app/storage"
import (
"database/sql"
"errors"
"fmt"
"time"
@ -74,7 +75,7 @@ func (s *Storage) UpdateEntryContent(entry *model.Entry) error {
}
// createEntry add a new entry.
func (s *Storage) createEntry(entry *model.Entry) error {
func (s *Storage) createEntry(tx *sql.Tx, entry *model.Entry) error {
query := `
INSERT INTO entries
(title, hash, url, comments_url, published_at, content, author, user_id, feed_id, changed_at, document_vectors)
@ -83,7 +84,7 @@ func (s *Storage) createEntry(entry *model.Entry) error {
RETURNING
id, status
`
err := s.db.QueryRow(
err := tx.QueryRow(
query,
entry.Title,
entry.Hash,
@ -103,7 +104,7 @@ func (s *Storage) createEntry(entry *model.Entry) error {
for i := 0; i < len(entry.Enclosures); i++ {
entry.Enclosures[i].EntryID = entry.ID
entry.Enclosures[i].UserID = entry.UserID
err := s.CreateEnclosure(entry.Enclosures[i])
err := s.createEnclosure(tx, entry.Enclosures[i])
if err != nil {
return err
}
@ -115,7 +116,7 @@ func (s *Storage) createEntry(entry *model.Entry) error {
// updateEntry updates an entry when a feed is refreshed.
// Note: we do not update the published date because some feeds do not contains any date,
// it default to time.Now() which could change the order of items on the history page.
func (s *Storage) updateEntry(entry *model.Entry) error {
func (s *Storage) updateEntry(tx *sql.Tx, entry *model.Entry) error {
query := `
UPDATE
entries
@ -131,7 +132,7 @@ func (s *Storage) updateEntry(entry *model.Entry) error {
RETURNING
id
`
err := s.db.QueryRow(
err := tx.QueryRow(
query,
entry.Title,
entry.URL,
@ -152,15 +153,19 @@ func (s *Storage) updateEntry(entry *model.Entry) error {
enclosure.EntryID = entry.ID
}
return s.UpdateEnclosures(entry.Enclosures)
return s.updateEnclosures(tx, entry.UserID, entry.ID, entry.Enclosures)
}
// entryExists checks if an entry already exists based on its hash when refreshing a feed.
func (s *Storage) entryExists(entry *model.Entry) bool {
var result int
query := `SELECT 1 FROM entries WHERE user_id=$1 AND feed_id=$2 AND hash=$3`
s.db.QueryRow(query, entry.UserID, entry.FeedID, entry.Hash).Scan(&result)
return result == 1
func (s *Storage) entryExists(tx *sql.Tx, entry *model.Entry) bool {
var result bool
tx.QueryRow(
`SELECT true FROM entries WHERE user_id=$1 AND feed_id=$2 AND hash=$3`,
entry.UserID,
entry.FeedID,
entry.Hash,
).Scan(&result)
return result
}
// cleanupEntries deletes from the database entries marked as "removed" and not visible anymore in the feed.
@ -180,31 +185,44 @@ func (s *Storage) cleanupEntries(feedID int64, entryHashes []string) error {
return nil
}
// UpdateEntries updates a list of entries while refreshing a feed.
func (s *Storage) UpdateEntries(userID, feedID int64, entries model.Entries, updateExistingEntries bool) (err error) {
// RefreshFeedEntries updates feed entries while refreshing a feed.
func (s *Storage) RefreshFeedEntries(userID, feedID int64, entries model.Entries, updateExistingEntries bool) (err error) {
var entryHashes []string
for _, entry := range entries {
entry.UserID = userID
entry.FeedID = feedID
if s.entryExists(entry) {
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf(`store: unable to start transaction: %v`, err)
}
if s.entryExists(tx, entry) {
if updateExistingEntries {
err = s.updateEntry(entry)
err = s.updateEntry(tx, entry)
}
} else {
err = s.createEntry(entry)
err = s.createEntry(tx, entry)
}
if err != nil {
tx.Rollback()
return err
}
if err := tx.Commit(); err != nil {
return fmt.Errorf(`store: unable to commit transaction: %v`, err)
}
entryHashes = append(entryHashes, entry.Hash)
}
if err := s.cleanupEntries(feedID, entryHashes); err != nil {
logger.Error(`store: feed #%d: %v`, feedID, err)
}
go func() {
if err := s.cleanupEntries(feedID, entryHashes); err != nil {
logger.Error(`store: feed #%d: %v`, feedID, err)
}
}()
return nil
}

@ -68,6 +68,14 @@ func (s *Storage) FeedURLExists(userID int64, feedURL string) bool {
return result
}
// AnotherFeedURLExists checks if the user a duplicated feed.
func (s *Storage) AnotherFeedURLExists(userID, feedID int64, feedURL string) bool {
var result bool
query := `SELECT true FROM feeds WHERE id <> $1 AND user_id=$2 AND feed_url=$3`
s.db.QueryRow(query, feedID, userID, feedURL).Scan(&result)
return result
}
// CountFeeds returns the number of feeds that belongs to the given user.
func (s *Storage) CountFeeds(userID int64) int {
var result int
@ -435,12 +443,21 @@ func (s *Storage) CreateFeed(feed *model.Feed) error {
feed.Entries[i].FeedID = feed.ID
feed.Entries[i].UserID = feed.UserID
if !s.entryExists(feed.Entries[i]) {
err := s.createEntry(feed.Entries[i])
if err != nil {
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf(`store: unable to start transaction: %v`, err)
}
if !s.entryExists(tx, feed.Entries[i]) {
if err := s.createEntry(tx, feed.Entries[i]); err != nil {
tx.Rollback()
return err
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf(`store: unable to commit transaction: %v`, err)
}
}
return nil

Loading…
Cancel
Save