Подключение к Apache Kafka в HDInsight с помощью виртуальной сети Azure
Узнайте, как подключиться напрямую к Apache Kafka в HDInsight с помощью виртуальной сети Azure. В этой статье приведены сведения о подключении к Kafka с использованием приведенных ниже конфигураций.
- Из ресурсов в локальной сети. Это подключение устанавливается с помощью VPN-устройства (программного или аппаратного) в локальной сети.
- Из среды разработки с помощью программного VPN-клиента.
Примечание.
Мы рекомендуем использовать модуль Azure Az PowerShell для взаимодействия с Azure. Чтобы начать работу, см. статью Установка Azure PowerShell. Дополнительные сведения см. в статье Перенос Azure PowerShell с AzureRM на Az.
Архитектура и планирование
HDInsight не разрешает прямое подключение к Kafka через общедоступный Интернет. Вместо этого клиенты Kafka (производители и потребители) должны использовать один из следующих методов подключения.
Запустите клиент в той же виртуальной сети, что и Kafka в HDInsight. Сведения об использовании этой конфигурации см. в статье Приступая к работе с Apache Kafka в HDInsight. Клиент работает непосредственно на узлах кластера HDInsight или на другой виртуальной машине в той же сети.
Подключите частную сеть, например локальную, к виртуальной сети. Эта конфигурация разрешает клиентам в локальной сети напрямую работать с Kafka. Чтобы включить эту конфигурацию, сделайте следующее:
Создайте виртуальную сеть.
Создайте VPN-шлюз, использующий конфигурацию типа "сеть — сеть". Конфигурация, используемая в этой статье, подключается к устройству шлюза VPN в локальной сети.
Создайте DNS-сервер в виртуальной сети.
Настройте переадресацию между DNS-серверами в каждой сети.
Создайте Kafka в кластере HDInsight в виртуальной сети.
Дополнительные сведения см. в разделе Подключение к Apache Kafka из локальной сети.
Подключите отдельные виртуальные машины к виртуальной сети с помощью VPN-шлюза и VPN-клиента. Чтобы включить эту конфигурацию, сделайте следующее:
Создайте виртуальную сеть.
Создайте VPN-шлюз, использующий конфигурацию типа "точка — сеть". Эту конфигурацию можно использовать как с клиентами Windows, так и с macOS.
Создайте Kafka в кластере HDInsight в виртуальной сети.
Настройте Kafka для объявления IP-адресов. Такая конфигурация позволяет клиенту подключаться с помощью IP-адресов брокера вместо использования доменных имен.
Скачайте и используйте VPN-клиент в системе разработки.
Дополнительные сведения см. в разделе Подключение к Apache Kafka с помощью VPN-клиента.
Предупреждение
Эта конфигурация рекомендуется только для целей разработки из-за следующих ограничений:
- Каждый клиент должен подключиться с помощью программного VPN-клиента.
- Клиент VPN не передает запросы на разрешения имен в виртуальную сеть, поэтому для обмена данными с Kafka необходимо использовать IP-адрес. Для взаимодействия с IP-адресом требуется дополнительная настройка кластера Kafka.
Дополнительные сведения об использовании HDInsight в виртуальной сети см. в статье Планирование виртуальной сети для кластеров Azure HDInsight.
Подключение к Apache Kafka из локальной сети
Чтобы создать кластер Kafka, который взаимодействует с локальной сетью, выполните действия, описанные в статье Подключение HDInsight к локальной сети.
Внимание
При создании кластера HDInsight выберите тип кластера Kafka.
Выполнив эти действия, вы создадите следующую конфигурацию:
- Виртуальная сеть Azure
- VPN-шлюз типа "сеть — сеть";
- учетная запись хранения Azure (используемая HDInsight);
- Kafka в HDInsight
Чтобы убедиться, что клиент Kafka может подключиться к кластеру из локальной сети, ознакомьтесь с разделом Пример: клиент Python.
Подключение к Apache Kafka с помощью VPN-клиента
В этом разделе описаны действия по созданию следующей конфигурации:
- Виртуальная сеть Azure
- VPN-шлюз типа "точка — сеть"
- Учетная запись хранения Azure (используется HDInsight)
- Kafka в HDInsight
Дополнительные сведения см. в статье Создание и экспорт сертификатов для подключений типа "точка — сеть" с помощью PowerShell в Windows 10. Там приведены действия по созданию сертификатов, необходимых для шлюза.
Откройте командную строку PowerShell и используйте следующий код, чтобы войти в подписку Azure:
Connect-AzAccount # If you have multiple subscriptions, uncomment to set the subscription #Select-AzSubscription -SubscriptionName "name of your subscription"
Используйте следующий код, чтобы создать переменные, которые содержат сведения о конфигурации:
# Prompt for generic information $resourceGroupName = Read-Host "What is the resource group name?" $baseName = Read-Host "What is the base name? It is used to create names for resources, such as 'net-basename' and 'kafka-basename':" $location = Read-Host "What Azure Region do you want to create the resources in?" $rootCert = Read-Host "What is the file path to the root certificate? It is used to secure the VPN gateway." # Prompt for HDInsight credentials $adminCreds = Get-Credential -Message "Enter the HTTPS user name and password for the HDInsight cluster" -UserName "admin" $sshCreds = Get-Credential -Message "Enter the SSH user name and password for the HDInsight cluster" -UserName "sshuser" # Names for Azure resources $networkName = "net-$baseName" $clusterName = "kafka-$baseName" $storageName = "store$baseName" # Can't use dashes in storage names $defaultContainerName = $clusterName $defaultSubnetName = "default" $gatewaySubnetName = "GatewaySubnet" $gatewayPublicIpName = "GatewayIp" $gatewayIpConfigName = "GatewayConfig" $vpnRootCertName = "rootcert" $vpnName = "VPNGateway" # Network settings $networkAddressPrefix = "10.0.0.0/16" $defaultSubnetPrefix = "10.0.0.0/24" $gatewaySubnetPrefix = "10.0.1.0/24" $vpnClientAddressPool = "172.16.201.0/24" # HDInsight settings $hdiWorkerNodes = 4 $hdiVersion = "3.6" $hdiType = "Kafka"
Используйте следующий код, чтобы создать группу ресурсов и виртуальную сеть Azure:
# Create the resource group that contains everything New-AzResourceGroup -Name $resourceGroupName -Location $location # Create the subnet configuration $defaultSubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName ` -AddressPrefix $defaultSubnetPrefix $gatewaySubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName ` -AddressPrefix $gatewaySubnetPrefix # Create the subnet New-AzVirtualNetwork -Name $networkName ` -ResourceGroupName $resourceGroupName ` -Location $location ` -AddressPrefix $networkAddressPrefix ` -Subnet $defaultSubnetConfig, $gatewaySubnetConfig # Get the network & subnet that were created $network = Get-AzVirtualNetwork -Name $networkName ` -ResourceGroupName $resourceGroupName $gatewaySubnet = Get-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName ` -VirtualNetwork $network $defaultSubnet = Get-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName ` -VirtualNetwork $network # Set a dynamic public IP address for the gateway subnet $gatewayPublicIp = New-AzPublicIpAddress -Name $gatewayPublicIpName ` -ResourceGroupName $resourceGroupName ` -Location $location ` -AllocationMethod Dynamic $gatewayIpConfig = New-AzVirtualNetworkGatewayIpConfig -Name $gatewayIpConfigName ` -Subnet $gatewaySubnet ` -PublicIpAddress $gatewayPublicIp # Get the certificate info # Get the full path in case a relative path was passed $rootCertFile = Get-ChildItem $rootCert $cert = New-Object System.Security.Cryptography.X509Certificates.X509Certificate2($rootCertFile) $certBase64 = [System.Convert]::ToBase64String($cert.RawData) $p2sRootCert = New-AzVpnClientRootCertificate -Name $vpnRootCertName ` -PublicCertData $certBase64 # Create the VPN gateway New-AzVirtualNetworkGateway -Name $vpnName ` -ResourceGroupName $resourceGroupName ` -Location $location ` -IpConfigurations $gatewayIpConfig ` -GatewayType Vpn ` -VpnType RouteBased ` -EnableBgp $false ` -GatewaySku Standard ` -VpnClientAddressPool $vpnClientAddressPool ` -VpnClientRootCertificates $p2sRootCert
Предупреждение
Этот процесс может занять несколько минут.
Используйте следующий код, чтобы создать учетную запись хранения Azure и контейнер BLOB-объектов:
# Create the storage account New-AzStorageAccount ` -ResourceGroupName $resourceGroupName ` -Name $storageName ` -SkuName Standard_GRS ` -Location $location ` -Kind StorageV2 ` -EnableHttpsTrafficOnly 1 # Get the storage account keys and create a context $defaultStorageKey = (Get-AzStorageAccountKey -ResourceGroupName $resourceGroupName ` -Name $storageName)[0].Value $storageContext = New-AzStorageContext -StorageAccountName $storageName ` -StorageAccountKey $defaultStorageKey # Create the default storage container New-AzStorageContainer -Name $defaultContainerName ` -Context $storageContext
Используйте следующий код, чтобы создать кластер HDInsight:
# Create the HDInsight cluster New-AzHDInsightCluster ` -ResourceGroupName $resourceGroupName ` -ClusterName $clusterName ` -Location $location ` -ClusterSizeInNodes $hdiWorkerNodes ` -ClusterType $hdiType ` -OSType Linux ` -Version $hdiVersion ` -HttpCredential $adminCreds ` -SshCredential $sshCreds ` -DefaultStorageAccountName "$storageName.blob.core.windows.net" ` -DefaultStorageAccountKey $defaultStorageKey ` -DefaultStorageContainer $defaultContainerName ` -DisksPerWorkerNode 2 ` -VirtualNetworkId $network.Id ` -SubnetName $defaultSubnet.Id
Предупреждение
Эта процедура занимает около 15 минут.
Настройка Kafka для объявления IP-адресов
По умолчанию Apache Zookeeper возвращает клиентам доменное имя брокеров Kafka. Эта конфигурация не работает для программного VPN-клиента, так как он не может использовать разрешение имен для сущностей в виртуальной сети. Для этой конфигурации выполните следующие действия, чтобы настроить Kafka для объявления IP-адресов вместо доменных имен:
Откройте веб-браузер и перейдите по адресу
https://CLUSTERNAME.azurehdinsight.net
. ЗаменитеCLUSTERNAME
именем Kafka в кластере HDInsight.При появлении запроса введите имя пользователя и пароль HTTPS для кластера. Отобразится веб-интерфейс Ambari для кластера.
Чтобы просмотреть сведения о Kafka, из списка слева выберите Kafka.
Чтобы просмотреть конфигурацию Kafka, выберите пункт Configs (Конфигурации) в верхней части окна.
Чтобы найти конфигурацию kafka-env, введите
kafka-env
в поле фильтра в правом верхнем углу.Чтобы настроить Kafka для объявления IP-адресов, добавьте следующий текст в нижнюю часть поля kafka-env template (шаблон kafka-env):
# Configure Kafka to advertise IP addresses instead of FQDN IP_ADDRESS=$(hostname -i) echo advertised.listeners=$IP_ADDRESS sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
Чтобы настроить интерфейс, через который Kafka ожидает передачи данных, введите
listeners
в поле фильтра в правом верхнем углу.Чтобы настроить Kafka для ожидания передачи данных через все сетевые интерфейсы, измените значение в поле listeners на
PLAINTEXT://0.0.0.0:9092
.Нажмите кнопку Save (Сохранить), чтобы сохранить изменения в конфигурации. Введите текст, описывающий изменения. После сохранения изменений нажмите кнопку ОК.
Для предотвращения ошибок при перезапуске Kafka нажмите кнопку Service Actions (Действия со службой) и выберите Turn On Maintenance Mode (Включить режим обслуживания). Чтобы завершить эту операцию, нажмите кнопку ОК.
Чтобы перезапустить Kafka, нажмите кнопку Restart (Перезапустить) и выберите Restart All Affected (Перезапустить все затронутые). Подтвердите перезапуск, а после завершения операции нажмите кнопку ОК.
Чтобы отключить режим обслуживания нажмите кнопку Service Actions (Действия со службой) и выберите Turn Off Maintenance Mode (Отключить режим обслуживания). Чтобы завершить эту операцию, нажмите кнопку ОК.
Подключение к VPN-шлюзу
Для подключения к VPN-шлюзу используйте раздел Подключение к Azure статьи Настройка подключения типа "точка — сеть".
Пример: клиент Python
Чтобы проверить подключение к Kafka, выполните следующие действия для создания и запуска производителя и потребителя Python:
Чтобы получить полное доменное имя (FQDN) и IP-адреса узлов кластера Kafka, используйте один из следующих методов.
$resourceGroupName = "The resource group that contains the virtual network used with HDInsight" $clusterNICs = Get-AzNetworkInterface -ResourceGroupName $resourceGroupName | where-object {$_.Name -like "*node*"} $nodes = @() foreach($nic in $clusterNICs) { $node = new-object System.Object $node | add-member -MemberType NoteProperty -name "Type" -value $nic.Name.Split('-')[1] $node | add-member -MemberType NoteProperty -name "InternalIP" -value $nic.IpConfigurations.PrivateIpAddress $node | add-member -MemberType NoteProperty -name "InternalFQDN" -value $nic.DnsSettings.InternalFqdn $nodes += $node } $nodes | sort-object Type
az network nic list --resource-group <resourcegroupname> --output table --query "[?contains(name,'node')].{NICname:name,InternalIP:ipConfigurations[0].privateIpAddress,InternalFQDN:dnsSettings.internalFqdn}"
В этом сценарии предполагается, что
$resourceGroupName
— это имя группы ресурсов Azure, содержащей виртуальную сеть.Сохраните полученные данные для использования на последующих шагах.
Чтобы установить клиент kafka-python, используйте следующую команду:
pip install kafka-python
Чтобы отправить данные в Kafka, используйте следующий код Python:
from kafka import KafkaProducer # Replace the `ip_address` entries with the IP address of your worker nodes # NOTE: you don't need the full list of worker nodes, just one or two. producer = KafkaProducer(bootstrap_servers=['kafka_broker_1','kafka_broker_2']) for _ in range(50): producer.send('testtopic', b'test message')
Замените записи
'kafka_broker'
адресами, полученными на шаге 1 этого раздела.При использовании программного VPN-клиента замените записи
kafka_broker
IP-адресом рабочих узлов.Если вы включили разрешение имен через пользовательский DNS-сервер, замените записи
kafka_broker
полным доменным именем рабочих узлов.Примечание.
Этот код отправляет строку
test message
в разделtesttopic
. По умолчанию Kafka в HDInsight не создает раздел, если он не существует. См. раздел Настройка автоматического создания разделов в Apache Kafka в HDInsight. Кроме того, можно создавать разделы вручную перед созданием сообщений.
Для получения сообщений от Kafka используйте следующий код Python:
from kafka import KafkaConsumer # Replace the `ip_address` entries with the IP address of your worker nodes # Again, you only need one or two, not the full list. # Note: auto_offset_reset='earliest' resets the starting offset to the beginning # of the topic consumer = KafkaConsumer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'],auto_offset_reset='earliest') consumer.subscribe(['testtopic']) for msg in consumer: print (msg)
Замените записи
'kafka_broker'
адресами, полученными на шаге 1 этого раздела.При использовании программного VPN-клиента замените записи
kafka_broker
IP-адресом рабочих узлов.Если вы включили разрешение имен через пользовательский DNS-сервер, замените записи
kafka_broker
полным доменным именем рабочих узлов.
Следующие шаги
Дополнительные сведения об использовании HDInsight в виртуальной сети см. в статье Планирование развертывания виртуальной сети для кластеров Azure HDInsight.
Дополнительные сведения о создании виртуальной сети Azure с VPN-шлюзом типа "точка — сеть" см. в следующих документах:
Настройка подключения типа "точка — сеть" к виртуальной сети с помощью портала Azure
Настройка подключения типа "точка — сеть" к виртуальной сети с помощью PowerShell
Дополнительные сведения о работе с Apache Kafka HDInsight см. в следующих документах:
- Get started with Apache Kafka on HDInsight (preview) (Приступая к работе с Apache Kafka в HDInsight (предварительная версия))
- Использование репликации разделов Apache Kafka с помощью Kafka в HDInsight и MirrorMaker