package org.apache.flink.addons.hbase;

import java.io.IOException;
import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/addons/hbase/HBaseLookupFunction.class */
public class HBaseLookupFunction extends TableFunction<Row> {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseLookupFunction.class);
    private static final long serialVersionUID = 1;
    private final String hTableName;
    private final byte[] serializedConfig;
    private final HBaseTableSchema hbaseTableSchema;
    private transient HBaseReadWriteHelper readHelper;
    private transient Connection hConnection;
    private transient HTable table;

    public HBaseLookupFunction(Configuration configuration, String str, HBaseTableSchema hBaseTableSchema) {
        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
        this.hTableName = str;
        this.hbaseTableSchema = hBaseTableSchema;
    }

    public void eval(Object obj) throws IOException {
        Result result = this.table.get(this.readHelper.createGet(obj));
        if (result.isEmpty()) {
            return;
        }
        collect(this.readHelper.parseToRow(result, obj));
    }

    public TypeInformation<Row> getResultType() {
        return this.hbaseTableSchema.convertsToTableSchema().toRowType();
    }

    private Configuration prepareRuntimeConfiguration() {
        Configuration deserializeConfiguration = HBaseConfigurationUtil.deserializeConfiguration(this.serializedConfig, HBaseConfiguration.create());
        if (!StringUtils.isNullOrWhitespaceOnly(deserializeConfiguration.get("hbase.zookeeper.quorum"))) {
            return deserializeConfiguration;
        }
        LOG.error("can not connect to HBase without {} configuration", "hbase.zookeeper.quorum");
        throw new IllegalArgumentException("check HBase configuration failed, lost: 'hbase.zookeeper.quorum'!");
    }

    public void open(FunctionContext functionContext) {
        LOG.info("start open ...");
        try {
            this.hConnection = ConnectionFactory.createConnection(prepareRuntimeConfiguration());
            this.table = this.hConnection.getTable(TableName.valueOf(this.hTableName));
            this.readHelper = new HBaseReadWriteHelper(this.hbaseTableSchema);
            LOG.info("end open.");
        } catch (IOException e) {
            LOG.error("Exception while creating connection to HBase.", e);
            throw new RuntimeException("Cannot create connection to HBase.", e);
        } catch (TableNotFoundException e2) {
            LOG.error("Table '{}' not found ", this.hTableName, e2);
            throw new RuntimeException("HBase table '" + this.hTableName + "' not found.", e2);
        }
    }

    public void close() {
        LOG.info("start close ...");
        if (null != this.table) {
            try {
                this.table.close();
                this.table = null;
            } catch (IOException e) {
                LOG.warn("exception when close table", e);
            }
        }
        if (null != this.hConnection) {
            try {
                this.hConnection.close();
                this.hConnection = null;
            } catch (IOException e2) {
                LOG.warn("exception when close connection", e2);
            }
        }
        LOG.info("end close.");
    }

    @VisibleForTesting
    String getHTableName() {
        return this.hTableName;
    }
}
