Skip to content
Snippets Groups Projects
Commit 014a2c08 authored by ale's avatar ale
Browse files

first feedsaver implementation

parents
Branches
No related tags found
No related merge requests found
atom.go 0 → 100644
package feedsaver
import (
"encoding/xml"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
)
type Feed struct {
XMLName xml.Name `xml:"http://www.w3.org/2005/Atom feed"`
Title string `xml:"title"`
Entry []*Entry `xml:"entry"`
}
type Entry struct {
Title string `xml:"title"`
Links []Link `xml:"link"`
}
type Link struct {
Rel string `xml:"rel,attr"`
Href string `xml:"href,attr"`
}
func (f *Feed) Dump() {
fmt.Printf("[*] %s\n", f.Title)
for _, t := range f.Entry {
fmt.Printf("[-] %s\n\t%s\n", t.Links[0].Href, t.Title)
}
}
func parse_feed(body []byte) (*Feed, error) {
var feed Feed
err := xml.Unmarshal(body, &feed)
if err != nil {
return nil, err
}
return &feed, nil
}
func get_feed(url string) (*Feed, error) {
log.Printf("downloading Atom feed from %s", url)
r, err := http.Get(url)
defer r.Body.Close()
if err != nil {
return nil, err
}
if r.StatusCode != http.StatusOK {
return nil, errors.New(
fmt.Sprintf("http status %d", r.StatusCode))
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}
return parse_feed(body)
}
// Copyright 2012 Autistici/Inventati (info@autistici.org).
// A fake pubsubhubbub server that downloads HTML pages.
//
// The intended usage for this service involves an anti-censorship context.
// If you connect your blog to a feedsaver instance via pubsub, it will
// happily download an HTML page for each blog post you make -- this content
// can be easily made available anonymously, for instance running a simple
// HTTP server on a Tor Hidden Service.
package feedsaver
import (
"errors"
"expvar"
"flag"
"fmt"
"log"
"net/http"
"sync"
)
var (
num_parallel_fetches = flag.Int("parallel_fetches", 10, "Download concurrency")
publish_counter = expvar.NewInt("publish-requests")
download_counter = expvar.NewInt("total-downloads")
failures_counter = expvar.NewInt("failed-downloads")
)
type seen_map struct {
sync.Mutex
entries map[string]bool
}
type Feedsaver struct {
new_feeds chan string
new_urls chan string
quit chan bool
seen *seen_map
storage *Storage
}
// Create a new Feedsaver instance.
func NewFeedsaver(storage *Storage) *Feedsaver {
seen := &seen_map{entries: make(map[string]bool)}
f := Feedsaver{new_feeds: make(chan string, 100*(*num_parallel_fetches)),
new_urls: make(chan string, *num_parallel_fetches),
storage: storage,
seen: seen,
quit: make(chan bool)}
return &f
}
func (f *Feedsaver) scan_feed(url string) error {
// Download and parse the Atom feed.
feed, err := get_feed(url)
if err != nil {
log.Printf("error downloading feed %s (%s)",
url, err.Error())
failures_counter.Add(1)
return err
}
// Create a list of new entries (urls we haven't seen before).
f.seen.Lock()
new_links := []string{}
for _, t := range feed.Entry {
if len(t.Links) > 0 {
link_url := t.Links[0].Href
if !f.seen.entries[link_url] {
new_links = append(new_links, link_url)
}
}
}
f.seen.Unlock()
// Now submit them to the queue (this operation can block).
for _, link_url := range new_links {
f.new_urls <- link_url
}
return nil
}
func (f *Feedsaver) download(url string) error {
log.Printf("downloading %s", url)
// If we already have the file, don't download it again.
if f.storage.Check(url) {
log.Printf("%s already saved", url)
// On the other hand, if we got here it means that our
// 'seen' cache does not contain the url... so let's
// fall through instead of returning.
} else {
// Download the URL. If there are fatal errors at this
// stage, we won't mark the url as 'seen'. This gives
// us a chance to retry later (when the next post is
// submitted).
r, err := http.Get(url)
defer r.Body.Close()
if err != nil {
log.Printf("error downloading %s (%s)", url, err.Error())
failures_counter.Add(1)
return err
}
if r.StatusCode != http.StatusOK {
log.Printf("error downloading %s (http status %d)",
url, r.StatusCode)
failures_counter.Add(1)
return errors.New(
fmt.Sprintf("http status %d", r.StatusCode))
}
// Save the file to storage.
err = f.storage.Store(url, r.Body)
if err != nil {
log.Printf("error saving %s (%s)", url, err.Error())
failures_counter.Add(1)
return err
}
log.Printf("stored %s", url)
download_counter.Add(1)
}
// Register the url in the 'seen' database.
f.seen.Lock()
f.seen.entries[url] = true
f.seen.Unlock()
return nil
}
func (f *Feedsaver) run_feed_scanner(quit chan bool) {
for {
var url string
select {
case url = <-f.new_feeds:
go f.scan_feed(url)
case <-quit:
return
}
}
}
func (f *Feedsaver) run_downloader(quit chan bool) {
for {
var url string
select {
case url = <-f.new_urls:
go f.download(url)
case <-quit:
return
}
}
}
func (f *Feedsaver) publish_endpoint(w http.ResponseWriter, req *http.Request) {
log.Printf("got publish request from %s", req.RemoteAddr)
feed_url := req.FormValue("hub.url")
if feed_url == "" || req.FormValue("hub.mode") != "publish" {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
log.Printf("received new publish notification for %s", feed_url)
publish_counter.Add(1)
f.new_feeds <- feed_url
http.Error(w, "", 204)
}
func (f *Feedsaver) start_server(addr string) error {
http.HandleFunc(
"/publish",
func(w http.ResponseWriter, req *http.Request) {
f.publish_endpoint(w, req)
})
http.HandleFunc(
"/subscribe",
func(w http.ResponseWriter, req *http.Request) {
http.Error(w, "This hub does not accept subscriptions.",
http.StatusForbidden)
})
http.HandleFunc(
"/quit",
func(w http.ResponseWriter, req *http.Request) {
f.quit <- true
})
return http.ListenAndServe(addr, nil)
}
// Run the Feedsaver instance. Starts the HTTP server and does not return.
func (f *Feedsaver) Run(addr string) error {
feed_scanner_quit := make(chan bool)
downloader_quit := make(chan bool)
go f.run_feed_scanner(feed_scanner_quit)
go f.run_downloader(downloader_quit)
log.Printf("starting server on %s", addr)
go f.start_server(addr)
select {
case <-f.quit:
feed_scanner_quit <- true
downloader_quit <- true
}
log.Println("exiting...")
return nil
}
package feedsaver
import (
"fmt"
"io"
"os"
"path"
"strings"
)
type Storage struct {
Root string
}
func NewStorage(root string) *Storage {
s := Storage{Root: root}
return &s
}
func (s *Storage) Store(url string, r io.Reader) error {
dst_path := s.path_for_url(url)
// Create the parent directory if needed.
err := os.MkdirAll(path.Dir(dst_path), 0755)
if err != nil {
return err
}
// Open the file for writing and dump the request in it.
f, err := os.Create(dst_path)
if err != nil {
return err
}
defer f.Close()
_, err = io.Copy(f, r)
return err
}
func (s *Storage) Check(url string) bool {
if _, err := os.Stat(s.path_for_url(url)); err != nil {
return false
}
return true
}
func (s *Storage) path_for_url(url string) string {
if url[:7] == "http://" {
url = url[7:]
} else if url[:8] == "https://" {
url = url[8:]
}
url = strings.TrimRight(url, "/")
if len(url) > 5 && url[len(url)-5:len(url)] != ".html" {
url = fmt.Sprintf("%s.html", url)
}
return fmt.Sprintf("%s/%s", s.Root, url)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment