package liber import ( "bytes" "encoding/json" "fmt" "io" "log" "mime/multipart" "net/http" "os" "path/filepath" "sync" "time" ) const ( diffChunkSize = 1000 uploadConcurrency = 3 ) type SyncClient interface { DiffRequest(*diffRequest) (*diffResponse, error) SendBook(*Book, []*File) error } type remoteServer struct { client *http.Client remoteURL string } func NewRemoteServer(remoteURL string) *remoteServer { return &remoteServer{ remoteURL: remoteURL, client: &http.Client{}, } } // DiffRequest sends a diffRequest to the remote server. func (r *remoteServer) DiffRequest(diffreq *diffRequest) (*diffResponse, error) { var body bytes.Buffer if err := json.NewEncoder(&body).Encode(diffreq); err != nil { return nil, err } req, err := http.NewRequest("POST", r.remoteURL+"/api/sync/diff", &body) if err != nil { return nil, err } req.Header.Set("Content-Type", "application/json") resp, err := r.client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode != 200 { return nil, fmt.Errorf("HTTP error: %s", resp.Status) } var diffresp diffResponse if err := json.NewDecoder(resp.Body).Decode(&diffresp); err != nil { return nil, err } return &diffresp, nil } func addFilePart(w *multipart.Writer, varname, filename, mimeFilename string) error { file, err := os.Open(filename) if err != nil { return err } defer file.Close() part, err := w.CreateFormFile(varname, mimeFilename) if err != nil { return err } if _, err := io.Copy(part, file); err != nil { return err } return nil } // SendBook uploads a book to the remote server. func (r *remoteServer) SendBook(book *Book, files []*File) error { // Create a multipart request with the JSON-encoded metadata // and the actual file contents as two separate mime/multipart // sections. var body bytes.Buffer w := multipart.NewWriter(&body) part, err := w.CreateFormFile("meta", "meta.json") if err := json.NewEncoder(part).Encode(book.Metadata); err != nil { return err } for i, f := range files { varname := fmt.Sprintf("book%d", i) filename := fmt.Sprintf("%d%s", book.Id, f.FileType) if err := addFilePart(w, varname, f.Path, filename); err != nil { w.Close() return err } } if book.CoverPath != "" { if err := addFilePart(w, "cover", book.CoverPath, "cover.jpg"); err != nil { w.Close() return err } } if err := w.Close(); err != nil { return err } req, err := http.NewRequest("POST", r.remoteURL+"/api/sync/upload", &body) if err != nil { return err } req.Header.Add("Content-Type", w.FormDataContentType()) resp, err := r.client.Do(req) if err != nil { return err } resp.Body.Close() if resp.StatusCode != 200 { return fmt.Errorf("HTTP error: %s", resp.Status) } return nil } type uniqueIds struct { Id string Unique []string } type diffRequest struct { Candidates []uniqueIds } type diffResponse struct { Missing []string } func (db *Database) findMissing(srv SyncClient) chan string { missing := make(chan string, diffChunkSize) doRequest := func(req *diffRequest) { resp, err := srv.DiffRequest(req) if err != nil { log.Printf("DiffRequest(): %v", err) } else { for _, id := range resp.Missing { missing <- id } } } go func() { n := 0 var req diffRequest for iter := db.Scan(BookBucket); iter.Valid(); iter.Next() { var book Book if err := iter.Value(&book); err != nil { continue } req.Candidates = append(req.Candidates, uniqueIds{ Id: book.Id.String(), Unique: book.Metadata.Uniques(), }) n++ if n%diffChunkSize == 0 { doRequest(&req) req.Candidates = nil } } if req.Candidates != nil { doRequest(&req) } close(missing) }() return missing } // Sync the local database with a remote one. This is a one-way // synchronization: files missing on the remote side will be uploaded // to it. func (db *Database) Sync(remote SyncClient) error { var wg sync.WaitGroup ch := db.findMissing(remote) for i := 0; i < uploadConcurrency; i++ { wg.Add(1) go func() { for id := range ch { bookid := ParseID(id) if book, err := db.GetBook(bookid); err == nil { if files, err := db.GetBookFiles(bookid); err == nil { if err := remote.SendBook(book, files); err != nil { log.Printf("SendBook(%d): %v", id, err) } } } } wg.Done() }() } wg.Wait() return nil } type syncServer struct { db *Database storage *FileStorage } func (l *syncServer) handleDiffRequest(w http.ResponseWriter, req *http.Request) { if req.Header.Get("Content-Type") != "application/json" { http.Error(w, "Bad Request", http.StatusBadRequest) return } var diffreq diffRequest if err := json.NewDecoder(req.Body).Decode(&diffreq); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } var resp diffResponse for _, c := range diffreq.Candidates { // For every unique ID, decode it into a template // Metadata object and see if we can find a match in // the database. found := false for _, unique := range c.Unique { meta, err := parseUniqueId(unique) if err != nil { continue } if _, err := l.db.Find(meta); err == nil { found = true break } } if !found { resp.Missing = append(resp.Missing, c.Id) } } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(&resp) } func (l *syncServer) handleSyncUpload(w http.ResponseWriter, req *http.Request) { mf, _, err := req.FormFile("meta") if err != nil { log.Printf("request with no 'meta' field") http.Error(w, err.Error(), http.StatusBadRequest) return } var md Metadata if err := json.NewDecoder(mf).Decode(&md); err != nil { log.Printf("error decoding metadata: %v", err) http.Error(w, err.Error(), http.StatusBadRequest) return } // Check again that we don't have this book. if _, err := l.db.Find(&md); err == nil { log.Printf("attempt to upload duplicate: %#v", &md) http.Error(w, "Duplicate", http.StatusConflict) return } // Create a new Book with a local ID and path. bookid := NewID() book := &Book{ Id: bookid, Metadata: &md, } // Save the file data to our local storage. for i := 0; i < 10; i++ { // Use a temporary file, we'll know the right // extension to use only after having parsed the // file's MIME header. tmppath := l.storage.Path(fmt.Sprintf("%d.%d.tmp", bookid, i)) varname := fmt.Sprintf("book%d", i) size, hdr, err := savePart(req, varname, l.storage, tmppath) if err == http.ErrMissingFile { break } else if err != nil { log.Printf("error saving local file: %v", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } filetype := filepath.Ext(hdr.Filename) path := l.storage.Path(fmt.Sprintf("%d.%d%s", bookid, i, filetype)) if err := l.storage.Rename(tmppath, path); err != nil { log.Printf("error moving local file: %v", err) } file := &File{ Path: path, FileType: filetype, Mtime: time.Now(), Size: size, Id: bookid, } if err := l.db.PutFile(file); err != nil { log.Printf("error saving file to the database: %v", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } } // If the request contains a cover image, save that as well. coverPath := l.storage.Path(fmt.Sprintf("%s.cover.png", book.Id)) if _, _, err := savePart(req, "cover", l.storage, coverPath); err == nil { book.CoverPath = coverPath } if err := l.db.PutBook(book); err != nil { log.Printf("error saving book to the database: %v", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } // Empty 200 OK response. w.WriteHeader(200) } func savePart(req *http.Request, fieldname string, storage *FileStorage, outname string) (int64, *multipart.FileHeader, error) { f, hdr, err := req.FormFile(fieldname) if err != nil { return 0, nil, err } outf, err := storage.Create(outname) if err != nil { return 0, nil, err } defer outf.Close() n, err := io.Copy(outf, f) if err != nil { return 0, nil, err } return n, hdr, nil }