package tp.intro_streaming;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:tp/intro_streaming/Producer.class */
public class Producer<T> extends Thread {
    final int id;
    private TimestampGetter<T> tsGetter;
    private SocketWriter<T> writer;
    private long lastMsgTimestamp = -1;
    BlockingQueue<T> queue = new LinkedBlockingQueue();

    public BlockingQueue<T> getQueue() {
        return this.queue;
    }

    public Producer(int i, TimestampGetter<T> timestampGetter, SocketWriter<T> socketWriter) {
        this.id = i;
        this.tsGetter = timestampGetter;
        this.writer = socketWriter;
    }

    public boolean addMessage(T t) {
        return this.queue.add(t);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (true) {
            try {
                T poll = this.queue.poll(1L, TimeUnit.SECONDS);
                if (poll != null) {
                    Long timestamp = this.tsGetter.getTimestamp(poll);
                    Long valueOf = Long.valueOf(System.currentTimeMillis());
                    while (this.lastMsgTimestamp != -1 && valueOf.longValue() - this.lastMsgTimestamp >= System.currentTimeMillis() - timestamp.longValue()) {
                    }
                    this.writer.printToSocket(poll);
                    this.lastMsgTimestamp = timestamp.longValue();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                return;
            }
        }
    }
}
