代码如下:
private async Task CustomNamedPipeServer(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
var pipeServer = CreatePipeServer();
await pipeServer.WaitForConnectionAsync(token)
.ConfigureAwait(false);
// 交给后台处理
_ = HandleClientLongConnectionAsync(pipeServer, token);
}
catch (OperationCanceledException) { break; }
catch (Exception ex)
{
logger.Error($"[CustomPipe Server Error] {ex.Message}");
await Task.Delay(1000, token);
}
}
}
private NamedPipeServerStream CreatePipeServer()
{
return new NamedPipeServerStream(
"YourPipeName",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous,
1024 * 32,
1024 * 32);
}
private async Task HandleClientLongConnectionAsync(
NamedPipeServerStream pipeServer,
CancellationToken token)
{
byte[] buffer = new byte[65536];
var ms = new MemoryStream();
try
{
using (var writer = new StreamWriter(pipeServer, new UTF8Encoding(false)) { AutoFlush = true })
{
while (!token.IsCancellationRequested &&
pipeServer.IsConnected)
{
int bytesReadLen = await pipeServer.ReadAsync(buffer, 0, buffer.Length, token);
if (bytesReadLen == 0)
{
logger.Warn("收到空,客户端可能断开");
break;
}
ms.Write(buffer, 0, bytesReadLen);
if (!pipeServer.IsMessageComplete && pipeServer.TransmissionMode == PipeTransmissionMode.Message)
{
continue;
}
string msg = Encoding.UTF8.GetString(ms.ToArray());
logger.Debug("收到数据:" + msg);
ms.SetLength(0); // 重置
// 你的逻辑
await writer.WriteLineAsync("ReceiveOK")
.ConfigureAwait(false);
}
}
}
}
catch (Exception ex)
{
}
finally
{
try { pipeServer.Dispose(); } catch { }
}
}
2. 调用
CancellationTokenSource _cts = new CancellationTokenSource();
Task _receTask = Task.Run(() => CustomNamedPipeServer(_cts.Token));
// 停止
_cts.Cancel();
