import {
  Observable,
  from,
  of,
  concat,
  MonoTypeOperatorFunction,
  Subject,
  ReplaySubject, BehaviorSubject, interval,
} from 'rxjs'
import AWS from 'aws-sdk/global'
import S3 from 'aws-sdk/clients/s3'
import {
  map,
  tap,
  filter,
  reduce,
  retryWhen,
  delay,
  take,
  mergeMap,
  shareReplay,
  endWith,
  switchMap, concatMap,
} from 'rxjs/operators'
import { v4 as uuidv4 } from 'uuid'

AWS.config.correctClockSkew = true

let awsIdentityId = ''
let bucketName = ''
let lastAwsCredRefresh = 0
const awsCredRefreshTime = 1000 * 60 * 20

interface AwsCredentials {
  identityId: string
  token: string
  identityPoolId: string
  developerProvidedName: string
  bucketName: string
  region: string
}

function getS3(): S3 {
  return new S3({
    params: { Bucket: bucketName },
  })
}

export function authenticateAws(): Promise<void> {
  if (Date.now() - lastAwsCredRefresh < awsCredRefreshTime) {
    return Promise.resolve()
  }
  return fetch('/users/cognito/', {
    method: 'GET',
    credentials: 'same-origin',
    headers: {
      'Content-Type': 'application/json',
    },
  })
    .then(x => x.json())
    .then(x => x as AwsCredentials)
    .then(rawCredentials => {
      awsIdentityId = rawCredentials.identityId
      bucketName = rawCredentials.bucketName

      AWS.config.region = rawCredentials.region
      const credentials = new AWS.CognitoIdentityCredentials({
        IdentityId: rawCredentials.identityId,
        IdentityPoolId: rawCredentials.identityPoolId,
        Logins: {
          'cognito-identity.amazonaws.com': rawCredentials.token,
        },
      })
      AWS.config.credentials = credentials
      AWS.config.maxRetries = 1
      lastAwsCredRefresh = Date.now()
      return credentials.getPromise()
    })
}

export function ensureAwsAuthenticated<T>(
  observable: Observable<T>
): Observable<T> {
  return observable.pipe(
    switchMap(x => from(authenticateAws()).pipe(map(() => x)))
  )
}

const mb5 = 5242880
// const mb5 = 10000000
// cons      5242880

export function blob5mb(blobs: Observable<Blob>): Observable<Blob> {
  let size = 0
  let position = 0
  let current: Array<Blob> = []
  let positionStart = 0
  return concat(
    blobs.pipe(
      tap(x =>
        console.log(`Added block from ${position} to ${position + x.size}`)
      ),
      tap(x => (size += x.size)),
      tap(x => (position += x.size)),
      tap(x => current.push(x)),
      filter(() => size > mb5),
      map((): Blob[] => {
        const result = current
        current = []
        size = 0
        console.log(`Emitting multiblock from ${positionStart} to ${position} of bytes ${result.reduce((a: number, b) => a + b.size, 0)}`)
        positionStart = position
        return result
      })
    ),
    of(1).pipe(
      map((): Blob[] => current),
      filter(x => x.length > 0),
      tap((x): void =>
        console.log(`Emitting final block from ${positionStart} to ${position} of bytes ${x.reduce((a: number, b) => a + b.size, 0)}`)
      )
    )
  ).pipe(map(x => new Blob(x, { type: x[0] ? x[0].type : undefined })))
}

interface UploadStartAndParts {
  start: S3.Types.CreateMultipartUploadOutput
  parts: Array<S3.CompletedPart>
}

export function retryWithDelay<T>(): MonoTypeOperatorFunction<T> {
  return (x): Observable<T> => {
    return x.pipe(retryWhen(e => e.pipe(delay(5000))))
  }
}

export interface VideoUploading {
  fullUrl: Observable<string>
  partCompletion: Observable<Observable<number>>
  info: Observable<AwsUploadInfo>
  awaiting: Observable<number>
  blobs: Array<Blob>
}

export interface WholeVideoUploading {
  fullUrl: Observable<string>
  progress: Observable<number>
}

export function s3AccessLink(keyOrUrl: string): string {
  const s3 = getS3()
  const index = keyOrUrl.indexOf('.com/')
  const key = keyOrUrl.substring(index + 4)
  return s3.getSignedUrl('getObject', { Bucket: bucketName, Key: key })
}

const virtualMode = false

export function s3StreamedUpload(
  blobs: Observable<Blob>,
  contentType: string,
  extension: string
): VideoUploading {
  if (virtualMode) {
    return alt(of({
      Bucket: 'S3.BucketName',
      Key: 'S3.ObjectKey',
      UploadId: 'S3.MultipartUploadId'
    }), blobs)
  }
  const s3 = getS3()
  const fileName = uuidv4() + '.' + extension
  const key = 'media/' + awsIdentityId + '/' + fileName
  return resumeS3StreamedUpload(
    from(authenticateAws()).pipe(
      switchMap(() =>
        s3
          .createMultipartUpload({
            Bucket: bucketName,
            Key: key,
            ContentType: contentType,
          })
          .promise()
      ),
      map(x => {
        return {
          Bucket: x.Bucket!,
          Key: x.Key!,
          UploadId: x.UploadId!,
        }
      })
    ),
    blobs
  )
}

export function s3StreamedUploadFromExisting(
  key: string,
  existingUploadId: string,
  blobs: Observable<Blob>
): VideoUploading {
  if (virtualMode)
    return {
      fullUrl: blobs.pipe(
        filter(() => false),
        endWith('fullUrl'),
        map(x => x as string),
        shareReplay(1)
      ),
      partCompletion: of(of(1)),
      info: of({
        Bucket: 'bucket',
        Key: key,
        UploadId: existingUploadId,
      }),
      awaiting: of(0),
      blobs: []
    }
  console.log('Continuing existing recording: ', key, existingUploadId)
  const s3 = getS3()
  return resumeS3StreamedUpload(
    from(authenticateAws()).pipe(
      switchMap(() =>
        from(
          s3
            .listParts({
              Bucket: bucketName,
              Key: key,
              UploadId: existingUploadId,
            })
            .promise()
        )
      ),
      map(x => {
        console.log('We have existing parts: ', x.Parts)
        return {
          Bucket: x.Bucket!,
          Key: x.Key!,
          UploadId: x.UploadId!,
          Parts: x.Parts!.map(y => {
            return {
              ETag: y.ETag,
              PartNumber: y.PartNumber,
            }
          }),
        } as AwsUploadInfo
      })
    ),
    blobs
  )
}

export interface AwsUploadInfo {
  Bucket: S3.BucketName
  Key: S3.ObjectKey
  UploadId: S3.MultipartUploadId
  Parts?: Array<S3.CompletedPart>
}

export function resumeS3StreamedUpload(
  startInfo: Observable<AwsUploadInfo>,
  blobs: Observable<Blob>
): VideoUploading {
  const s3 = getS3()
  let index = 1
  let knownData: AwsUploadInfo | null = null
  const infoSubject = new Subject<AwsUploadInfo>()
  const partCompletion = new Subject<Observable<number>>()
  const url = new Subject<string>()
  const awaiting = new BehaviorSubject(0)
  const gatheredBodies: Array<Blob> = []
  startInfo
    .pipe(
      take(1),
      tap(
        x => {
          console.log(`Started upload: `, x)
          knownData = x
          infoSubject.next(x)
          index = (x.Parts?.length ?? 0) + 1
        },
        e => {
          console.error(`Got error on start: ${e}`)
        }
      ),
      // retryWithDelay(),
      mergeMap(ongoing => {
        return blobs.pipe(
          tap(x => gatheredBodies.push(x)),
          blob5mb,
          ensureAwsAuthenticated,
          tap(() => {
            awaiting.next(awaiting.value + 1)
          }),
          concatMap(x => {
            const partNumber = index++
            console.log(`Uploading part ${partNumber}... ${x.size} bytes`)
            const upload = s3.uploadPart({
              UploadId: knownData!.UploadId,
              PartNumber: partNumber,
              Body: x,
              Bucket: knownData!.Bucket,
              Key: knownData!.Key,
            })
            const thisPartProgress = new Subject<number>()
            upload.on('httpError', err => {
              console.error('Got an error ', err)
            })
            upload.on('error', err => {
              console.error('Got an error ', err)
            })
            upload.on('complete', () => {
              thisPartProgress.complete()
              console.log('complete')
            })
            upload.on('httpUploadProgress', prog => {
              console.log('httpUploadProgress: ', prog.loaded / prog.total)
              thisPartProgress.next((prog.loaded / prog.total) * 0.95)
            })
            upload.on('httpDownloadProgress', prog => {
              console.log('httpDownloadProgress: ', prog.loaded / prog.total)
              if (prog.total == 0) {
                thisPartProgress.next(1.0)
              } else {
                thisPartProgress.next((prog.loaded / prog.total) * 0.05 + 0.95)
              }
            })
            partCompletion.next(thisPartProgress)
            return from(upload.promise()).pipe(
              take(1),
              // catchError(err => EMPTY),
              tap(
                x => {
                  console.log(`Uploaded part ${partNumber}: `, x)
                  awaiting.next(awaiting.value - 1)
                },
                e => {
                  console.error(`Got error on part ${partNumber} upload: `, e)
                  awaiting.next(awaiting.value - 1)
                }
              ),
              // retryWithDelay(),
              map((x): S3.CompletedPart => {
                return {
                  ETag: x.ETag!.replace('"', ''),
                  PartNumber: partNumber,
                }
              })
            )
          }),
          reduce<S3.CompletedPart, Array<S3.CompletedPart>>((a, b) => {
            a.push(b)
            return a
          }, []),
          map(newParts => {
            console.log('Combining old parts: ', knownData?.Parts, newParts)
            if (knownData?.Parts) {
              return knownData.Parts.concat(newParts)
            } else {
              return newParts
            }
          }),
          map(parts =>
            parts.sort((a, b) => (a.PartNumber || 1) - (b.PartNumber || 1))
          ),
          map(parts => {
            console.log('All parts: ', parts)
            return { start: ongoing, parts: parts } as UploadStartAndParts
          })
        )
      }),
      ensureAwsAuthenticated,
      mergeMap(ongoing =>
        from(
          s3
            .completeMultipartUpload({
              UploadId: ongoing.start.UploadId!,
              Bucket: ongoing.start.Bucket!,
              Key: ongoing.start.Key!,
              MultipartUpload: {
                Parts: ongoing.parts,
              },
            })
            .promise()
        ).pipe(
          take(1),
          tap(
            x => console.log(`Marking finish: ${x}`),
            e => console.error(`Got error on finish ${e}`)
          )
          // retryWithDelay()
        )
      ),
      tap({
        next: x => console.log(`Finished upload: `, x),
      }),
      map(x => x.Key!)
    )
    .subscribe(
      x => url.next(x),
      err => url.error(err),
      () => url.complete()
    )
  return {
    fullUrl: url.pipe(shareReplay(1)),
    partCompletion: partCompletion.pipe(shareReplay(1)),
    awaiting: awaiting,
    info: infoSubject.pipe(shareReplay(1)),
    blobs: gatheredBodies
  }
}

function alt(
  startInfo: Observable<AwsUploadInfo>,
  blobs: Observable<Blob>
): VideoUploading {
  let index = 1
  const partCompletion = new Subject<Observable<number>>()
  const infoSubject = new Subject<AwsUploadInfo>()
  const url = new Subject<string>()
  const awaiting = new BehaviorSubject(0)
  const gatheredBodies: Array<Blob> = []
  startInfo
    .pipe(
      take(1),
      tap(
        x => {
          console.log(`Started upload: `, x)
          index = (x.Parts?.length ?? 0) + 1
        },
        e => {
          console.error(`Got error on start: ${e}`)
        }
      ),
      // retryWithDelay(),
      mergeMap(ongoing => {
        return blobs.pipe(
          tap(x => gatheredBodies.push(x)),
          blob5mb,
          ensureAwsAuthenticated,
          tap(x => {
            console.log(`New part to upload enqueued`)
            awaiting.next(awaiting.value + 1)
          }),
          concatMap(x => {
            const partNumber = index++
            console.log(`Uploading part ${partNumber}... ${x.size} bytes`)
            const seconds = 15
            partCompletion.next(interval(1000).pipe(take(seconds), map(x => (x + 1) / seconds)))
            return of(true).pipe(
              delay(seconds * 1000),
              take(1),
              // catchError(err => EMPTY),
              tap(
                x => {
                  console.log(`Uploaded part ${partNumber}: `, x)
                  awaiting.next(awaiting.value - 1)
                },
                e => {
                  console.error(`Got error on part ${partNumber} upload: `, e)
                  awaiting.next(awaiting.value - 1)
                }
              ),
              // retryWithDelay(),
              map((x): S3.CompletedPart => {
                return {
                  ETag: '',
                  PartNumber: partNumber,
                }
              })
            )
          }),
          reduce<S3.CompletedPart, Array<S3.CompletedPart>>((a, b) => {
            a.push(b)
            return a
          }, []),
          map(parts =>
            parts.sort((a, b) => (a.PartNumber || 1) - (b.PartNumber || 1))
          ),
          map(parts => {
            console.log('All parts: ', parts)
            return { start: ongoing, parts: parts } as UploadStartAndParts
          })
        )
      }),
      ensureAwsAuthenticated,
      mergeMap(ongoing =>
        of(ongoing).pipe(
          take(1),
          tap(
            x => console.log(`Marking finish: ${x}`),
            e => console.error(`Got error on finish ${e}`)
          )
          // retryWithDelay()
        )
      ),
      tap({
        next: x => console.log(`Finished upload: `, x),
      }),
      map(x => 'key')
    )
    .subscribe(
      x => url.next(x),
      err => url.error(err),
      () => url.complete()
    )
  return {
    fullUrl: url.pipe(shareReplay(1)),
    partCompletion: partCompletion.pipe(shareReplay(1)),
    awaiting: awaiting,
    info: infoSubject.pipe(shareReplay(1)),
    blobs: gatheredBodies
  }
}

export function s3UploadAtOnce(
  blob: Blob,
  type: string,
  extension: string
): WholeVideoUploading {
  const s3 = getS3()
  const fileName = uuidv4() + '.' + extension
  const key = 'media/' + awsIdentityId + '/' + fileName
  const partCompletion = new ReplaySubject<number>(1)
  const url = new ReplaySubject<string>(1)
  const managedUpload = s3.upload({
    Bucket: bucketName,
    Key: key,
    ContentType: type,
    Body: blob,
  })
  managedUpload.on('httpUploadProgress', ev => {
    console.log('Progress: ', ev.loaded / ev.total)
    partCompletion.next(ev.loaded / ev.total)
  })
  from(managedUpload.promise()).subscribe(x => {
    console.log('Got', x.Key)
    url.next(x.Key!)
  })
  return { fullUrl: url, progress: partCompletion }
}
