import {Observable} from 'rxjs';
import {finalize, flatMap} from 'rxjs/operators';

interface ActionDone{
    done(): void
}

export class ParallelExecutor{
    queue: Job[] = [];
    running: Job[] = [];
    constructor(public readonly maxConcurrency: number){}

    wrap<T>(source: Observable<T>): Observable<T>{
        return new Observable<ActionDone>(subscriber => {
            this.enqueue(done => {
                subscriber.next({done});
                subscriber.complete()
            })
        }).pipe(
            flatMap((value) => {
                return source.pipe(
                    finalize(() => {
                        value.done()
                    })
                )
            })
        )
    }

    enqueue(fn: (done: () => void) => void): void{
        this.queue.push(new Job(this, fn));
        this.scheduleNext();
    }

    jobDone(job: Job): void{
        const index = this.running.indexOf(job);
        if(index >= 0){
            this.running.splice(index, 1);
            this.scheduleNext();
        }

    }

    private scheduleNext(): void{
        while ((this.running.length < this.maxConcurrency) && (this.queue.length > 0)){

            const scheduled = this.queue.shift();
            this.running.push(scheduled);
            setTimeout(() => {
                scheduled.run()
            }, 0)
        }
    }
}


class Job{

    private isDone: boolean;

    constructor(private owner: ParallelExecutor, private fn: (done: () => void) => void) {
    }

    done(): void{
        if(!this.isDone){
            this.isDone = true;
            this.owner.jobDone(this)
        }

    }

    run(): void{
        this.fn(this.done.bind(this))
    }

}
