Hive 元数据变更有2种实现:
1)基于 Hook 函数实现,实现类为 HiveHook
2)基于MetaStoreEventListener 实现, 实现类为HiveMetastoreHookImpl
所以提供2 种配置,即配置钩子函数或监听器,我们目前采用的是第二种方案。Atlas hook的类图如下所示
Hive hook 开发的步骤
1)创建类实现ExecuteWithHookContext
2)打包上传到HIVE_HOME/lib
3)配置,可在hive client中临时配置,或者设置hive-site.xml配置
HiveHook重写了AtlasHook的run方法,根据hive的操作类型,生成事件,通过AtlasHook#notifyEntities发送事件消息。
public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
@Override
public void run(HookContext hookContext) throws Exception {
//...try {
// 从hookContext中获取HiveOperationHiveOperation oper = OPERATION_MAP.get(hookContext.getOperationName());AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext, getKnownObjects());BaseHiveEvent event = null;
// 匹配不同的HiveOperation, 创建事件 switch (oper) {case CREATEDATABASE:event = new CreateDatabase(context);break;case DROPDATABASE:event = new DropDatabase(context);break;
//...case ALTERTABLE_RENAME:case ALTERVIEW_RENAME:event = new AlterTableRename(context);break;case ALTERTABLE_RENAMECOL:event = new AlterTableRenameCol(context);break;default://...break;}if (event != null) {
// 用户组信息final UserGroupInformation ugi = hookContext.getUgi() == null ? Utils.getUGI() : hookContext.getUgi();
// 委托给AtlasHook#notifyEntitiessuper.notifyEntities(event.getNotificationMessages(), ugi);}} catch (Throwable t) {LOG.error("HiveHook.run(): failed to process operation {}", hookContext.getOperationName(), t);}
//...
}
}
hive事件类图
BaseHiveEvent事件主要分为以下几类
1)Hive类型 HIVE_TYPE_
2)Hbase类型 HBASE_TYPE_
3)Hive属性 ATTRIBUTE_
4)Hbase属性 HBASE_
5)Hive关系属性 RELATIONSHIP_
6)其他
notifyEntities消息通知有同步和异步两种策略,notifyEntities委托给notifyEntitiesInternal
public static void notifyEntities(List messages, UserGroupInformation ugi, int maxRetries) {if (executor == null) { // send synchronously
// 同步通知notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);} else {
// 异步通知executor.submit(new Runnable() {@Overridepublic void run() {notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);}});}
}
notifyEntitiesInternal实现了重试策略, 掉用notificationInterface.send方法发送消息
@VisibleForTesting
static void notifyEntitiesInternal(List messages, int maxRetries, UserGroupInformation ugi,NotificationInterface notificationInterface,boolean shouldLogFailedMessages, FailedMessagesLogger logger) {
// ...final int maxAttempts = maxRetries < 1 ? 1 : maxRetries;Exception notificationFailure = null;
// 多次尝试for (int numAttempt = 1; numAttempt <= maxAttempts; numAttempt++) {if (numAttempt > 1) { // retry attempt
// 重试,线程sleeptry {LOG.debug("Sleeping for {} ms before retry", notificationRetryInterval);Thread.sleep(notificationRetryInterval);} catch (InterruptedException ie) {LOG.error("Notification hook thread sleep interrupted");break;}}try {
// 发送通知,什么地方接收通知if (ugi == null) {notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages);} else {PrivilegedExceptionAction
通知消息最终由KafkaNotification将消息发送到kafka, KafkaNotification在AtlasHook静态方法种由NotificationProvider创建
notificationInterface的初始化在AtlasHook 的static方法中初始化,notificationInterface的实现类为KafkaNotification,消息将发送到kafka中,Atlas服务端对消息进行消费,然后入库。
public abstract class AtlasHook {
//...protected static Configuration atlasProperties;protected static NotificationInterface notificationInterface;
//...private static ExecutorService executor = null;static {try {atlasProperties = ApplicationProperties.get();} catch (Exception e) {LOG.info("Failed to load application properties", e);}
//...metadataNamespace = getMetadataNamespace(atlasProperties);placeCode = getPlaceCode(atlasProperties);notificationMaxRetries = atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
// 消息通知接口实现notificationInterface = NotificationProvider.get();
//...
}
HiveMetastoreHookImpl 继承了MetaStoreEventListener 类,重写了 onCreateDatabase、onDropDatabase等方法,并委托给HiveMetastoreHook#handleEvent处理
public class HiveMetastoreHookImpl extends MetaStoreEventListener {private static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreHookImpl.class);private final HiveHook hiveHook;private final HiveMetastoreHook hook;public HiveMetastoreHookImpl(Configuration config) {super(config);this.hiveHook = new HiveHook();this.hook = new HiveMetastoreHook();}
//...@Overridepublic void onCreateDatabase(CreateDatabaseEvent dbEvent) {HiveOperationContext context = new HiveOperationContext(CREATEDATABASE, dbEvent);// 委托给HiveMetastoreHook执行hook.handleEvent(context);}@Overridepublic void onDropDatabase(DropDatabaseEvent dbEvent) {HiveOperationContext context = new HiveOperationContext(DROPDATABASE, dbEvent);hook.handleEvent(context);
}
//...
}
handleEvent将变更消息通过AtlasHook#notifyEntities,消息将发送到kafka中,Atlas服务端对消息进行消费,然后入库。
public void handleEvent(HiveOperationContext operContext) {ListenerEvent listenerEvent = operContext.getEvent();
//...try {HiveOperation oper = operContext.getOperation();AtlasHiveHookContext context = new AtlasHiveHookContext(hiveHook, oper, hiveHook.getKnownObjects(), this, listenerEvent);BaseHiveEvent event = null;switch (oper) {case CREATEDATABASE:event = new CreateDatabase(context);break;//...case ALTERTABLE_RENAMECOL:FieldSchema columnOld = operContext.getColumnOld();FieldSchema columnNew = operContext.getColumnNew();event = new AlterTableRenameCol(columnOld, columnNew, context);break;default:if (LOG.isDebugEnabled()) {LOG.debug("HiveMetastoreHook.handleEvent({}): operation ignored.", listenerEvent);}break;}if (event != null) {final UserGroupInformation ugi = SecurityUtils.getUGI() == null ? Utils.getUGI() : SecurityUtils.getUGI();
// 消息通知,AtlasHook#notifyEntitiessuper.notifyEntities(event.getNotificationMessages(), ugi);}} catch (Throwable t) {LOG.error("HiveMetastoreHook.handleEvent({}): failed to process operation {}", listenerEvent, t);}}
}