|
| 1 | +// Copyright 2017 JanusGraph Authors |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License at |
| 6 | +// |
| 7 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +// |
| 9 | +// Unless required by applicable law or agreed to in writing, software |
| 10 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +// See the License for the specific language governing permissions and |
| 13 | +// limitations under the License. |
| 14 | + |
| 15 | +package org.janusgraph.hadoop.formats.hbase; |
| 16 | + |
| 17 | +import org.janusgraph.diskstorage.Entry; |
| 18 | +import org.janusgraph.diskstorage.PermanentBackendException; |
| 19 | +import org.janusgraph.diskstorage.StaticBuffer; |
| 20 | +import org.janusgraph.diskstorage.hbase.HBaseStoreManager; |
| 21 | +import org.janusgraph.hadoop.config.JanusGraphHadoopConfiguration; |
| 22 | +import org.janusgraph.hadoop.formats.util.AbstractBinaryInputFormat; |
| 23 | +import org.apache.hadoop.conf.Configuration; |
| 24 | +import org.apache.hadoop.fs.Path; |
| 25 | +import org.apache.hadoop.hbase.HBaseConfiguration; |
| 26 | +import org.apache.hadoop.hbase.client.Result; |
| 27 | +import org.apache.hadoop.hbase.client.Scan; |
| 28 | +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| 29 | +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; |
| 30 | +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; |
| 31 | +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; |
| 32 | +import org.apache.hadoop.hbase.util.Bytes; |
| 33 | +import org.apache.hadoop.mapreduce.InputSplit; |
| 34 | +import org.apache.hadoop.mapreduce.Job; |
| 35 | +import org.apache.hadoop.mapreduce.JobContext; |
| 36 | +import org.apache.hadoop.mapreduce.RecordReader; |
| 37 | +import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| 38 | +import org.slf4j.Logger; |
| 39 | +import org.slf4j.LoggerFactory; |
| 40 | + |
| 41 | +import java.io.IOException; |
| 42 | +import java.lang.reflect.Method; |
| 43 | +import java.util.List; |
| 44 | +import java.util.Map; |
| 45 | + |
| 46 | +import com.google.common.collect.BiMap; |
| 47 | + |
| 48 | +/** |
| 49 | + * An input format to read from a HBase snapshot. This will consume a stable, |
| 50 | + * read-only view of a HBase table directly off HDFS, bypassing HBase server |
| 51 | + * calls. The configuration properties required by this input format are: |
| 52 | + * <p> |
| 53 | + * 1. The snapshot name. This points to a pre-created snapshot of the graph |
| 54 | + * table on HBase. {@link HBaseStoreManager#HBASE_SNAPSHOT} |
| 55 | + * <br/> |
| 56 | + * e.g. janusgraphmr.ioformat.conf.storage.hbase.snapshot-name=janusgraph-snapshot |
| 57 | + * <p> |
| 58 | + * 2. The snapshot restore directory. This is specified as a temporary restore |
| 59 | + * directory on the same File System as hbase root dir. The restore directory is |
| 60 | + * used to restore the table and region structure from the snapshot to scan the |
| 61 | + * table, but with no data coping involved. |
| 62 | + * {@link HBaseStoreManager#HBASE_SNAPSHOT_RESTORE_DIR} |
| 63 | + * <br/> |
| 64 | + * e.g. janusgraphmr.ioformat.conf.storage.hbase.snapshot-restore-dir=/tmp |
| 65 | + * <p> |
| 66 | + * It is also required that the Hadoop configuration directory, which contains |
| 67 | + * core-site.xml, is in the classpath for access to the hadoop cluster. This |
| 68 | + * requirement is similar to the configuration requirement for <a href= |
| 69 | + * "http://tinkerpop.apache.org/docs/current/reference/#hadoop-gremlin">hadoop-gremlin</a> |
| 70 | + * <p> |
| 71 | + * Additionally, the HBase configuration directory, which contains hbase-site.xml, |
| 72 | + * should be placed in the classpath as well. If it is not, hbase.rootdir property |
| 73 | + * needs to be set as a pass-through property in the graph property file. |
| 74 | + * <br/> |
| 75 | + * e.g. janusgraphmr.ioformat.conf.storage.hbase.ext.hbase.rootdir=/hbase |
| 76 | + */ |
| 77 | +public class HBaseSnapshotBinaryInputFormat extends AbstractBinaryInputFormat { |
| 78 | + |
| 79 | + private static final Logger log = LoggerFactory.getLogger(HBaseSnapshotBinaryInputFormat.class); |
| 80 | + |
| 81 | + // Key for specifying the snapshot name. To be replaced by the constant in hbase package if it |
| 82 | + // becomes public. |
| 83 | + private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name"; |
| 84 | + // key for specifying the root dir of the restored snapshot. To be replaced by the constant |
| 85 | + // in hbase package if it becomes public. |
| 86 | + private static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir"; |
| 87 | + |
| 88 | + private final TableSnapshotInputFormat tableSnapshotInputFormat = new TableSnapshotInputFormat(); |
| 89 | + private RecordReader<ImmutableBytesWritable, Result> tableReader; |
| 90 | + private byte[] edgeStoreFamily; |
| 91 | + private RecordReader<StaticBuffer, Iterable<Entry>> janusgraphRecordReader; |
| 92 | + |
| 93 | + @Override |
| 94 | + public List<InputSplit> getSplits(final JobContext jobContext) throws IOException, InterruptedException { |
| 95 | + return this.tableSnapshotInputFormat.getSplits(jobContext); |
| 96 | + } |
| 97 | + |
| 98 | + @Override |
| 99 | + public RecordReader<StaticBuffer, Iterable<Entry>> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { |
| 100 | + tableReader = tableSnapshotInputFormat.createRecordReader(inputSplit, taskAttemptContext); |
| 101 | + janusgraphRecordReader = new HBaseBinaryRecordReader(tableReader, edgeStoreFamily); |
| 102 | + return janusgraphRecordReader; |
| 103 | + } |
| 104 | + |
| 105 | + @Override |
| 106 | + public void setConf(final Configuration config) { |
| 107 | + HBaseConfiguration.addHbaseResources(config); |
| 108 | + super.setConf(config); |
| 109 | + |
| 110 | + // Pass the extra pass-through properties directly to HBase/Hadoop config. |
| 111 | + final Map<String, Object> configSub = janusgraphConf.getSubset(HBaseStoreManager.HBASE_CONFIGURATION_NAMESPACE); |
| 112 | + for (Map.Entry<String, Object> entry : configSub.entrySet()) { |
| 113 | + log.info("HBase configuration: setting {}={}", entry.getKey(), entry.getValue()); |
| 114 | + if (entry.getValue() == null) continue; |
| 115 | + config.set(entry.getKey(), entry.getValue().toString()); |
| 116 | + } |
| 117 | + |
| 118 | + config.set("autotype", "none"); |
| 119 | + final Scan scanner = new Scan(); |
| 120 | + String cfName = mrConf.get(JanusGraphHadoopConfiguration.COLUMN_FAMILY_NAME); |
| 121 | + // TODO the space-saving short name mapping leaks from HBaseStoreManager here |
| 122 | + if (janusgraphConf.get(HBaseStoreManager.SHORT_CF_NAMES)) { |
| 123 | + try { |
| 124 | + final BiMap<String, String> shortCfMap = HBaseStoreManager.createShortCfMap(janusgraphConf); |
| 125 | + cfName = HBaseStoreManager.shortenCfName(shortCfMap, cfName); |
| 126 | + } catch (PermanentBackendException e) { |
| 127 | + throw new RuntimeException(e); |
| 128 | + } |
| 129 | + } |
| 130 | + edgeStoreFamily = Bytes.toBytes(cfName); |
| 131 | + scanner.addFamily(edgeStoreFamily); |
| 132 | + |
| 133 | + // This is a workaround, to be removed when convertScanToString becomes public in hbase package. |
| 134 | + Method converter; |
| 135 | + try { |
| 136 | + converter = TableMapReduceUtil.class.getDeclaredMethod("convertScanToString", Scan.class); |
| 137 | + converter.setAccessible(true); |
| 138 | + config.set(TableInputFormat.SCAN, (String) converter.invoke(null, scanner)); |
| 139 | + } catch (Exception e) { |
| 140 | + throw new RuntimeException(e); |
| 141 | + } |
| 142 | + |
| 143 | + final String snapshotName = janusgraphConf.get(HBaseStoreManager.HBASE_SNAPSHOT); |
| 144 | + final String restoreDirString = janusgraphConf.get(HBaseStoreManager.HBASE_SNAPSHOT_RESTORE_DIR); |
| 145 | + |
| 146 | + final Path restoreDir = new Path(restoreDirString); |
| 147 | + try { |
| 148 | + // This is a workaround. TableSnapshotInputFormat.setInput accepts a Job as parameter. |
| 149 | + // And the Job.getInstance(config) create clone of the config, not setting on the |
| 150 | + // passed in config. |
| 151 | + Job job = Job.getInstance(config); |
| 152 | + TableSnapshotInputFormat.setInput(job, snapshotName, restoreDir); |
| 153 | + config.set(SNAPSHOT_NAME_KEY, job.getConfiguration().get(SNAPSHOT_NAME_KEY)); |
| 154 | + config.set(RESTORE_DIR_KEY, job.getConfiguration().get(RESTORE_DIR_KEY)); |
| 155 | + } catch (IOException e) { |
| 156 | + throw new RuntimeException(e); |
| 157 | + } |
| 158 | + } |
| 159 | + |
| 160 | + @Override |
| 161 | + public Configuration getConf() { |
| 162 | + return super.getConf(); |
| 163 | + } |
| 164 | +} |
0 commit comments