什么是RPC?
RPC(Remote Procedure Call Protocol)——远程过程调用协议
安全的RPC 客户端 —— Thrift
在1.6版本中,Thrift的source和sink支持kerberos认证。客户端需要使用secureRpcClientFactory的getThriftInstance方法获得SecureThriftRpcClient对象。SecureThriftClient继承ThriftRpcClient(实现了RpcClient接口)。使用SecureRpcClientFactory依赖于Flume-ng-auth模块。客户端的principal以及keytab都需要通过参数的形式传入,他们作为kerberos KDC的证书。另外,目标服务器的principal也需要提供。下面就是secureRpclientFacotry的例子:
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.api.SecureRpcClientFactory;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClient;
import java.nio.charset.Charset;
import java.util.Properties;
public class MyApp
{
public static void main(String[] args)
{
MySecureRpcClientFacade client = new MySecureRpcClientFacade();
// Initialize client with the remote Flume agent's host, port
Properties props = new Properties();
props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift");
props.setProperty("hosts", "h1");
props.setProperty("hosts.h1", "client.example.org" + ":" + String.valueOf(41414));
// Initialize client with the kerberos authentication related properties
props.setProperty("kerberos", "true");
props.setProperty("client-principal", "flumeclient/client.example.org@EXAMPLE.ORG"
props.setProperty("client-keytab", "/tmp/flumeclient.keytab"); props.setProperty("server-principal", "flume/server.example.org@EXAMPLE.ORG"); client.init(props);
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Hello Flume!";
for(int i = 0; i < 10; i++)
{
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MySecureRpcClientFacade
{
private RpcClient client;
private Properties properties;
public void init(Properties properties)
{
// Setup the RPC connection
this.properties = properties;
// Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory
this.client = SecureRpcClientFactory.getThriftInstance(properties);
}
public void sendDataToFlume(String data)
{
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try
{
client.append(event);
}
catch(EventDeliveryException e)
{
// clean up and recreate the client
client.close();
client = null;
client = SecureRpcClientFactory.getThriftInstance(properties);
}
}
public void cleanUp()
{
// Close the RPC connection
client.close();
}
}
}
ThriftSource则需要配置成kerberos模式。
a1.channels = c1
a1.sources = r1
a1.sinks = k1
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.sources.r1.kerberos = true
a1.sources.r1.agent-principal = flume/server.example.org@EXAMPLE.ORG
a1.sources.r1.agent-keytab = /tmp/flume.keytab
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger
容错机制
下面的类使用的是默认的Avro RPC Client,它基于<host>:<port>
的列表组成容错组。容错RPC Client目前不支持Thrift.如果当前与指定的agent通信出错,则会自动选取列表中的下一个通信。比如:
// Setup properties for the failover
Properties props = new Properties();
props.put("client.type", "default_failover");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");
// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);
// create the client with failover properties
RpcClient client = RpcClientFactory.getInstance(props);
为了更灵活一些,failover flume client实现FailoverRpcClient,可以基于下面的配置:
client.type = default_failover
hosts = h1 h2 h3 # at least one is required, but 2 or
# more makes better sense
hosts.h1 = host1.example.org:41414
hosts.h2 = host2.example.org:41414
hosts.h3 = host3.example.org:41414
max-attempts = 3 # Must be >=0 (default: number of hosts
# specified, 3 in this case). A '0'
# value doesn't make much sense because
# it will just cause an append call to
# immmediately fail. A '1' value means
# that the failover client will try only
# once to send the Event, and if it
# fails then there will be no failover
# to a second client, so this value
# causes the failover client to
# degenerate into just a default client.
# It makes sense to set this value to at
# least the number of hosts that you
# specified.
batch-size = 100 # Must be >=1 (default: 100)
connect-timeout = 20000 # Must be >=1000 (default: 20000)
request-timeout = 20000 # Must be >=1000 (default: 20000)
负载均衡
Flume客户端SDK也支持在多个主机中负载均衡。client使用<host>:<port>
的形式组成一个负载均衡组。client端会配置负载均衡的策略,可能是随机选择配置的主机,也可能是基于轮询的模式。你可以通过实现LoadBalancingRpcClient$HostSelector接口,指定自定义的类。在这种情况下,FQCN需要指定成特定的host selector.负载均衡RPC目前不支持Thrift.
如果backoff可用,那么在主机失败进行选举的时候会排除名单中的主机。当超时后,如果这个主机仍然不可用就会认为选举失败,超时时间会以指数级增长,以避免某些主机反应迟钝
最大的backoff事件可以通过maxBackoff进行配置。默认是30S(在OrderSelector中指定)。backoff 参数会以指数级增长。最大限制为65536秒,即18.2小时。
// Setup properties for the load balancing
Properties props = new Properties();
props.put("client.type", "default_loadbalance");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");
// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);
props.put("host-selector", "random"); // For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
// // selection
props.put("backoff", "true"); // Disabled by default.
props.put("maxBackoff", "10000"); // Defaults 0, which effectively
// becomes 30000 ms
// Create the client with load balancing properties
RpcClient client = RpcClientFactory.getInstance(props);
也可以直接如下配置:
client.type = default_loadbalance
hosts = h1 h2 h3 # At least 2 hosts are required
hosts.h1 = host1.example.org:41414
hosts.h2 = host2.example.org:41414
hosts.h3 = host3.example.org:41414
backoff = false # Specifies whether the client should
# back-off from (i.e. temporarily
# blacklist) a failed host
# (default: false).
maxBackoff = 0 # Max timeout in millis that a will
# remain inactive due to a previous
# failure with that host (default: 0,
# which effectively becomes 30000)
host-selector = round_robin # The host selection strategy used
# when load-balancing among hosts
# (default: round_robin).
# Other values are include "random"
# or the FQCN of a custom class
# that implements
# LoadBalancingRpcClient$HostSelector
batch-size = 100 # Must be >=1 (default: 100)
connect-timeout = 20000 # Must be >=1000 (default: 20000)
request-timeout = 20000 # Must be >=1000 (default: 20000)
嵌入式节点
Flume支持嵌入式API,把节点嵌入到应用中。这种节点意味着更加轻量级,比如没有source,sink以及channel的概念。EmbeddedAgent对象的put以及putAll方法可以搜集事件,目前仅支持File Channel以及Memory Channel,sink仅支持AvroSink.
配置嵌入式节点与配置普通节点类似。下面是额外的配置:
属性名称 | 默认值 | 描述 |
---|---|---|
source.type | embedded | 唯一可用的source就是embedded source |
channel.type | - | 可以是memory或者file,对应的是MemoryChannel以及FileChannel |
channel. | - | 配置channelde canshu |
sinks | - | sink的名称列表 |
sink.type | - | 值必须为avro |
sink. | - | sink的配置 |
processor.type | - | 可以使failover或者load_balance,对应的是FailoverSinksProcessor以及LoadBalancingSinkProcessor |
processor. | - | 配置processor |
source.interceptors | - | 拦截器的列表 |
source.interceptors. | - | 拦截器配置 |
Map<String, String> properties = new HashMap<String, String>();
properties.put("channel.type", "memory");
properties.put("channel.capacity", "200");
properties.put("sinks", "sink1 sink2");
properties.put("sink1.type", "avro");
properties.put("sink2.type", "avro");
properties.put("sink1.hostname", "collector1.apache.org");
properties.put("sink1.port", "5564");
properties.put("sink2.hostname", "collector2.apache.org");
properties.put("sink2.port", "5565");
properties.put("processor.type", "load_balance");
properties.put("source.interceptors", "i1");
properties.put("source.interceptors.i1.type", "static");
properties.put("source.interceptors.i1.key", "key1");
properties.put("source.interceptors.i1.value", "value1");
EmbeddedAgent agent = new EmbeddedAgent("myagent");
agent.configure(properties);
agent.start();
List<Event> events = Lists.newArrayList();
events.add(event);
events.add(event);
events.add(event);
events.add(event);
agent.putAll(events);
...
agent.stop();