Commit 8bc3b959 authored by ale's avatar ale

Initial commit

parents
package main
import (
"errors"
"log"
"net/mail"
"net/url"
"strings"
"time"
imap "github.com/emersion/go-imap"
"github.com/emersion/go-imap/client"
)
type connectionParams struct {
user string
password string
server string
mboxPrefix string
}
func parseImapURL(s string) (*connectionParams, error) {
u, err := url.Parse(s)
if err != nil {
return nil, err
}
if u.Scheme != "imaps" {
return nil, errors.New("unsupported scheme")
}
if u.User == nil {
return nil, errors.New("no user information in URL")
}
pw, ok := u.User.Password()
if !ok {
return nil, errors.New("no password specified")
}
pfx := strings.TrimSuffix(u.Path, "/")
if pfx == "" {
pfx = "INBOX"
}
return &connectionParams{
user: u.User.Username(),
password: pw,
server: u.Host,
mboxPrefix: pfx,
}, nil
}
func hasPrefix(s, prefix string) bool {
// Match IMAP mbox names.
return s == prefix || strings.HasPrefix(s, prefix+"/")
}
func stripPrefix(s, prefix string) string {
return strings.TrimPrefix(s, prefix+"/")
}
func addPrefix(s, prefix string) string {
if s == "" {
return prefix
}
return prefix + "/" + s
}
type imapSource struct {
params *connectionParams
client *client.Client
}
func connectIMAP(params *connectionParams) (*client.Client, error) {
c, err := client.DialTLS(params.server, nil)
if err != nil {
return nil, err
}
c.ErrorLog = &nilLogger{}
if err := c.Login(params.user, params.password); err != nil {
c.Logout()
return nil, err
}
return c, nil
}
func newIMAPSource(u string) (*imapSource, error) {
params, err := parseImapURL(u)
if err != nil {
return nil, err
}
client, err := connectIMAP(params)
if err != nil {
return nil, err
}
return &imapSource{
params: params,
client: client,
}, nil
}
func (s *imapSource) Close() {
s.client.Logout()
}
func (s *imapSource) ListFolders() ([]string, error) {
mailboxCh := make(chan *imap.MailboxInfo, 10)
done := make(chan error, 1)
go func() {
done <- s.client.List("", "*", mailboxCh)
}()
var mboxes []string
for m := range mailboxCh {
if !hasPrefix(m.Name, s.params.mboxPrefix) {
continue
}
relName := stripPrefix(m.Name, s.params.mboxPrefix)
mboxes = append(mboxes, relName)
}
return mboxes, <-done
}
type imapMessage struct {
msg *imap.Message
messageID string
}
func (m *imapMessage) Date() time.Time {
return m.msg.InternalDate
}
func (m *imapMessage) MessageID() string {
return m.messageID
}
func (s *imapSource) ListMessages(folder string) ([]message, error) {
mbox, err := s.client.Select(addPrefix(folder, s.params.mboxPrefix), true)
if err != nil {
log.Printf("Select error for %s: %v", folder, err)
return nil, err
}
if mbox.Messages == 0 {
return nil, nil
}
// Fetch attributes for all messages so we can check the destination.
seqset := new(imap.SeqSet)
seqset.AddRange(1, mbox.Messages)
attrs := []string{"INTERNALDATE", "BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)]"}
messages := make(chan *imap.Message, 100)
done := make(chan error, 1)
go func() {
done <- s.client.Fetch(seqset, attrs, messages)
}()
var out []message
for msg := range messages {
body := msg.GetBody("BODY[HEADER.FIELDS (MESSAGE-ID)]")
if body == nil {
continue
}
parsed, err := mail.ReadMessage(body)
if err != nil {
log.Printf("ReadMessage error: %v", err)
continue
}
messageID := parseMessageID(parsed.Header.Get("Message-Id"))
if messageID == "" {
//log.Printf("message without ID: %v", parsed.Header)
continue
}
out = append(out, &imapMessage{
msg: msg,
messageID: messageID,
})
}
return out, <-done
}
func (s *imapSource) fetchBatch(batch []message, ch chan *mail.Message) {
set := new(imap.SeqSet)
for _, msg := range batch {
set.AddNum(msg.(*imapMessage).msg.SeqNum)
}
attrs := []string{"BODY.PEEK[]"}
messages := make(chan *imap.Message, 100)
done := make(chan error, 1)
go func() {
done <- s.client.Fetch(set, attrs, messages)
}()
for msg := range messages {
body := msg.GetBody("BODY[]")
if body == nil {
continue
}
parsed, err := mail.ReadMessage(body)
if err != nil {
log.Printf("ReadMessage error: %v", err)
continue
}
ch <- parsed
}
if err := <-done; err != nil {
log.Printf("imap Fetch error: %v", err)
}
}
func (s *imapSource) FetchMessages(folder string, msgs []message) <-chan *mail.Message {
_, err := s.client.Select(addPrefix(folder, s.params.mboxPrefix), true)
if err != nil {
log.Printf("Select error for %s: %v", folder, err)
return nil
}
batchSize := 30
ch := make(chan *mail.Message, batchSize*2)
go func() {
defer close(ch)
for i := 0; i < len(msgs); i += batchSize {
j := i + batchSize
if j > len(msgs) {
j = len(msgs)
}
batch := msgs[i:j]
s.fetchBatch(batch, ch)
}
}()
return ch
}
type nilLogger struct{}
func (l nilLogger) Printf(format string, v ...interface{}) {}
func (l nilLogger) Println(v ...interface{}) {}
package main
import (
"bufio"
"bytes"
"crypto/rand"
"encoding/hex"
"io"
"net/mail"
"net/textproto"
"os"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"time"
)
// Maildir type, inspired by github.com/luksen/maildir but without the
// accidentally quadratic behavior.
type Maildir string
// Create a maildir if it does not already exist.
func (d Maildir) Create() error {
for _, sub := range []string{"cur", "new", "tmp"} {
p := filepath.Join(string(d), sub)
if _, err := os.Stat(p); os.IsNotExist(err) {
if err := os.MkdirAll(p, 0700); err != nil {
return err
}
}
}
return nil
}
// Unseen moves messages from the 'new' directory to 'cur', and
// returns their list.
func (d Maildir) Unseen() ([]string, error) {
f, err := os.Open(filepath.Join(string(d), "new"))
if err != nil {
return nil, err
}
defer f.Close()
names, err := f.Readdirnames(0)
if err != nil {
return nil, err
}
var keys []string
for _, n := range names {
if n[0] == '.' {
continue
}
if err := os.Rename(filepath.Join(string(d), "new", n), filepath.Join(string(d), "cur", n)); err == nil {
keys = append(keys, n)
}
}
return keys, nil
}
// Keys returns keys for all messages in a folder.
func (d Maildir) Keys() ([]string, error) {
f, err := os.Open(filepath.Join(string(d), "cur"))
if err != nil {
return nil, err
}
defer f.Close()
names, err := f.Readdirnames(0)
return names, err
}
// Header returns a parsed message header.
func (d Maildir) Header(key string) (mail.Header, error) {
filename := filepath.Join(string(d), "cur", key)
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer file.Close()
tp := textproto.NewReader(bufio.NewReader(file))
hdr, err := tp.ReadMIMEHeader()
if err != nil {
return nil, err
}
return mail.Header(hdr), nil
}
// Message returns a Message by key.
func (d Maildir) Message(key string) (*mail.Message, error) {
filename := filepath.Join(string(d), "cur", key)
r, err := os.Open(filename)
if err != nil {
return nil, err
}
defer r.Close()
buf := new(bytes.Buffer)
_, err = io.Copy(buf, r)
if err != nil {
return nil, err
}
return mail.ReadMessage(buf)
}
// Delivery tracks the delivery of a single message to a maildir.
type Delivery struct {
curPath, dstPath string
f *os.File
}
// Close the delivery successfully.
func (d *Delivery) Close() error {
if err := d.f.Close(); err != nil {
return err
}
return os.Rename(d.curPath, d.dstPath)
}
// Abort the delivery (remove temporary file).
func (d *Delivery) Abort() {
d.f.Close()
os.Remove(d.curPath)
}
// Write the message body.
func (d *Delivery) Write(b []byte) (int, error) {
return d.f.Write(b)
}
// NewDelivery starts writing a new message to the maildir.
func (d Maildir) NewDelivery() (*Delivery, error) {
key, err := maildirKey()
if err != nil {
return nil, err
}
curPath := filepath.Join(string(d), "tmp", key)
dstPath := filepath.Join(string(d), "new", key)
f, err := os.Create(curPath)
if err != nil {
return nil, err
}
return &Delivery{
f: f,
curPath: curPath,
dstPath: dstPath,
}, nil
}
var id int64
// maildirKey generates a new unique key as described in the Maildir
// specification. For the third part of the key (delivery identifier)
// it uses an internal counter, the process id and a cryptographical
// random number to ensure uniqueness among messages delivered in the
// same second.
func maildirKey() (string, error) {
var key string
key += strconv.FormatInt(time.Now().Unix(), 10)
key += "."
host, err := os.Hostname()
if err != nil {
return "", err
}
host = strings.Replace(host, "/", "\057", -1)
host = strings.Replace(host, ":", "\072", -1)
key += host
key += "."
key += strconv.FormatInt(int64(os.Getpid()), 10)
key += strconv.FormatInt(id, 10)
atomic.AddInt64(&id, 1)
bs := make([]byte, 10)
_, err = io.ReadFull(rand.Reader, bs)
if err != nil {
return "", err
}
key += hex.EncodeToString(bs)
return key, nil
}
type maildirDest struct {
*maildirSource
}
func newMaildirDest(root string) *maildirDest {
return &maildirDest{
&maildirSource{
root: root,
},
}
}
func (d *maildirDest) CreateFolder(folder string) error {
dir := Maildir(filepath.Join(d.root, folder))
return dir.Create()
}
func (d *maildirDest) WriteMessage(folder string, body []byte) error {
dir := Maildir(filepath.Join(d.root, folder))
delivery, err := dir.NewDelivery()
if err != nil {
return err
}
l := len(body)
for l > 0 {
n, err := delivery.Write(body)
if err != nil {
delivery.Abort()
return err
}
l -= n
body = body[n:]
}
return delivery.Close()
}
type maildirMessage struct {
key string
hdr mail.Header
}
func (m *maildirMessage) Date() time.Time {
t, _ := mail.ParseDate(m.hdr.Get("Date"))
return t
}
func (m *maildirMessage) MessageID() string {
return parseMessageID(m.hdr.Get("Message-ID"))
}
type maildirSource struct {
root string
}
func (s *maildirSource) Close() {}
func isMaildir(path string) bool {
stat, err := os.Stat(filepath.Join(path, "cur"))
return err == nil && stat.IsDir()
}
func (s *maildirSource) ListFolders() ([]string, error) {
var mboxes []string
root := strings.TrimSuffix(s.root, "/")
err := filepath.Walk(
root,
func(path string, fi os.FileInfo, err error) error {
if err != nil {
return nil
}
var relPath string
if path != root {
relPath = strings.TrimPrefix(path, root+"/")
}
if isMaildir(path) {
mboxes = append(mboxes, relPath)
}
return nil
},
)
return mboxes, err
}
func (s *maildirSource) ListMessages(folder string) ([]message, error) {
dir := Maildir(filepath.Join(s.root, folder))
var msgs []message
keys, err := dir.Keys()
if err != nil {
return nil, err
}
unseen, _ := dir.Unseen()
if len(unseen) > 0 {
keys = append(keys, unseen...)
}
for _, key := range keys {
hdr, err := dir.Header(key)
if err != nil {
continue
}
msgs = append(msgs, &maildirMessage{key: key, hdr: hdr})
}
return msgs, nil
}
func (s *maildirSource) FetchMessages(folder string, msgs []message) <-chan *mail.Message {
dir := Maildir(filepath.Join(s.root, folder))
ch := make(chan *mail.Message, 100)
go func() {
defer close(ch)
for _, m := range msgs {
msg, err := dir.Message(m.(*maildirMessage).key)
if err == nil {
ch <- msg
}
}
}()
return ch
}
package main
import (
"bytes"
"errors"
"flag"
"fmt"
"io"
"log"
"net/mail"
"os"
"regexp"
"runtime/pprof"
"strings"
"sync"
"time"
)
var (
srcURL = flag.String("src", "", "source URL")
dstURL = flag.String("dst", "", "destination URL")
excludes = flag.String("exclude", "", "folders to exclude (comma-sep regexp list)")
cpuprofile = flag.String("cpuprofile", "", "write CPU profile here")
)
// A source of messages.
type mailboxSource interface {
ListFolders() ([]string, error)
ListMessages(string) ([]message, error)
FetchMessages(string, []message) <-chan *mail.Message
Close()
}
// A destination for messages.
type mailboxDestination interface {
mailboxSource
CreateFolder(string) error
WriteMessage(string, []byte) error
}
// An opaque message type (that is passed between ListMessages and
// FetchMessages).
type message interface {
Date() time.Time
MessageID() string
}
func newSource(u string) (mailboxSource, error) {
switch {
case strings.HasPrefix(u, "imaps://"):
return newIMAPSource(u)
case strings.HasPrefix(u, "maildir:"):
return &maildirSource{root: u[8:]}, nil
default:
return nil, errors.New("unsupported URL scheme")
}
}
func newDestination(u string) (mailboxDestination, error) {
switch {
case strings.HasPrefix(u, "maildir:"):
return newMaildirDest(u[8:]), nil
default:
return nil, errors.New("unsupported URL scheme")
}
}
type regexpList []*regexp.Regexp
func newRegexpList(patterns ...string) regexpList {
var l regexpList
for _, s := range patterns {
p := regexp.MustCompile("^" + s + "$")
l = append(l, p)
}
return l
}
func (l regexpList) Add(s string) regexpList {
return append(l, regexp.MustCompile("^"+s+"$"))
}
func (l regexpList) Match(s string) bool {
for _, p := range l {
if p.MatchString(s) {
return true
}
}
return false
}
func parseMessageID(s string) string {
if s != "" && s[0] == '<' {
return s[1 : len(s)-2]
}
return s
}
func messagesInFolder(src mailboxSource, folder string) (map[string]message, error) {
set := make(map[string]message)
msgs, err := src.ListMessages(folder)
if err != nil {
return nil, err
}
for _, m := range msgs {
set[m.MessageID()] = m
}
return set, nil
}
func dumpMessage(msg *mail.Message) []byte {
var buf bytes.Buffer
for hdr, values := range msg.Header {
for _, value := range values {
fmt.Fprintf(&buf, "%s: %s\r\n", hdr, value)
}
}
fmt.Fprintf(&buf, "\r\n")
if _, err := io.Copy(&buf, msg.Body); err != nil {
return nil
}
return buf.Bytes()
}
func syncFolder(src mailboxSource, dst mailboxDestination, folder string) error {
// Build set of Message-IDs on source and destination in
// parallel, then compute set difference.
var srcMsgs, dstMsgs map[string]message
var err error
var wg sync.WaitGroup
wg.Add(2)
go func() {
srcMsgs, err = messagesInFolder(src, folder)
wg.Done()
}()
go func() {
// Ignore errors on destination, it's ok if the folder
// does not exist for example.
dstMsgs, _ = messagesInFolder(dst, folder)
wg.Done()
}()
wg.Wait()
if err != nil {
return err
}
srcMsgCount := len(srcMsgs)
dstMsgCount := len(dstMsgs)
for _, m := range dstMsgs {
delete(srcMsgs, m.MessageID())
}
// Copy what's left.
var toCopy []message
for _, m := range srcMsgs {
toCopy = append(toCopy, m)
}
if len(toCopy) == 0 {
log.Printf("mailbox %s: nothing to do (%d src, %d dst)", folder, srcMsgCount, dstMsgCount)
return nil
}
if err := dst.CreateFolder(folder); err != nil {
log.Printf("warning: create folder %s: %v", folder, err)
}
copiedMsgCount := 0
for m := range src.FetchMessages(folder, toCopy) {
if err := dst.WriteMessage(folder, dumpMessage(m)); err != nil {
return err
}
copiedMsgCount++
}
log.Printf("synced mailbox %s (%d src, %d dst, %d/%d copied)", folder, srcMsgCount, dstMsgCount, copiedMsgCount, len(toCopy))
return nil
}
var workerWg sync.WaitGroup
func syncWorker(srcurl, dsturl string, mboxCh <-chan string) {
defer workerWg.Done()
src, err := newSource(srcurl)
if err != nil {
log.Printf("source connection error: %v", err)
return
}
defer src.Close()
dst, err := newDestination(dsturl)
if err != nil {
log.Printf("target connection error: %v", err)
return
}
defer dst.Close()
for folder := range mboxCh {
if err := syncFolder(src, dst, folder); err != nil {
log.Printf("error processing mailbox %s: %v", folder, err)
}
}
}
var (
syncConcurrency = 5
excludeMboxes = newRegexpList("Trash", "[sS]pam", "Junk", "Drafts")
)
func listSourceFolders(srcurl string) ([]string, error) {
src, err := newSource(srcurl)
if err != nil {
return nil